aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/consensus_api.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2012-11-21 20:56:49 +0000
committerFlorian Dold <florian.dold@gmail.com>2012-11-21 20:56:49 +0000
commit6b1559589726297aa372048b4e388d2e1473a6f6 (patch)
treee4edfd3bc5865905de4db27229427ac998484824 /src/consensus/consensus_api.c
parent84d88f3ddc67f50c4493a20f33883c9242fd0d57 (diff)
downloadgnunet-6b1559589726297aa372048b4e388d2e1473a6f6.tar.gz
gnunet-6b1559589726297aa372048b4e388d2e1473a6f6.zip
started implementing consensus api and service
Diffstat (limited to 'src/consensus/consensus_api.c')
-rw-r--r--src/consensus/consensus_api.c502
1 files changed, 502 insertions, 0 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
new file mode 100644
index 000000000..0d7e6c8e4
--- /dev/null
+++ b/src/consensus/consensus_api.c
@@ -0,0 +1,502 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file consensus/consensus_api.c
23 * @brief
24 * @author Florian Dold
25 */
26#include "platform.h"
27#include "gnunet_protocols.h"
28#include "gnunet_client_lib.h"
29#include "gnunet_consensus_service.h"
30#include "consensus.h"
31
32
33#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
34
35
36/**
37 * Handle for the service.
38 */
39struct GNUNET_CONSENSUS_Handle
40{
41 /**
42 * Configuration to use.
43 */
44 const struct GNUNET_CONFIGURATION_Handle *cfg;
45
46 /**
47 * Socket (if available).
48 */
49 struct GNUNET_CLIENT_Connection *client;
50
51 /**
52 * Callback for new elements. Not called for elements added locally.
53 */
54 GNUNET_CONSENSUS_NewElementCallback new_element_cb;
55
56 /**
57 * Closure for new_element_cb
58 */
59 void *new_element_cls;
60
61 /**
62 * Session identifier for the consensus session.
63 */
64 struct GNUNET_HashCode session_id;
65
66 /**
67 * Number of peers in the consensus. Optionally includes the local peer.
68 */
69 int num_peers;
70
71 /**
72 * Peer identities of peers in the consensus. Optionally includes the local peer.
73 */
74 struct GNUNET_PeerIdentity *peers;
75
76 /**
77 * Currently active transmit request.
78 */
79 struct GNUNET_CLIENT_TransmitHandle *th;
80
81 /**
82 * GNUNES_YES iff the join message has been sent to the service.
83 */
84 int joined;
85
86 /**
87 * Called when the current insertion operation finishes.
88 * NULL if there is no insert operation active.
89 */
90 GNUNET_CONSENSUS_InsertDoneCallback idc;
91
92 /**
93 * Closure for the insert done callback.
94 */
95 void *idc_cls;
96
97 /**
98 * An element that was requested to be inserted.
99 */
100 struct GNUNET_CONSENSUS_Element *insert_element;
101
102 /**
103 * Called when the conclude operation finishes or fails.
104 */
105 GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
106
107 /**
108 * Closure for the conclude callback.
109 */
110 void *conclude_cls;
111
112 /**
113 * Deadline for the conclude operation.
114 */
115 struct GNUNET_TIME_Absolute conclude_deadline;
116};
117
118
119static void
120handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
121 struct GNUNET_CONSENSUS_ElementMessage *msg)
122{
123 struct GNUNET_CONSENSUS_Element element;
124 element.type = msg->element_type;
125 element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
126 element.data = &msg[1];
127 consensus->new_element_cb(consensus->new_element_cls, &element);
128}
129
130static void
131handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
132 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
133{
134 GNUNET_assert (NULL != consensus->conclude_cb);
135 consensus->conclude_cb(consensus->conclude_cls,
136 msg->num_peers,
137 (struct GNUNET_PeerIdentity *) &msg[1]);
138 consensus->conclude_cb = NULL;
139}
140
141
142
143/**
144 * Type of a function to call when we receive a message
145 * from the service.
146 *
147 * @param cls closure
148 * @param msg message received, NULL on timeout or fatal error
149 */
150static void
151message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
152{
153 struct GNUNET_CONSENSUS_Handle *consensus = cls;
154 GNUNET_CONSENSUS_InsertDoneCallback idc;
155 void *idc_cls;
156
157 if (msg == NULL)
158 {
159 /* Error, timeout, death */
160 GNUNET_CLIENT_disconnect (consensus->client);
161 consensus->client = NULL;
162 consensus->new_element_cb(NULL, NULL);
163 if (NULL != consensus->idc)
164 {
165 consensus->idc(consensus->idc_cls, GNUNET_NO);
166 consensus->idc = NULL;
167 consensus->idc_cls = NULL;
168 }
169 return;
170 }
171
172 switch (ntohs(msg->type))
173 {
174 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT_ACK:
175 idc = consensus->idc;
176 consensus->idc = NULL;
177 idc_cls = consensus->idc_cls;
178 consensus->idc_cls = NULL;
179 idc(idc_cls, GNUNET_YES);
180 break;
181 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
182 handle_new_element(consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
183 break;
184 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
185 handle_conclude_done(consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
186 break;
187 default:
188 LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring");
189 }
190 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
191 GNUNET_TIME_UNIT_FOREVER_REL);
192}
193
194
195
196
197/**
198 * Function called to notify a client about the connection
199 * begin ready to queue more data. "buf" will be
200 * NULL and "size" zero if the connection was closed for
201 * writing in the meantime.
202 *
203 * @param cls closure
204 * @param size number of bytes available in buf
205 * @param buf where the callee should write the message
206 * @return number of bytes written to buf
207 */
208static size_t
209transmit_insert (void *cls, size_t size, void *buf)
210{
211 struct GNUNET_CONSENSUS_ElementMessage *msg;
212 struct GNUNET_CONSENSUS_Handle *consensus;
213 int msize;
214
215 GNUNET_assert (NULL != buf);
216
217 consensus = cls;
218
219 GNUNET_assert (NULL != consensus->insert_element);
220
221 consensus->th = NULL;
222
223
224 msg = buf;
225
226 msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
227 consensus->insert_element->size;
228
229 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
230 msg->header.size = htons (msize);
231 memcpy(&msg[1],
232 consensus->insert_element->data,
233 consensus->insert_element->size);
234
235 return msize;
236}
237
238
239/**
240 * Function called to notify a client about the connection
241 * begin ready to queue more data. "buf" will be
242 * NULL and "size" zero if the connection was closed for
243 * writing in the meantime.
244 *
245 * @param cls closure
246 * @param size number of bytes available in buf
247 * @param buf where the callee should write the message
248 * @return number of bytes written to buf
249 */
250static size_t
251transmit_join (void *cls, size_t size, void *buf)
252{
253 struct GNUNET_CONSENSUS_JoinMessage *msg;
254 struct GNUNET_CONSENSUS_Handle *consensus;
255 int msize;
256
257 LOG(GNUNET_ERROR_TYPE_DEBUG, "transmitting CLIENT_JOIN to service\n");
258
259 GNUNET_assert (NULL != buf);
260
261 consensus = cls;
262 consensus->th = NULL;
263 consensus->joined = 1;
264
265 msg = buf;
266
267 msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) +
268 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity);
269
270 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN);
271 msg->header.size = htons (msize);
272 msg->session_id = consensus->session_id;
273 msg->num_peers = htons (consensus->num_peers);
274 memcpy(&msg[1],
275 consensus->peers,
276 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
277
278 if (consensus->insert_element != NULL)
279 {
280 consensus->th =
281 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
282 msize,
283 GNUNET_TIME_UNIT_FOREVER_REL,
284 GNUNET_NO, &transmit_insert, consensus);
285 }
286
287
288 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
289 GNUNET_TIME_UNIT_FOREVER_REL);
290
291 return msize;
292}
293
294
295/**
296 * Function called to notify a client about the connection
297 * begin ready to queue more data. "buf" will be
298 * NULL and "size" zero if the connection was closed for
299 * writing in the meantime.
300 *
301 * @param cls closure
302 * @param size number of bytes available in buf
303 * @param buf where the callee should write the message
304 * @return number of bytes written to buf
305 */
306static size_t
307transmit_conclude (void *cls, size_t size, void *buf)
308{
309 struct GNUNET_CONSENSUS_ConcludeMessage *msg;
310 struct GNUNET_CONSENSUS_Handle *consensus;
311 int msize;
312
313 GNUNET_assert (NULL != buf);
314
315 consensus = cls;
316 consensus->th = NULL;
317
318 msg = buf;
319
320 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
321
322 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
323 msg->header.size = htons (msize);
324 msg->timeout =
325 GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
326
327 return msize;
328}
329
330
331
332
333/**
334 * Create a consensus session.
335 *
336 * @param cfg
337 * @param num_peers
338 * @param peers array of peers participating in this consensus session
339 * Inclusion of the local peer is optional.
340 * @param session_id session identifier
341 * Allows a group of peers to have more than consensus session.
342 * @param num_initial_elements number of entries in the 'initial_elements' array
343 * @param initial_elements our elements for the consensus (each of 'element_size'
344 * @param new_element callback, called when a new element is added to the set by
345 * another peer
346 * @param new_element_cls closure for new_element
347 * @return handle to use, NULL on error
348 */
349struct GNUNET_CONSENSUS_Handle *
350GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
351 unsigned int num_peers,
352 const struct GNUNET_PeerIdentity *peers,
353 const struct GNUNET_HashCode *session_id,
354 /*
355 unsigned int num_initial_elements,
356 const struct GNUNET_CONSENSUS_Element **initial_elements,
357 */
358 GNUNET_CONSENSUS_NewElementCallback new_element,
359 void *new_element_cls)
360{
361 struct GNUNET_CONSENSUS_Handle *consensus;
362 size_t join_message_size;
363
364
365 consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle));
366 consensus->cfg = cfg;
367 consensus->new_element_cb = new_element;
368 consensus->new_element_cls = new_element_cls;
369 consensus->num_peers = num_peers;
370 consensus->session_id = *session_id;
371
372
373
374 if (0 == num_peers)
375 {
376 consensus->peers = NULL;
377 }
378 else if (num_peers > 0)
379 {
380
381 consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
382 }
383 else
384 {
385 GNUNET_break (0);
386 }
387
388
389 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
390
391 GNUNET_assert (consensus->client != NULL);
392
393 join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) +
394 (num_peers * sizeof (struct GNUNET_PeerIdentity));
395
396 consensus->th =
397 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
398 join_message_size,
399 GNUNET_TIME_UNIT_FOREVER_REL,
400 GNUNET_NO, &transmit_join, consensus);
401
402
403 GNUNET_assert (consensus->th != NULL);
404
405 return consensus;
406}
407
408
409
410/**
411 * Insert an element in the set being reconsiled. Must not be called after
412 * "GNUNET_CONSENSUS_conclude".
413 *
414 * @param consensus handle for the consensus session
415 * @param element the element to be inserted
416 * @param idc function called when we are done with this element and it
417 * is thus allowed to call GNUNET_CONSENSUS_insert again
418 * @param idc_cls closure for 'idc'
419 */
420void
421GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
422 const struct GNUNET_CONSENSUS_Element *element,
423 GNUNET_CONSENSUS_InsertDoneCallback idc,
424 void *idc_cls)
425{
426
427 GNUNET_assert (NULL == consensus->idc);
428 GNUNET_assert (NULL == consensus->insert_element);
429
430 consensus->idc = idc;
431 consensus->idc_cls = idc_cls;
432 consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
433
434 if (consensus->joined == 0)
435 {
436 GNUNET_assert (NULL != consensus->th);
437 return;
438 }
439
440 GNUNET_assert (NULL == consensus->th);
441
442 consensus->th =
443 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
444 element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage),
445 GNUNET_TIME_UNIT_FOREVER_REL,
446 GNUNET_NO, &transmit_insert, consensus);
447}
448
449
450/**
451 * We are finished inserting new elements into the consensus;
452 * try to conclude the consensus within a given time window.
453 *
454 * @param consensus consensus session
455 * @param timeout timeout after which the conculde callback
456 * must be called
457 * @param conclude called when the conclusion was successful
458 * @param conclude_cls closure for the conclude callback
459 */
460void
461GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
462 struct GNUNET_TIME_Relative timeout,
463 GNUNET_CONSENSUS_ConcludeCallback conclude,
464 void *conclude_cls)
465{
466 GNUNET_assert (NULL == consensus->th);
467 GNUNET_assert (NULL == consensus->conclude_cb);
468
469 consensus->conclude_cls = conclude_cls;
470 consensus->conclude_cb = conclude;
471 consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
472
473 consensus->th =
474 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
475 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
476 timeout,
477 GNUNET_NO, &transmit_conclude, consensus);
478 if (NULL == consensus->th)
479 {
480 conclude(conclude_cls, 0, NULL);
481 }
482}
483
484
485/**
486 * Destroy a consensus handle (free all state associated with
487 * it, no longer call any of the callbacks).
488 *
489 * @param consensus handle to destroy
490 */
491void
492GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
493{
494 if (consensus->client != NULL)
495 {
496 GNUNET_CLIENT_disconnect (consensus->client);
497 consensus->client = NULL;
498 }
499 GNUNET_free (consensus->peers);
500 GNUNET_free (consensus);
501}
502