diff options
author | Florian Dold <florian.dold@gmail.com> | 2012-12-05 21:41:09 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2012-12-05 21:41:09 +0000 |
commit | aac85d938153d2f181d4bfd08eb734be980bab43 (patch) | |
tree | 806ee375f14540e08e3b4b71582a13a731d7192a /src/consensus/consensus_api.c | |
parent | 612f87ce7ff13706d291c441de26eaf15ded5199 (diff) | |
download | gnunet-aac85d938153d2f181d4bfd08eb734be980bab43.tar.gz gnunet-aac85d938153d2f181d4bfd08eb734be980bab43.zip |
consensus api, consensus service (local), peer driver and ibf sketch
Diffstat (limited to 'src/consensus/consensus_api.c')
-rw-r--r-- | src/consensus/consensus_api.c | 238 |
1 files changed, 199 insertions, 39 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 90b0fdf16..2479c019c 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -24,6 +24,7 @@ | |||
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | */ | 25 | */ |
26 | #include "platform.h" | 26 | #include "platform.h" |
27 | #include "gnunet_util_lib.h" | ||
27 | #include "gnunet_protocols.h" | 28 | #include "gnunet_protocols.h" |
28 | #include "gnunet_client_lib.h" | 29 | #include "gnunet_client_lib.h" |
29 | #include "gnunet_consensus_service.h" | 30 | #include "gnunet_consensus_service.h" |
@@ -32,6 +33,13 @@ | |||
32 | 33 | ||
33 | #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) | 34 | #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) |
34 | 35 | ||
36 | struct ElementAck | ||
37 | { | ||
38 | struct ElementAck *next; | ||
39 | struct ElementAck *prev; | ||
40 | int keep; | ||
41 | struct GNUNET_CONSENSUS_Element *element; | ||
42 | }; | ||
35 | 43 | ||
36 | /** | 44 | /** |
37 | * Handle for the service. | 45 | * Handle for the service. |
@@ -113,20 +121,138 @@ struct GNUNET_CONSENSUS_Handle | |||
113 | * Deadline for the conclude operation. | 121 | * Deadline for the conclude operation. |
114 | */ | 122 | */ |
115 | struct GNUNET_TIME_Absolute conclude_deadline; | 123 | struct GNUNET_TIME_Absolute conclude_deadline; |
124 | |||
125 | struct ElementAck *ack_head; | ||
126 | struct ElementAck *ack_tail; | ||
127 | |||
128 | /** | ||
129 | * Set to GNUNET_YES if the begin message has been transmitted to the service | ||
130 | */ | ||
131 | int begin_sent; | ||
132 | |||
133 | /** | ||
134 | * Set to GNUNET_YES it the begin message should be transmitted to the service | ||
135 | */ | ||
136 | int begin_requested; | ||
116 | }; | 137 | }; |
117 | 138 | ||
118 | 139 | ||
140 | static size_t | ||
141 | transmit_ack (void *cls, size_t size, void *buf); | ||
142 | |||
143 | static size_t | ||
144 | transmit_insert (void *cls, size_t size, void *buf); | ||
145 | |||
146 | static size_t | ||
147 | transmit_conclude (void *cls, size_t size, void *buf); | ||
148 | |||
149 | static size_t | ||
150 | transmit_begin (void *cls, size_t size, void *buf); | ||
151 | |||
152 | |||
153 | /** | ||
154 | * Call notify_transmit_ready for ack if necessary and possible. | ||
155 | */ | ||
156 | static void | ||
157 | ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus) | ||
158 | { | ||
159 | if ((NULL == consensus->th) && (NULL != consensus->ack_head)) | ||
160 | { | ||
161 | consensus->th = | ||
162 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
163 | sizeof (struct GNUNET_CONSENSUS_AckMessage), | ||
164 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
165 | GNUNET_NO, &transmit_ack, consensus); | ||
166 | } | ||
167 | } | ||
168 | |||
169 | |||
170 | /** | ||
171 | * Call notify_transmit_ready for ack if necessary and possible. | ||
172 | */ | ||
173 | static void | ||
174 | ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus) | ||
175 | { | ||
176 | if ((NULL == consensus->th) && (NULL != consensus->insert_element)) | ||
177 | { | ||
178 | consensus->th = | ||
179 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
180 | sizeof (struct GNUNET_CONSENSUS_ElementMessage) + | ||
181 | consensus->insert_element->size, | ||
182 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
183 | GNUNET_NO, &transmit_insert, consensus); | ||
184 | } | ||
185 | } | ||
186 | |||
187 | |||
188 | /** | ||
189 | * Call notify_transmit_ready for ack if necessary and possible. | ||
190 | */ | ||
191 | static void | ||
192 | ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus) | ||
193 | { | ||
194 | if ((NULL == consensus->th) && (NULL != consensus->conclude_cb)) | ||
195 | { | ||
196 | consensus->th = | ||
197 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
198 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage), | ||
199 | GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline), | ||
200 | GNUNET_NO, &transmit_conclude, consensus); | ||
201 | } | ||
202 | } | ||
203 | |||
204 | |||
205 | /** | ||
206 | * Call notify_transmit_ready for ack if necessary and possible. | ||
207 | */ | ||
208 | static void | ||
209 | ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus) | ||
210 | { | ||
211 | if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) && | ||
212 | (GNUNET_NO == consensus->begin_sent)) | ||
213 | { | ||
214 | consensus->th = | ||
215 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
216 | sizeof (struct GNUNET_MessageHeader), | ||
217 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
218 | GNUNET_NO, &transmit_begin, consensus); | ||
219 | } | ||
220 | } | ||
221 | |||
222 | /** | ||
223 | * Called when the server has sent is a new element | ||
224 | * | ||
225 | * @param consensus consensus handle | ||
226 | * @param msg element message | ||
227 | */ | ||
119 | static void | 228 | static void |
120 | handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, | 229 | handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, |
121 | struct GNUNET_CONSENSUS_ElementMessage *msg) | 230 | struct GNUNET_CONSENSUS_ElementMessage *msg) |
122 | { | 231 | { |
123 | struct GNUNET_CONSENSUS_Element element; | 232 | struct GNUNET_CONSENSUS_Element element; |
233 | struct ElementAck *ack; | ||
234 | int ret; | ||
235 | |||
124 | element.type = msg->element_type; | 236 | element.type = msg->element_type; |
125 | element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); | 237 | element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); |
126 | element.data = &msg[1]; | 238 | element.data = &msg[1]; |
127 | consensus->new_element_cb (consensus->new_element_cls, &element); | 239 | |
240 | ret = consensus->new_element_cb (consensus->new_element_cls, &element); | ||
241 | ack = GNUNET_malloc (sizeof (struct ElementAck)); | ||
242 | ack->keep = ret; | ||
243 | GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, consensus->ack_tail,ack); | ||
244 | |||
245 | ntr_ack (consensus); | ||
128 | } | 246 | } |
129 | 247 | ||
248 | |||
249 | /** | ||
250 | * Called when the server has announced | ||
251 | * that the conclusion is over. | ||
252 | * | ||
253 | * @param consensus consensus handle | ||
254 | * @param msg conclude done message | ||
255 | */ | ||
130 | static void | 256 | static void |
131 | handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, | 257 | handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, |
132 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) | 258 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) |
@@ -170,7 +296,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
170 | return; | 296 | return; |
171 | } | 297 | } |
172 | 298 | ||
173 | switch (ntohs(msg->type)) | 299 | switch (ntohs (msg->type)) |
174 | { | 300 | { |
175 | case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: | 301 | case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: |
176 | handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); | 302 | handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); |
@@ -200,6 +326,43 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
200 | * @return number of bytes written to buf | 326 | * @return number of bytes written to buf |
201 | */ | 327 | */ |
202 | static size_t | 328 | static size_t |
329 | transmit_ack (void *cls, size_t size, void *buf) | ||
330 | { | ||
331 | struct GNUNET_CONSENSUS_AckMessage *msg; | ||
332 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
333 | |||
334 | consensus = (struct GNUNET_CONSENSUS_Handle *) cls; | ||
335 | |||
336 | GNUNET_assert (NULL != consensus->ack_head); | ||
337 | |||
338 | msg = (struct GNUNET_CONSENSUS_AckMessage *) buf; | ||
339 | msg->keep = consensus->ack_head->keep; | ||
340 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); | ||
341 | msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage)); | ||
342 | |||
343 | consensus->ack_head = consensus->ack_head->next; | ||
344 | |||
345 | consensus->th = NULL; | ||
346 | |||
347 | ntr_insert (consensus); | ||
348 | ntr_ack (consensus); | ||
349 | ntr_conclude (consensus); | ||
350 | |||
351 | return sizeof (struct GNUNET_CONSENSUS_AckMessage); | ||
352 | } | ||
353 | |||
354 | /** | ||
355 | * Function called to notify a client about the connection | ||
356 | * begin ready to queue more data. "buf" will be | ||
357 | * NULL and "size" zero if the connection was closed for | ||
358 | * writing in the meantime. | ||
359 | * | ||
360 | * @param cls closure | ||
361 | * @param size number of bytes available in buf | ||
362 | * @param buf where the callee should write the message | ||
363 | * @return number of bytes written to buf | ||
364 | */ | ||
365 | static size_t | ||
203 | transmit_insert (void *cls, size_t size, void *buf) | 366 | transmit_insert (void *cls, size_t size, void *buf) |
204 | { | 367 | { |
205 | struct GNUNET_CONSENSUS_ElementMessage *msg; | 368 | struct GNUNET_CONSENSUS_ElementMessage *msg; |
@@ -227,6 +390,7 @@ transmit_insert (void *cls, size_t size, void *buf) | |||
227 | consensus->insert_element->data, | 390 | consensus->insert_element->data, |
228 | consensus->insert_element->size); | 391 | consensus->insert_element->size); |
229 | 392 | ||
393 | consensus->insert_element = NULL; | ||
230 | 394 | ||
231 | idc = consensus->idc; | 395 | idc = consensus->idc; |
232 | consensus->idc = NULL; | 396 | consensus->idc = NULL; |
@@ -234,6 +398,11 @@ transmit_insert (void *cls, size_t size, void *buf) | |||
234 | consensus->idc_cls = NULL; | 398 | consensus->idc_cls = NULL; |
235 | idc (idc_cls, GNUNET_YES); | 399 | idc (idc_cls, GNUNET_YES); |
236 | 400 | ||
401 | |||
402 | ntr_ack (consensus); | ||
403 | ntr_insert (consensus); | ||
404 | ntr_conclude (consensus); | ||
405 | |||
237 | return msize; | 406 | return msize; |
238 | } | 407 | } |
239 | 408 | ||
@@ -273,18 +442,14 @@ transmit_join (void *cls, size_t size, void *buf) | |||
273 | msg->header.size = htons (msize); | 442 | msg->header.size = htons (msize); |
274 | msg->session_id = consensus->session_id; | 443 | msg->session_id = consensus->session_id; |
275 | msg->num_peers = htons (consensus->num_peers); | 444 | msg->num_peers = htons (consensus->num_peers); |
276 | memcpy(&msg[1], | 445 | if (0 != msg->num_peers) |
277 | consensus->peers, | 446 | memcpy(&msg[1], |
278 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); | 447 | consensus->peers, |
448 | consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
279 | 449 | ||
280 | if (consensus->insert_element != NULL) | 450 | ntr_insert (consensus); |
281 | { | 451 | ntr_begin (consensus); |
282 | consensus->th = | 452 | ntr_conclude (consensus); |
283 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
284 | msize, | ||
285 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
286 | GNUNET_NO, &transmit_insert, consensus); | ||
287 | } | ||
288 | 453 | ||
289 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, | 454 | GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, |
290 | GNUNET_TIME_UNIT_FOREVER_REL); | 455 | GNUNET_TIME_UNIT_FOREVER_REL); |
@@ -325,6 +490,8 @@ transmit_conclude (void *cls, size_t size, void *buf) | |||
325 | msg->timeout = | 490 | msg->timeout = |
326 | GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline)); | 491 | GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline)); |
327 | 492 | ||
493 | ntr_ack (consensus); | ||
494 | |||
328 | return msize; | 495 | return msize; |
329 | } | 496 | } |
330 | 497 | ||
@@ -359,6 +526,10 @@ transmit_begin (void *cls, size_t size, void *buf) | |||
359 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN); | 526 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN); |
360 | msg->size = htons (msize); | 527 | msg->size = htons (msize); |
361 | 528 | ||
529 | ntr_ack (consensus); | ||
530 | ntr_insert (consensus); | ||
531 | ntr_conclude (consensus); | ||
532 | |||
362 | return msize; | 533 | return msize; |
363 | } | 534 | } |
364 | 535 | ||
@@ -421,8 +592,8 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, | |||
421 | GNUNET_TIME_UNIT_FOREVER_REL, | 592 | GNUNET_TIME_UNIT_FOREVER_REL, |
422 | GNUNET_NO, &transmit_join, consensus); | 593 | GNUNET_NO, &transmit_join, consensus); |
423 | 594 | ||
424 | GNUNET_assert (consensus->th != NULL); | ||
425 | 595 | ||
596 | GNUNET_assert (consensus->th != NULL); | ||
426 | return consensus; | 597 | return consensus; |
427 | } | 598 | } |
428 | 599 | ||
@@ -444,9 +615,9 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | |||
444 | GNUNET_CONSENSUS_InsertDoneCallback idc, | 615 | GNUNET_CONSENSUS_InsertDoneCallback idc, |
445 | void *idc_cls) | 616 | void *idc_cls) |
446 | { | 617 | { |
447 | |||
448 | GNUNET_assert (NULL == consensus->idc); | 618 | GNUNET_assert (NULL == consensus->idc); |
449 | GNUNET_assert (NULL == consensus->insert_element); | 619 | GNUNET_assert (NULL == consensus->insert_element); |
620 | GNUNET_assert (NULL == consensus->conclude_cb); | ||
450 | 621 | ||
451 | consensus->idc = idc; | 622 | consensus->idc = idc; |
452 | consensus->idc_cls = idc_cls; | 623 | consensus->idc_cls = idc_cls; |
@@ -454,17 +625,10 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | |||
454 | 625 | ||
455 | if (consensus->joined == 0) | 626 | if (consensus->joined == 0) |
456 | { | 627 | { |
457 | GNUNET_assert (NULL != consensus->th); | ||
458 | return; | 628 | return; |
459 | } | 629 | } |
460 | 630 | ||
461 | GNUNET_assert (NULL == consensus->th); | 631 | ntr_insert (consensus); |
462 | |||
463 | consensus->th = | ||
464 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | ||
465 | element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage), | ||
466 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
467 | GNUNET_NO, &transmit_insert, consensus); | ||
468 | } | 632 | } |
469 | 633 | ||
470 | 634 | ||
@@ -478,12 +642,12 @@ GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus) | |||
478 | { | 642 | { |
479 | GNUNET_assert (NULL == consensus->idc); | 643 | GNUNET_assert (NULL == consensus->idc); |
480 | GNUNET_assert (NULL == consensus->insert_element); | 644 | GNUNET_assert (NULL == consensus->insert_element); |
645 | GNUNET_assert (GNUNET_NO == consensus->begin_requested); | ||
646 | GNUNET_assert (GNUNET_NO == consensus->begin_sent); | ||
481 | 647 | ||
482 | consensus->th = | 648 | consensus->begin_requested = GNUNET_YES; |
483 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | 649 | |
484 | sizeof (struct GNUNET_MessageHeader), | 650 | ntr_begin (consensus); |
485 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
486 | GNUNET_NO, &transmit_begin, consensus); | ||
487 | } | 651 | } |
488 | 652 | ||
489 | 653 | ||
@@ -503,22 +667,17 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | |||
503 | GNUNET_CONSENSUS_ConcludeCallback conclude, | 667 | GNUNET_CONSENSUS_ConcludeCallback conclude, |
504 | void *conclude_cls) | 668 | void *conclude_cls) |
505 | { | 669 | { |
506 | GNUNET_assert (NULL == consensus->th); | 670 | GNUNET_assert (NULL != conclude); |
507 | GNUNET_assert (NULL == consensus->conclude_cb); | 671 | GNUNET_assert (NULL == consensus->conclude_cb); |
508 | 672 | ||
509 | consensus->conclude_cls = conclude_cls; | 673 | consensus->conclude_cls = conclude_cls; |
510 | consensus->conclude_cb = conclude; | 674 | consensus->conclude_cb = conclude; |
511 | consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout); | 675 | consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout); |
512 | 676 | ||
513 | consensus->th = | 677 | |
514 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, | 678 | /* if transmitting the conclude message is not possible right now, transmit_join |
515 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage), | 679 | * or transmit_ack will handle it */ |
516 | timeout, | 680 | ntr_conclude (consensus); |
517 | GNUNET_NO, &transmit_conclude, consensus); | ||
518 | if (NULL == consensus->th) | ||
519 | { | ||
520 | conclude(conclude_cls, 0, NULL); | ||
521 | } | ||
522 | } | 681 | } |
523 | 682 | ||
524 | 683 | ||
@@ -536,7 +695,8 @@ GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus) | |||
536 | GNUNET_CLIENT_disconnect (consensus->client); | 695 | GNUNET_CLIENT_disconnect (consensus->client); |
537 | consensus->client = NULL; | 696 | consensus->client = NULL; |
538 | } | 697 | } |
539 | GNUNET_free (consensus->peers); | 698 | if (NULL != consensus->peers) |
699 | GNUNET_free (consensus->peers); | ||
540 | GNUNET_free (consensus); | 700 | GNUNET_free (consensus); |
541 | } | 701 | } |
542 | 702 | ||