aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-06-03 10:53:49 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-06-03 10:53:49 +0000
commit68403fa780bf94ace2ebc13c2c09463cbbc0b57c (patch)
tree3442e4f25de90eab67c4f9813cb6e433c50b7482 /src/consensus
parentfae7f583f2e11cac15fefcbefef64287ab6915d3 (diff)
downloadgnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.tar.gz
gnunet-68403fa780bf94ace2ebc13c2c09463cbbc0b57c.zip
- conclude for SET
- consensus with SET
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/Makefile.am4
-rw-r--r--src/consensus/consensus_api.c6
-rw-r--r--src/consensus/consensus_protocol.h11
-rw-r--r--src/consensus/gnunet-consensus.c6
-rw-r--r--src/consensus/gnunet-service-consensus.c407
-rw-r--r--src/consensus/test_consensus.conf8
6 files changed, 256 insertions, 186 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am
index a0edb1d65..914fbdef8 100644
--- a/src/consensus/Makefile.am
+++ b/src/consensus/Makefile.am
@@ -61,6 +61,8 @@ gnunet_service_consensus_LDADD = \
61 $(top_builddir)/src/mesh/libgnunetmesh.la \ 61 $(top_builddir)/src/mesh/libgnunetmesh.la \
62 $(top_builddir)/src/set/libgnunetset.la \ 62 $(top_builddir)/src/set/libgnunetset.la \
63 $(GN_LIBINTL) 63 $(GN_LIBINTL)
64gnunet_service_consensus_DEPENDENCIES = \
65 $(top_builddir)/src/set/libgnunetset.la
64 66
65gnunet_service_evil_consensus_SOURCES = \ 67gnunet_service_evil_consensus_SOURCES = \
66 gnunet-service-consensus.c 68 gnunet-service-consensus.c
@@ -71,6 +73,8 @@ gnunet_service_evil_consensus_LDADD = \
71 $(top_builddir)/src/mesh/libgnunetmesh.la \ 73 $(top_builddir)/src/mesh/libgnunetmesh.la \
72 $(top_builddir)/src/set/libgnunetset.la \ 74 $(top_builddir)/src/set/libgnunetset.la \
73 $(GN_LIBINTL) 75 $(GN_LIBINTL)
76gnunet_service_evil_consensus_DEPENDENCIES = \
77 $(top_builddir)/src/set/libgnunetset.la
74gnunet_service_evil_consensus_CFLAGS = -DEVIL 78gnunet_service_evil_consensus_CFLAGS = -DEVIL
75 79
76libgnunetconsensus_la_SOURCES = \ 80libgnunetconsensus_la_SOURCES = \
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 635a610ca..e3ddb4913 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -236,9 +236,9 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus)
236 */ 236 */
237static void 237static void
238handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, 238handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
239 struct GNUNET_CONSENSUS_ElementMessage *msg) 239 struct GNUNET_CONSENSUS_ElementMessage *msg)
240{ 240{
241 struct GNUNET_CONSENSUS_Element element; 241 struct GNUNET_SET_Element element;
242 242
243 LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); 243 LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
244 244
@@ -424,7 +424,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
424 */ 424 */
425void 425void
426GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, 426GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
427 const struct GNUNET_CONSENSUS_Element *element, 427 const struct GNUNET_SET_Element *element,
428 GNUNET_CONSENSUS_InsertDoneCallback idc, 428 GNUNET_CONSENSUS_InsertDoneCallback idc,
429 void *idc_cls) 429 void *idc_cls)
430{ 430{
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h
index af4d74100..4e0673c7d 100644
--- a/src/consensus/consensus_protocol.h
+++ b/src/consensus/consensus_protocol.h
@@ -38,12 +38,15 @@ GNUNET_NETWORK_STRUCT_BEGIN
38/** 38/**
39 * Sent as context message for set reconciliation. 39 * Sent as context message for set reconciliation.
40 */ 40 */
41struct ConsensusRoundMessage 41struct GNUNET_CONSENSUS_RoundContextMessage
42{ 42{
43 /**
44 * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT
45 */
43 struct GNUNET_MessageHeader header; 46 struct GNUNET_MessageHeader header;
44 uint8_t round; 47 uint32_t round;
45 uint8_t exp_round; 48 uint32_t exp_round;
46 uint8_t exp_subround; 49 uint32_t exp_subround;
47}; 50};
48 51
49GNUNET_NETWORK_STRUCT_END 52GNUNET_NETWORK_STRUCT_END
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index d8c1b14ee..60db8c61a 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -133,7 +133,7 @@ do_consensus ()
133 { 133 {
134 int j; 134 int j;
135 struct GNUNET_HashCode *val; 135 struct GNUNET_HashCode *val;
136 struct GNUNET_CONSENSUS_Element *element; 136 struct GNUNET_SET_Element *element;
137 generate_indices(unique_indices); 137 generate_indices(unique_indices);
138 138
139 val = GNUNET_malloc (sizeof *val); 139 val = GNUNET_malloc (sizeof *val);
@@ -151,6 +151,8 @@ do_consensus ()
151 } 151 }
152 } 152 }
153 153
154 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "all elements inserted, calling conclude\n");
155
154 for (i = 0; i < num_peers; i++) 156 for (i = 0; i < num_peers; i++)
155 GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]); 157 GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]);
156} 158}
@@ -194,7 +196,7 @@ connect_complete (void *cls,
194 196
195static void 197static void
196new_element_cb (void *cls, 198new_element_cb (void *cls,
197 const struct GNUNET_CONSENSUS_Element *element) 199 const struct GNUNET_SET_Element *element)
198{ 200{
199 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); 201 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n");
200} 202}
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 44edeb215..21123b24f 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet 2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors) 3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -43,9 +43,9 @@
43 43
44 44
45/** 45/**
46 * Number of exponential rounds, used in the inventory and completion round. 46 * Number of exponential rounds, used in the exp and completion round.
47 */ 47 */
48#define NUM_EXP_ROUNDS (4) 48#define NUM_EXP_ROUNDS 4
49 49
50/* forward declarations */ 50/* forward declarations */
51 51
@@ -155,17 +155,17 @@ struct ConsensusSession
155 * Permutation of peers for the current round, 155 * Permutation of peers for the current round,
156 * maps logical index (for current round) to physical index (location in info array) 156 * maps logical index (for current round) to physical index (location in info array)
157 */ 157 */
158 int *shuffle; 158 uint32_t *shuffle;
159 159
160 /** 160 /**
161 * Current round of the exponential scheme. 161 * Current round of the exponential scheme.
162 */ 162 */
163 int exp_round; 163 uint32_t exp_round;
164 164
165 /** 165 /**
166 * Current sub-round of the exponential scheme. 166 * Current sub-round of the exponential scheme.
167 */ 167 */
168 int exp_subround; 168 uint32_t exp_subround;
169 169
170 /** 170 /**
171 * The partner for the current exp-round 171 * The partner for the current exp-round
@@ -201,17 +201,6 @@ struct ConsensusPeerInformation
201 struct GNUNET_PeerIdentity peer_id; 201 struct GNUNET_PeerIdentity peer_id;
202 202
203 /** 203 /**
204 * Do we connect to the peer, or does the peer connect to us?
205 * Only valid for all-to-all phases
206 */
207 int is_outgoing;
208
209 /**
210 * Did we receive/send a consensus hello?
211 */
212 int hello;
213
214 /**
215 * Back-reference to the consensus session, 204 * Back-reference to the consensus session,
216 * to that ConsensusPeerInformation can be used as a closure 205 * to that ConsensusPeerInformation can be used as a closure
217 */ 206 */
@@ -223,22 +212,14 @@ struct ConsensusPeerInformation
223 int exp_subround_finished; 212 int exp_subround_finished;
224 213
225 /** 214 /**
226 * GNUNET_YES if we synced inventory with this peer; 215 * Set operation we are currently executing with this peer.
227 * GNUNET_NO otherwise.
228 */
229 int inventory_synced;
230
231 /**
232 * Round this peer seems to be in, according to the last SE we got.
233 * Necessary to store this, as we sometimes need to respond to a request from an
234 * older round, while we are already in the next round.
235 */ 216 */
236 enum ConsensusRound apparent_round; 217 struct GNUNET_SET_OperationHandle *set_op;
237 218
238 /** 219 /**
239 * Set operation we are currently executing with this peer. 220 * Has conclude been called on the set_op?
240 */ 221 */
241 struct GNUNET_SET_OperationHandle *set_op; 222 int set_op_concluded;
242}; 223};
243 224
244 225
@@ -268,9 +249,8 @@ static struct GNUNET_SERVER_Handle *srv;
268static struct GNUNET_PeerIdentity my_peer; 249static struct GNUNET_PeerIdentity my_peer;
269 250
270 251
271/*
272static int 252static int
273exp_subround_finished (const struct ConsensusSession *session) 253have_exp_subround_finished (const struct ConsensusSession *session)
274{ 254{
275 int not_finished; 255 int not_finished;
276 not_finished = 0; 256 not_finished = 0;
@@ -284,25 +264,9 @@ exp_subround_finished (const struct ConsensusSession *session)
284 return GNUNET_YES; 264 return GNUNET_YES;
285 return GNUNET_NO; 265 return GNUNET_NO;
286} 266}
287*/
288 267
289 268
290 269
291/*
292static int
293inventory_round_finished (struct ConsensusSession *session)
294{
295 int i;
296 int finished;
297 finished = 0;
298 for (i = 0; i < session->num_peers; i++)
299 if (GNUNET_YES == session->info[i].inventory_synced)
300 finished++;
301 if (finished >= (session->num_peers / 2))
302 return GNUNET_YES;
303 return GNUNET_NO;
304}
305*/
306 270
307/** 271/**
308 * Destroy a session, free all resources associated with it. 272 * Destroy a session, free all resources associated with it.
@@ -341,39 +305,6 @@ destroy_session (struct ConsensusSession *session)
341 305
342 306
343/** 307/**
344 * Start the inventory round, contact all peers we are supposed to contact.
345 *
346 * @param session the current session
347 */
348static void
349start_inventory (struct ConsensusSession *session)
350{
351 int i;
352 int last;
353
354 for (i = 0; i < session->num_peers; i++)
355 session->info[i].is_outgoing = GNUNET_NO;
356
357 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
358 i = (session->local_peer_idx + 1) % session->num_peers;
359 while (i != last)
360 {
361 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
362 session->info[i].is_outgoing = GNUNET_YES;
363 // embrace_peer (&session->info[i]);
364 i = (i + 1) % session->num_peers;
365 }
366 // tie-breaker for even number of peers
367 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
368 {
369 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
370 session->info[last].is_outgoing = GNUNET_YES;
371 // embrace_peer (&session->info[last]);
372 }
373}
374
375
376/**
377 * Start the next round. 308 * Start the next round.
378 * This function can be invoked as a timeout task, or called manually (tc will be NULL then). 309 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
379 * 310 *
@@ -407,27 +338,8 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
407 subround_over (session, NULL); 338 subround_over (session, NULL);
408 break; 339 break;
409 case CONSENSUS_ROUND_EXCHANGE: 340 case CONSENSUS_ROUND_EXCHANGE:
410 /* handle two peers specially */ 341 /* FIXME: send all elements to client */
411 if (session->num_peers <= 2)
412 {
413 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx);
414 //GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session);
415 //send_client_conclude_done (session);
416 session->current_round = CONSENSUS_ROUND_FINISH;
417 return;
418 }
419 session->current_round = CONSENSUS_ROUND_INVENTORY;
420 start_inventory (session);
421 break;
422 case CONSENSUS_ROUND_INVENTORY:
423 session->current_round = CONSENSUS_ROUND_COMPLETION;
424 session->exp_round = 0;
425 subround_over (session, NULL);
426 break;
427 case CONSENSUS_ROUND_COMPLETION:
428 session->current_round = CONSENSUS_ROUND_FINISH; 342 session->current_round = CONSENSUS_ROUND_FINISH;
429 //send_client_conclude_done (session);
430 break;
431 default: 343 default:
432 GNUNET_assert (0); 344 GNUNET_assert (0);
433 } 345 }
@@ -435,68 +347,91 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
435 347
436 348
437/** 349/**
438 * Adapt the shuffle of the session for the current round. 350 * Create a new permutation for the session's peers in session->shuffle.
351 * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
352 * both the global session id and the current round index.
353 *
354 * @param session the session to create the new permutation for
439 */ 355 */
440static void 356static void
441shuffle (struct ConsensusSession *session) 357shuffle (struct ConsensusSession *session)
442{ 358{
443 /* adapted from random_permute in util/crypto_random.c */ 359 uint32_t i;
444 /* FIXME 360 uint32_t randomness[session->num_peers-1];
445 unsigned int *ret; 361
446 unsigned int i; 362 if (NULL == session->shuffle)
447 unsigned int tmp; 363 session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
448 uint32_t x; 364
365 GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), &session->exp_round, sizeof (uint32_t),
366 &session->global_id, sizeof (struct GNUNET_HashCode));
367
368 for (i = 0; i < session->num_peers; i++)
369 session->shuffle[i] = i;
449 370
450 GNUNET_assert (n > 0); 371 for (i = session->num_peers - 1; i > 0; i--)
451 ret = GNUNET_malloc (n * sizeof (unsigned int));
452 for (i = 0; i < n; i++)
453 ret[i] = i;
454 for (i = n - 1; i > 0; i--)
455 { 372 {
456 x = GNUNET_CRYPTO_random_u32 (mode, i + 1); 373 uint32_t x;
457 tmp = ret[x]; 374 uint32_t tmp;
458 ret[x] = ret[i]; 375 x = randomness[i-1];
459 ret[i] = tmp; 376 tmp = session->shuffle[x];
377 session->shuffle[x] = session->shuffle[i];
378 session->shuffle[i] = tmp;
460 } 379 }
461 */
462} 380}
463 381
464 382
465/** 383/**
466 * Find and set the partner_incoming and partner_outgoing of our peer, 384 * Find and set the partner_incoming and partner_outgoing of our peer,
467 * one of them may not exist in most cases. 385 * one of them may not exist (and thus set to NULL) if the number of peers
386 * in the session is not a power of two.
468 * 387 *
469 * @param session the consensus session 388 * @param session the consensus session
470 */ 389 */
471static void 390static void
472find_partners (struct ConsensusSession *session) 391find_partners (struct ConsensusSession *session)
473{ 392{
474 int mark[session->num_peers]; 393 int arc;
475 int i; 394 int partner_idx;
476 395 int largest_arc;
477 memset (mark, 0, session->num_peers * sizeof (int)); 396 int num_ghosts;
478 session->partner_incoming = session->partner_outgoing = NULL; 397
479 for (i = 0; i < session->num_peers; i++) 398 /* distance to neighboring peer in current subround */
399 arc = 1 << session->exp_subround;
400 partner_idx = (session->local_peer_idx + arc) % session->num_peers;
401 largest_arc = 1;
402 while (largest_arc < session->num_peers)
403 largest_arc <<= 1;
404 num_ghosts = largest_arc - session->num_peers;
405
406 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "num ghosts: %d\n", num_ghosts);
407
408 if (0 == (session->local_peer_idx & arc))
480 { 409 {
481 int arc; 410 /* we are outgoing */
482 if (0 != mark[i]) 411 session->partner_outgoing = &session->info[session->shuffle[partner_idx]];
483 continue; 412 /* are we a 'ghost' of a peer that would exist if
484 arc = (i + (1 << session->exp_subround)) % session->num_peers; 413 * the number of peers was a power of two, and thus have to partner
485 mark[i] = mark[arc] = 1; 414 * with an additional peer?
486 GNUNET_assert (i != arc); 415 */
487 if (i == session->local_peer_idx) 416 if (session->local_peer_idx < num_ghosts)
488 { 417 {
489 GNUNET_assert (NULL == session->partner_outgoing); 418 int ghost_partner_idx;
490 session->partner_outgoing = &session->info[session->shuffle[arc]]; 419 ghost_partner_idx = (session->local_peer_idx - arc) % session->num_peers;
491 session->partner_outgoing->exp_subround_finished = GNUNET_NO; 420 /* platform dependent; modulo sometimes returns negative values */
421 if (ghost_partner_idx < 0)
422 ghost_partner_idx += arc;
423 session->partner_incoming = &session->info[session->shuffle[ghost_partner_idx]];
492 } 424 }
493 if (arc == session->local_peer_idx) 425 else
494 { 426 {
495 GNUNET_assert (NULL == session->partner_incoming); 427 session->partner_incoming = NULL;
496 session->partner_incoming = &session->info[session->shuffle[i]];
497 session->partner_incoming->exp_subround_finished = GNUNET_NO;
498 } 428 }
499 } 429 }
430 else
431 {
432 session->partner_outgoing = NULL;
433 session->partner_incoming = &session->info[session->shuffle[partner_idx]];
434 }
500} 435}
501 436
502 437
@@ -508,11 +443,42 @@ find_partners (struct ConsensusSession *session)
508 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK 443 * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
509 * @param status see enum GNUNET_SET_Status 444 * @param status see enum GNUNET_SET_Status
510 */ 445 */
511static void set_result_cb (void *cls, 446static void
512 const struct GNUNET_SET_Element *element, 447set_result_cb (void *cls,
513 enum GNUNET_SET_Status status) 448 const struct GNUNET_SET_Element *element,
449 enum GNUNET_SET_Status status)
514{ 450{
515 /* FIXME */ 451 struct ConsensusPeerInformation *cpi = cls;
452
453 switch (status)
454 {
455 case GNUNET_SET_STATUS_OK:
456 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: element\n");
457 break;
458 case GNUNET_SET_STATUS_FAILURE:
459 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: failure\n");
460 break;
461 case GNUNET_SET_STATUS_HALF_DONE:
462 case GNUNET_SET_STATUS_DONE:
463 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: done\n");
464 cpi->exp_subround_finished = GNUNET_YES;
465 if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
466 subround_over (cpi->session, NULL);
467 return;
468 default:
469 GNUNET_break (0);
470 return;
471 }
472
473 switch (cpi->session->current_round)
474 {
475 case CONSENSUS_ROUND_EXCHANGE:
476 GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
477 break;
478 default:
479 GNUNET_break (0);
480 return;
481 }
516} 482}
517 483
518 484
@@ -540,14 +506,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
540 GNUNET_SCHEDULER_cancel (session->round_timeout_tid); 506 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
541 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; 507 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
542 } 508 }
543 /* check if we are done with the log phase, 2-peer consensus only does one log round */
544 if ( (session->exp_round == NUM_EXP_ROUNDS) ||
545 ((session->num_peers == 2) && (session->exp_round == 1)))
546 {
547 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
548 round_over (session, NULL);
549 return;
550 }
551 if (session->exp_round == 0) 509 if (session->exp_round == 0)
552 { 510 {
553 /* initialize everything for the log-rounds */ 511 /* initialize everything for the log-rounds */
@@ -575,18 +533,27 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
575 533
576 if (NULL != session->partner_outgoing) 534 if (NULL != session->partner_outgoing)
577 { 535 {
536 struct GNUNET_CONSENSUS_RoundContextMessage *msg;
537 msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
538 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
539 msg->header.size = htons (sizeof *msg);
540 msg->round = htonl (session->current_round);
541 msg->exp_round = htonl (session->exp_round);
542 msg->exp_subround = htonl (session->exp_subround);
543
578 if (NULL != session->partner_outgoing->set_op) 544 if (NULL != session->partner_outgoing->set_op)
545 {
579 GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); 546 GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
547 }
580 session->partner_outgoing->set_op = 548 session->partner_outgoing->set_op =
581 GNUNET_SET_evaluate (session->element_set, 549 GNUNET_SET_evaluate (&session->partner_outgoing->peer_id,
582 &session->partner_outgoing->peer_id,
583 &session->global_id, 550 &session->global_id,
584 NULL, /* FIXME */ 551 (struct GNUNET_MessageHeader *) msg,
585 0, /* FIXME */ 552 0, /* FIXME */
586 GNUNET_SET_RESULT_ADDED, 553 GNUNET_SET_RESULT_ADDED,
587 set_result_cb, session); 554 set_result_cb, session->partner_outgoing);
588 555 GNUNET_SET_conclude (session->partner_outgoing->set_op, session->element_set);
589 556 session->partner_outgoing->set_op_concluded = GNUNET_YES;
590 } 557 }
591 558
592#ifdef GNUNET_EXTRA_LOGGING 559#ifdef GNUNET_EXTRA_LOGGING
@@ -642,6 +609,8 @@ compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCod
642 int i; 609 int i;
643 struct GNUNET_HashCode tmp; 610 struct GNUNET_HashCode tmp;
644 611
612 /* FIXME: use kdf? */
613
645 session->global_id = *session_id; 614 session->global_id = *session_id;
646 for (i = 0; i < session->num_peers; ++i) 615 for (i = 0; i < session->num_peers; ++i)
647 { 616 {
@@ -727,9 +696,6 @@ initialize_session_peer_list (struct ConsensusSession *session,
727} 696}
728 697
729 698
730
731
732
733/** 699/**
734 * Called when another peer wants to do a set operation with the 700 * Called when another peer wants to do a set operation with the
735 * local peer. 701 * local peer.
@@ -750,7 +716,67 @@ set_listen_cb (void *cls,
750 const struct GNUNET_MessageHeader *context_msg, 716 const struct GNUNET_MessageHeader *context_msg,
751 struct GNUNET_SET_Request *request) 717 struct GNUNET_SET_Request *request)
752{ 718{
753 /* FIXME */ 719 struct ConsensusSession *session = cls;
720 struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
721 struct ConsensusPeerInformation *cpi;
722 int index;
723
724 if (NULL == context_msg)
725 {
726 GNUNET_break_op (0);
727 return;
728 }
729
730 index = get_peer_idx (other_peer, session);
731
732 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "result from %s\n", GNUNET_h2s (&other_peer->hashPubKey));
733
734 if (index < 0)
735 {
736 GNUNET_break_op (0);
737 return;
738 }
739
740 cpi = &session->info[index];
741
742 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got result from P%d\n", session->local_peer_idx, index);
743
744 switch (session->current_round)
745 {
746 case CONSENSUS_ROUND_EXCHANGE:
747 if (ntohl (msg->round) != CONSENSUS_ROUND_EXCHANGE)
748 {
749 GNUNET_break_op (0);
750 return;
751 }
752 if (ntohl (msg->exp_round) < session->exp_round)
753 {
754 GNUNET_break_op (0);
755 return;
756 }
757 if (ntohl (msg->exp_subround) < session->exp_subround)
758 {
759 GNUNET_break_op (0);
760 return;
761 }
762 if (NULL != cpi->set_op)
763 GNUNET_SET_operation_cancel (cpi->set_op);
764 cpi->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
765 set_result_cb, &session->info[index]);
766 if (ntohl (msg->exp_subround) == session->exp_subround)
767 {
768 cpi->set_op_concluded = GNUNET_YES;
769 GNUNET_SET_conclude (cpi->set_op, session->element_set);
770 }
771 else
772 {
773 cpi->set_op_concluded = GNUNET_NO;
774 }
775 break;
776 default:
777 GNUNET_break_op (0);
778 return;
779 }
754} 780}
755 781
756 782
@@ -769,7 +795,9 @@ initialize_session (struct ConsensusSession *session,
769 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); 795 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
770 compute_global_id (session, &join_msg->session_id); 796 compute_global_id (session, &join_msg->session_id);
771 797
772 /* check if some local client already owns the session. */ 798 /* check if some local client already owns the session.
799 * it is only legal to have a session with an existing global id
800 * if all other sessions with this global id are finished.*/
773 other_session = sessions_head; 801 other_session = sessions_head;
774 while (NULL != other_session) 802 while (NULL != other_session)
775 { 803 {
@@ -789,6 +817,8 @@ initialize_session (struct ConsensusSession *session,
789 817
790 session->local_peer_idx = get_peer_idx (&my_peer, session); 818 session->local_peer_idx = get_peer_idx (&my_peer, session);
791 GNUNET_assert (-1 != session->local_peer_idx); 819 GNUNET_assert (-1 != session->local_peer_idx);
820 session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
821 GNUNET_assert (NULL != session->element_set);
792 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, 822 session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
793 &session->global_id, 823 &session->global_id,
794 set_listen_cb, session); 824 set_listen_cb, session);
@@ -827,6 +857,8 @@ client_join (void *cls,
827{ 857{
828 struct ConsensusSession *session; 858 struct ConsensusSession *session;
829 859
860 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join message sent by client\n");
861
830 session = get_session_by_client (client); 862 session = get_session_by_client (client);
831 if (NULL != session) 863 if (NULL != session)
832 { 864 {
@@ -835,9 +867,13 @@ client_join (void *cls,
835 return; 867 return;
836 } 868 }
837 session = GNUNET_new (struct ConsensusSession); 869 session = GNUNET_new (struct ConsensusSession);
870 session->client = client;
838 GNUNET_SERVER_client_keep (client); 871 GNUNET_SERVER_client_keep (client);
839 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); 872 GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
840 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); 873 initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
874 GNUNET_SERVER_receive_done (client, GNUNET_OK);
875
876 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join done\n");
841} 877}
842 878
843 879
@@ -858,12 +894,7 @@ client_insert (void *cls,
858 struct GNUNET_SET_Element *element; 894 struct GNUNET_SET_Element *element;
859 ssize_t element_size; 895 ssize_t element_size;
860 896
861 session = sessions_head; 897 session = get_session_by_client (client);
862 while (NULL != session)
863 {
864 if (session->client == client)
865 break;
866 }
867 898
868 if (NULL == session) 899 if (NULL == session)
869 { 900 {
@@ -886,6 +917,7 @@ client_insert (void *cls,
886 GNUNET_break (0); 917 GNUNET_break (0);
887 return; 918 return;
888 } 919 }
920
889 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); 921 element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
890 element->type = msg->element_type; 922 element->type = msg->element_type;
891 element->size = element_size; 923 element->size = element_size;
@@ -893,6 +925,8 @@ client_insert (void *cls,
893 element->data = &element[1]; 925 element->data = &element[1];
894 GNUNET_SET_add_element (session->element_set, element, NULL, NULL); 926 GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
895 GNUNET_SERVER_receive_done (client, GNUNET_OK); 927 GNUNET_SERVER_receive_done (client, GNUNET_OK);
928
929 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", session->local_peer_idx);
896} 930}
897 931
898 932
@@ -911,10 +945,10 @@ client_conclude (void *cls,
911 struct ConsensusSession *session; 945 struct ConsensusSession *session;
912 struct GNUNET_CONSENSUS_ConcludeMessage *cmsg; 946 struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
913 947
914 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
915 948
949 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
950 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
916 session = get_session_by_client (client); 951 session = get_session_by_client (client);
917
918 if (NULL == session) 952 if (NULL == session)
919 { 953 {
920 /* client not found */ 954 /* client not found */
@@ -922,14 +956,12 @@ client_conclude (void *cls,
922 GNUNET_SERVER_client_disconnect (client); 956 GNUNET_SERVER_client_disconnect (client);
923 return; 957 return;
924 } 958 }
925
926 if (CONSENSUS_ROUND_BEGIN != session->current_round) 959 if (CONSENSUS_ROUND_BEGIN != session->current_round)
927 { 960 {
928 /* client requested conclude twice */ 961 /* client requested conclude twice */
929 GNUNET_break (0); 962 GNUNET_break (0);
930 return; 963 return;
931 } 964 }
932
933 if (session->num_peers <= 1) 965 if (session->num_peers <= 1)
934 { 966 {
935 //send_client_conclude_done (session); 967 //send_client_conclude_done (session);
@@ -937,7 +969,7 @@ client_conclude (void *cls,
937 else 969 else
938 { 970 {
939 session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); 971 session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
940 /* the 'begin' round is over, start with the next, real round */ 972 /* the 'begin' round is over, start with the next, actual round */
941 round_over (session, NULL); 973 round_over (session, NULL);
942 } 974 }
943 975
@@ -964,6 +996,29 @@ shutdown_task (void *cls,
964 996
965 997
966/** 998/**
999 * Clean up after a client after it is
1000 * disconnected (either by us or by itself)
1001 *
1002 * @param cls closure, unused
1003 * @param client the client to clean up after
1004 */
1005void
1006handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
1007{
1008 struct ConsensusSession *session;
1009
1010 session = get_session_by_client (client);
1011 if (NULL == session)
1012 return;
1013 if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
1014 (CONSENSUS_ROUND_FINISH == session->current_round))
1015 destroy_session (session);
1016 else
1017 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
1018}
1019
1020
1021/**
967 * Start processing consensus requests. 1022 * Start processing consensus requests.
968 * 1023 *
969 * @param cls closure 1024 * @param cls closure
@@ -971,13 +1026,14 @@ shutdown_task (void *cls,
971 * @param c configuration to use 1026 * @param c configuration to use
972 */ 1027 */
973static void 1028static void
974run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) 1029run (void *cls, struct GNUNET_SERVER_Handle *server,
1030 const struct GNUNET_CONFIGURATION_Handle *c)
975{ 1031{
976 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { 1032 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
977 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
978 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
979 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, 1033 {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
980 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, 1034 sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
1035 {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
1036 {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
981 {NULL, NULL, 0, 0} 1037 {NULL, NULL, 0, 0}
982 }; 1038 };
983 1039
@@ -992,6 +1048,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
992 } 1048 }
993 GNUNET_SERVER_add_handlers (server, server_handlers); 1049 GNUNET_SERVER_add_handlers (server, server_handlers);
994 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); 1050 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1051 GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
995 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); 1052 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
996} 1053}
997 1054
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf
index 437636c99..37facf84e 100644
--- a/src/consensus/test_consensus.conf
+++ b/src/consensus/test_consensus.conf
@@ -5,7 +5,7 @@ HOSTNAME = localhost
5HOME = $SERVICEHOME 5HOME = $SERVICEHOME
6BINARY = gnunet-service-consensus 6BINARY = gnunet-service-consensus
7#PREFIX = gdbserver :12345 7#PREFIX = gdbserver :12345
8PREFIX = valgrind --leak-check=full 8PREFIX = valgrind
9ACCEPT_FROM = 127.0.0.1; 9ACCEPT_FROM = 127.0.0.1;
10ACCEPT_FROM6 = ::1; 10ACCEPT_FROM6 = ::1;
11UNIXPATH = /tmp/gnunet-service-consensus.sock 11UNIXPATH = /tmp/gnunet-service-consensus.sock
@@ -19,7 +19,11 @@ OPTIONS = -LERROR
19 19
20 20
21[arm] 21[arm]
22DEFAULTSERVICES = core consensus 22DEFAULTSERVICES = core consensus set
23
24[set]
25OPTIONS = -L INFO
26PREFIX = valgrind
23 27
24 28
25[testbed] 29[testbed]