aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/consensus_api.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2012-12-05 21:41:09 +0000
committerFlorian Dold <florian.dold@gmail.com>2012-12-05 21:41:09 +0000
commitaac85d938153d2f181d4bfd08eb734be980bab43 (patch)
tree806ee375f14540e08e3b4b71582a13a731d7192a /src/consensus/consensus_api.c
parent612f87ce7ff13706d291c441de26eaf15ded5199 (diff)
downloadgnunet-aac85d938153d2f181d4bfd08eb734be980bab43.tar.gz
gnunet-aac85d938153d2f181d4bfd08eb734be980bab43.zip
consensus api, consensus service (local), peer driver and ibf sketch
Diffstat (limited to 'src/consensus/consensus_api.c')
-rw-r--r--src/consensus/consensus_api.c238
1 files changed, 199 insertions, 39 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 90b0fdf16..2479c019c 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -24,6 +24,7 @@
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_util_lib.h"
27#include "gnunet_protocols.h" 28#include "gnunet_protocols.h"
28#include "gnunet_client_lib.h" 29#include "gnunet_client_lib.h"
29#include "gnunet_consensus_service.h" 30#include "gnunet_consensus_service.h"
@@ -32,6 +33,13 @@
32 33
33#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
34 35
36struct ElementAck
37{
38 struct ElementAck *next;
39 struct ElementAck *prev;
40 int keep;
41 struct GNUNET_CONSENSUS_Element *element;
42};
35 43
36/** 44/**
37 * Handle for the service. 45 * Handle for the service.
@@ -113,20 +121,138 @@ struct GNUNET_CONSENSUS_Handle
113 * Deadline for the conclude operation. 121 * Deadline for the conclude operation.
114 */ 122 */
115 struct GNUNET_TIME_Absolute conclude_deadline; 123 struct GNUNET_TIME_Absolute conclude_deadline;
124
125 struct ElementAck *ack_head;
126 struct ElementAck *ack_tail;
127
128 /**
129 * Set to GNUNET_YES if the begin message has been transmitted to the service
130 */
131 int begin_sent;
132
133 /**
134 * Set to GNUNET_YES it the begin message should be transmitted to the service
135 */
136 int begin_requested;
116}; 137};
117 138
118 139
140static size_t
141transmit_ack (void *cls, size_t size, void *buf);
142
143static size_t
144transmit_insert (void *cls, size_t size, void *buf);
145
146static size_t
147transmit_conclude (void *cls, size_t size, void *buf);
148
149static size_t
150transmit_begin (void *cls, size_t size, void *buf);
151
152
153/**
154 * Call notify_transmit_ready for ack if necessary and possible.
155 */
156static void
157ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus)
158{
159 if ((NULL == consensus->th) && (NULL != consensus->ack_head))
160 {
161 consensus->th =
162 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
163 sizeof (struct GNUNET_CONSENSUS_AckMessage),
164 GNUNET_TIME_UNIT_FOREVER_REL,
165 GNUNET_NO, &transmit_ack, consensus);
166 }
167}
168
169
170/**
171 * Call notify_transmit_ready for ack if necessary and possible.
172 */
173static void
174ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus)
175{
176 if ((NULL == consensus->th) && (NULL != consensus->insert_element))
177 {
178 consensus->th =
179 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
180 sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
181 consensus->insert_element->size,
182 GNUNET_TIME_UNIT_FOREVER_REL,
183 GNUNET_NO, &transmit_insert, consensus);
184 }
185}
186
187
188/**
189 * Call notify_transmit_ready for ack if necessary and possible.
190 */
191static void
192ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus)
193{
194 if ((NULL == consensus->th) && (NULL != consensus->conclude_cb))
195 {
196 consensus->th =
197 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
198 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
199 GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline),
200 GNUNET_NO, &transmit_conclude, consensus);
201 }
202}
203
204
205/**
206 * Call notify_transmit_ready for ack if necessary and possible.
207 */
208static void
209ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus)
210{
211 if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) &&
212 (GNUNET_NO == consensus->begin_sent))
213 {
214 consensus->th =
215 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
216 sizeof (struct GNUNET_MessageHeader),
217 GNUNET_TIME_UNIT_FOREVER_REL,
218 GNUNET_NO, &transmit_begin, consensus);
219 }
220}
221
222/**
223 * Called when the server has sent is a new element
224 *
225 * @param consensus consensus handle
226 * @param msg element message
227 */
119static void 228static void
120handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, 229handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
121 struct GNUNET_CONSENSUS_ElementMessage *msg) 230 struct GNUNET_CONSENSUS_ElementMessage *msg)
122{ 231{
123 struct GNUNET_CONSENSUS_Element element; 232 struct GNUNET_CONSENSUS_Element element;
233 struct ElementAck *ack;
234 int ret;
235
124 element.type = msg->element_type; 236 element.type = msg->element_type;
125 element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); 237 element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
126 element.data = &msg[1]; 238 element.data = &msg[1];
127 consensus->new_element_cb (consensus->new_element_cls, &element); 239
240 ret = consensus->new_element_cb (consensus->new_element_cls, &element);
241 ack = GNUNET_malloc (sizeof (struct ElementAck));
242 ack->keep = ret;
243 GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, consensus->ack_tail,ack);
244
245 ntr_ack (consensus);
128} 246}
129 247
248
249/**
250 * Called when the server has announced
251 * that the conclusion is over.
252 *
253 * @param consensus consensus handle
254 * @param msg conclude done message
255 */
130static void 256static void
131handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, 257handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus,
132 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) 258 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
@@ -170,7 +296,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
170 return; 296 return;
171 } 297 }
172 298
173 switch (ntohs(msg->type)) 299 switch (ntohs (msg->type))
174 { 300 {
175 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: 301 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT:
176 handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); 302 handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
@@ -200,6 +326,43 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
200 * @return number of bytes written to buf 326 * @return number of bytes written to buf
201 */ 327 */
202static size_t 328static size_t
329transmit_ack (void *cls, size_t size, void *buf)
330{
331 struct GNUNET_CONSENSUS_AckMessage *msg;
332 struct GNUNET_CONSENSUS_Handle *consensus;
333
334 consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
335
336 GNUNET_assert (NULL != consensus->ack_head);
337
338 msg = (struct GNUNET_CONSENSUS_AckMessage *) buf;
339 msg->keep = consensus->ack_head->keep;
340 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
341 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage));
342
343 consensus->ack_head = consensus->ack_head->next;
344
345 consensus->th = NULL;
346
347 ntr_insert (consensus);
348 ntr_ack (consensus);
349 ntr_conclude (consensus);
350
351 return sizeof (struct GNUNET_CONSENSUS_AckMessage);
352}
353
354/**
355 * Function called to notify a client about the connection
356 * begin ready to queue more data. "buf" will be
357 * NULL and "size" zero if the connection was closed for
358 * writing in the meantime.
359 *
360 * @param cls closure
361 * @param size number of bytes available in buf
362 * @param buf where the callee should write the message
363 * @return number of bytes written to buf
364 */
365static size_t
203transmit_insert (void *cls, size_t size, void *buf) 366transmit_insert (void *cls, size_t size, void *buf)
204{ 367{
205 struct GNUNET_CONSENSUS_ElementMessage *msg; 368 struct GNUNET_CONSENSUS_ElementMessage *msg;
@@ -227,6 +390,7 @@ transmit_insert (void *cls, size_t size, void *buf)
227 consensus->insert_element->data, 390 consensus->insert_element->data,
228 consensus->insert_element->size); 391 consensus->insert_element->size);
229 392
393 consensus->insert_element = NULL;
230 394
231 idc = consensus->idc; 395 idc = consensus->idc;
232 consensus->idc = NULL; 396 consensus->idc = NULL;
@@ -234,6 +398,11 @@ transmit_insert (void *cls, size_t size, void *buf)
234 consensus->idc_cls = NULL; 398 consensus->idc_cls = NULL;
235 idc (idc_cls, GNUNET_YES); 399 idc (idc_cls, GNUNET_YES);
236 400
401
402 ntr_ack (consensus);
403 ntr_insert (consensus);
404 ntr_conclude (consensus);
405
237 return msize; 406 return msize;
238} 407}
239 408
@@ -273,18 +442,14 @@ transmit_join (void *cls, size_t size, void *buf)
273 msg->header.size = htons (msize); 442 msg->header.size = htons (msize);
274 msg->session_id = consensus->session_id; 443 msg->session_id = consensus->session_id;
275 msg->num_peers = htons (consensus->num_peers); 444 msg->num_peers = htons (consensus->num_peers);
276 memcpy(&msg[1], 445 if (0 != msg->num_peers)
277 consensus->peers, 446 memcpy(&msg[1],
278 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); 447 consensus->peers,
448 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
279 449
280 if (consensus->insert_element != NULL) 450 ntr_insert (consensus);
281 { 451 ntr_begin (consensus);
282 consensus->th = 452 ntr_conclude (consensus);
283 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
284 msize,
285 GNUNET_TIME_UNIT_FOREVER_REL,
286 GNUNET_NO, &transmit_insert, consensus);
287 }
288 453
289 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, 454 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
290 GNUNET_TIME_UNIT_FOREVER_REL); 455 GNUNET_TIME_UNIT_FOREVER_REL);
@@ -325,6 +490,8 @@ transmit_conclude (void *cls, size_t size, void *buf)
325 msg->timeout = 490 msg->timeout =
326 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline)); 491 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
327 492
493 ntr_ack (consensus);
494
328 return msize; 495 return msize;
329} 496}
330 497
@@ -359,6 +526,10 @@ transmit_begin (void *cls, size_t size, void *buf)
359 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN); 526 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
360 msg->size = htons (msize); 527 msg->size = htons (msize);
361 528
529 ntr_ack (consensus);
530 ntr_insert (consensus);
531 ntr_conclude (consensus);
532
362 return msize; 533 return msize;
363} 534}
364 535
@@ -421,8 +592,8 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
421 GNUNET_TIME_UNIT_FOREVER_REL, 592 GNUNET_TIME_UNIT_FOREVER_REL,
422 GNUNET_NO, &transmit_join, consensus); 593 GNUNET_NO, &transmit_join, consensus);
423 594
424 GNUNET_assert (consensus->th != NULL);
425 595
596 GNUNET_assert (consensus->th != NULL);
426 return consensus; 597 return consensus;
427} 598}
428 599
@@ -444,9 +615,9 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
444 GNUNET_CONSENSUS_InsertDoneCallback idc, 615 GNUNET_CONSENSUS_InsertDoneCallback idc,
445 void *idc_cls) 616 void *idc_cls)
446{ 617{
447
448 GNUNET_assert (NULL == consensus->idc); 618 GNUNET_assert (NULL == consensus->idc);
449 GNUNET_assert (NULL == consensus->insert_element); 619 GNUNET_assert (NULL == consensus->insert_element);
620 GNUNET_assert (NULL == consensus->conclude_cb);
450 621
451 consensus->idc = idc; 622 consensus->idc = idc;
452 consensus->idc_cls = idc_cls; 623 consensus->idc_cls = idc_cls;
@@ -454,17 +625,10 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
454 625
455 if (consensus->joined == 0) 626 if (consensus->joined == 0)
456 { 627 {
457 GNUNET_assert (NULL != consensus->th);
458 return; 628 return;
459 } 629 }
460 630
461 GNUNET_assert (NULL == consensus->th); 631 ntr_insert (consensus);
462
463 consensus->th =
464 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
465 element->size + sizeof (struct GNUNET_CONSENSUS_ElementMessage),
466 GNUNET_TIME_UNIT_FOREVER_REL,
467 GNUNET_NO, &transmit_insert, consensus);
468} 632}
469 633
470 634
@@ -478,12 +642,12 @@ GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
478{ 642{
479 GNUNET_assert (NULL == consensus->idc); 643 GNUNET_assert (NULL == consensus->idc);
480 GNUNET_assert (NULL == consensus->insert_element); 644 GNUNET_assert (NULL == consensus->insert_element);
645 GNUNET_assert (GNUNET_NO == consensus->begin_requested);
646 GNUNET_assert (GNUNET_NO == consensus->begin_sent);
481 647
482 consensus->th = 648 consensus->begin_requested = GNUNET_YES;
483 GNUNET_CLIENT_notify_transmit_ready (consensus->client, 649
484 sizeof (struct GNUNET_MessageHeader), 650 ntr_begin (consensus);
485 GNUNET_TIME_UNIT_FOREVER_REL,
486 GNUNET_NO, &transmit_begin, consensus);
487} 651}
488 652
489 653
@@ -503,22 +667,17 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
503 GNUNET_CONSENSUS_ConcludeCallback conclude, 667 GNUNET_CONSENSUS_ConcludeCallback conclude,
504 void *conclude_cls) 668 void *conclude_cls)
505{ 669{
506 GNUNET_assert (NULL == consensus->th); 670 GNUNET_assert (NULL != conclude);
507 GNUNET_assert (NULL == consensus->conclude_cb); 671 GNUNET_assert (NULL == consensus->conclude_cb);
508 672
509 consensus->conclude_cls = conclude_cls; 673 consensus->conclude_cls = conclude_cls;
510 consensus->conclude_cb = conclude; 674 consensus->conclude_cb = conclude;
511 consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout); 675 consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
512 676
513 consensus->th = 677
514 GNUNET_CLIENT_notify_transmit_ready (consensus->client, 678 /* if transmitting the conclude message is not possible right now, transmit_join
515 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage), 679 * or transmit_ack will handle it */
516 timeout, 680 ntr_conclude (consensus);
517 GNUNET_NO, &transmit_conclude, consensus);
518 if (NULL == consensus->th)
519 {
520 conclude(conclude_cls, 0, NULL);
521 }
522} 681}
523 682
524 683
@@ -536,7 +695,8 @@ GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
536 GNUNET_CLIENT_disconnect (consensus->client); 695 GNUNET_CLIENT_disconnect (consensus->client);
537 consensus->client = NULL; 696 consensus->client = NULL;
538 } 697 }
539 GNUNET_free (consensus->peers); 698 if (NULL != consensus->peers)
699 GNUNET_free (consensus->peers);
540 GNUNET_free (consensus); 700 GNUNET_free (consensus);
541} 701}
542 702