aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-01-03 00:43:57 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-01-03 00:43:57 +0000
commit845316cb543af7e4c77709acce4df79f3e0dc162 (patch)
tree629739608b0bd0f645d82fd41d90ccb2cb617d09 /src/consensus
parent3def510831309a5a0a5f50a2250911d6b592a87e (diff)
downloadgnunet-845316cb543af7e4c77709acce4df79f3e0dc162.tar.gz
gnunet-845316cb543af7e4c77709acce4df79f3e0dc162.zip
implemented the modified consensus api, started implementing p2p protocol for consensus
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/Makefile.am1
-rw-r--r--src/consensus/consensus.h9
-rw-r--r--src/consensus/consensus_api.c481
-rw-r--r--src/consensus/gnunet-consensus-start-peers.c3
-rw-r--r--src/consensus/gnunet-consensus.c21
-rw-r--r--src/consensus/gnunet-service-consensus.c424
-rw-r--r--src/consensus/test_consensus_api.c30
7 files changed, 556 insertions, 413 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am
index d6901245f..1beaa0c62 100644
--- a/src/consensus/Makefile.am
+++ b/src/consensus/Makefile.am
@@ -57,6 +57,7 @@ gnunet_service_consensus_SOURCES = \
57gnunet_service_consensus_LDADD = \ 57gnunet_service_consensus_LDADD = \
58 $(top_builddir)/src/util/libgnunetutil.la \ 58 $(top_builddir)/src/util/libgnunetutil.la \
59 $(top_builddir)/src/core/libgnunetcore.la \ 59 $(top_builddir)/src/core/libgnunetcore.la \
60 $(top_builddir)/src/mesh/libgnunetmesh.la \
60 $(GN_LIBINTL) 61 $(GN_LIBINTL)
61 62
62libgnunetconsensus_la_SOURCES = \ 63libgnunetconsensus_la_SOURCES = \
diff --git a/src/consensus/consensus.h b/src/consensus/consensus.h
index d76c6b769..75b90b0f9 100644
--- a/src/consensus/consensus.h
+++ b/src/consensus/consensus.h
@@ -52,7 +52,15 @@ struct GNUNET_CONSENSUS_ConcludeMessage
52 */ 52 */
53 struct GNUNET_MessageHeader header; 53 struct GNUNET_MessageHeader header;
54 54
55 /**
56 * Timeout for conclude
57 */
55 struct GNUNET_TIME_RelativeNBO timeout; 58 struct GNUNET_TIME_RelativeNBO timeout;
59
60 /**
61 * Minimum group size required for a consensus group.
62 */
63 uint32_t min_group_size;
56}; 64};
57 65
58 66
@@ -102,6 +110,7 @@ struct GNUNET_CONSENSUS_AckMessage
102 */ 110 */
103 uint8_t keep; 111 uint8_t keep;
104 112
113 /* FIXME: add message hash? */
105}; 114};
106 115
107GNUNET_NETWORK_STRUCT_END 116GNUNET_NETWORK_STRUCT_END
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 2479c019c..25c76b358 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -33,14 +33,43 @@
33 33
34#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) 34#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__)
35 35
36struct ElementAck 36/**
37 * Actions that can be queued.
38 */
39struct QueuedMessage
37{ 40{
38 struct ElementAck *next; 41 /**
39 struct ElementAck *prev; 42 * Queued messages are stored in a doubly linked list.
40 int keep; 43 */
41 struct GNUNET_CONSENSUS_Element *element; 44 struct QueuedMessage *next;
45
46 /**
47 * Queued messages are stored in a doubly linked list.
48 */
49 struct QueuedMessage *prev;
50
51 /**
52 * The actual queued message.
53 */
54 struct GNUNET_MessageHeader *msg;
55
56 /**
57 * Size of the message in msg.
58 */
59 size_t size;
60
61 /**
62 * Will be called after transmit, if not NULL
63 */
64 GNUNET_CONSENSUS_InsertDoneCallback idc;
65
66 /**
67 * Closure for idc
68 */
69 void *idc_cls;
42}; 70};
43 71
72
44/** 73/**
45 * Handle for the service. 74 * Handle for the service.
46 */ 75 */
@@ -52,14 +81,14 @@ struct GNUNET_CONSENSUS_Handle
52 const struct GNUNET_CONFIGURATION_Handle *cfg; 81 const struct GNUNET_CONFIGURATION_Handle *cfg;
53 82
54 /** 83 /**
55 * Socket (if available). 84 * Client connected to the consensus service, may be NULL if not connected.
56 */ 85 */
57 struct GNUNET_CLIENT_Connection *client; 86 struct GNUNET_CLIENT_Connection *client;
58 87
59 /** 88 /**
60 * Callback for new elements. Not called for elements added locally. 89 * Callback for new elements. Not called for elements added locally.
61 */ 90 */
62 GNUNET_CONSENSUS_NewElementCallback new_element_cb; 91 GNUNET_CONSENSUS_ElementCallback new_element_cb;
63 92
64 /** 93 /**
65 * Closure for new_element_cb 94 * Closure for new_element_cb
@@ -67,7 +96,7 @@ struct GNUNET_CONSENSUS_Handle
67 void *new_element_cls; 96 void *new_element_cls;
68 97
69 /** 98 /**
70 * Session identifier for the consensus session. 99 * The (local) session identifier for the consensus session.
71 */ 100 */
72 struct GNUNET_HashCode session_id; 101 struct GNUNET_HashCode session_id;
73 102
@@ -77,9 +106,9 @@ struct GNUNET_CONSENSUS_Handle
77 int num_peers; 106 int num_peers;
78 107
79 /** 108 /**
80 * Peer identities of peers in the consensus. Optionally includes the local peer. 109 * Peer identities of peers participating in the consensus, includes the local peer.
81 */ 110 */
82 struct GNUNET_PeerIdentity *peers; 111 struct GNUNET_PeerIdentity **peers;
83 112
84 /** 113 /**
85 * Currently active transmit request. 114 * Currently active transmit request.
@@ -92,22 +121,11 @@ struct GNUNET_CONSENSUS_Handle
92 int joined; 121 int joined;
93 122
94 /** 123 /**
95 * Called when the current insertion operation finishes.
96 * NULL if there is no insert operation active.
97 */
98 GNUNET_CONSENSUS_InsertDoneCallback idc;
99
100 /**
101 * Closure for the insert done callback. 124 * Closure for the insert done callback.
102 */ 125 */
103 void *idc_cls; 126 void *idc_cls;
104 127
105 /** 128 /**
106 * An element that was requested to be inserted.
107 */
108 struct GNUNET_CONSENSUS_Element *insert_element;
109
110 /**
111 * Called when the conclude operation finishes or fails. 129 * Called when the conclude operation finishes or fails.
112 */ 130 */
113 GNUNET_CONSENSUS_ConcludeCallback conclude_cb; 131 GNUNET_CONSENSUS_ConcludeCallback conclude_cb;
@@ -122,103 +140,92 @@ struct GNUNET_CONSENSUS_Handle
122 */ 140 */
123 struct GNUNET_TIME_Absolute conclude_deadline; 141 struct GNUNET_TIME_Absolute conclude_deadline;
124 142
125 struct ElementAck *ack_head; 143 unsigned int conclude_min_size;
126 struct ElementAck *ack_tail;
127
128 /**
129 * Set to GNUNET_YES if the begin message has been transmitted to the service
130 */
131 int begin_sent;
132 144
133 /** 145 struct QueuedMessage *messages_head;
134 * Set to GNUNET_YES it the begin message should be transmitted to the service 146 struct QueuedMessage *messages_tail;
135 */
136 int begin_requested;
137}; 147};
138 148
139 149
140static size_t
141transmit_ack (void *cls, size_t size, void *buf);
142
143static size_t
144transmit_insert (void *cls, size_t size, void *buf);
145
146static size_t
147transmit_conclude (void *cls, size_t size, void *buf);
148
149static size_t
150transmit_begin (void *cls, size_t size, void *buf);
151
152 150
153/** 151/**
154 * Call notify_transmit_ready for ack if necessary and possible. 152 * Schedule transmitting the next message.
153 *
154 * @param consensus consensus handle
155 */ 155 */
156static void 156static void
157ntr_ack (struct GNUNET_CONSENSUS_Handle *consensus) 157schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus);
158{
159 if ((NULL == consensus->th) && (NULL != consensus->ack_head))
160 {
161 consensus->th =
162 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
163 sizeof (struct GNUNET_CONSENSUS_AckMessage),
164 GNUNET_TIME_UNIT_FOREVER_REL,
165 GNUNET_NO, &transmit_ack, consensus);
166 }
167}
168 158
169 159
170/** 160/**
171 * Call notify_transmit_ready for ack if necessary and possible. 161 * Function called to notify a client about the connection
162 * begin ready to queue more data. "buf" will be
163 * NULL and "size" zero if the connection was closed for
164 * writing in the meantime.
165 *
166 * @param cls closure
167 * @param size number of bytes available in buf
168 * @param buf where the callee should write the message
169 * @return number of bytes written to buf
172 */ 170 */
173static void 171static size_t transmit_queued (void *cls, size_t size,
174ntr_insert (struct GNUNET_CONSENSUS_Handle *consensus) 172 void *buf)
175{ 173{
176 if ((NULL == consensus->th) && (NULL != consensus->insert_element)) 174 struct GNUNET_CONSENSUS_Handle *consensus;
175 struct QueuedMessage *qmsg;
176 size_t ret_size;
177
178 printf("transmitting queued\n");
179
180 consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
181 qmsg = consensus->messages_head;
182 GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
183 GNUNET_assert (qmsg);
184
185 if (NULL == buf)
177 { 186 {
178 consensus->th = 187 if (NULL != qmsg->idc)
179 GNUNET_CLIENT_notify_transmit_ready (consensus->client, 188 {
180 sizeof (struct GNUNET_CONSENSUS_ElementMessage) + 189 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
181 consensus->insert_element->size, 190 }
182 GNUNET_TIME_UNIT_FOREVER_REL,
183 GNUNET_NO, &transmit_insert, consensus);
184 } 191 }
185}
186
187 192
188/** 193 memcpy (buf, qmsg->msg, qmsg->size);
189 * Call notify_transmit_ready for ack if necessary and possible. 194 ret_size = qmsg->size;
190 */ 195 if (NULL != qmsg->idc)
191static void
192ntr_conclude (struct GNUNET_CONSENSUS_Handle *consensus)
193{
194 if ((NULL == consensus->th) && (NULL != consensus->conclude_cb))
195 { 196 {
196 consensus->th = 197 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
197 GNUNET_CLIENT_notify_transmit_ready (consensus->client,
198 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage),
199 GNUNET_TIME_absolute_get_remaining (consensus->conclude_deadline),
200 GNUNET_NO, &transmit_conclude, consensus);
201 } 198 }
199 GNUNET_free (qmsg->msg);
200 GNUNET_free (qmsg);
201
202 schedule_transmit (consensus);
203
204 return ret_size;
202} 205}
203 206
204 207
205/** 208/**
206 * Call notify_transmit_ready for ack if necessary and possible. 209 * Schedule transmitting the next message.
210 *
211 * @param consensus consensus handle
207 */ 212 */
208static void 213static void
209ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus) 214schedule_transmit (struct GNUNET_CONSENSUS_Handle *consensus)
210{ 215{
211 if ((NULL == consensus->th) && (GNUNET_YES == consensus->begin_requested) && 216 if (NULL != consensus->th)
212 (GNUNET_NO == consensus->begin_sent)) 217 return;
218
219 if (NULL != consensus->messages_head)
213 { 220 {
214 consensus->th = 221 LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
215 GNUNET_CLIENT_notify_transmit_ready (consensus->client, 222 GNUNET_CLIENT_notify_transmit_ready (consensus->client, consensus->messages_head->size,
216 sizeof (struct GNUNET_MessageHeader), 223 GNUNET_TIME_UNIT_FOREVER_REL,
217 GNUNET_TIME_UNIT_FOREVER_REL, 224 GNUNET_NO, &transmit_queued, consensus);
218 GNUNET_NO, &transmit_begin, consensus);
219 } 225 }
220} 226}
221 227
228
222/** 229/**
223 * Called when the server has sent is a new element 230 * Called when the server has sent is a new element
224 * 231 *
@@ -226,11 +233,12 @@ ntr_begin (struct GNUNET_CONSENSUS_Handle *consensus)
226 * @param msg element message 233 * @param msg element message
227 */ 234 */
228static void 235static void
229handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus, 236handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
230 struct GNUNET_CONSENSUS_ElementMessage *msg) 237 struct GNUNET_CONSENSUS_ElementMessage *msg)
231{ 238{
232 struct GNUNET_CONSENSUS_Element element; 239 struct GNUNET_CONSENSUS_Element element;
233 struct ElementAck *ack; 240 struct GNUNET_CONSENSUS_AckMessage *ack_msg;
241 struct QueuedMessage *queued_msg;
234 int ret; 242 int ret;
235 243
236 element.type = msg->element_type; 244 element.type = msg->element_type;
@@ -238,11 +246,15 @@ handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
238 element.data = &msg[1]; 246 element.data = &msg[1];
239 247
240 ret = consensus->new_element_cb (consensus->new_element_cls, &element); 248 ret = consensus->new_element_cb (consensus->new_element_cls, &element);
241 ack = GNUNET_malloc (sizeof (struct ElementAck));
242 ack->keep = ret;
243 GNUNET_CONTAINER_DLL_insert_tail (consensus->ack_head, consensus->ack_tail,ack);
244 249
245 ntr_ack (consensus); 250 queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct GNUNET_CONSENSUS_AckMessage));
251 queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1];
252
253 ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg;
254 ack_msg->keep = ret;
255
256 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail,
257 queued_msg);
246} 258}
247 259
248 260
@@ -254,13 +266,12 @@ handle_new_element(struct GNUNET_CONSENSUS_Handle *consensus,
254 * @param msg conclude done message 266 * @param msg conclude done message
255 */ 267 */
256static void 268static void
257handle_conclude_done(struct GNUNET_CONSENSUS_Handle *consensus, 269handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
258 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) 270 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
259{ 271{
260 GNUNET_assert (NULL != consensus->conclude_cb); 272 GNUNET_assert (NULL != consensus->conclude_cb);
261 consensus->conclude_cb(consensus->conclude_cls, 273 consensus->conclude_cb (consensus->conclude_cls,
262 msg->num_peers, 274 0, NULL);
263 (struct GNUNET_PeerIdentity *) &msg[1]);
264 consensus->conclude_cb = NULL; 275 consensus->conclude_cb = NULL;
265} 276}
266 277
@@ -287,12 +298,6 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
287 GNUNET_CLIENT_disconnect (consensus->client); 298 GNUNET_CLIENT_disconnect (consensus->client);
288 consensus->client = NULL; 299 consensus->client = NULL;
289 consensus->new_element_cb (NULL, NULL); 300 consensus->new_element_cb (NULL, NULL);
290 if (NULL != consensus->idc)
291 {
292 consensus->idc(consensus->idc_cls, GNUNET_NO);
293 consensus->idc = NULL;
294 consensus->idc_cls = NULL;
295 }
296 return; 301 return;
297 } 302 }
298 303
@@ -305,108 +310,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
305 handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); 310 handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg);
306 break; 311 break;
307 default: 312 default:
308 LOG(GNUNET_ERROR_TYPE_WARNING, "did not understand message type sent by service, ignoring"); 313 GNUNET_break (0);
309 } 314 }
310 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, 315 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
311 GNUNET_TIME_UNIT_FOREVER_REL); 316 GNUNET_TIME_UNIT_FOREVER_REL);
312} 317}
313 318
314
315
316
317/**
318 * Function called to notify a client about the connection
319 * begin ready to queue more data. "buf" will be
320 * NULL and "size" zero if the connection was closed for
321 * writing in the meantime.
322 *
323 * @param cls closure
324 * @param size number of bytes available in buf
325 * @param buf where the callee should write the message
326 * @return number of bytes written to buf
327 */
328static size_t
329transmit_ack (void *cls, size_t size, void *buf)
330{
331 struct GNUNET_CONSENSUS_AckMessage *msg;
332 struct GNUNET_CONSENSUS_Handle *consensus;
333
334 consensus = (struct GNUNET_CONSENSUS_Handle *) cls;
335
336 GNUNET_assert (NULL != consensus->ack_head);
337
338 msg = (struct GNUNET_CONSENSUS_AckMessage *) buf;
339 msg->keep = consensus->ack_head->keep;
340 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
341 msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_AckMessage));
342
343 consensus->ack_head = consensus->ack_head->next;
344
345 consensus->th = NULL;
346
347 ntr_insert (consensus);
348 ntr_ack (consensus);
349 ntr_conclude (consensus);
350
351 return sizeof (struct GNUNET_CONSENSUS_AckMessage);
352}
353
354/**
355 * Function called to notify a client about the connection
356 * begin ready to queue more data. "buf" will be
357 * NULL and "size" zero if the connection was closed for
358 * writing in the meantime.
359 *
360 * @param cls closure
361 * @param size number of bytes available in buf
362 * @param buf where the callee should write the message
363 * @return number of bytes written to buf
364 */
365static size_t
366transmit_insert (void *cls, size_t size, void *buf)
367{
368 struct GNUNET_CONSENSUS_ElementMessage *msg;
369 struct GNUNET_CONSENSUS_Handle *consensus;
370 GNUNET_CONSENSUS_InsertDoneCallback idc;
371 int msize;
372 void *idc_cls;
373
374 GNUNET_assert (NULL != buf);
375
376 consensus = cls;
377
378 GNUNET_assert (NULL != consensus->insert_element);
379
380 consensus->th = NULL;
381
382 msg = buf;
383
384 msize = sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
385 consensus->insert_element->size;
386
387 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
388 msg->header.size = htons (msize);
389 memcpy (&msg[1],
390 consensus->insert_element->data,
391 consensus->insert_element->size);
392
393 consensus->insert_element = NULL;
394
395 idc = consensus->idc;
396 consensus->idc = NULL;
397 idc_cls = consensus->idc_cls;
398 consensus->idc_cls = NULL;
399 idc (idc_cls, GNUNET_YES);
400
401
402 ntr_ack (consensus);
403 ntr_insert (consensus);
404 ntr_conclude (consensus);
405
406 return msize;
407}
408
409
410/** 319/**
411 * Function called to notify a client about the connection 320 * Function called to notify a client about the connection
412 * begin ready to queue more data. "buf" will be 321 * begin ready to queue more data. "buf" will be
@@ -427,7 +336,7 @@ transmit_join (void *cls, size_t size, void *buf)
427 336
428 GNUNET_assert (NULL != buf); 337 GNUNET_assert (NULL != buf);
429 338
430 LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n"); 339 LOG (GNUNET_ERROR_TYPE_INFO, "transmitting join message\n");
431 340
432 consensus = cls; 341 consensus = cls;
433 consensus->th = NULL; 342 consensus->th = NULL;
@@ -447,9 +356,7 @@ transmit_join (void *cls, size_t size, void *buf)
447 consensus->peers, 356 consensus->peers,
448 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); 357 consensus->num_peers * sizeof (struct GNUNET_PeerIdentity));
449 358
450 ntr_insert (consensus); 359 schedule_transmit (consensus);
451 ntr_begin (consensus);
452 ntr_conclude (consensus);
453 360
454 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, 361 GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus,
455 GNUNET_TIME_UNIT_FOREVER_REL); 362 GNUNET_TIME_UNIT_FOREVER_REL);
@@ -457,88 +364,11 @@ transmit_join (void *cls, size_t size, void *buf)
457 return msize; 364 return msize;
458} 365}
459 366
460
461/**
462 * Function called to notify a client about the connection
463 * begin ready to queue more data. "buf" will be
464 * NULL and "size" zero if the connection was closed for
465 * writing in the meantime.
466 *
467 * @param cls closure
468 * @param size number of bytes available in buf
469 * @param buf where the callee should write the message
470 * @return number of bytes written to buf
471 */
472static size_t
473transmit_conclude (void *cls, size_t size, void *buf)
474{
475 struct GNUNET_CONSENSUS_ConcludeMessage *msg;
476 struct GNUNET_CONSENSUS_Handle *consensus;
477 int msize;
478
479 GNUNET_assert (NULL != buf);
480
481 consensus = cls;
482 consensus->th = NULL;
483
484 msg = buf;
485
486 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
487
488 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
489 msg->header.size = htons (msize);
490 msg->timeout =
491 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining(consensus->conclude_deadline));
492
493 ntr_ack (consensus);
494
495 return msize;
496}
497
498
499/**
500 * Function called to notify a client about the connection
501 * begin ready to queue more data. "buf" will be
502 * NULL and "size" zero if the connection was closed for
503 * writing in the meantime.
504 *
505 * @param cls the consensus handle
506 * @param size number of bytes available in buf
507 * @param buf where the callee should write the message
508 * @return number of bytes written to buf
509 */
510static size_t
511transmit_begin (void *cls, size_t size, void *buf)
512{
513 struct GNUNET_MessageHeader *msg;
514 struct GNUNET_CONSENSUS_Handle *consensus;
515 int msize;
516
517 GNUNET_assert (NULL != buf);
518
519 consensus = cls;
520 consensus->th = NULL;
521
522 msg = buf;
523
524 msize = sizeof (struct GNUNET_MessageHeader);
525
526 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN);
527 msg->size = htons (msize);
528
529 ntr_ack (consensus);
530 ntr_insert (consensus);
531 ntr_conclude (consensus);
532
533 return msize;
534}
535
536
537/** 367/**
538 * Create a consensus session. 368 * Create a consensus session.
539 * 369 *
540 * @param cfg 370 * @param cfg configuration to use for connecting to the consensus service
541 * @param num_peers 371 * @param num_peers number of peers in the peers array
542 * @param peers array of peers participating in this consensus session 372 * @param peers array of peers participating in this consensus session
543 * Inclusion of the local peer is optional. 373 * Inclusion of the local peer is optional.
544 * @param session_id session identifier 374 * @param session_id session identifier
@@ -553,7 +383,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
553 unsigned int num_peers, 383 unsigned int num_peers,
554 const struct GNUNET_PeerIdentity *peers, 384 const struct GNUNET_PeerIdentity *peers,
555 const struct GNUNET_HashCode *session_id, 385 const struct GNUNET_HashCode *session_id,
556 GNUNET_CONSENSUS_NewElementCallback new_element_cb, 386 GNUNET_CONSENSUS_ElementCallback new_element_cb,
557 void *new_element_cls) 387 void *new_element_cls)
558{ 388{
559 struct GNUNET_CONSENSUS_Handle *consensus; 389 struct GNUNET_CONSENSUS_Handle *consensus;
@@ -567,17 +397,10 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
567 consensus->session_id = *session_id; 397 consensus->session_id = *session_id;
568 398
569 if (0 == num_peers) 399 if (0 == num_peers)
570 {
571 consensus->peers = NULL; 400 consensus->peers = NULL;
572 }
573 else if (num_peers > 0) 401 else if (num_peers > 0)
574 { 402 consensus->peers =
575 consensus->peers = GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); 403 GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity));
576 }
577 else
578 {
579 GNUNET_break (0);
580 }
581 404
582 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); 405 consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
583 406
@@ -615,45 +438,37 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
615 GNUNET_CONSENSUS_InsertDoneCallback idc, 438 GNUNET_CONSENSUS_InsertDoneCallback idc,
616 void *idc_cls) 439 void *idc_cls)
617{ 440{
618 GNUNET_assert (NULL == consensus->idc); 441 struct QueuedMessage *qmsg;
619 GNUNET_assert (NULL == consensus->insert_element); 442 struct GNUNET_CONSENSUS_ElementMessage *element_msg;
620 GNUNET_assert (NULL == consensus->conclude_cb); 443 size_t element_msg_size;
621 444
622 consensus->idc = idc; 445 LOG (GNUNET_ERROR_TYPE_INFO, "inserting, size=%llu\n", element->size);
623 consensus->idc_cls = idc_cls;
624 consensus->insert_element = GNUNET_memdup(element, sizeof (struct GNUNET_CONSENSUS_Element) + element->size);
625 446
626 if (consensus->joined == 0) 447 element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) +
627 { 448 element->size);
628 return;
629 }
630 449
631 ntr_insert (consensus); 450 element_msg = GNUNET_malloc (element_msg_size);
632} 451 element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT);
452 element_msg->header.size = htons (element_msg_size);
453 memcpy (&element_msg[1], element->data, element->size);
633 454
455 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
456 qmsg->msg = (struct GNUNET_MessageHeader *) element_msg;
457 qmsg->size = element_msg_size;
458 qmsg->idc = idc;
459 qmsg->idc_cls = idc_cls;
634 460
635/** 461 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
636 * Begin reconciling elements with other peers.
637 *
638 * @param consensus handle for the consensus session
639 */
640void
641GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
642{
643 GNUNET_assert (NULL == consensus->idc);
644 GNUNET_assert (NULL == consensus->insert_element);
645 GNUNET_assert (GNUNET_NO == consensus->begin_requested);
646 GNUNET_assert (GNUNET_NO == consensus->begin_sent);
647
648 consensus->begin_requested = GNUNET_YES;
649 462
650 ntr_begin (consensus); 463 schedule_transmit (consensus);
651} 464}
652 465
653 466
654/** 467/**
655 * We are finished inserting new elements into the consensus; 468 * We are done with inserting new elements into the consensus;
656 * try to conclude the consensus within a given time window. 469 * try to conclude the consensus within a given time window.
470 * After conclude has been called, no further elements may be
471 * inserted by the client.
657 * 472 *
658 * @param consensus consensus session 473 * @param consensus consensus session
659 * @param timeout timeout after which the conculde callback 474 * @param timeout timeout after which the conculde callback
@@ -664,20 +479,32 @@ GNUNET_CONSENSUS_begin (struct GNUNET_CONSENSUS_Handle *consensus)
664void 479void
665GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, 480GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
666 struct GNUNET_TIME_Relative timeout, 481 struct GNUNET_TIME_Relative timeout,
482 unsigned int min_group_size_in_consensus,
667 GNUNET_CONSENSUS_ConcludeCallback conclude, 483 GNUNET_CONSENSUS_ConcludeCallback conclude,
668 void *conclude_cls) 484 void *conclude_cls)
669{ 485{
486 struct QueuedMessage *qmsg;
487 struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg;
488
670 GNUNET_assert (NULL != conclude); 489 GNUNET_assert (NULL != conclude);
671 GNUNET_assert (NULL == consensus->conclude_cb); 490 GNUNET_assert (NULL == consensus->conclude_cb);
672 491
673 consensus->conclude_cls = conclude_cls; 492 consensus->conclude_cls = conclude_cls;
674 consensus->conclude_cb = conclude; 493 consensus->conclude_cb = conclude;
675 consensus->conclude_deadline = GNUNET_TIME_relative_to_absolute(timeout);
676 494
495 conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
496 conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
497 conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
498 conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
499 conclude_msg->min_group_size = min_group_size_in_consensus;
500
501 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
502 qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
503 qmsg->size = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
504
505 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg);
677 506
678 /* if transmitting the conclude message is not possible right now, transmit_join 507 schedule_transmit (consensus);
679 * or transmit_ack will handle it */
680 ntr_conclude (consensus);
681} 508}
682 509
683 510
diff --git a/src/consensus/gnunet-consensus-start-peers.c b/src/consensus/gnunet-consensus-start-peers.c
index fb7f047ae..8b516393f 100644
--- a/src/consensus/gnunet-consensus-start-peers.c
+++ b/src/consensus/gnunet-consensus-start-peers.c
@@ -147,6 +147,9 @@ run (void *cls, char *const *args, const char *cfgfile,
147 NULL, 147 NULL,
148 test_master, 148 test_master,
149 NULL); 149 NULL);
150
151
152 printf("hello there!\n");
150} 153}
151 154
152 155
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index bc518657e..cd267f5ec 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -62,10 +62,10 @@ stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
62 */ 62 */
63static void 63static void
64conclude_cb (void *cls, 64conclude_cb (void *cls,
65 unsigned int num_peers_in_consensus, 65 unsigned int consensus_group_count,
66 const struct GNUNET_PeerIdentity *peers_in_consensus) 66 const struct GNUNET_CONSENSUS_Group *groups)
67{ 67{
68 printf("reached conclusion with %d peers\n", num_peers_in_consensus); 68 printf("reached conclusion\n");
69 GNUNET_SCHEDULER_shutdown (); 69 GNUNET_SCHEDULER_shutdown ();
70} 70}
71 71
@@ -111,7 +111,7 @@ stdin_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
111 if (feof (stdin)) 111 if (feof (stdin))
112 { 112 {
113 printf ("concluding ...\n"); 113 printf ("concluding ...\n");
114 GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, conclude_cb, NULL); 114 GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, 0, conclude_cb, NULL);
115 } 115 }
116 return; 116 return;
117 } 117 }
@@ -144,6 +144,12 @@ static int
144cb (void *cls, 144cb (void *cls,
145 struct GNUNET_CONSENSUS_Element *element) 145 struct GNUNET_CONSENSUS_Element *element)
146{ 146{
147 if (NULL == element)
148 {
149 printf("error receiving from consensus\n");
150 GNUNET_SCHEDULER_shutdown ();
151 return GNUNET_NO;
152 }
147 printf("got element\n"); 153 printf("got element\n");
148 return GNUNET_YES; 154 return GNUNET_YES;
149} 155}
@@ -178,10 +184,12 @@ run (void *cls, char *const *args, const char *cfgfile,
178 184
179 if (NULL == session_id_str) 185 if (NULL == session_id_str)
180 { 186 {
181 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given\n"); 187 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no session id given (missing -s/--session-id)\n");
182 return; 188 return;
183 } 189 }
184 190
191 GNUNET_CRYPTO_hash (session_id_str, strlen (session_id_str), &sid);
192
185 for (count = 0; NULL != args[count]; count++); 193 for (count = 0; NULL != args[count]; count++);
186 194
187 if (0 != count) 195 if (0 != count)
@@ -213,9 +221,6 @@ run (void *cls, char *const *args, const char *cfgfile,
213 &sid, 221 &sid,
214 &cb, NULL); 222 &cb, NULL);
215 223
216 GNUNET_CONSENSUS_begin (consensus);
217
218
219 stdin_fh = GNUNET_DISK_get_handle_from_native (stdin); 224 stdin_fh = GNUNET_DISK_get_handle_from_native (stdin);
220 stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, stdin_fh, 225 stdin_tid = GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, stdin_fh,
221 &stdin_cb, NULL); 226 &stdin_cb, NULL);
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 195efc681..1b394db19 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -16,12 +16,19 @@
16 along with GNUnet; see the file COPYING. If not, write to the 16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330, 17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA. 18 Boston, MA 02111-1307, USA.
19 */ 19*/
20
20 21
22/**
23 * @file consensus/gnunet-service-consensus.c
24 * @brief
25 * @author Florian Dold
26 */
21 27
22#include "platform.h" 28#include "platform.h"
23#include "gnunet_common.h" 29#include "gnunet_common.h"
24#include "gnunet_protocols.h" 30#include "gnunet_protocols.h"
31#include "gnunet_applications.h"
25#include "gnunet_util_lib.h" 32#include "gnunet_util_lib.h"
26#include "gnunet_consensus_service.h" 33#include "gnunet_consensus_service.h"
27#include "gnunet_core_service.h" 34#include "gnunet_core_service.h"
@@ -57,6 +64,24 @@ struct PendingElement
57}; 64};
58 65
59 66
67/*
68 * A peer that is also in a consensus session.
69 * Note that 'this' peer is not in the list.
70 */
71struct ConsensusPeer
72{
73 struct GNUNET_PeerIdentity *peer_id;
74
75 /**
76 * Incoming tunnel from the peer.
77 */
78 struct GNUNET_MESH_Tunnel *incoming_tunnel;
79
80 struct InvertibleBloomFilter *last_ibf;
81
82};
83
84
60/** 85/**
61 * A consensus session consists of one local client and the remote authorities. 86 * A consensus session consists of one local client and the remote authorities.
62 */ 87 */
@@ -84,18 +109,14 @@ struct ConsensusSession
84 struct GNUNET_HashCode *global_id; 109 struct GNUNET_HashCode *global_id;
85 110
86 /** 111 /**
87 * Corresponding server handle. 112 * Local client in this consensus session.
113 * There is only one client per consensus session.
88 */ 114 */
89 struct GNUNET_SERVER_Client *client; 115 struct GNUNET_SERVER_Client *client;
90 116
91 /** 117 /**
92 * Client wants to receive and send updates.
93 */
94 int begin;
95
96 /**
97 * Values in the consensus set of this session, 118 * Values in the consensus set of this session,
98 * all of them either have been sent or approved by the client. 119 * all of them either have been sent by or approved by the client.
99 */ 120 */
100 struct GNUNET_CONTAINER_MultiHashMap *values; 121 struct GNUNET_CONTAINER_MultiHashMap *values;
101 122
@@ -110,12 +131,12 @@ struct ConsensusSession
110 struct PendingElement *transmit_pending_tail; 131 struct PendingElement *transmit_pending_tail;
111 132
112 /** 133 /**
113 * Elements that have not been sent to the client yet. 134 * Elements that have not been approved (or rejected) by the client yet.
114 */ 135 */
115 struct PendingElement *approval_pending_head; 136 struct PendingElement *approval_pending_head;
116 137
117 /** 138 /**
118 * Elements that have not been sent to the client yet. 139 * Elements that have not been approved (or rejected) by the client yet.
119 */ 140 */
120 struct PendingElement *approval_pending_tail; 141 struct PendingElement *approval_pending_tail;
121 142
@@ -136,9 +157,47 @@ struct ConsensusSession
136 int conclude_sent; 157 int conclude_sent;
137 158
138 /** 159 /**
160 * Minimum number of peers to form a consensus group
161 */
162 int conclude_group_min;
163
164 /**
165 * Current round of the conclusion
166 */
167 int current_round;
168
169 /**
170 * Soft deadline for conclude.
171 * Speed up the speed of the consensus at the cost of consensus quality, as
172 * the time approached or crosses the deadline.
173 */
174 struct GNUNET_TIME_Absolute conclude_deadline;
175
176 /**
139 * Number of other peers in the consensus 177 * Number of other peers in the consensus
140 */ 178 */
141 int num_peers; 179 unsigned int num_peers;
180
181 /**
182 * Other peers in the consensus, array of ConsensusPeer
183 */
184 struct ConsensusPeer *peers;
185
186 /**
187 * Tunnel for broadcasting to all other authorities
188 */
189 struct GNUNET_MESH_Tunnel *broadcast_tunnel;
190
191 /**
192 * Time limit for one round of pairwise exchange.
193 * FIXME: should not actually be a constant
194 */
195 struct GNUNET_TIME_Relative round_time;
196
197 /**
198 * Task identifier for the round timeout task
199 */
200 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
142}; 201};
143 202
144 203
@@ -167,10 +226,21 @@ static struct GNUNET_SERVER_Handle *srv;
167 */ 226 */
168static struct GNUNET_PeerIdentity *my_peer; 227static struct GNUNET_PeerIdentity *my_peer;
169 228
229/**
230 * Handle to the mesh service.
231 */
232static struct GNUNET_MESH_Handle *mesh;
233
234/**
235 * Handle to the core service. Only used during service startup, will be NULL after that.
236 */
237static struct GNUNET_CORE_Handle *core;
238
170static void 239static void
171disconnect_client (struct GNUNET_SERVER_Client *client) 240disconnect_client (struct GNUNET_SERVER_Client *client)
172{ 241{
173 /* FIXME */ 242 GNUNET_SERVER_client_disconnect (client);
243 /* FIXME: free data structures that this client owns */
174} 244}
175 245
176static void 246static void
@@ -185,7 +255,6 @@ compute_global_id (struct GNUNET_HashCode *dst,
185 *dst = *local_id; 255 *dst = *local_id;
186 for (i = 0; i < num_peers; ++i) 256 for (i = 0; i < num_peers; ++i)
187 { 257 {
188 /* FIXME: maybe hash_xor/hash allow aliased source/target, and we can get by without tmp */
189 GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp); 258 GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp);
190 *dst = tmp; 259 *dst = tmp;
191 GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp); 260 GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp);
@@ -255,7 +324,7 @@ send_next (struct ConsensusSession *session)
255 324
256 if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent == GNUNET_NO)) 325 if ((session->conclude_requested == GNUNET_YES) && (session->conclude_sent == GNUNET_NO))
257 { 326 {
258 /* just the conclude message with no other authorities in the dummy */ 327 /* FIXME */
259 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage); 328 msize = sizeof (struct GNUNET_CONSENSUS_ConcludeMessage);
260 session->th = 329 session->th =
261 GNUNET_SERVER_notify_transmit_ready (session->client, msize, 330 GNUNET_SERVER_notify_transmit_ready (session->client, msize,
@@ -274,6 +343,39 @@ send_next (struct ConsensusSession *session)
274 343
275 344
276/** 345/**
346 * Method called whenever a peer has disconnected from the tunnel.
347 * Implementations of this callback must NOT call
348 * GNUNET_MESH_tunnel_destroy immediately, but instead schedule those
349 * to run in some other task later. However, calling
350 * "GNUNET_MESH_notify_transmit_ready_cancel" is allowed.
351 *
352 * @param cls closure
353 * @param peer peer identity the tunnel stopped working with
354 */
355static void
356disconnect_handler (void *cls, const struct GNUNET_PeerIdentity *peer)
357{
358 /* FIXME: how do we handle this */
359}
360
361
362/**
363 * Method called whenever a peer has connected to the tunnel.
364 *
365 * @param cls closure
366 * @param peer peer identity the tunnel was created to, NULL on timeout
367 * @param atsi performance data for the connection
368 */
369static void
370connect_handler (void *cls,
371 const struct GNUNET_PeerIdentity *peer,
372 const struct GNUNET_ATS_Information *atsi)
373{
374 /* not much we can do here, now we know the other peer has been added to our broadcast tunnel */
375}
376
377
378/**
277 * Called when a client wants to join a consensus session. 379 * Called when a client wants to join a consensus session.
278 * 380 *
279 * @param cls unused 381 * @param cls unused
@@ -288,18 +390,24 @@ client_join (void *cls,
288 struct GNUNET_HashCode global_id; 390 struct GNUNET_HashCode global_id;
289 const struct GNUNET_CONSENSUS_JoinMessage *msg; 391 const struct GNUNET_CONSENSUS_JoinMessage *msg;
290 struct ConsensusSession *session; 392 struct ConsensusSession *session;
393 unsigned int i;
291 394
292 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n"); 395 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client joining\n");
293 396
294 msg = (struct GNUNET_CONSENSUS_JoinMessage *) m; 397 msg = (struct GNUNET_CONSENSUS_JoinMessage *) m;
295 398
399 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session id is %s\n", GNUNET_h2s (&msg->session_id));
400
296 compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers); 401 compute_global_id (&global_id, &msg->session_id, (struct GNUNET_PeerIdentity *) &m[1], msg->num_peers);
297 402
403 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "computed global id is %s\n", GNUNET_h2s (&global_id));
404
298 session = sessions_head; 405 session = sessions_head;
299 while (NULL != session) 406 while (NULL != session)
300 { 407 {
301 if (client == session->client) 408 if (client == session->client)
302 { 409 {
410
303 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n"); 411 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client already in session\n");
304 disconnect_client (client); 412 disconnect_client (client);
305 return; 413 return;
@@ -310,6 +418,7 @@ client_join (void *cls,
310 disconnect_client (client); 418 disconnect_client (client);
311 return; 419 return;
312 } 420 }
421 session = session->next;
313 } 422 }
314 423
315 GNUNET_SERVER_client_keep (client); 424 GNUNET_SERVER_client_keep (client);
@@ -320,11 +429,40 @@ client_join (void *cls,
320 session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode)); 429 session->global_id = GNUNET_memdup (&global_id, sizeof (struct GNUNET_HashCode));
321 session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO); 430 session->values = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
322 session->client = client; 431 session->client = client;
432 /* FIXME: should not be a constant, but chosen adaptively */
433 session->round_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
323 434
324 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); 435 session->broadcast_tunnel = GNUNET_MESH_tunnel_create (mesh, session, connect_handler, disconnect_handler, session);
436
437 session->num_peers = 0;
438
439 /* count the peers that are not the local peer */
440 for (i = 0; i < msg->num_peers; i++)
441 {
442 struct GNUNET_PeerIdentity *peers;
443 peers = (struct GNUNET_PeerIdentity *) &msg[1];
444 if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
445 session->num_peers++;
446 }
447
448 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeer));
449
450 /* copy the peer identities and add peers to broadcast tunnel */
451 for (i = 0; i < msg->num_peers; i++)
452 {
453 struct GNUNET_PeerIdentity *peers;
454 peers = (struct GNUNET_PeerIdentity *) &msg[1];
455 if (0 != memcmp (&peers[i], my_peer, sizeof (struct GNUNET_PeerIdentity)))
456 {
457 *session->peers->peer_id = peers[i];
458 GNUNET_MESH_peer_request_connect_add (session->broadcast_tunnel, &peers[i]);
459 }
460 }
325 461
326 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n"); 462 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "created new session\n");
327 463
464 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
465
328 GNUNET_SERVER_receive_done (client, GNUNET_OK); 466 GNUNET_SERVER_receive_done (client, GNUNET_OK);
329} 467}
330 468
@@ -381,40 +519,28 @@ client_insert (void *cls,
381 519
382 520
383/** 521/**
384 * Called when a client wants to begin 522 * Do one round of the conclusion.
523 * Start by broadcasting the set difference estimator (IBF strata).
524 *
385 */ 525 */
386void 526void
387client_begin (void *cls, 527conclude_do_round (struct ConsensusSession *session)
388 struct GNUNET_SERVER_Client *client,
389 const struct GNUNET_MessageHeader *message)
390{ 528{
391 struct ConsensusSession *session; 529 /* FIXME */
392 530}
393 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client requested begin\n");
394
395 session = sessions_head;
396 while (NULL != session)
397 {
398 if (session->client == client)
399 break;
400 }
401
402 if (NULL == session)
403 {
404 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to 'begin', but client is not in any session\n");
405 GNUNET_SERVER_client_disconnect (client);
406 return;
407 }
408
409 session->begin = GNUNET_YES;
410 531
411 send_next (session);
412 532
413 GNUNET_SERVER_receive_done (client, GNUNET_OK); 533/**
534 * Cancel the current round if necessary, decide to run another round or
535 * terminate.
536 */
537void
538conclude_round_done (struct ConsensusSession *session)
539{
540 /* FIXME */
414} 541}
415 542
416 543
417
418/** 544/**
419 * Called when a client performs the conclude operation. 545 * Called when a client performs the conclude operation.
420 */ 546 */
@@ -425,6 +551,8 @@ client_conclude (void *cls,
425{ 551{
426 struct ConsensusSession *session; 552 struct ConsensusSession *session;
427 553
554 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
555
428 session = sessions_head; 556 session = sessions_head;
429 while ((session != NULL) && (session->client != client)) 557 while ((session != NULL) && (session->client != client))
430 { 558 {
@@ -432,12 +560,25 @@ client_conclude (void *cls,
432 } 560 }
433 if (NULL == session) 561 if (NULL == session)
434 { 562 {
563 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client not found\n");
564 GNUNET_SERVER_client_disconnect (client);
565 return;
566 }
567
568 if (GNUNET_YES == session->conclude_requested)
569 {
570 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client requested conclude twice\n");
435 GNUNET_SERVER_client_disconnect (client); 571 GNUNET_SERVER_client_disconnect (client);
436 return; 572 return;
437 } 573 }
574
438 session->conclude_requested = GNUNET_YES; 575 session->conclude_requested = GNUNET_YES;
439 send_next (session); 576
577 conclude_do_round (session);
578
440 GNUNET_SERVER_receive_done (client, GNUNET_OK); 579 GNUNET_SERVER_receive_done (client, GNUNET_OK);
580
581 send_next (session);
441} 582}
442 583
443 584
@@ -462,10 +603,8 @@ static void
462disconnect_core (void *cls, 603disconnect_core (void *cls,
463 const struct GNUNET_SCHEDULER_TaskContext *tc) 604 const struct GNUNET_SCHEDULER_TaskContext *tc)
464{ 605{
465 struct GNUNET_CORE_Handle *core;
466 core = (struct GNUNET_CORE_Handle *) cls;
467 GNUNET_CORE_disconnect (core); 606 GNUNET_CORE_disconnect (core);
468 607 core = NULL;
469 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n"); 608 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnected from core\n");
470} 609}
471 610
@@ -478,8 +617,6 @@ core_startup (void *cls,
478 static const struct GNUNET_SERVER_MessageHandler handlers[] = { 617 static const struct GNUNET_SERVER_MessageHandler handlers[] = {
479 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, 618 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
480 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, 619 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
481 {&client_begin, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_BEGIN,
482 sizeof (struct GNUNET_MessageHeader)},
483 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, 620 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
484 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, 621 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
485 {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK, 622 {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK,
@@ -489,13 +626,173 @@ core_startup (void *cls,
489 626
490 GNUNET_SERVER_add_handlers (srv, handlers); 627 GNUNET_SERVER_add_handlers (srv, handlers);
491 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity)); 628 my_peer = GNUNET_memdup(peer, sizeof (struct GNUNET_PeerIdentity));
629 /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */
492 GNUNET_SCHEDULER_add_now (&disconnect_core, core); 630 GNUNET_SCHEDULER_add_now (&disconnect_core, core);
493 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); 631 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n");
494} 632}
495 633
496 634
635
636/**
637 * Method called whenever another peer has added us to a tunnel
638 * the other peer initiated.
639 * Only called (once) upon reception of data with a message type which was
640 * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy
641 * causes te tunnel to be ignored and no further notifications are sent about
642 * the same tunnel.
643 *
644 * @param cls closure
645 * @param tunnel new handle to the tunnel
646 * @param initiator peer that started the tunnel
647 * @param atsi performance information for the tunnel
648 * @return initial tunnel context for the tunnel
649 * (can be NULL -- that's not an error)
650 */
651static void *
652new_tunnel (void *cls,
653 struct GNUNET_MESH_Tunnel *tunnel,
654 const struct GNUNET_PeerIdentity *initiator,
655 const struct GNUNET_ATS_Information *atsi)
656{
657 /* there's nothing we can do here, as we don't have the global consensus id yet */
658 return NULL;
659}
660
661
662/**
663 * Function called whenever an inbound tunnel is destroyed. Should clean up
664 * any associated state. This function is NOT called if the client has
665 * explicitly asked for the tunnel to be destroyed using
666 * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
667 * the tunnel.
668 *
669 * @param cls closure (set from GNUNET_MESH_connect)
670 * @param tunnel connection to the other end (henceforth invalid)
671 * @param tunnel_ctx place where local state associated
672 * with the tunnel is stored
673 */
674static void
675cleaner (void *cls, const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
676{
677 /* FIXME: what to do here? */
678}
679
680
681
497/** 682/**
498 * Process consensus requests. 683 * Called to clean up, after a shutdown has been requested.
684 *
685 * @param cls closure
686 * @param tc context information (why was this task triggered now)
687 */
688static void
689shutdown_task (void *cls,
690 const struct GNUNET_SCHEDULER_TaskContext *tc)
691{
692 /* mesh requires all the tunnels to be destroyed manually */
693 while (NULL != sessions_head)
694 {
695 struct ConsensusSession *session;
696 session = sessions_head;
697 GNUNET_MESH_tunnel_destroy (sessions_head->broadcast_tunnel);
698 sessions_head = sessions_head->next;
699 GNUNET_free (session);
700 }
701
702 if (NULL != mesh)
703 {
704 GNUNET_MESH_disconnect (mesh);
705 mesh = NULL;
706 }
707 if (NULL != core)
708 {
709 GNUNET_CORE_disconnect (core);
710 core = NULL;
711 }
712}
713
714
715
716/**
717 * Functions with this signature are called whenever a message is
718 * received.
719 *
720 * @param cls closure (set from GNUNET_MESH_connect)
721 * @param tunnel connection to the other end
722 * @param tunnel_ctx place to store local state associated with the tunnel
723 * @param sender who sent the message
724 * @param message the actual message
725 * @param atsi performance data for the connection
726 * @return GNUNET_OK to keep the connection open,
727 * GNUNET_SYSERR to close it (signal serious error)
728 */
729static int
730p2p_delta_estimate (void *cls,
731 struct GNUNET_MESH_Tunnel * tunnel,
732 void **tunnel_ctx,
733 const struct GNUNET_PeerIdentity *sender,
734 const struct GNUNET_MessageHeader *message,
735 const struct GNUNET_ATS_Information *atsi)
736{
737 /* FIXME */
738 return GNUNET_OK;
739}
740
741
742/**
743 * Functions with this signature are called whenever a message is
744 * received.
745 *
746 * @param cls closure (set from GNUNET_MESH_connect)
747 * @param tunnel connection to the other end
748 * @param tunnel_ctx place to store local state associated with the tunnel
749 * @param sender who sent the message
750 * @param message the actual message
751 * @param atsi performance data for the connection
752 * @return GNUNET_OK to keep the connection open,
753 * GNUNET_SYSERR to close it (signal serious error)
754 */
755static int
756p2p_difference_digest (void *cls,
757 struct GNUNET_MESH_Tunnel * tunnel,
758 void **tunnel_ctx,
759 const struct GNUNET_PeerIdentity *sender,
760 const struct GNUNET_MessageHeader *message,
761 const struct GNUNET_ATS_Information *atsi)
762{
763 /* FIXME */
764 return GNUNET_OK;
765}
766
767
768/**
769 * Functions with this signature are called whenever a message is
770 * received.
771 *
772 * @param cls closure (set from GNUNET_MESH_connect)
773 * @param tunnel connection to the other end
774 * @param tunnel_ctx place to store local state associated with the tunnel
775 * @param sender who sent the message
776 * @param message the actual message
777 * @param atsi performance data for the connection
778 * @return GNUNET_OK to keep the connection open,
779 * GNUNET_SYSERR to close it (signal serious error)
780 */
781static int
782p2p_elements_and_requests (void *cls,
783 struct GNUNET_MESH_Tunnel * tunnel,
784 void **tunnel_ctx,
785 const struct GNUNET_PeerIdentity *sender,
786 const struct GNUNET_MessageHeader *message,
787 const struct GNUNET_ATS_Information *atsi)
788{
789 /* FIXME */
790 return GNUNET_OK;
791}
792
793
794/**
795 * Start processing consensus requests.
499 * 796 *
500 * @param cls closure 797 * @param cls closure
501 * @param server the initialized server 798 * @param server the initialized server
@@ -504,22 +801,38 @@ core_startup (void *cls,
504static void 801static void
505run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) 802run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
506{ 803{
507 struct GNUNET_CORE_Handle *my_core;
508 static const struct GNUNET_CORE_MessageHandler handlers[] = { 804 static const struct GNUNET_CORE_MessageHandler handlers[] = {
509 {NULL, 0, 0} 805 {NULL, 0, 0}
510 }; 806 };
807 static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
808 {p2p_delta_estimate, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE, 0},
809 {p2p_difference_digest, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST, 0},
810 {p2p_elements_and_requests, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_AND_REQUESTS, 0},
811 {NULL, 0, 0}
812 };
813 static const GNUNET_MESH_ApplicationType app_types[] = {
814 GNUNET_APPLICATION_TYPE_CONSENSUS,
815 GNUNET_APPLICATION_TYPE_END
816 };
511 817
512 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n"); 818 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
513 819
514 cfg = c; 820 cfg = c;
515 srv = server; 821 srv = server;
516 my_core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, handlers); 822
517 GNUNET_assert (NULL != my_core); 823 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
824
825 mesh = GNUNET_MESH_connect (cfg, NULL, new_tunnel, cleaner, mesh_handlers, app_types);
826 GNUNET_assert (NULL != mesh);
827
828 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
829 core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, handlers);
830 GNUNET_assert (NULL != core);
518} 831}
519 832
520 833
521/** 834/**
522 * The main function for the statistics service. 835 * The main function for the consensus service.
523 * 836 *
524 * @param argc number of arguments from the command line 837 * @param argc number of arguments from the command line
525 * @param argv command line arguments 838 * @param argv command line arguments
@@ -528,7 +841,8 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
528int 841int
529main (int argc, char *const *argv) 842main (int argc, char *const *argv)
530{ 843{
531 return (GNUNET_OK == 844 int ret;
532 GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL)) ? 0 : 1; 845 ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL);
846 return (GNUNET_OK == ret) ? 0 : 1;
533} 847}
534 848
diff --git a/src/consensus/test_consensus_api.c b/src/consensus/test_consensus_api.c
index 8a263171f..e752983c2 100644
--- a/src/consensus/test_consensus_api.c
+++ b/src/consensus/test_consensus_api.c
@@ -27,14 +27,9 @@
27#include "gnunet_testing_lib.h" 27#include "gnunet_testing_lib.h"
28 28
29 29
30static struct GNUNET_CONSENSUS_Handle *consensus1; 30static struct GNUNET_CONSENSUS_Handle *consensus;
31static struct GNUNET_CONSENSUS_Handle *consensus2;
32 31
33static int concluded1; 32static int insert;
34static int concluded2;
35
36static int insert1;
37static int insert2;
38 33
39static struct GNUNET_HashCode session_id; 34static struct GNUNET_HashCode session_id;
40 35
@@ -48,20 +43,12 @@ static void conclude_done (void *cls,
48 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "concluded\n"); 43 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "concluded\n");
49} 44}
50 45
51static void 46static int
52on_new_element (void *cls, 47on_new_element (void *cls,
53 struct GNUNET_CONSENSUS_Element *element) 48 struct GNUNET_CONSENSUS_Element *element)
54{ 49{
55 struct GNUNET_CONSENSUS_Handle *consensus; 50 GNUNET_assert (0);
56 51 return GNUNET_YES;
57 GNUNET_assert (NULL != element);
58
59 consensus = *(struct GNUNET_CONSENSUS_Handle **) cls;
60
61 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
62
63 GNUNET_CONSENSUS_conclude (consensus, GNUNET_TIME_UNIT_FOREVER_REL, &conclude_done, consensus);
64
65} 52}
66 53
67static void 54static void
@@ -71,7 +58,6 @@ insert_done (void *cls, int success)
71} 58}
72 59
73 60
74
75static void 61static void
76run (void *cls, 62run (void *cls,
77 const struct GNUNET_CONFIGURATION_Handle *cfg, 63 const struct GNUNET_CONFIGURATION_Handle *cfg,
@@ -89,11 +75,9 @@ run (void *cls,
89 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n"); 75 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "testing consensus api\n");
90 76
91 GNUNET_CRYPTO_hash (str, strlen (str), &session_id); 77 GNUNET_CRYPTO_hash (str, strlen (str), &session_id);
92 consensus1 = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, on_new_element, &consensus1); 78 consensus = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, on_new_element, &consensus);
79 GNUNET_assert (consensus != NULL);
93 /* 80 /*
94 consensus2 = GNUNET_CONSENSUS_create (cfg, 0, NULL, &session_id, on_new_element, &consensus2);
95 GNUNET_assert (consensus1 != NULL);
96 GNUNET_assert (consensus2 != NULL);
97 GNUNET_CONSENSUS_insert (consensus1, &el1, &insert_done, &consensus1); 81 GNUNET_CONSENSUS_insert (consensus1, &el1, &insert_done, &consensus1);
98 GNUNET_CONSENSUS_insert (consensus2, &el2, &insert_done, &consensus2); 82 GNUNET_CONSENSUS_insert (consensus2, &el2, &insert_done, &consensus2);
99 */ 83 */