diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-08-12 14:35:51 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-08-12 14:35:51 +0000 |
commit | d88cf34549d5aa0d8367ace1d5482289d4925525 (patch) | |
tree | 2b2fb0d4887b6f58534b3bd5252fe55a862a29a9 /src/consensus | |
parent | a784d46eb79547f251599075bece274f48bf3427 (diff) | |
download | gnunet-d88cf34549d5aa0d8367ace1d5482289d4925525.tar.gz gnunet-d88cf34549d5aa0d8367ace1d5482289d4925525.zip |
- fixed consensus for >2 peers
Diffstat (limited to 'src/consensus')
-rw-r--r-- | src/consensus/consensus_api.c | 1 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 4 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 237 | ||||
-rw-r--r-- | src/consensus/test_consensus.conf | 4 |
4 files changed, 190 insertions, 56 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 7690dc059..873e72ca1 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -266,6 +266,7 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | |||
266 | i->cls = idc_cls; | 266 | i->cls = idc_cls; |
267 | GNUNET_MQ_notify_sent (ev, idc_adapter, i); | 267 | GNUNET_MQ_notify_sent (ev, idc_adapter, i); |
268 | } | 268 | } |
269 | GNUNET_MQ_send (consensus->mq, ev); | ||
269 | } | 270 | } |
270 | 271 | ||
271 | 272 | ||
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index 60db8c61a..847c99a02 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c | |||
@@ -95,6 +95,7 @@ destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx) | |||
95 | static void | 95 | static void |
96 | conclude_cb (void *cls) | 96 | conclude_cb (void *cls) |
97 | { | 97 | { |
98 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus done\n"); | ||
98 | GNUNET_SCHEDULER_add_now (destroy, cls); | 99 | GNUNET_SCHEDULER_add_now (destroy, cls); |
99 | } | 100 | } |
100 | 101 | ||
@@ -274,6 +275,8 @@ peer_info_cb (void *cb_cls, | |||
274 | { | 275 | { |
275 | GNUNET_assert (0); | 276 | GNUNET_assert (0); |
276 | } | 277 | } |
278 | |||
279 | GNUNET_TESTBED_operation_done (op); | ||
277 | } | 280 | } |
278 | 281 | ||
279 | 282 | ||
@@ -302,7 +305,6 @@ test_master (void *cls, | |||
302 | 305 | ||
303 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n"); | 306 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n"); |
304 | 307 | ||
305 | |||
306 | peers = started_peers; | 308 | peers = started_peers; |
307 | 309 | ||
308 | peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); | 310 | peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); |
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 567480e58..cb7ab2c68 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -87,6 +87,30 @@ enum ConsensusRound | |||
87 | CONSENSUS_ROUND_FINISH | 87 | CONSENSUS_ROUND_FINISH |
88 | }; | 88 | }; |
89 | 89 | ||
90 | |||
91 | /** | ||
92 | * Complete information about the current round and all | ||
93 | * subrounds. | ||
94 | */ | ||
95 | struct RoundInfo | ||
96 | { | ||
97 | /** | ||
98 | * The current main round. | ||
99 | */ | ||
100 | enum ConsensusRound round; | ||
101 | /** | ||
102 | * The current exp round, valid if | ||
103 | * the main round is an exp round. | ||
104 | */ | ||
105 | uint32_t exp_round; | ||
106 | /** | ||
107 | * The current exp subround, valid if | ||
108 | * the main round is an exp round. | ||
109 | */ | ||
110 | uint32_t exp_subround; | ||
111 | }; | ||
112 | |||
113 | |||
90 | /** | 114 | /** |
91 | * A consensus session consists of one local client and the remote authorities. | 115 | * A consensus session consists of one local client and the remote authorities. |
92 | */ | 116 | */ |
@@ -217,9 +241,14 @@ struct ConsensusPeerInformation | |||
217 | struct GNUNET_SET_OperationHandle *set_op; | 241 | struct GNUNET_SET_OperationHandle *set_op; |
218 | 242 | ||
219 | /** | 243 | /** |
220 | * Has commit been called on the set_op? | 244 | * Set operation we are planning on executing with this peer. |
221 | */ | 245 | */ |
222 | int set_op_commited; | 246 | struct GNUNET_SET_OperationHandle *delayed_set_op; |
247 | |||
248 | /** | ||
249 | * Info about the round of the delayed set operation. | ||
250 | */ | ||
251 | struct RoundInfo delayed_round_info; | ||
223 | }; | 252 | }; |
224 | 253 | ||
225 | 254 | ||
@@ -277,11 +306,26 @@ destroy_session (struct ConsensusSession *session) | |||
277 | int i; | 306 | int i; |
278 | 307 | ||
279 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); | 308 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); |
309 | if (NULL != session->element_set) | ||
310 | { | ||
311 | GNUNET_SET_destroy (session->element_set); | ||
312 | session->element_set = NULL; | ||
313 | } | ||
314 | if (NULL != session->set_listener) | ||
315 | { | ||
316 | GNUNET_SET_listen_cancel (session->set_listener); | ||
317 | session->set_listener = NULL; | ||
318 | } | ||
280 | if (NULL != session->client_mq) | 319 | if (NULL != session->client_mq) |
281 | { | 320 | { |
282 | GNUNET_MQ_destroy (session->client_mq); | 321 | GNUNET_MQ_destroy (session->client_mq); |
283 | session->client_mq = NULL; | 322 | session->client_mq = NULL; |
284 | } | 323 | } |
324 | if (NULL != session->client) | ||
325 | { | ||
326 | GNUNET_SERVER_client_disconnect (session->client); | ||
327 | session->client = NULL; | ||
328 | } | ||
285 | if (NULL != session->shuffle) | 329 | if (NULL != session->shuffle) |
286 | { | 330 | { |
287 | GNUNET_free (session->shuffle); | 331 | GNUNET_free (session->shuffle); |
@@ -328,7 +372,7 @@ send_to_client_iter (void *cls, | |||
328 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got element for client\n", | 372 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got element for client\n", |
329 | session->local_peer_idx); | 373 | session->local_peer_idx); |
330 | 374 | ||
331 | ev = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); | 375 | ev = GNUNET_MQ_msg_extra (m, element->size, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); |
332 | m->element_type = htons (element->type); | 376 | m->element_type = htons (element->type); |
333 | memcpy (&m[1], element->data, element->size); | 377 | memcpy (&m[1], element->data, element->size); |
334 | GNUNET_MQ_send (session->client_mq, ev); | 378 | GNUNET_MQ_send (session->client_mq, ev); |
@@ -435,17 +479,16 @@ shuffle (struct ConsensusSession *session) | |||
435 | static void | 479 | static void |
436 | find_partners (struct ConsensusSession *session) | 480 | find_partners (struct ConsensusSession *session) |
437 | { | 481 | { |
438 | int arc; | 482 | unsigned int arc; |
483 | unsigned int num_ghosts; | ||
484 | unsigned int largest_arc; | ||
439 | int partner_idx; | 485 | int partner_idx; |
440 | int largest_arc; | ||
441 | int num_ghosts; | ||
442 | 486 | ||
443 | /* shuffled local index */ | 487 | /* shuffled local index */ |
444 | int my_idx = session->shuffle[session->local_peer_idx]; | 488 | int my_idx = session->shuffle[session->local_peer_idx]; |
445 | 489 | ||
446 | /* distance to neighboring peer in current subround */ | 490 | /* distance to neighboring peer in current subround */ |
447 | arc = 1 << session->exp_subround; | 491 | arc = 1 << session->exp_subround; |
448 | partner_idx = (my_idx + arc) % session->num_peers; | ||
449 | largest_arc = 1; | 492 | largest_arc = 1; |
450 | while (largest_arc < session->num_peers) | 493 | while (largest_arc < session->num_peers) |
451 | largest_arc <<= 1; | 494 | largest_arc <<= 1; |
@@ -456,7 +499,9 @@ find_partners (struct ConsensusSession *session) | |||
456 | if (0 == (my_idx & arc)) | 499 | if (0 == (my_idx & arc)) |
457 | { | 500 | { |
458 | /* we are outgoing */ | 501 | /* we are outgoing */ |
502 | partner_idx = (my_idx + arc) % session->num_peers; | ||
459 | session->partner_outgoing = &session->info[session->shuffle[partner_idx]]; | 503 | session->partner_outgoing = &session->info[session->shuffle[partner_idx]]; |
504 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | ||
460 | /* are we a 'ghost' of a peer that would exist if | 505 | /* are we a 'ghost' of a peer that would exist if |
461 | * the number of peers was a power of two, and thus have to partner | 506 | * the number of peers was a power of two, and thus have to partner |
462 | * with an additional peer? | 507 | * with an additional peer? |
@@ -464,22 +509,26 @@ find_partners (struct ConsensusSession *session) | |||
464 | if (my_idx < num_ghosts) | 509 | if (my_idx < num_ghosts) |
465 | { | 510 | { |
466 | int ghost_partner_idx; | 511 | int ghost_partner_idx; |
467 | ghost_partner_idx = (my_idx - arc) % session->num_peers; | 512 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "my index %d, arc %d, peers %u\n", my_idx, arc, session->num_peers); |
513 | ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers; | ||
514 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ghost partner is before %d\n", ghost_partner_idx); | ||
468 | /* platform dependent; modulo sometimes returns negative values */ | 515 | /* platform dependent; modulo sometimes returns negative values */ |
469 | if (ghost_partner_idx < 0) | 516 | if (ghost_partner_idx < 0) |
470 | ghost_partner_idx += arc; | 517 | ghost_partner_idx += session->num_peers; |
518 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ghost partner is after %d\n", ghost_partner_idx); | ||
471 | session->partner_incoming = &session->info[session->shuffle[ghost_partner_idx]]; | 519 | session->partner_incoming = &session->info[session->shuffle[ghost_partner_idx]]; |
520 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
521 | return; | ||
472 | } | 522 | } |
473 | else | 523 | session->partner_incoming = NULL; |
474 | { | 524 | return; |
475 | session->partner_incoming = NULL; | ||
476 | } | ||
477 | } | ||
478 | else | ||
479 | { | ||
480 | session->partner_outgoing = NULL; | ||
481 | session->partner_incoming = &session->info[session->shuffle[partner_idx]]; | ||
482 | } | 525 | } |
526 | partner_idx = (my_idx - (int) arc) % (int) session->num_peers; | ||
527 | if (partner_idx < 0) | ||
528 | partner_idx += session->num_peers; | ||
529 | session->partner_outgoing = NULL; | ||
530 | session->partner_incoming = &session->info[session->shuffle[partner_idx]]; | ||
531 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
483 | } | 532 | } |
484 | 533 | ||
485 | 534 | ||
@@ -497,23 +546,40 @@ set_result_cb (void *cls, | |||
497 | enum GNUNET_SET_Status status) | 546 | enum GNUNET_SET_Status status) |
498 | { | 547 | { |
499 | struct ConsensusPeerInformation *cpi = cls; | 548 | struct ConsensusPeerInformation *cpi = cls; |
549 | unsigned int remote_idx = cpi - cpi->session->info; | ||
550 | unsigned int local_idx = cpi->session->local_peer_idx; | ||
551 | |||
552 | GNUNET_assert ((cpi == cpi->session->partner_outgoing) || | ||
553 | (cpi == cpi->session->partner_incoming)); | ||
500 | 554 | ||
501 | switch (status) | 555 | switch (status) |
502 | { | 556 | { |
503 | case GNUNET_SET_STATUS_OK: | 557 | case GNUNET_SET_STATUS_OK: |
504 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: element\n"); | 558 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u: element\n", |
559 | local_idx, remote_idx); | ||
505 | break; | 560 | break; |
506 | case GNUNET_SET_STATUS_FAILURE: | 561 | case GNUNET_SET_STATUS_FAILURE: |
507 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: failure\n"); | 562 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u: failure\n", |
563 | local_idx, remote_idx); | ||
508 | cpi->set_op = NULL; | 564 | cpi->set_op = NULL; |
509 | return; | 565 | return; |
510 | case GNUNET_SET_STATUS_HALF_DONE: | 566 | case GNUNET_SET_STATUS_HALF_DONE: |
511 | case GNUNET_SET_STATUS_DONE: | 567 | case GNUNET_SET_STATUS_DONE: |
512 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: done\n"); | 568 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u: done\n", |
569 | local_idx, remote_idx); | ||
513 | cpi->exp_subround_finished = GNUNET_YES; | 570 | cpi->exp_subround_finished = GNUNET_YES; |
514 | cpi->set_op = NULL; | 571 | cpi->set_op = NULL; |
515 | if (have_exp_subround_finished (cpi->session) == GNUNET_YES) | 572 | if (have_exp_subround_finished (cpi->session) == GNUNET_YES) |
573 | { | ||
574 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: all reconciliations of subround done\n", | ||
575 | local_idx); | ||
516 | subround_over (cpi->session, NULL); | 576 | subround_over (cpi->session, NULL); |
577 | } | ||
578 | else | ||
579 | { | ||
580 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: waiting for further set results\n", | ||
581 | local_idx); | ||
582 | } | ||
517 | return; | 583 | return; |
518 | default: | 584 | default: |
519 | GNUNET_break (0); | 585 | GNUNET_break (0); |
@@ -533,6 +599,39 @@ set_result_cb (void *cls, | |||
533 | 599 | ||
534 | 600 | ||
535 | /** | 601 | /** |
602 | * Compare the round the session is in with the round of the given context message. | ||
603 | * | ||
604 | * @param session a consensus session | ||
605 | * @param round a round context message | ||
606 | * @return 0 if it's the same round, -1 if the session is in an earlier round, | ||
607 | * 1 if the session is in a later round | ||
608 | */ | ||
609 | static int | ||
610 | rounds_compare (struct ConsensusSession *session, | ||
611 | struct RoundInfo* ri) | ||
612 | { | ||
613 | if (session->current_round < ri->round) | ||
614 | return -1; | ||
615 | if (session->current_round > ri->round) | ||
616 | return 1; | ||
617 | if (session->current_round == CONSENSUS_ROUND_EXCHANGE) | ||
618 | { | ||
619 | if (session->exp_round < ri->exp_round) | ||
620 | return -1; | ||
621 | if (session->exp_round > ri->exp_round) | ||
622 | return 1; | ||
623 | if (session->exp_subround < ri->exp_subround) | ||
624 | return -1; | ||
625 | if (session->exp_subround < ri->exp_subround) | ||
626 | return 1; | ||
627 | return 0; | ||
628 | } | ||
629 | /* comparing rounds when we are not in a exp round */ | ||
630 | GNUNET_assert (0); | ||
631 | } | ||
632 | |||
633 | |||
634 | /** | ||
536 | * Do the next subround in the exp-scheme. | 635 | * Do the next subround in the exp-scheme. |
537 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | 636 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). |
538 | * | 637 | * |
@@ -557,7 +656,7 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
557 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | 656 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; |
558 | } | 657 | } |
559 | 658 | ||
560 | if (session->exp_round > NUM_EXP_ROUNDS) | 659 | if (session->exp_round >= NUM_EXP_ROUNDS) |
561 | { | 660 | { |
562 | round_over (session, NULL); | 661 | round_over (session, NULL); |
563 | return; | 662 | return; |
@@ -578,7 +677,7 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
578 | /* subrounds done, start new log-round */ | 677 | /* subrounds done, start new log-round */ |
579 | session->exp_round++; | 678 | session->exp_round++; |
580 | session->exp_subround = 0; | 679 | session->exp_subround = 0; |
581 | shuffle (session); | 680 | //shuffle (session); |
582 | } | 681 | } |
583 | else | 682 | else |
584 | { | 683 | { |
@@ -588,6 +687,10 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
588 | /* determine the incoming and outgoing partner */ | 687 | /* determine the incoming and outgoing partner */ |
589 | find_partners (session); | 688 | find_partners (session); |
590 | 689 | ||
690 | GNUNET_assert (session->partner_outgoing != &session->info[session->local_peer_idx]); | ||
691 | GNUNET_assert (session->partner_incoming != &session->info[session->local_peer_idx]); | ||
692 | |||
693 | /* initiate set operation with the outgoing partner */ | ||
591 | if (NULL != session->partner_outgoing) | 694 | if (NULL != session->partner_outgoing) |
592 | { | 695 | { |
593 | struct GNUNET_CONSENSUS_RoundContextMessage *msg; | 696 | struct GNUNET_CONSENSUS_RoundContextMessage *msg; |
@@ -606,11 +709,36 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
606 | GNUNET_SET_prepare (&session->partner_outgoing->peer_id, | 709 | GNUNET_SET_prepare (&session->partner_outgoing->peer_id, |
607 | &session->global_id, | 710 | &session->global_id, |
608 | (struct GNUNET_MessageHeader *) msg, | 711 | (struct GNUNET_MessageHeader *) msg, |
609 | 0, /* FIXME */ | 712 | 0, /* FIXME: salt */ |
610 | GNUNET_SET_RESULT_ADDED, | 713 | GNUNET_SET_RESULT_ADDED, |
611 | set_result_cb, session->partner_outgoing); | 714 | set_result_cb, session->partner_outgoing); |
715 | GNUNET_free (msg); | ||
612 | GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set); | 716 | GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set); |
613 | session->partner_outgoing->set_op_commited = GNUNET_YES; | 717 | } |
718 | |||
719 | /* commit to the delayed set operation */ | ||
720 | if ((NULL != session->partner_incoming) && (NULL != session->partner_incoming->delayed_set_op)) | ||
721 | { | ||
722 | int cmp = rounds_compare (session, &session->partner_incoming->delayed_round_info); | ||
723 | |||
724 | if (NULL != session->partner_incoming->set_op) | ||
725 | { | ||
726 | GNUNET_SET_operation_cancel (session->partner_incoming->set_op); | ||
727 | session->partner_incoming->set_op = NULL; | ||
728 | } | ||
729 | if (cmp == 0) | ||
730 | { | ||
731 | GNUNET_SET_commit (session->partner_incoming->delayed_set_op, session->element_set); | ||
732 | session->partner_incoming->set_op = session->partner_incoming->delayed_set_op; | ||
733 | session->partner_incoming->delayed_set_op = NULL; | ||
734 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d resumed delayed round with P%d\n", | ||
735 | session->local_peer_idx, (int) (session->partner_incoming - session->info)); | ||
736 | } | ||
737 | else | ||
738 | { | ||
739 | /* this should not happen -- a round has been skipped! */ | ||
740 | GNUNET_break_op (0); | ||
741 | } | ||
614 | } | 742 | } |
615 | 743 | ||
616 | #ifdef GNUNET_EXTRA_LOGGING | 744 | #ifdef GNUNET_EXTRA_LOGGING |
@@ -777,13 +905,10 @@ set_listen_cb (void *cls, | |||
777 | struct ConsensusSession *session = cls; | 905 | struct ConsensusSession *session = cls; |
778 | struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg; | 906 | struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg; |
779 | struct ConsensusPeerInformation *cpi; | 907 | struct ConsensusPeerInformation *cpi; |
908 | struct GNUNET_SET_OperationHandle *set_op; | ||
909 | struct RoundInfo round_info; | ||
780 | int index; | 910 | int index; |
781 | 911 | int cmp; | |
782 | /* FIXME: should this even happen? */ | ||
783 | /* | ||
784 | if (NULL == request) | ||
785 | return; | ||
786 | */ | ||
787 | 912 | ||
788 | if (NULL == context_msg) | 913 | if (NULL == context_msg) |
789 | { | 914 | { |
@@ -793,48 +918,53 @@ set_listen_cb (void *cls, | |||
793 | 918 | ||
794 | index = get_peer_idx (other_peer, session); | 919 | index = get_peer_idx (other_peer, session); |
795 | 920 | ||
796 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "result from %s\n", GNUNET_h2s (&other_peer->hashPubKey)); | ||
797 | |||
798 | if (index < 0) | 921 | if (index < 0) |
799 | { | 922 | { |
800 | GNUNET_break_op (0); | 923 | GNUNET_break_op (0); |
801 | return; | 924 | return; |
802 | } | 925 | } |
803 | 926 | ||
927 | round_info.round = ntohl (msg->round); | ||
928 | round_info.exp_round = ntohl (msg->exp_round); | ||
929 | round_info.exp_subround = ntohl (msg->exp_subround); | ||
930 | |||
804 | cpi = &session->info[index]; | 931 | cpi = &session->info[index]; |
805 | 932 | ||
806 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got result from P%d\n", session->local_peer_idx, index); | 933 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got set request from P%d\n", session->local_peer_idx, index); |
807 | 934 | ||
808 | switch (session->current_round) | 935 | switch (session->current_round) |
809 | { | 936 | { |
810 | case CONSENSUS_ROUND_EXCHANGE: | 937 | case CONSENSUS_ROUND_EXCHANGE: |
811 | if (ntohl (msg->round) != CONSENSUS_ROUND_EXCHANGE) | 938 | cmp = rounds_compare (session, &round_info); |
812 | { | 939 | if (cmp > 0) |
813 | GNUNET_break_op (0); | ||
814 | return; | ||
815 | } | ||
816 | if (ntohl (msg->exp_round) < session->exp_round) | ||
817 | { | ||
818 | GNUNET_break_op (0); | ||
819 | return; | ||
820 | } | ||
821 | if (ntohl (msg->exp_subround) < session->exp_subround) | ||
822 | { | 940 | { |
941 | /* the other peer is too late */ | ||
823 | GNUNET_break_op (0); | 942 | GNUNET_break_op (0); |
824 | return; | 943 | return; |
825 | } | 944 | } |
945 | /* kill old request, if any. this is legal, | ||
946 | * as the other peer would not make a new request if it would want to | ||
947 | * complete the old one! */ | ||
826 | if (NULL != cpi->set_op) | 948 | if (NULL != cpi->set_op) |
949 | { | ||
827 | GNUNET_SET_operation_cancel (cpi->set_op); | 950 | GNUNET_SET_operation_cancel (cpi->set_op); |
828 | cpi->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, | 951 | cpi->set_op = NULL; |
952 | } | ||
953 | set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, | ||
829 | set_result_cb, &session->info[index]); | 954 | set_result_cb, &session->info[index]); |
830 | if (ntohl (msg->exp_subround) == session->exp_subround) | 955 | if (cmp == 0) |
831 | { | 956 | { |
832 | cpi->set_op_commited = GNUNET_YES; | 957 | cpi->set_op = set_op; |
833 | GNUNET_SET_commit (cpi->set_op, session->element_set); | 958 | GNUNET_SET_commit (set_op, session->element_set); |
959 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d commited to set request from P%d\n", session->local_peer_idx, index); | ||
834 | } | 960 | } |
835 | else | 961 | else |
836 | { | 962 | { |
837 | cpi->set_op_commited = GNUNET_NO; | 963 | /* if there's a exp subround running, mark it as finished, as the set op has been canceled! */ |
964 | cpi->delayed_set_op = set_op; | ||
965 | cpi->delayed_round_info = round_info; | ||
966 | cpi->exp_subround_finished = GNUNET_YES; | ||
967 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d delaying set request from P%d\n", session->local_peer_idx, index); | ||
838 | } | 968 | } |
839 | break; | 969 | break; |
840 | default: | 970 | default: |
@@ -934,7 +1064,6 @@ client_join (void *cls, | |||
934 | session = GNUNET_new (struct ConsensusSession); | 1064 | session = GNUNET_new (struct ConsensusSession); |
935 | session->client = client; | 1065 | session->client = client; |
936 | session->client_mq = GNUNET_MQ_queue_for_server_client (client); | 1066 | session->client_mq = GNUNET_MQ_queue_for_server_client (client); |
937 | GNUNET_SERVER_client_keep (client); | ||
938 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); | 1067 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); |
939 | initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); | 1068 | initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); |
940 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1069 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
@@ -990,6 +1119,7 @@ client_insert (void *cls, | |||
990 | memcpy (&element[1], &msg[1], element_size); | 1119 | memcpy (&element[1], &msg[1], element_size); |
991 | element->data = &element[1]; | 1120 | element->data = &element[1]; |
992 | GNUNET_SET_add_element (session->element_set, element, NULL, NULL); | 1121 | GNUNET_SET_add_element (session->element_set, element, NULL, NULL); |
1122 | GNUNET_free (element); | ||
993 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1123 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
994 | 1124 | ||
995 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", session->local_peer_idx); | 1125 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", session->local_peer_idx); |
@@ -1030,6 +1160,7 @@ client_conclude (void *cls, | |||
1030 | } | 1160 | } |
1031 | if (session->num_peers <= 1) | 1161 | if (session->num_peers <= 1) |
1032 | { | 1162 | { |
1163 | /* FIXME: what to do here? */ | ||
1033 | //send_client_conclude_done (session); | 1164 | //send_client_conclude_done (session); |
1034 | } | 1165 | } |
1035 | else | 1166 | else |
@@ -1080,7 +1211,7 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
1080 | (CONSENSUS_ROUND_FINISH == session->current_round)) | 1211 | (CONSENSUS_ROUND_FINISH == session->current_round)) |
1081 | destroy_session (session); | 1212 | destroy_session (session); |
1082 | else | 1213 | else |
1083 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n"); | 1214 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client disconnected, but waiting for consensus to finish\n"); |
1084 | } | 1215 | } |
1085 | 1216 | ||
1086 | 1217 | ||
@@ -1131,7 +1262,7 @@ main (int argc, char *const *argv) | |||
1131 | { | 1262 | { |
1132 | int ret; | 1263 | int ret; |
1133 | ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL); | 1264 | ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL); |
1134 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); | 1265 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret); |
1135 | return (GNUNET_OK == ret) ? 0 : 1; | 1266 | return (GNUNET_OK == ret) ? 0 : 1; |
1136 | } | 1267 | } |
1137 | 1268 | ||
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf index d0190f8ba..571d378fa 100644 --- a/src/consensus/test_consensus.conf +++ b/src/consensus/test_consensus.conf | |||
@@ -5,7 +5,7 @@ HOSTNAME = localhost | |||
5 | HOME = $SERVICEHOME | 5 | HOME = $SERVICEHOME |
6 | BINARY = gnunet-service-consensus | 6 | BINARY = gnunet-service-consensus |
7 | #PREFIX = gdbserver :12345 | 7 | #PREFIX = gdbserver :12345 |
8 | PREFIX = valgrind | 8 | PREFIX = valgrind --leak-check=full |
9 | ACCEPT_FROM = 127.0.0.1; | 9 | ACCEPT_FROM = 127.0.0.1; |
10 | ACCEPT_FROM6 = ::1; | 10 | ACCEPT_FROM6 = ::1; |
11 | UNIXPATH = /tmp/gnunet-service-consensus.sock | 11 | UNIXPATH = /tmp/gnunet-service-consensus.sock |
@@ -23,7 +23,7 @@ DEFAULTSERVICES = core consensus set | |||
23 | 23 | ||
24 | [set] | 24 | [set] |
25 | OPTIONS = -L INFO | 25 | OPTIONS = -L INFO |
26 | PREFIX = valgrind | 26 | PREFIX = valgrind --leak-check=full |
27 | 27 | ||
28 | 28 | ||
29 | [testbed] | 29 | [testbed] |