aboutsummaryrefslogtreecommitdiff
path: root/src/consensus/consensus_api.c
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-01-03 00:43:57 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-01-03 00:43:57 +0000
commit845316cb543af7e4c77709acce4df79f3e0dc162 (patch)
tree629739608b0bd0f645d82fd41d90ccb2cb617d09 /src/consensus/consensus_api.c
parent3def510831309a5a0a5f50a2250911d6b592a87e (diff)
downloadgnunet-845316cb543af7e4c77709acce4df79f3e0dc162.tar.gz
gnunet-845316cb543af7e4c77709acce4df79f3e0dc162.zip
implemented the modified consensus api, started implementing p2p protocol for consensus
Diffstat (limited to 'src/consensus/consensus_api.c')
-rw-r--r--src/consensus/consensus_api.c481
1 files changed, 154 insertions, 327 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 2479c019c..25c76b358 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -33,14 +33,43 @@
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35 35
36struct ElementAck 36/**
37 * Actions that can be queued.
38 */
39struct QueuedMessage
37{ 40{
38 struct ElementAck *next; 41 /**
39 struct ElementAck *prev; 42 * Queued messages are stored in a doubly linked list.
40 int keep; 43 */
41 struct GNUNET_CONSENSUS_Element *element; 44 struct QueuedMessage *next;
45
46 /**
47 * Queued messages are stored in a doubly linked list.
48 */
49 struct QueuedMessage *prev;
50
51 /**
52 * The actual queued message.
53 */
54 struct GNUNET_MessageHeader *msg;
55
56 /**
57 * Size of the message in msg.
58 */
59 size_t size;
60
61 /**
62 * Will be called after transmit, if not NULL
63 */
64 GNUNET_CONSENSUS_InsertDoneCallback idc;
65
66 /**
67 * Closure for idc
68 */
69 void *idc_cls;
42}; 70};
43 71
72
44/** 73/**
45 * Handle for the service. 74 * Handle for the service.
46 */ 75 */
@@ -52,14 +81,14 @@ struct GNUNET_CONSENSUS_Handle
52 const struct GNUNET_CONFIGURATION_Handle *cfg; 81 const struct GNUNET_CONFIGURATION_Handle *cfg;
53 82
54 /** 83 /**
55 * Socket (if available). 84 * Client connected to the consensus service, may be NULL if not connected.
56 */ 85 */
57 struct GNUNET_CLIENT_Connection *client; 86 struct GNUNET_CLIENT_Connection *client;
58 87
59 /** 88 /**
60 * Callback for new elements. Not called for elements added locally. 89 * Callback for new elements. Not called for elements added locally.
61 */ 90 */
62 GNUNET_CONSENSUS_NewElementCallback new_element_cb; 91 GNUNET_CONSENSUS_ElementCallback new_element_cb;
63 92
64 /** 93 /**
65 * Closure for new_element_cb 94 * Closure for new_element_cb
@@ -67,7 +96,7 @@ struct GNUNET_CONSENSUS_Handle
67 void *new_element_cls; 96 void *new_element_cls;
68 97
69 /** 98 /**
70 * Session identifier for the consensus session. 99 * The (local) session identifier for the consensus session.
71 */ 100 */
72 struct GNUNET_HashCode session_id; 101 struct GNUNET_HashCode session_id;
73 102
@@ -77,9 +106,9 @@ struct GNUNET_CONSENSUS_Handle
77 int num_peers; 106 int num_peers;
78 107
79 /** 108 /**
80 * Peer identities of peers in the consensus. Optionally includes the local peer. 109 * Peer identities of peers participating in the consensus, includes the local peer.
81 */ 110 */
82 struct GNUNET_PeerIdentity *peers; 111 struct GNUNET_PeerIdentity **peers;
83 112
84 /** 113 /**
85 * Currently active transmit request. 114 * Currently active transmit request.
@@ -92,22 +121,11 @@ struct GNUNET_CONSENSUS_Handle
92 int joined; 121 int joined;
93 122
94 /** 123 /**
95 * Called when the current insertion operation finishes.
96 * NULL if there is no insert operation active.
97 */
98 GNUNET_CONSENSUS_InsertDoneCallback idc;
99
100 /**
101 * Closure for the insert done callback. 124 * Closure for the insert done callback.
102 */ 125 */
103 void *idc_cls; 126 void *idc_cls;
104 127
105 /** 128 /**
106 * An element that was requested to be inserted.
107 */
108 struct GNUNET_CONSENSUS_Element *insert_element;
109
110 /**
111 * Called when the conclude operation finishes or fails. 129 * Called when the conclude operation finishes or fails.
112 */ 130 */
113 GNUNET_CONSENSUS_ConcludeCallback conclude_cb; 131 GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
@@ -122,103 +140,92 @@ struct GNUNET_CONSENSUS_Handle
122 */ 140 */
123 struct GNUNET_TIME_Absolute conclude_deadline; 141 struct GNUNET_TIME_Absolute conclude_deadline;
124 142
125 struct ElementAck *ack_head; 143 unsigned int conclude_min_size;
126 struct ElementAck *ack_tail;
127
128 /**
129 * Set to GNUNET_YES if the begin message has been transmitted to the service
130 */
131 int begin_sent;
132 144
133 /** 145 struct QueuedMessage *messages_head;
134 * Set to GNUNET_YES it the begin message should be transmitted to the service 146 struct QueuedMessage *messages_tail;
135 */
136 int begin_requested;
137}; 147};
138 148
139 149
140static size_t
141transmit_ack (void *cls, size_t size, void *buf);
142
143static size_t
144transmit_insert (void *cls, size_t size, void *buf);
145
146static size_t
147transmit_conclude (void *cls, size_t size, void *buf);
148
149static size_t
150transmit_begin (void *cls, size_t size, void *buf);
151
152 150
153/** 151/**
154 * Call notify_transmit_ready for ack if necessary and possible. 152 * Schedule transmitting the next message.
153 *
154 * @param consensus consensus handle
155 */ 155 */
156static void 156static void
157ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus) 157schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus);
158{
159 if ((NULL == consensus->th) && (NULL != consensus->ack_head))
160 {
161 consensus->th =
162 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
163 sizeof (struct GNUNET_CONSENSUS_AckMessage),
164 GNUNET_TIME_UNIT_FOREVER_REL,
165 GNUNET_NO, &transmit_ack, consensus);
166 }
167}
168 158
169 159
170/** 160/**
171 * Call notify_transmit_ready for ack if necessary and possible. 161 * Function called to notify a client about the connection
162 * begin ready to queue more data. "buf" will be
163 * NULL and "size" zero if the connection was closed for
164 * writing in the meantime.
165 *
166 * @param cls closure
167 * @param size number of bytes available in buf
168 * @param buf where the callee should write the message
169 * @return number of bytes written to buf
172 */ 170 */
173static void 171static size_t transmit_queued (void *cls, size_t size,
174ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus) 172 void *buf)
175{ 173{
176 if ((NULL == consensus->th) && (NULL != consensus->insert_element)) 174 struct GNUNET_CONSENSUS_Handle *consensus;
175 struct QueuedMessage *qmsg;
176 size_t ret_size;
177
178 printf("transmitting queued\n");
179
180 consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
181 qmsg = consensus->messages_head;
182 GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
183 GNUNET_assert (qmsg);
184
185 if (NULL == buf)
177 { 186 {
178 consensus->th = 187 if (NULL != qmsg->idc)
179 GNUNET_CLIENT_notify_transmit_ready (consensus->client, 188 {
180 sizeof (struct GNUNET_CONSENSUS_ElementMessage) + 189 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
181 consensus->insert_element->size, 190 }
182 GNUNET_TIME_UNIT_FOREVER_REL,
183 GNUNET_NO, &transmit_insert, consensus);
184 } 191 }
185}
186
187 192
188/** 193 memcpy (buf, qmsg->msg, qmsg->size);
189 * Call notify_transmit_ready for ack if necessary and possible. 194 ret_size = qmsg->size;
190 */ 195 if (NULL != qmsg->idc)
191static void
192ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus)
193{
194 if ((NULL == consensus->th) && (NULL != consensus->conclude_cb))
195 { 196 {
196 consensus->th = 197 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
197 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
198 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
199 GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline),
200 GNUNET_NO, &transmit_conclude, consensus);
201 } 198 }
199 GNUNET_free (qmsg->msg);
200 GNUNET_free (qmsg);
201
202 schedule_transmit (consensus);
203
204 return ret_size;
202} 205}
203 206
204 207
205/** 208/**
206 * Call notify_transmit_ready for ack if necessary and possible. 209 * Schedule transmitting the next message.
210 *
211 * @param consensus consensus handle
207 */ 212 */
208static void 213static void
209ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus) 214schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus)
210{ 215{
211 if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) && 216 if (NULL != consensus->th)
212 (GNUNET_NO == consensus->begin_sent)) 217 return;
218
219 if (NULL != consensus->messages_head)
213 { 220 {
214 consensus->th = 221 LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
215 GNUNET_CLIENT_notify_transmit_ready (consensus->client, 222 GNUNET_CLIENT_notify_transmit_ready (consensus->client, consensus->messages_head->size,
216 sizeof (struct GNUNET_MessageHeader), 223 GNUNET_TIME_UNIT_FOREVER_REL,
217 GNUNET_TIME_UNIT_FOREVER_REL, 224 GNUNET_NO, &transmit_queued, consensus);
218 GNUNET_NO, &transmit_begin, consensus);
219 } 225 }
220} 226}
221 227
228
222/** 229/**
223 * Called when the server has sent is a new element 230 * Called when the server has sent is a new element
224 * 231 *
@@ -226,11 +233,12 @@ ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus)
226 * @param msg element message 233 * @param msg element message
227 */ 234 */
228static void 235static void
229handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, 236handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
230 struct GNUNET_CONSENSUS_ElementMessage *msg) 237 struct GNUNET_CONSENSUS_ElementMessage *msg)
231{ 238{
232 struct GNUNET_CONSENSUS_Element element; 239 struct GNUNET_CONSENSUS_Element element;
233 struct ElementAck *ack; 240 struct GNUNET_CONSENSUS_AckMessage *ack_msg;
241 struct QueuedMessage *queued_msg;
234 int ret; 242 int ret;
235 243
236 element.type = msg->element_type; 244 element.type = msg->element_type;
@@ -238,11 +246,15 @@ handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
238 element.data = &msg[1]; 246 element.data = &msg[1];
239 247
240 ret = consensus->new_element_cb (consensus->new_element_cls, &element); 248 ret = consensus->new_element_cb (consensus->new_element_cls, &element);
241 ack = GNUNET_malloc (sizeof (struct ElementAck));
242 ack->keep = ret;
243 GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, consensus->ack_tail,ack);
244 249
245 ntr_ack (consensus); 250 queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct GNUNET_CONSENSUS_AckMessage));
251 queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1];
252
253 ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg;
254 ack_msg->keep = ret;
255
256 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail,
257 queued_msg);
246} 258}
247 259
248 260
@@ -254,13 +266,12 @@ handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
254 * @param msg conclude done message 266 * @param msg conclude done message
255 */ 267 */
256static void 268static void
257handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, 269handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
258 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) 270 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
259{ 271{
260 GNUNET_assert (NULL != consensus->conclude_cb); 272 GNUNET_assert (NULL != consensus->conclude_cb);
261 consensus->conclude_cb(consensus->conclude_cls, 273 consensus->conclude_cb (consensus->conclude_cls,
262 msg->num_peers, 274 0, NULL);
263 (struct GNUNET_PeerIdentity *) &msg[1]);
264 consensus->conclude_cb = NULL; 275 consensus->conclude_cb = NULL;
265} 276}
266 277
@@ -287,12 +298,6 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
287 GNUNET_CLIENT_disconnect (consensus->client); 298 GNUNET_CLIENT_disconnect (consensus->client);
288 consensus->client = NULL; 299 consensus->client = NULL;
289 consensus->new_element_cb (NULL, NULL); 300 consensus->new_element_cb (NULL, NULL);
290 if (NULL != consensus->idc)
291 {
292 consensus->idc(consensus->idc_cls, GNUNET_NO);
293 consensus->idc = NULL;
294 consensus->idc_cls = NULL;
295 }
296 return; 301 return;
297 } 302 }
298 303
@@ -305,108 +310,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
305 handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); 310 handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
306 break; 311 break;
307 default: 312 default:
308 LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring"); 313 GNUNET_break (0);
309 } 314 }
310 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, 315 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
311 GNUNET_TIME_UNIT_FOREVER_REL); 316 GNUNET_TIME_UNIT_FOREVER_REL);
312} 317}
313 318
314
315
316
317/**
318 * Function called to notify a client about the connection
319 * begin ready to queue more data. "buf" will be
320 * NULL and "size" zero if the connection was closed for
321 * writing in the meantime.
322 *
323 * @param cls closure
324 * @param size number of bytes available in buf
325 * @param buf where the callee should write the message
326 * @return number of bytes written to buf
327 */
328static size_t
329transmit_ack (void *cls, size_t size, void *buf)
330{
331 struct GNUNET_CONSENSUS_AckMessage *msg;
332 struct GNUNET_CONSENSUS_Handle *consensus;
333
334 consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
335
336 GNUNET_assert (NULL != consensus->ack_head);
337
338 msg = (struct GNUNET_CONSENSUS_AckMessage *) buf;
339 msg->keep = consensus->ack_head->keep;
340 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
341 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage));
342
343 consensus->ack_head = consensus->ack_head->next;
344
345 consensus->th = NULL;
346
347 ntr_insert (consensus);
348 ntr_ack (consensus);
349 ntr_conclude (consensus);
350
351 return sizeof (struct GNUNET_CONSENSUS_AckMessage);
352}
353
354/**
355 * Function called to notify a client about the connection
356 * begin ready to queue more data. "buf" will be
357 * NULL and "size" zero if the connection was closed for
358 * writing in the meantime.
359 *
360 * @param cls closure
361 * @param size number of bytes available in buf
362 * @param buf where the callee should write the message
363 * @return number of bytes written to buf
364 */
365static size_t
366transmit_insert (void *cls, size_t size, void *buf)
367{
368 struct GNUNET_CONSENSUS_ElementMessage *msg;
369 struct GNUNET_CONSENSUS_Handle *consensus;
370 GNUNET_CONSENSUS_InsertDoneCallback idc;
371 int msize;
372 void *idc_cls;
373
374 GNUNET_assert (NULL != buf);
375
376 consensus = cls;
377
378 GNUNET_assert (NULL != consensus->insert_element);
379
380 consensus->th = NULL;
381
382 msg = buf;
383
384 msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
385 consensus->insert_element->size;
386
387 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
388 msg->header.size = htons (msize);
389 memcpy (&msg[1],
390 consensus->insert_element->data,
391 consensus->insert_element->size);
392
393 consensus->insert_element = NULL;
394
395 idc = consensus->idc;
396 consensus->idc = NULL;
397 idc_cls = consensus->idc_cls;
398 consensus->idc_cls = NULL;
399 idc (idc_cls, GNUNET_YES);
400
401
402 ntr_ack (consensus);
403 ntr_insert (consensus);
404 ntr_conclude (consensus);
405
406 return msize;
407}
408
409
410/** 319/**
411 * Function called to notify a client about the connection 320 * Function called to notify a client about the connection
412 * begin ready to queue more data. "buf" will be 321 * begin ready to queue more data. "buf" will be
@@ -427,7 +336,7 @@ transmit_join (void *cls, size_t size, void *buf)
427 336
428 GNUNET_assert (NULL != buf); 337 GNUNET_assert (NULL != buf);
429 338
430 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n"); 339 LOG (GNUNET_ERROR_TYPE_INFO, "transmitting join message\n");
431 340
432 consensus = cls; 341 consensus = cls;
433 consensus->th = NULL; 342 consensus->th = NULL;
@@ -447,9 +356,7 @@ transmit_join (void *cls, size_t size, void *buf)
447 consensus->peers, 356 consensus->peers,
448 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); 357 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
449 358
450 ntr_insert (consensus); 359 schedule_transmit (consensus);
451 ntr_begin (consensus);
452 ntr_conclude (consensus);
453 360
454 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, 361 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
455 GNUNET_TIME_UNIT_FOREVER_REL); 362 GNUNET_TIME_UNIT_FOREVER_REL);
@@ -457,88 +364,11 @@ transmit_join (void *cls, size_t size, void *buf)
457 return msize; 364 return msize;
458} 365}
459 366
460
461/**
462 * Function called to notify a client about the connection
463 * begin ready to queue more data. "buf" will be
464 * NULL and "size" zero if the connection was closed for
465 * writing in the meantime.
466 *
467 * @param cls closure
468 * @param size number of bytes available in buf
469 * @param buf where the callee should write the message
470 * @return number of bytes written to buf
471 */
472static size_t
473transmit_conclude (void *cls, size_t size, void *buf)
474{
475 struct GNUNET_CONSENSUS_ConcludeMessage *msg;
476 struct GNUNET_CONSENSUS_Handle *consensus;
477 int msize;
478
479 GNUNET_assert (NULL != buf);
480
481 consensus = cls;
482 consensus->th = NULL;
483
484 msg = buf;
485
486 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
487
488 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
489 msg->header.size = htons (msize);
490 msg->timeout =
491 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
492
493 ntr_ack (consensus);
494
495 return msize;
496}
497
498
499/**
500 * Function called to notify a client about the connection
501 * begin ready to queue more data. "buf" will be
502 * NULL and "size" zero if the connection was closed for
503 * writing in the meantime.
504 *
505 * @param cls the consensus handle
506 * @param size number of bytes available in buf
507 * @param buf where the callee should write the message
508 * @return number of bytes written to buf
509 */
510static size_t
511transmit_begin (void *cls, size_t size, void *buf)
512{
513 struct GNUNET_MessageHeader *msg;
514 struct GNUNET_CONSENSUS_Handle *consensus;
515 int msize;
516
517 GNUNET_assert (NULL != buf);
518
519 consensus = cls;
520 consensus->th = NULL;
521
522 msg = buf;
523
524 msize = sizeof (struct GNUNET_MessageHeader);
525
526 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
527 msg->size = htons (msize);
528
529 ntr_ack (consensus);
530 ntr_insert (consensus);
531 ntr_conclude (consensus);
532
533 return msize;
534}
535
536
537/** 367/**
538 * Create a consensus session. 368 * Create a consensus session.
539 * 369 *
540 * @param cfg 370 * @param cfg configuration to use for connecting to the consensus service
541 * @param num_peers 371 * @param num_peers number of peers in the peers array
542 * @param peers array of peers participating in this consensus session 372 * @param peers array of peers participating in this consensus session
543 * Inclusion of the local peer is optional. 373 * Inclusion of the local peer is optional.
544 * @param session_id session identifier 374 * @param session_id session identifier
@@ -553,7 +383,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
553 unsigned int num_peers, 383 unsigned int num_peers,
554 const struct GNUNET_PeerIdentity *peers, 384 const struct GNUNET_PeerIdentity *peers,
555 const struct GNUNET_HashCode *session_id, 385 const struct GNUNET_HashCode *session_id,
556 GNUNET_CONSENSUS_NewElementCallback new_element_cb, 386 GNUNET_CONSENSUS_ElementCallback new_element_cb,
557 void *new_element_cls) 387 void *new_element_cls)
558{ 388{
559 struct GNUNET_CONSENSUS_Handle *consensus; 389 struct GNUNET_CONSENSUS_Handle *consensus;
@@ -567,17 +397,10 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
567 consensus->session_id = *session_id; 397 consensus->session_id = *session_id;
568 398
569 if (0 == num_peers) 399 if (0 == num_peers)
570 {
571 consensus->peers = NULL; 400 consensus->peers = NULL;
572 }
573 else if (num_peers > 0) 401 else if (num_peers > 0)
574 { 402 consensus->peers =
575 consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); 403 GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
576 }
577 else
578 {
579 GNUNET_break (0);
580 }
581 404
582 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); 405 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
583 406
@@ -615,45 +438,37 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
615 GNUNET_CONSENSUS_InsertDoneCallback idc, 438 GNUNET_CONSENSUS_InsertDoneCallback idc,
616 void *idc_cls) 439 void *idc_cls)
617{ 440{
618 GNUNET_assert (NULL == consensus->idc); 441 struct QueuedMessage *qmsg;
619 GNUNET_assert (NULL == consensus->insert_element); 442 struct GNUNET_CONSENSUS_ElementMessage *element_msg;
620 GNUNET_assert (NULL == consensus->conclude_cb); 443 size_t element_msg_size;
621 444
622 consensus->idc = idc; 445 LOG (GNUNET_ERROR_TYPE_INFO, "inserting, size=%llu\n", element->size);
623 consensus->idc_cls = idc_cls;
624 consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
625 446
626 if (consensus->joined == 0) 447 element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
627 { 448 element->size);
628 return;
629 }
630 449
631 ntr_insert (consensus); 450 element_msg = GNUNET_malloc (element_msg_size);
632} 451 element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
452 element_msg->header.size = htons (element_msg_size);
453 memcpy (&element_msg[1], element->data, element->size);
633 454
455 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
456 qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
457 qmsg->size = element_msg_size;
458 qmsg->idc = idc;
459 qmsg->idc_cls = idc_cls;
634 460
635/** 461 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
636 * Begin reconciling elements with other peers.
637 *
638 * @param consensus handle for the consensus session
639 */
640void
641GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
642{
643 GNUNET_assert (NULL == consensus->idc);
644 GNUNET_assert (NULL == consensus->insert_element);
645 GNUNET_assert (GNUNET_NO == consensus->begin_requested);
646 GNUNET_assert (GNUNET_NO == consensus->begin_sent);
647
648 consensus->begin_requested = GNUNET_YES;
649 462
650 ntr_begin (consensus); 463 schedule_transmit (consensus);
651} 464}
652 465
653 466
654/** 467/**
655 * We are finished inserting new elements into the consensus; 468 * We are done with inserting new elements into the consensus;
656 * try to conclude the consensus within a given time window. 469 * try to conclude the consensus within a given time window.
470 * After conclude has been called, no further elements may be
471 * inserted by the client.
657 * 472 *
658 * @param consensus consensus session 473 * @param consensus consensus session
659 * @param timeout timeout after which the conculde callback 474 * @param timeout timeout after which the conculde callback
@@ -664,20 +479,32 @@ GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
664void 479void
665GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, 480GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
666 struct GNUNET_TIME_Relative timeout, 481 struct GNUNET_TIME_Relative timeout,
482 unsigned int min_group_size_in_consensus,
667 GNUNET_CONSENSUS_ConcludeCallback conclude, 483 GNUNET_CONSENSUS_ConcludeCallback conclude,
668 void *conclude_cls) 484 void *conclude_cls)
669{ 485{
486 struct QueuedMessage *qmsg;
487 struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
488
670 GNUNET_assert (NULL != conclude); 489 GNUNET_assert (NULL != conclude);
671 GNUNET_assert (NULL == consensus->conclude_cb); 490 GNUNET_assert (NULL == consensus->conclude_cb);
672 491
673 consensus->conclude_cls = conclude_cls; 492 consensus->conclude_cls = conclude_cls;
674 consensus->conclude_cb = conclude; 493 consensus->conclude_cb = conclude;
675 consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
676 494
495 conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
496 conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
497 conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
498 conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
499 conclude_msg->min_group_size = min_group_size_in_consensus;
500
501 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
502 qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
503 qmsg->size = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
504
505 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
677 506
678 /* if transmitting the conclude message is not possible right now, transmit_join 507 schedule_transmit (consensus);
679 * or transmit_ack will handle it */
680 ntr_conclude (consensus);
681} 508}
682 509
683 510