aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-03-21 01:06:40 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-03-21 01:06:40 +0000
commitf69659c664077034fcf135acb49b6e680937af1d (patch)
tree325b219bce0adef70b603d17146ed28838f0f037 /src/consensus
parent9f0ab5f654c9d5bc1a04c3728ff65f74b5c1f741 (diff)
downloadgnunet-f69659c664077034fcf135acb49b6e680937af1d.tar.gz
gnunet-f69659c664077034fcf135acb49b6e680937af1d.zip
fixed consensus multi-peer communication, memory leaks, various bugs
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/consensus_flout.h60
-rw-r--r--src/consensus/consensus_protocol.h4
-rw-r--r--src/consensus/gnunet-consensus-ibf.c2
-rw-r--r--src/consensus/gnunet-consensus.c23
-rw-r--r--src/consensus/gnunet-service-consensus.c849
-rw-r--r--src/consensus/ibf.c16
-rw-r--r--src/consensus/ibf.h22
-rw-r--r--src/consensus/test_consensus.conf2
8 files changed, 669 insertions, 309 deletions
diff --git a/src/consensus/consensus_flout.h b/src/consensus/consensus_flout.h
new file mode 100644
index 000000000..99b4475b9
--- /dev/null
+++ b/src/consensus/consensus_flout.h
@@ -0,0 +1,60 @@
1/*
2 This file is part of GNUnet
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19 */
20
21/**
22 * @file consensus/consensus_flout.h
23 * @brief intentionally misbehave in certain ways for testing
24 * @author Florian Dold
25 */
26
27#ifndef GNUNET_CONSENSUS_FLOUT_H
28#define GNUNET_CONSENSUS_FLOUT_H
29
30#ifdef __cplusplus
31extern "C"
32{
33#if 0 /* keep Emacsens' auto-indent happy */
34}
35#endif
36#endif
37
38#include "platform.h"
39#include "gnunet_common.h"
40#include "gnunet_consensus_service.h"
41
42void
43GNUNET_CONSENSUS_flout_disable_peer (struct GNUNET_CONSENSUS_Handle *consensus);
44
45void
46GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_HashCode *element_hash);
47
48void
49GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_HashCode *element_hash);
50
51
52
53#if 0 /* keep Emacsens' auto-indent happy */
54{
55#endif
56#ifdef __cplusplus
57}
58#endif
59
60#endif
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h
index c0420d55c..32c3d8b09 100644
--- a/src/consensus/consensus_protocol.h
+++ b/src/consensus/consensus_protocol.h
@@ -76,12 +76,10 @@ struct ConsensusHello
76 struct GNUNET_HashCode global_id; 76 struct GNUNET_HashCode global_id;
77}; 77};
78 78
79struct ConsensusRoundHeader 79struct ConsensusRoundMessage
80{ 80{
81 struct GNUNET_MessageHeader header; 81 struct GNUNET_MessageHeader header;
82 uint8_t round; 82 uint8_t round;
83 uint8_t exp_round;
84 uint8_t exp_subround;
85}; 83};
86 84
87 85
diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c
index f4a233ece..73dc31b56 100644
--- a/src/consensus/gnunet-consensus-ibf.c
+++ b/src/consensus/gnunet-consensus-ibf.c
@@ -19,7 +19,7 @@
19*/ 19*/
20 20
21/** 21/**
22 * @file consensus/gnunet-consensus-ibf 22 * @file consensus/gnunet-consensus-ibf.c
23 * @brief tool for reconciling data with invertible bloom filters 23 * @brief tool for reconciling data with invertible bloom filters
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index 7a951d35b..4525fc719 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -39,6 +39,8 @@ static struct GNUNET_TIME_Relative conclude_timeout;
39 39
40static struct GNUNET_CONSENSUS_Handle **consensus_handles; 40static struct GNUNET_CONSENSUS_Handle **consensus_handles;
41 41
42static struct GNUNET_TESTBED_Operation **testbed_operations;
43
42static unsigned int num_connected_handles; 44static unsigned int num_connected_handles;
43 45
44static struct GNUNET_TESTBED_Peer **peers; 46static struct GNUNET_TESTBED_Peer **peers;
@@ -49,6 +51,8 @@ static unsigned int num_retrieved_peer_ids;
49 51
50static struct GNUNET_HashCode session_id; 52static struct GNUNET_HashCode session_id;
51 53
54static unsigned int peers_done = 0;
55
52 56
53/** 57/**
54 * Signature of the event handler function called by the 58 * Signature of the event handler function called by the
@@ -64,7 +68,6 @@ controller_cb(void *cls,
64 GNUNET_assert (0); 68 GNUNET_assert (0);
65} 69}
66 70
67
68static void 71static void
69destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx) 72destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx)
70{ 73{
@@ -72,14 +75,21 @@ destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx)
72 consensus = cls; 75 consensus = cls;
73 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying consensus\n"); 76 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying consensus\n");
74 GNUNET_CONSENSUS_destroy (consensus); 77 GNUNET_CONSENSUS_destroy (consensus);
78 peers_done++;
79 if (peers_done == num_peers)
80 {
81 int i;
82 for (i = 0; i < num_peers; i++)
83 GNUNET_TESTBED_operation_done (testbed_operations[i]);
84 GNUNET_SCHEDULER_shutdown ();
85 }
75} 86}
76 87
77 88
78/** 89/**
79 * Called when a conclusion was successful. 90 * Called when a conclusion was successful.
80 * 91 *
81 * @param cls 92 * @param cls closure, the consensus handle
82 * @param group
83 * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not 93 * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not
84 */ 94 */
85static void 95static void
@@ -255,8 +265,9 @@ peer_info_cb (void *cb_cls,
255 num_retrieved_peer_ids++; 265 num_retrieved_peer_ids++;
256 if (num_retrieved_peer_ids == num_peers) 266 if (num_retrieved_peer_ids == num_peers)
257 for (i = 0; i < num_peers; i++) 267 for (i = 0; i < num_peers; i++)
258 GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus", connect_complete, &consensus_handles[i], 268 testbed_operations[i] =
259 connect_adapter, disconnect_adapter, NULL); 269 GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus", connect_complete, &consensus_handles[i],
270 connect_adapter, disconnect_adapter, NULL);
260 } 271 }
261 else 272 else
262 { 273 {
@@ -272,7 +283,6 @@ test_master (void *cls,
272{ 283{
273 int i; 284 int i;
274 285
275
276 GNUNET_log_setup ("gnunet-consensus", "INFO", NULL); 286 GNUNET_log_setup ("gnunet-consensus", "INFO", NULL);
277 287
278 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n"); 288 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n");
@@ -283,6 +293,7 @@ test_master (void *cls,
283 peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); 293 peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
284 294
285 consensus_handles = GNUNET_malloc (num_peers * sizeof (struct ConsensusHandle *)); 295 consensus_handles = GNUNET_malloc (num_peers * sizeof (struct ConsensusHandle *));
296 testbed_operations = GNUNET_malloc (num_peers * sizeof (struct ConsensusHandle *));
286 297
287 for (i = 0; i < num_peers; i++) 298 for (i = 0; i < num_peers; i++)
288 GNUNET_TESTBED_peer_get_information (peers[i], 299 GNUNET_TESTBED_peer_get_information (peers[i],
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index e300d9169..479e7ebdd 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -124,6 +124,42 @@ struct PendingElement
124}; 124};
125 125
126 126
127struct ElementList
128{
129 struct ElementList *next;
130 struct GNUNET_CONSENSUS_Element *element;
131 struct GNUNET_HashCode *element_hash;
132};
133
134
135/**
136 * Describes the current round a consensus session is in.
137 */
138enum ConsensusRound
139{
140 /**
141 * Not started the protocol yet.
142 */
143 CONSENSUS_ROUND_BEGIN=0,
144 /**
145 * Distribution of elements with the exponential scheme.
146 */
147 CONSENSUS_ROUND_EXCHANGE,
148 /**
149 * Exchange which elements each peer has, but not the elements.
150 */
151 CONSENSUS_ROUND_INVENTORY,
152 /**
153 * Collect and distribute missing values.
154 */
155 CONSENSUS_ROUND_STOCK,
156 /**
157 * Consensus concluded.
158 */
159 CONSENSUS_ROUND_FINISH
160};
161
162
127/** 163/**
128 * Information about a peer that is in a consensus session. 164 * Information about a peer that is in a consensus session.
129 */ 165 */
@@ -148,8 +184,6 @@ struct ConsensusPeerInformation
148 */ 184 */
149 int is_outgoing; 185 int is_outgoing;
150 186
151 int connected;
152
153 /** 187 /**
154 * Did we receive/send a consensus hello? 188 * Did we receive/send a consensus hello?
155 */ 189 */
@@ -246,6 +280,15 @@ struct ConsensusPeerInformation
246 */ 280 */
247 int exp_subround_finished; 281 int exp_subround_finished;
248 282
283 int inventory_synced;
284
285 /**
286 * Round this peer seems to be in, according to the last SE we got.
287 * Necessary to store this, as we sometimes need to respond to a request from an
288 * older round, while we are already in the next round.
289 */
290 enum ConsensusRound apparent_round;
291
249}; 292};
250 293
251typedef void (*QueuedMessageCallback) (void *msg); 294typedef void (*QueuedMessageCallback) (void *msg);
@@ -272,32 +315,6 @@ struct QueuedMessage
272 void *cls; 315 void *cls;
273}; 316};
274 317
275/**
276 * Describes the current round a consensus session is in.
277 */
278enum ConsensusRound
279{
280 /**
281 * Not started the protocol yet.
282 */
283 CONSENSUS_ROUND_BEGIN=0,
284 /**
285 * Distribution of elements with the exponential scheme.
286 */
287 CONSENSUS_ROUND_EXCHANGE,
288 /**
289 * Exchange which elements each peer has, but not the elements.
290 */
291 CONSENSUS_ROUND_INVENTORY,
292 /**
293 * Collect and distribute missing values.
294 */
295 CONSENSUS_ROUND_STOCK,
296 /**
297 * Consensus concluded.
298 */
299 CONSENSUS_ROUND_FINISH
300};
301 318
302struct StrataEstimator 319struct StrataEstimator
303{ 320{
@@ -342,7 +359,8 @@ struct ConsensusSession
342 /** 359 /**
343 * Elements in the consensus set of this session, 360 * Elements in the consensus set of this session,
344 * all of them either have been sent by or approved by the client. 361 * all of them either have been sent by or approved by the client.
345 * Contains GNUNET_CONSENSUS_Element. 362 * Contains ElementList.
363 * Used as a unique-key hashmap.
346 */ 364 */
347 struct GNUNET_CONTAINER_MultiHashMap *values; 365 struct GNUNET_CONTAINER_MultiHashMap *values;
348 366
@@ -544,6 +562,8 @@ queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHea
544 * 562 *
545 * @param cpi peer 563 * @param cpi peer
546 * @param msg message we want to queue 564 * @param msg message we want to queue
565 * @param cb callback, called when the message is given to strem
566 * @param cls closure for cb
547 */ 567 */
548static void 568static void
549queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls) 569queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls)
@@ -572,14 +592,14 @@ queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageH
572} 592}
573 593
574 594
575 595/*
576static void 596static void
577clear_peer_messages (struct ConsensusPeerInformation *cpi) 597clear_peer_messages (struct ConsensusPeerInformation *cpi)
578{ 598{
579 /* FIXME: deallocate */
580 cpi->messages_head = NULL; 599 cpi->messages_head = NULL;
581 cpi->messages_tail = NULL; 600 cpi->messages_tail = NULL;
582} 601}
602*/
583 603
584 604
585/** 605/**
@@ -592,8 +612,8 @@ clear_peer_messages (struct ConsensusPeerInformation *cpi)
592 * @return the estimated difference 612 * @return the estimated difference
593 */ 613 */
594static int 614static int
595estimate_difference (struct StrataEstimator *se1, 615estimate_difference (const struct StrataEstimator *se1,
596 struct StrataEstimator *se2) 616 const struct StrataEstimator *se2)
597{ 617{
598 int i; 618 int i;
599 int count; 619 int count;
@@ -701,42 +721,36 @@ incoming_stream_data_processor (void *cls,
701} 721}
702 722
703 723
704/** 724static void
705 * Iterator over hash map entries. 725send_elements (struct ConsensusPeerInformation *cpi, struct ElementList *head)
706 * Queue elements to be sent to the peer in cls.
707 *
708 * @param cls closure
709 * @param key current key code
710 * @param value value in the hash map
711 * @return GNUNET_YES if we should continue to
712 * iterate,
713 * GNUNET_NO if not.
714 */
715static int
716send_element_iter (void *cls,
717 const struct GNUNET_HashCode *key,
718 void *value)
719{ 726{
720 struct ConsensusPeerInformation *cpi;
721 struct GNUNET_CONSENSUS_Element *element; 727 struct GNUNET_CONSENSUS_Element *element;
722 struct GNUNET_MessageHeader *element_msg; 728 struct GNUNET_MessageHeader *element_msg;
723 size_t msize; 729 size_t msize;
724 730
725 cpi = cls; 731 while (NULL != head)
726 element = value; 732 {
727 msize = sizeof (struct GNUNET_MessageHeader) + element->size; 733 element = head->element;
728 element_msg = GNUNET_malloc (msize); 734 msize = sizeof (struct GNUNET_MessageHeader) + element->size;
729 element_msg->size = htons (msize); 735 element_msg = GNUNET_malloc (msize);
730 if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) 736 element_msg->size = htons (msize);
731 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); 737 switch (cpi->apparent_round)
732 else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round) 738 {
733 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); 739 case CONSENSUS_ROUND_STOCK:
734 else 740 case CONSENSUS_ROUND_EXCHANGE:
735 GNUNET_assert (0); 741 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
736 GNUNET_assert (NULL != element->data); 742 break;
737 memcpy (&element_msg[1], element->data, element->size); 743 case CONSENSUS_ROUND_INVENTORY:
738 queue_peer_message (cpi, element_msg); 744 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
739 return GNUNET_YES; 745 break;
746 default:
747 GNUNET_break (0);
748 }
749 GNUNET_assert (NULL != element->data);
750 memcpy (&element_msg[1], element->data, element->size);
751 queue_peer_message (cpi, element_msg);
752 head = head->next;
753 }
740} 754}
741 755
742/** 756/**
@@ -755,8 +769,13 @@ ibf_values_iterator (void *cls,
755 void *value) 769 void *value)
756{ 770{
757 struct ConsensusPeerInformation *cpi; 771 struct ConsensusPeerInformation *cpi;
772 struct ElementList *head;
773 struct IBF_Key ibf_key;
758 cpi = cls; 774 cpi = cls;
759 ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key_from_hashcode (key)); 775 head = value;
776 ibf_key = ibf_key_from_hashcode (head->element_hash);
777 GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val);
778 ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key);
760 return GNUNET_YES; 779 return GNUNET_YES;
761} 780}
762 781
@@ -764,7 +783,7 @@ ibf_values_iterator (void *cls,
764 * Create and populate an IBF for the specified peer, 783 * Create and populate an IBF for the specified peer,
765 * if it does not already exist. 784 * if it does not already exist.
766 * 785 *
767 * @param peer to create the ibf for 786 * @param cpi peer to create the ibf for
768 */ 787 */
769static void 788static void
770prepare_ibf (struct ConsensusPeerInformation *cpi) 789prepare_ibf (struct ConsensusPeerInformation *cpi)
@@ -791,22 +810,68 @@ handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GN
791 GNUNET_assert (0); 810 GNUNET_assert (0);
792} 811}
793 812
794static void 813
795fin_sent_cb (void *cls) 814static int
815exp_subround_finished (const struct ConsensusSession *session)
796{ 816{
797 struct ConsensusPeerInformation *cpi;
798 int not_finished; 817 int not_finished;
799 cpi = cls;
800 cpi->exp_subround_finished = GNUNET_YES;
801 /* the subround is only really over if *both* partners are done */
802 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
803 not_finished = 0; 818 not_finished = 0;
804 if ((cpi->session->partner_outgoing != NULL) && (cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO)) 819 if ((session->partner_outgoing != NULL) && (session->partner_outgoing->exp_subround_finished == GNUNET_NO))
805 not_finished++; 820 not_finished++;
806 if ((cpi->session->partner_incoming != NULL) && (cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO)) 821 if ((session->partner_incoming != NULL) && (session->partner_incoming->exp_subround_finished == GNUNET_NO))
807 not_finished++; 822 not_finished++;
808 if (0 == not_finished) 823 if (0 == not_finished)
809 subround_over (cpi->session, NULL); 824 return GNUNET_YES;
825 return GNUNET_NO;
826}
827
828static int
829inventory_round_finished (struct ConsensusSession *session)
830{
831 int i;
832 int finished;
833 finished = 0;
834 for (i = 0; i < session->num_peers; i++)
835 if (GNUNET_YES == session->info[i].inventory_synced)
836 finished++;
837 if (finished >= (session->num_peers / 2))
838 return GNUNET_YES;
839 return GNUNET_NO;
840}
841
842
843
844static void
845fin_sent_cb (void *cls)
846{
847 struct ConsensusPeerInformation *cpi;
848 cpi = cls;
849 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx);
850 switch (cpi->session->current_round)
851 {
852 case CONSENSUS_ROUND_EXCHANGE:
853 case CONSENSUS_ROUND_STOCK:
854 /* the subround is only really over if *both* partners are done */
855 if (cpi->session->current_round != cpi->apparent_round)
856 {
857 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx);
858 break;
859 }
860 cpi->exp_subround_finished = GNUNET_YES;
861 if (GNUNET_YES == exp_subround_finished (cpi->session))
862 subround_over (cpi->session, NULL);
863 else
864 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx);
865 break;
866 case CONSENSUS_ROUND_INVENTORY:
867 cpi->inventory_synced = GNUNET_YES;
868 if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round)
869 round_over (cpi->session, NULL);
870 /* FIXME: maybe go to next round */
871 break;
872 default:
873 GNUNET_break (0);
874 }
810} 875}
811 876
812 877
@@ -817,17 +882,23 @@ fin_sent_cb (void *cls)
817static int 882static int
818handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) 883handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
819{ 884{
820 struct GNUNET_MessageHeader *fin_msg; 885 struct ConsensusRoundMessage *fin_msg;
886
821 switch (cpi->session->current_round) 887 switch (cpi->session->current_round)
822 { 888 {
889 case CONSENSUS_ROUND_INVENTORY:
890 cpi->inventory_synced = GNUNET_YES;
891 case CONSENSUS_ROUND_STOCK:
823 case CONSENSUS_ROUND_EXCHANGE: 892 case CONSENSUS_ROUND_EXCHANGE:
824 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); 893 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
825 fin_msg = GNUNET_malloc (sizeof *fin_msg); 894 fin_msg = GNUNET_malloc (sizeof *fin_msg);
826 fin_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); 895 fin_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
827 fin_msg->size = htons (sizeof *fin_msg); 896 fin_msg->header.size = htons (sizeof *fin_msg);
897 fin_msg->round = cpi->apparent_round;
828 /* the subround os over once we kicked off sending the fin msg */ 898 /* the subround os over once we kicked off sending the fin msg */
829 /* FIXME: assert we are talking to the right peer! */ 899 /* FIXME: assert we are talking to the right peer! */
830 queue_peer_message_with_cls (cpi, fin_msg, fin_sent_cb, cpi); 900 queue_peer_message_with_cls (cpi, (struct GNUNET_MessageHeader *) fin_msg, fin_sent_cb, cpi);
901 /* FIXME: mark peer as synced */
831 break; 902 break;
832 default: 903 default:
833 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); 904 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
@@ -836,30 +907,40 @@ handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_Mes
836 return GNUNET_YES; 907 return GNUNET_YES;
837} 908}
838 909
910
839/** 911/**
840 * The other peer wants us to inform that he sent us all the elements we requested. 912 * The other peer wants us to inform that he sent us all the elements we requested.
841 */ 913 */
842static int 914static int
843handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) 915handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
844{ 916{
917 struct ConsensusRoundMessage *round_msg;
918 round_msg = (struct ConsensusRoundMessage *) msg;
845 /* FIXME: only call subround_over if round is the current one! */ 919 /* FIXME: only call subround_over if round is the current one! */
846 switch (cpi->session->current_round) 920 switch (cpi->session->current_round)
847 { 921 {
848 case CONSENSUS_ROUND_EXCHANGE: 922 case CONSENSUS_ROUND_EXCHANGE:
849 { 923 case CONSENSUS_ROUND_STOCK:
850 int not_finished; 924 if (cpi->session->current_round != round_msg->round)
925 {
926 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
927 cpi->ibf_state = IBF_STATE_NONE;
928 cpi->ibf_bucket_counter = 0;
929 break;
930 }
931 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
851 cpi->exp_subround_finished = GNUNET_YES; 932 cpi->exp_subround_finished = GNUNET_YES;
852 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); 933 if (GNUNET_YES == exp_subround_finished (cpi->session))
853 /* the subround is only really over if *both* partners are done */
854 not_finished = 0;
855 if ((cpi->session->partner_outgoing != NULL) && (cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO))
856 not_finished++;
857 if ((cpi->session->partner_incoming != NULL) && (cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO))
858 not_finished++;
859 if (0 == not_finished)
860 subround_over (cpi->session, NULL); 934 subround_over (cpi->session, NULL);
861 } 935 else
936 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx);
862 break; 937 break;
938 case CONSENSUS_ROUND_INVENTORY:
939 cpi->inventory_synced = GNUNET_YES;
940 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
941 if (inventory_round_finished (cpi->session))
942 round_over (cpi->session, NULL);
943 break;
863 default: 944 default:
864 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); 945 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n");
865 break; 946 break;
@@ -884,6 +965,38 @@ strata_estimator_create ()
884 return se; 965 return se;
885} 966}
886 967
968static void
969strata_estimator_destroy (struct StrataEstimator *se)
970{
971 int i;
972 for (i = 0; i < STRATA_COUNT; i++)
973 ibf_destroy (se->strata[i]);
974 GNUNET_free (se->strata);
975 GNUNET_free (se);
976}
977
978
979static int
980is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg)
981{
982 switch (strata_msg->round)
983 {
984 case CONSENSUS_ROUND_STOCK:
985 case CONSENSUS_ROUND_EXCHANGE:
986 /* here, we also have to compare subrounds */
987 if ( (strata_msg->round != session->current_round) ||
988 (strata_msg->exp_round != session->exp_round) ||
989 (strata_msg->exp_subround != session->exp_subround))
990 return GNUNET_YES;
991 break;
992 default:
993 if (session->current_round != strata_msg->round)
994 return GNUNET_YES;
995 break;
996 }
997 return GNUNET_NO;
998}
999
887 1000
888/** 1001/**
889 * Called when a peer sends us its strata estimator. 1002 * Called when a peer sends us its strata estimator.
@@ -900,32 +1013,28 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
900 void *buf; 1013 void *buf;
901 size_t size; 1014 size_t size;
902 1015
903 1016 if ((cpi->session->current_round == CONSENSUS_ROUND_STOCK) && (strata_msg->round == CONSENSUS_ROUND_INVENTORY))
904
905 switch (cpi->session->current_round)
906 { 1017 {
907 case CONSENSUS_ROUND_EXCHANGE: 1018 /* we still have to handle this request appropriately */
908 if ( (strata_msg->round != CONSENSUS_ROUND_EXCHANGE) || 1019 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n",
909 (strata_msg->exp_round != cpi->session->exp_round) || 1020 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
910 (strata_msg->exp_subround != cpi->session->exp_subround)) 1021 }
911 { 1022 else if (is_premature_strata_message (cpi->session, strata_msg))
912 if (GNUNET_NO == cpi->replaying_strata_message) 1023 {
913 { 1024 if (GNUNET_NO == cpi->replaying_strata_message)
914 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n", 1025 {
915 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround); 1026 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n",
916 cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg); 1027 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround);
917 } 1028 cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg);
918 return GNUNET_YES; 1029 }
919 } 1030 return GNUNET_YES;
920 break;
921 default:
922 GNUNET_assert (0);
923 break;
924 } 1031 }
925 1032
926 if (NULL == cpi->se) 1033 if (NULL == cpi->se)
927 cpi->se = strata_estimator_create (); 1034 cpi->se = strata_estimator_create ();
928 1035
1036 cpi->apparent_round = strata_msg->round;
1037
929 size = ntohs (strata_msg->header.size); 1038 size = ntohs (strata_msg->header.size);
930 buf = (void *) &strata_msg[1]; 1039 buf = (void *) &strata_msg[1];
931 for (i = 0; i < STRATA_COUNT; i++) 1040 for (i = 0; i < STRATA_COUNT; i++)
@@ -937,25 +1046,32 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
937 1046
938 diff = estimate_difference (cpi->session->se, cpi->se); 1047 diff = estimate_difference (cpi->session->se, cpi->se);
939 1048
940 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d\n", 1049 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n",
941 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); 1050 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff);
942 1051
943 if ( (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) || 1052 switch (cpi->session->current_round)
944 (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round))
945 { 1053 {
946 /* send IBF of the right size */ 1054 case CONSENSUS_ROUND_EXCHANGE:
947 cpi->ibf_order = 0; 1055 case CONSENSUS_ROUND_INVENTORY:
948 while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) ) 1056 case CONSENSUS_ROUND_STOCK:
949 cpi->ibf_order++; 1057 /* send IBF of the right size */
950 if (cpi->ibf_order > MAX_IBF_ORDER) 1058 cpi->ibf_order = 0;
951 cpi->ibf_order = MAX_IBF_ORDER; 1059 while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) )
952 cpi->ibf_order += 1; 1060 cpi->ibf_order++;
953 /* create ibf if not already pre-computed */ 1061 if (cpi->ibf_order > MAX_IBF_ORDER)
954 prepare_ibf (cpi); 1062 cpi->ibf_order = MAX_IBF_ORDER;
955 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); 1063 cpi->ibf_order += 1;
956 cpi->ibf_state = IBF_STATE_TRANSMITTING; 1064 /* create ibf if not already pre-computed */
957 cpi->ibf_bucket_counter = 0; 1065 prepare_ibf (cpi);
958 send_ibf (cpi); 1066 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1067 cpi->ibf_state = IBF_STATE_TRANSMITTING;
1068 cpi->ibf_bucket_counter = 0;
1069 send_ibf (cpi);
1070 break;
1071 default:
1072 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n",
1073 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1074 break;
959 } 1075 }
960 return GNUNET_YES; 1076 return GNUNET_YES;
961} 1077}
@@ -973,7 +1089,7 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig
973 switch (cpi->ibf_state) 1089 switch (cpi->ibf_state)
974 { 1090 {
975 case IBF_STATE_NONE: 1091 case IBF_STATE_NONE:
976 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving first ibf of order %d\n", digest->order); 1092 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
977 cpi->ibf_state = IBF_STATE_RECEIVING; 1093 cpi->ibf_state = IBF_STATE_RECEIVING;
978 cpi->ibf_order = digest->order; 1094 cpi->ibf_order = digest->order;
979 cpi->ibf_bucket_counter = 0; 1095 cpi->ibf_bucket_counter = 0;
@@ -984,7 +1100,8 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig
984 } 1100 }
985 break; 1101 break;
986 case IBF_STATE_ANTICIPATE_DIFF: 1102 case IBF_STATE_ANTICIPATE_DIFF:
987 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving decode fail ibf of order %d\n", digest->order); 1103 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n",
1104 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
988 cpi->ibf_state = IBF_STATE_RECEIVING; 1105 cpi->ibf_state = IBF_STATE_RECEIVING;
989 cpi->ibf_order = digest->order; 1106 cpi->ibf_order = digest->order;
990 cpi->ibf_bucket_counter = 0; 1107 cpi->ibf_bucket_counter = 0;
@@ -997,18 +1114,16 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig
997 case IBF_STATE_RECEIVING: 1114 case IBF_STATE_RECEIVING:
998 break; 1115 break;
999 default: 1116 default:
1000 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "received ibf unexpectedly in state %d\n", cpi->ibf_state); 1117 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1001 return GNUNET_YES; 1118 return GNUNET_YES;
1002 } 1119 }
1003 1120
1004 if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) 1121 if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
1005 { 1122 {
1006 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n"); 1123 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1007 return GNUNET_YES; 1124 return GNUNET_YES;
1008 } 1125 }
1009 1126
1010 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets,
1011 cpi->ibf_bucket_counter, (1 << cpi->ibf_order));
1012 1127
1013 if (NULL == cpi->ibf) 1128 if (NULL == cpi->ibf)
1014 cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); 1129 cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
@@ -1041,6 +1156,17 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me
1041 struct GNUNET_CONSENSUS_ElementMessage *client_element_msg; 1156 struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
1042 size_t size; 1157 size_t size;
1043 1158
1159 switch (cpi->session->current_round)
1160 {
1161 case CONSENSUS_ROUND_STOCK:
1162 /* FIXME: check if we really expect the element */
1163 case CONSENSUS_ROUND_EXCHANGE:
1164 break;
1165 default:
1166 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n");
1167 return GNUNET_YES;
1168 }
1169
1044 size = ntohs (element_msg->size) - sizeof *element_msg; 1170 size = ntohs (element_msg->size) - sizeof *element_msg;
1045 1171
1046 element = GNUNET_malloc (size + sizeof *element); 1172 element = GNUNET_malloc (size + sizeof *element);
@@ -1069,7 +1195,9 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me
1069 1195
1070/** 1196/**
1071 * Handle a request for elements. 1197 * Handle a request for elements.
1072 * Only allowed in exchange-rounds. 1198 *
1199 * @param cpi peer that is requesting the element
1200 * @param msg the element request message
1073 */ 1201 */
1074static int 1202static int
1075handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) 1203handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
@@ -1078,14 +1206,18 @@ handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct E
1078 struct IBF_Key *ibf_key; 1206 struct IBF_Key *ibf_key;
1079 unsigned int num; 1207 unsigned int num;
1080 1208
1209 /* element requests are allowed in every round */
1210
1081 num = ntohs (msg->header.size) / sizeof (struct IBF_Key); 1211 num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
1082 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num); 1212 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num);
1083 1213
1084 ibf_key = (struct IBF_Key *) &msg[1]; 1214 ibf_key = (struct IBF_Key *) &msg[1];
1085 while (num--) 1215 while (num--)
1086 { 1216 {
1217 struct ElementList *head;
1087 ibf_hashcode_from_key (*ibf_key, &hashcode); 1218 ibf_hashcode_from_key (*ibf_key, &hashcode);
1088 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi); 1219 head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
1220 send_elements (cpi, head);
1089 ibf_key++; 1221 ibf_key++;
1090 } 1222 }
1091 return GNUNET_YES; 1223 return GNUNET_YES;
@@ -1162,10 +1294,12 @@ send_strata_estimator (struct ConsensusPeerInformation *cpi)
1162 size_t msize; 1294 size_t msize;
1163 int i; 1295 int i;
1164 1296
1297 cpi->apparent_round = cpi->session->current_round;
1298 cpi->ibf_state = IBF_STATE_NONE;
1299 cpi->ibf_bucket_counter = 0;
1165 1300
1166 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE to P%d\n", 1301 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE(%d) to P%d\n",
1167 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); 1302 cpi->session->local_peer_idx, cpi->session->current_round, (int) (cpi - cpi->session->info));
1168
1169 1303
1170 msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); 1304 msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1171 1305
@@ -1185,21 +1319,20 @@ send_strata_estimator (struct ConsensusPeerInformation *cpi)
1185 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg); 1319 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg);
1186} 1320}
1187 1321
1322
1188/** 1323/**
1189 * Send an IBF of the order specified in cpi 1324 * Send an IBF of the order specified in cpi.
1190 * 1325 *
1191 * @param cpi the peer 1326 * @param cpi the peer
1192 */ 1327 */
1193static void 1328static void
1194send_ibf (struct ConsensusPeerInformation *cpi) 1329send_ibf (struct ConsensusPeerInformation *cpi)
1195{ 1330{
1196 int sent_buckets;
1197
1198 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", 1331 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
1199 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); 1332 cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1200 1333
1201 sent_buckets = 0; 1334 cpi->ibf_bucket_counter = 0;
1202 while (sent_buckets < (1 << cpi->ibf_order)) 1335 while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
1203 { 1336 {
1204 int num_buckets; 1337 int num_buckets;
1205 void *buf; 1338 void *buf;
@@ -1223,12 +1356,18 @@ send_ibf (struct ConsensusPeerInformation *cpi)
1223 1356
1224 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest); 1357 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest);
1225 1358
1226 sent_buckets += num_buckets; 1359 cpi->ibf_bucket_counter += num_buckets;
1227 } 1360 }
1361 cpi->ibf_bucket_counter = 0;
1228 cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; 1362 cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
1229} 1363}
1230 1364
1231 1365
1366/**
1367 * Decode the current diff ibf, and send elements/requests/reports/
1368 *
1369 * @param cpi partner peer
1370 */
1232static void 1371static void
1233decode (struct ConsensusPeerInformation *cpi) 1372decode (struct ConsensusPeerInformation *cpi)
1234{ 1373{
@@ -1236,11 +1375,12 @@ decode (struct ConsensusPeerInformation *cpi)
1236 struct GNUNET_HashCode hashcode; 1375 struct GNUNET_HashCode hashcode;
1237 int side; 1376 int side;
1238 1377
1239 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n"); 1378 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
1240 1379
1241 for (;;) 1380 for (;;)
1242 { 1381 {
1243 int res; 1382 int res;
1383
1244 res = ibf_decode (cpi->ibf, &side, &key); 1384 res = ibf_decode (cpi->ibf, &side, &key);
1245 if (GNUNET_SYSERR == res) 1385 if (GNUNET_SYSERR == res)
1246 { 1386 {
@@ -1256,19 +1396,22 @@ decode (struct ConsensusPeerInformation *cpi)
1256 } 1396 }
1257 if (GNUNET_NO == res) 1397 if (GNUNET_NO == res)
1258 { 1398 {
1259 struct GNUNET_MessageHeader *msg; 1399 struct ConsensusRoundMessage *msg;
1260 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); 1400 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx);
1261 msg = GNUNET_malloc (sizeof *msg); 1401 msg = GNUNET_malloc (sizeof *msg);
1262 msg->size = htons (sizeof *msg); 1402 msg->header.size = htons (sizeof *msg);
1263 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); 1403 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
1264 queue_peer_message (cpi, msg); 1404 msg->round = cpi->apparent_round;
1405 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg);
1265 return; 1406 return;
1266 } 1407 }
1267 if (-1 == side) 1408 if (-1 == side)
1268 { 1409 {
1410 struct ElementList *head;
1269 /* we have the element(s), send it to the other peer */ 1411 /* we have the element(s), send it to the other peer */
1270 ibf_hashcode_from_key (key, &hashcode); 1412 ibf_hashcode_from_key (key, &hashcode);
1271 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi); 1413 head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
1414 send_elements (cpi, head);
1272 } 1415 }
1273 else 1416 else
1274 { 1417 {
@@ -1278,17 +1421,18 @@ decode (struct ConsensusPeerInformation *cpi)
1278 1421
1279 msize = (sizeof *msg) + sizeof (struct IBF_Key); 1422 msize = (sizeof *msg) + sizeof (struct IBF_Key);
1280 msg = GNUNET_malloc (msize); 1423 msg = GNUNET_malloc (msize);
1281 if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) 1424 switch (cpi->apparent_round)
1282 {
1283 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1284 }
1285 else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)
1286 {
1287 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1288 }
1289 else
1290 { 1425 {
1291 GNUNET_assert (0); 1426 case CONSENSUS_ROUND_STOCK:
1427 /* FIXME: check if we really want to request the element */
1428 case CONSENSUS_ROUND_EXCHANGE:
1429 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1430 break;
1431 case CONSENSUS_ROUND_INVENTORY:
1432 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
1433 break;
1434 default:
1435 GNUNET_assert (0);
1292 } 1436 }
1293 msg->header.size = htons (msize); 1437 msg->header.size = htons (msize);
1294 p = (struct IBF_Key *) &msg[1]; 1438 p = (struct IBF_Key *) &msg[1];
@@ -1308,7 +1452,6 @@ decode (struct ConsensusPeerInformation *cpi)
1308 * @param cls closure 1452 * @param cls closure
1309 * @param client identification of the client 1453 * @param client identification of the client
1310 * @param message the actual message 1454 * @param message the actual message
1311 *
1312 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing 1455 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1313 */ 1456 */
1314static int 1457static int
@@ -1403,6 +1546,36 @@ listen_cb (void *cls,
1403 1546
1404 1547
1405/** 1548/**
1549 * Iterator over hash map entries.
1550 *
1551 * @param cls closure
1552 * @param key current key code
1553 * @param value value in the hash map
1554 * @return GNUNET_YES if we should continue to
1555 * iterate,
1556 * GNUNET_NO if not.
1557 */
1558static int
1559destroy_element_list_iter (void *cls,
1560 const struct GNUNET_HashCode * key,
1561 void *value)
1562{
1563 struct ElementList *el;
1564 el = value;
1565 while (NULL != el)
1566 {
1567 struct ElementList *el_old;
1568 el_old = el;
1569 el = el->next;
1570 GNUNET_free (el_old->element_hash);
1571 GNUNET_free (el_old->element);
1572 GNUNET_free (el_old);
1573 }
1574 return GNUNET_YES;
1575}
1576
1577
1578/**
1406 * Destroy a session, free all resources associated with it. 1579 * Destroy a session, free all resources associated with it.
1407 * 1580 *
1408 * @param session the session to destroy 1581 * @param session the session to destroy
@@ -1410,9 +1583,80 @@ listen_cb (void *cls,
1410static void 1583static void
1411destroy_session (struct ConsensusSession *session) 1584destroy_session (struct ConsensusSession *session)
1412{ 1585{
1413 /* FIXME: more stuff to free! */ 1586 int i;
1587
1414 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); 1588 GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
1415 GNUNET_SERVER_client_drop (session->client); 1589 GNUNET_SERVER_client_drop (session->client);
1590 session->client = NULL;
1591 if (NULL != session->shuffle)
1592 {
1593 GNUNET_free (session->shuffle);
1594 session->shuffle = NULL;
1595 }
1596 if (NULL != session->se)
1597 {
1598 strata_estimator_destroy (session->se);
1599 session->se = NULL;
1600 }
1601 if (NULL != session->info)
1602 {
1603 for (i = 0; i < session->num_peers; i++)
1604 {
1605 struct ConsensusPeerInformation *cpi;
1606 cpi = &session->info[i];
1607 if ((NULL != cpi) && (NULL != cpi->socket))
1608 {
1609 if (NULL != cpi->rh)
1610 {
1611 GNUNET_STREAM_read_cancel (cpi->rh);
1612 cpi->rh = NULL;
1613 }
1614 if (NULL != cpi->wh)
1615 {
1616 GNUNET_STREAM_write_cancel (cpi->wh);
1617 cpi->wh = NULL;
1618 }
1619 GNUNET_STREAM_close (cpi->socket);
1620 cpi->socket = NULL;
1621 }
1622 if (NULL != cpi->se)
1623 {
1624 strata_estimator_destroy (cpi->se);
1625 cpi->se = NULL;
1626 }
1627 if (NULL != cpi->ibf)
1628 {
1629 ibf_destroy (cpi->ibf);
1630 cpi->ibf = NULL;
1631 }
1632 if (NULL != cpi->mst)
1633 {
1634 GNUNET_SERVER_mst_destroy (cpi->mst);
1635 cpi->mst = NULL;
1636 }
1637 }
1638 GNUNET_free (session->info);
1639 session->info = NULL;
1640 }
1641 if (NULL != session->ibfs)
1642 {
1643 for (i = 0; i <= MAX_IBF_ORDER; i++)
1644 {
1645 if (NULL != session->ibfs[i])
1646 {
1647 ibf_destroy (session->ibfs[i]);
1648 session->ibfs[i] = NULL;
1649 }
1650 }
1651 GNUNET_free (session->ibfs);
1652 session->ibfs = NULL;
1653 }
1654 if (NULL != session->values)
1655 {
1656 GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_list_iter, NULL);
1657 GNUNET_CONTAINER_multihashmap_destroy (session->values);
1658 session->values = NULL;
1659 }
1416 GNUNET_free (session); 1660 GNUNET_free (session);
1417} 1661}
1418 1662
@@ -1448,6 +1692,7 @@ disconnect_client (struct GNUNET_SERVER_Client *client)
1448 * Thus, if the local id of two consensus sessions coincide, but are not comprised of 1692 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
1449 * exactly the same peers, the global id will be different. 1693 * exactly the same peers, the global id will be different.
1450 * 1694 *
1695 * @param session session to generate the global id for
1451 * @param session_id local id of the consensus session 1696 * @param session_id local id of the consensus session
1452 */ 1697 */
1453static void 1698static void
@@ -1511,9 +1756,9 @@ transmit_queued (void *cls, size_t size,
1511 1756
1512 1757
1513/** 1758/**
1514 * Schedule transmitting the next queued message (if any) to a client. 1759 * Schedule transmitting the next queued message (if any) to the inhabiting client of a session.
1515 * 1760 *
1516 * @param cli the client to send the next message to 1761 * @param session the consensus session
1517 */ 1762 */
1518static void 1763static void
1519client_send_next (struct ConsensusSession *session) 1764client_send_next (struct ConsensusSession *session)
@@ -1545,9 +1790,9 @@ client_send_next (struct ConsensusSession *session)
1545 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2. 1790 * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
1546 */ 1791 */
1547static int 1792static int
1548hash_cmp (const void *a, const void *b) 1793hash_cmp (const void *h1, const void *h2)
1549{ 1794{
1550 return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b); 1795 return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2);
1551} 1796}
1552 1797
1553 1798
@@ -1607,6 +1852,7 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1607 cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); 1852 cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
1608 cpi->wh = 1853 cpi->wh =
1609 GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); 1854 GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
1855 GNUNET_free (hello);
1610 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 1856 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1611 &session_stream_data_processor, cpi); 1857 &session_stream_data_processor, cpi);
1612} 1858}
@@ -1821,12 +2067,52 @@ hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret)
1821} 2067}
1822 2068
1823 2069
2070static void
2071insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element)
2072{
2073 struct GNUNET_HashCode hash;
2074 struct ElementList *head;
2075
2076 hash_for_ibf (element->data, element->size, &hash);
2077
2078 head = GNUNET_CONTAINER_multihashmap_get (session->values, &hash);
2079
2080 if (NULL == head)
2081 {
2082 int i;
2083
2084 head = GNUNET_malloc (sizeof *head);
2085 head->element = element;
2086 head->next = NULL;
2087 head->element_hash = GNUNET_memdup (&hash, sizeof hash);
2088 GNUNET_CONTAINER_multihashmap_put (session->values, &hash, head,
2089 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2090 strata_estimator_insert (session->se, &hash);
2091
2092 for (i = 0; i <= MAX_IBF_ORDER; i++)
2093 if (NULL != session->ibfs[i])
2094 ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&hash));
2095 }
2096 else
2097 {
2098 struct ElementList *el;
2099 el = GNUNET_malloc (sizeof *el);
2100 head->element = element;
2101 head->next = NULL;
2102 head->element_hash = GNUNET_memdup (&hash, sizeof hash);
2103 while (NULL != head->next)
2104 head = head->next;
2105 head->next = el;
2106 }
2107}
2108
2109
1824/** 2110/**
1825 * Called when a client performs an insert operation. 2111 * Called when a client performs an insert operation.
1826 * 2112 *
1827 * @param cls (unused) 2113 * @param cls (unused)
1828 * @param client client handle 2114 * @param client client handle
1829 * @param message message sent by the client 2115 * @param m message sent by the client
1830 */ 2116 */
1831void 2117void
1832client_insert (void *cls, 2118client_insert (void *cls,
@@ -1836,7 +2122,6 @@ client_insert (void *cls,
1836 struct ConsensusSession *session; 2122 struct ConsensusSession *session;
1837 struct GNUNET_CONSENSUS_ElementMessage *msg; 2123 struct GNUNET_CONSENSUS_ElementMessage *msg;
1838 struct GNUNET_CONSENSUS_Element *element; 2124 struct GNUNET_CONSENSUS_Element *element;
1839 struct GNUNET_HashCode hash;
1840 int element_size; 2125 int element_size;
1841 2126
1842 session = sessions_head; 2127 session = sessions_head;
@@ -1865,12 +2150,7 @@ client_insert (void *cls,
1865 2150
1866 GNUNET_assert (NULL != element->data); 2151 GNUNET_assert (NULL != element->data);
1867 2152
1868 hash_for_ibf (element->data, element_size, &hash); 2153 insert_element (session, element);
1869
1870 GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element,
1871 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1872
1873 strata_estimator_insert (session->se, &hash);
1874 2154
1875 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2155 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1876 2156
@@ -1951,18 +2231,41 @@ find_partners (struct ConsensusSession *session)
1951 { 2231 {
1952 GNUNET_assert (NULL == session->partner_outgoing); 2232 GNUNET_assert (NULL == session->partner_outgoing);
1953 session->partner_outgoing = &session->info[session->shuffle[arc]]; 2233 session->partner_outgoing = &session->info[session->shuffle[arc]];
2234 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
1954 } 2235 }
1955 if (arc == session->local_peer_idx) 2236 if (arc == session->local_peer_idx)
1956 { 2237 {
1957 GNUNET_assert (NULL == session->partner_incoming); 2238 GNUNET_assert (NULL == session->partner_incoming);
1958 session->partner_incoming = &session->info[session->shuffle[i]]; 2239 session->partner_incoming = &session->info[session->shuffle[i]];
2240 session->partner_incoming->exp_subround_finished = GNUNET_NO;
1959 } 2241 }
1960 } 2242 }
1961} 2243}
1962 2244
1963 2245
2246static void
2247replay_premature_message (struct ConsensusPeerInformation *cpi)
2248{
2249 if (NULL != cpi->premature_strata_message)
2250 {
2251 struct StrataMessage *sm;
2252
2253 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
2254 sm = cpi->premature_strata_message;
2255 cpi->premature_strata_message = NULL;
2256
2257 cpi->replaying_strata_message = GNUNET_YES;
2258 handle_p2p_strata (cpi, sm);
2259 cpi->replaying_strata_message = GNUNET_NO;
2260
2261 GNUNET_free (sm);
2262 }
2263}
2264
2265
1964/** 2266/**
1965 * Do the next subround in the exp-scheme. 2267 * Do the next subround in the exp-scheme.
2268 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
1966 * 2269 *
1967 * @param cls the session 2270 * @param cls the session
1968 * @param tc task context, for when this task is invoked by the scheduler, 2271 * @param tc task context, for when this task is invoked by the scheduler,
@@ -1977,35 +2280,29 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1977 /* don't kick off next subround if we're shutting down */ 2280 /* don't kick off next subround if we're shutting down */
1978 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 2281 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1979 return; 2282 return;
1980
1981 session = cls; 2283 session = cls;
1982 2284 /* don't send any messages from the last round */
1983 2285 /*
2286 clear_peer_messages (session->partner_outgoing);
2287 clear_peer_messages (session->partner_incoming);
1984 for (i = 0; i < session->num_peers; i++) 2288 for (i = 0; i < session->num_peers; i++)
1985 clear_peer_messages (&session->info[i]); 2289 clear_peer_messages (&session->info[i]);
1986 2290 */
2291 /* cancel timeout */
1987 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) 2292 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
1988 {
1989 GNUNET_SCHEDULER_cancel (session->round_timeout_tid); 2293 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
1990 }
1991 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; 2294 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
1992 2295 /* check if we are done with the log phase, 2-peer consensus only does one log round */
1993 if ((session->num_peers == 2) && (session->exp_round == 1)) 2296 if ( (session->exp_round == NUM_EXP_ROUNDS) ||
2297 ((session->num_peers == 2) && (session->exp_round == 1)))
1994 { 2298 {
1995 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n"); 2299 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
1996 round_over (session, NULL); 2300 round_over (session, NULL);
1997 return; 2301 return;
1998 } 2302 }
1999
2000 if (session->exp_round == NUM_EXP_ROUNDS)
2001 {
2002 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n");
2003 round_over (session, NULL);
2004 return;
2005 }
2006
2007 if (session->exp_round == 0) 2303 if (session->exp_round == 0)
2008 { 2304 {
2305 /* initialize everything for the log-rounds */
2009 session->exp_round = 1; 2306 session->exp_round = 1;
2010 session->exp_subround = 0; 2307 session->exp_subround = 0;
2011 if (NULL == session->shuffle) 2308 if (NULL == session->shuffle)
@@ -2015,6 +2312,7 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2015 } 2312 }
2016 else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) 2313 else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
2017 { 2314 {
2315 /* subrounds done, start new log-round */
2018 session->exp_round++; 2316 session->exp_round++;
2019 session->exp_subround = 0; 2317 session->exp_subround = 0;
2020 shuffle (session); 2318 shuffle (session);
@@ -2026,6 +2324,7 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2026 2324
2027 find_partners (session); 2325 find_partners (session);
2028 2326
2327#ifdef GNUNET_EXTRA_LOGGING
2029 { 2328 {
2030 int in; 2329 int in;
2031 int out; 2330 int out;
@@ -2040,14 +2339,7 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2040 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, 2339 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx,
2041 session->exp_round, session->exp_subround, in, out); 2340 session->exp_round, session->exp_subround, in, out);
2042 } 2341 }
2043 2342#endif /* GNUNET_EXTRA_LOGGING */
2044
2045 if (NULL != session->partner_outgoing)
2046 {
2047 session->partner_outgoing->ibf_state = IBF_STATE_NONE;
2048 session->partner_outgoing->ibf_bucket_counter = 0;
2049 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
2050 }
2051 2343
2052 if (NULL != session->partner_incoming) 2344 if (NULL != session->partner_incoming)
2053 { 2345 {
@@ -2056,24 +2348,15 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2056 session->partner_incoming->ibf_bucket_counter = 0; 2348 session->partner_incoming->ibf_bucket_counter = 0;
2057 2349
2058 /* maybe there's an early strata estimator? */ 2350 /* maybe there's an early strata estimator? */
2059 if (NULL != session->partner_incoming->premature_strata_message) 2351 replay_premature_message (session->partner_incoming);
2060 {
2061 struct StrataMessage *sm;
2062
2063 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
2064 sm = session->partner_incoming->premature_strata_message;
2065 session->partner_incoming->premature_strata_message = NULL;
2066
2067 session->partner_incoming->replaying_strata_message = GNUNET_YES;
2068 handle_p2p_strata (session->partner_incoming, sm);
2069 session->partner_incoming->replaying_strata_message = GNUNET_NO;
2070
2071 GNUNET_free (sm);
2072 }
2073 } 2352 }
2074 2353
2075 if (NULL != session->partner_outgoing) 2354 if (NULL != session->partner_outgoing)
2076 { 2355 {
2356 session->partner_outgoing->ibf_state = IBF_STATE_NONE;
2357 session->partner_outgoing->ibf_bucket_counter = 0;
2358 session->partner_outgoing->exp_subround_finished = GNUNET_NO;
2359
2077 if (NULL == session->partner_outgoing->socket) 2360 if (NULL == session->partner_outgoing->socket)
2078 { 2361 {
2079 session->partner_outgoing->socket = 2362 session->partner_outgoing->socket =
@@ -2092,7 +2375,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2092 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), 2375 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
2093 subround_over, session); 2376 subround_over, session);
2094 */ 2377 */
2095
2096} 2378}
2097 2379
2098static void 2380static void
@@ -2110,31 +2392,63 @@ contact_peer_a2a (struct ConsensusPeerInformation *cpi)
2110 } 2392 }
2111} 2393}
2112 2394
2395/**
2396 * Start the inventory round, contact all peers we are supposed to contact.
2397 *
2398 * @param session the current session
2399 */
2113static void 2400static void
2114start_inventory (struct ConsensusSession *session) 2401start_inventory (struct ConsensusSession *session)
2115{ 2402{
2116 int i; 2403 int i;
2117 int last; 2404 int last;
2118 2405
2406 for (i = 0; i < session->num_peers; i++)
2407 {
2408 session->info[i].ibf_bucket_counter = 0;
2409 session->info[i].ibf_state = IBF_STATE_NONE;
2410 session->info[i].is_outgoing = GNUNET_NO;
2411 }
2412
2119 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; 2413 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
2120 i = (session->local_peer_idx + 1) % session->num_peers; 2414 i = (session->local_peer_idx + 1) % session->num_peers;
2121 while (i != last) 2415 while (i != last)
2122 { 2416 {
2417 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
2123 contact_peer_a2a (&session->info[i]); 2418 contact_peer_a2a (&session->info[i]);
2419 session->info[i].is_outgoing = GNUNET_YES;
2124 i = (i + 1) % session->num_peers; 2420 i = (i + 1) % session->num_peers;
2125 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i);
2126 } 2421 }
2127 // tie-breaker for even number of peers 2422 // tie-breaker for even number of peers
2128 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) 2423 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
2129 { 2424 {
2425 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
2426 session->info[last].is_outgoing = GNUNET_YES;
2130 contact_peer_a2a (&session->info[last]); 2427 contact_peer_a2a (&session->info[last]);
2131 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last); 2428 }
2429
2430 for (i = 0; i < session->num_peers; i++)
2431 {
2432 if (GNUNET_NO == session->info[i].is_outgoing)
2433 replay_premature_message (&session->info[i]);
2132 } 2434 }
2133} 2435}
2134 2436
2437static void
2438send_client_conclude_done (struct ConsensusSession *session)
2439{
2440 struct GNUNET_MessageHeader *msg;
2441 session->current_round = CONSENSUS_ROUND_FINISH;
2442 msg = GNUNET_malloc (sizeof *msg);
2443 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
2444 msg->size = htons (sizeof *msg);
2445 queue_client_message (session, msg);
2446 client_send_next (session);
2447}
2135 2448
2136/** 2449/**
2137 * Select and kick off the next round, based on the current round. 2450 * Start the next round.
2451 * This function can be invoked as a timeout task, or called manually (tc will be NULL then).
2138 * 2452 *
2139 * @param cls the session 2453 * @param cls the session
2140 * @param tc task context, for when this task is invoked by the scheduler, 2454 * @param tc task context, for when this task is invoked by the scheduler,
@@ -2144,17 +2458,18 @@ static void
2144round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 2458round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2145{ 2459{
2146 struct ConsensusSession *session; 2460 struct ConsensusSession *session;
2147 int i;
2148 2461
2149 /* don't kick off next round if we're shutting down */ 2462 /* don't kick off next round if we're shutting down */
2150 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 2463 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2151 return; 2464 return;
2152 2465
2153 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n");
2154 session = cls; 2466 session = cls;
2467 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx);
2155 2468
2469 /*
2156 for (i = 0; i < session->num_peers; i++) 2470 for (i = 0; i < session->num_peers; i++)
2157 clear_peer_messages (&session->info[i]); 2471 clear_peer_messages (&session->info[i]);
2472 */
2158 2473
2159 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) 2474 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
2160 { 2475 {
@@ -2162,8 +2477,6 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2162 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; 2477 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
2163 } 2478 }
2164 2479
2165 /* FIXME: cancel current round */
2166
2167 switch (session->current_round) 2480 switch (session->current_round)
2168 { 2481 {
2169 case CONSENSUS_ROUND_BEGIN: 2482 case CONSENSUS_ROUND_BEGIN:
@@ -2172,31 +2485,24 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2172 subround_over (session, NULL); 2485 subround_over (session, NULL);
2173 break; 2486 break;
2174 case CONSENSUS_ROUND_EXCHANGE: 2487 case CONSENSUS_ROUND_EXCHANGE:
2175 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx); 2488 /* handle two peers specially */
2176 2489 if (session->num_peers <= 2)
2177 if (0)
2178 { 2490 {
2179 struct GNUNET_MessageHeader *msg; 2491 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx);
2180 msg = GNUNET_malloc (sizeof *msg); 2492 send_client_conclude_done (session);
2181 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); 2493 return;
2182 msg->size = htons (sizeof *msg);
2183 queue_client_message (session, msg);
2184 client_send_next (session);
2185 }
2186
2187 if (0)
2188 {
2189 session->current_round = CONSENSUS_ROUND_INVENTORY;
2190 start_inventory (session);
2191 } 2494 }
2495 session->current_round = CONSENSUS_ROUND_INVENTORY;
2496 start_inventory (session);
2192 break; 2497 break;
2193 case CONSENSUS_ROUND_INVENTORY: 2498 case CONSENSUS_ROUND_INVENTORY:
2194 session->current_round = CONSENSUS_ROUND_STOCK; 2499 session->current_round = CONSENSUS_ROUND_STOCK;
2195 /* FIXME: exchange stock */ 2500 session->exp_round = 0;
2501 subround_over (session, NULL);
2196 break; 2502 break;
2197 case CONSENSUS_ROUND_STOCK: 2503 case CONSENSUS_ROUND_STOCK:
2198 session->current_round = CONSENSUS_ROUND_FINISH; 2504 session->current_round = CONSENSUS_ROUND_FINISH;
2199 /* FIXME: send elements to client */ 2505 send_client_conclude_done (session);
2200 break; 2506 break;
2201 default: 2507 default:
2202 GNUNET_assert (0); 2508 GNUNET_assert (0);
@@ -2241,10 +2547,16 @@ client_conclude (void *cls,
2241 return; 2547 return;
2242 } 2548 }
2243 2549
2244 session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); 2550 if (session->num_peers <= 1)
2245 2551 {
2246 /* the 'begin' round is over, start with the next, real round */ 2552 send_client_conclude_done (session);
2247 round_over (session, NULL); 2553 }
2554 else
2555 {
2556 session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
2557 /* the 'begin' round is over, start with the next, real round */
2558 round_over (session, NULL);
2559 }
2248 2560
2249 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2561 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2250 client_send_next (session); 2562 client_send_next (session);
@@ -2267,7 +2579,6 @@ client_ack (void *cls,
2267 struct GNUNET_CONSENSUS_AckMessage *msg; 2579 struct GNUNET_CONSENSUS_AckMessage *msg;
2268 struct PendingElement *pending; 2580 struct PendingElement *pending;
2269 struct GNUNET_CONSENSUS_Element *element; 2581 struct GNUNET_CONSENSUS_Element *element;
2270 struct GNUNET_HashCode key;
2271 2582
2272 session = sessions_head; 2583 session = sessions_head;
2273 while (NULL != session) 2584 while (NULL != session)
@@ -2291,25 +2602,17 @@ client_ack (void *cls,
2291 2602
2292 if (msg->keep) 2603 if (msg->keep)
2293 { 2604 {
2294 int i;
2295 element = pending->element; 2605 element = pending->element;
2296 hash_for_ibf (element->data, element->size, &key); 2606 insert_element (session, element);
2297
2298 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
2299 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2300 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n"); 2607 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n");
2301 strata_estimator_insert (session->se, &key);
2302
2303 for (i = 0; i <= MAX_IBF_ORDER; i++)
2304 {
2305 if (NULL != session->ibfs[i])
2306 ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&key));
2307 }
2308 } 2608 }
2309 2609
2610 GNUNET_free (pending);
2611
2310 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2612 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2311} 2613}
2312 2614
2615
2313/** 2616/**
2314 * Task that disconnects from core. 2617 * Task that disconnects from core.
2315 * 2618 *
@@ -2366,7 +2669,18 @@ shutdown_task (void *cls,
2366 socket = incoming_sockets_head; 2669 socket = incoming_sockets_head;
2367 if (NULL == socket->cpi) 2670 if (NULL == socket->cpi)
2368 { 2671 {
2672 if (NULL != socket->rh)
2673 {
2674 GNUNET_STREAM_read_cancel (socket->rh);
2675 socket->rh = NULL;
2676 }
2369 GNUNET_STREAM_close (socket->socket); 2677 GNUNET_STREAM_close (socket->socket);
2678 socket->socket = NULL;
2679 if (NULL != socket->mst)
2680 {
2681 GNUNET_SERVER_mst_destroy (socket->mst);
2682 socket->mst = NULL;
2683 }
2370 } 2684 }
2371 incoming_sockets_head = incoming_sockets_head->next; 2685 incoming_sockets_head = incoming_sockets_head->next;
2372 GNUNET_free (socket); 2686 GNUNET_free (socket);
@@ -2375,26 +2689,9 @@ shutdown_task (void *cls,
2375 while (NULL != sessions_head) 2689 while (NULL != sessions_head)
2376 { 2690 {
2377 struct ConsensusSession *session; 2691 struct ConsensusSession *session;
2378 int i; 2692 session = sessions_head->next;
2379 2693 destroy_session (sessions_head);
2380 session = sessions_head; 2694 sessions_head = session;
2381
2382 if (NULL != session->info)
2383 for (i = 0; i < session->num_peers; i++)
2384 {
2385 struct ConsensusPeerInformation *cpi;
2386 cpi = &session->info[i];
2387 if ((NULL != cpi) && (NULL != cpi->socket))
2388 {
2389 GNUNET_STREAM_close (cpi->socket);
2390 }
2391 }
2392
2393 if (NULL != session->client)
2394 GNUNET_SERVER_client_disconnect (session->client);
2395
2396 sessions_head = sessions_head->next;
2397 GNUNET_free (session);
2398 } 2695 }
2399 2696
2400 if (NULL != core) 2697 if (NULL != core)
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c
index 5246b63a6..a51137493 100644
--- a/src/consensus/ibf.c
+++ b/src/consensus/ibf.c
@@ -136,7 +136,7 @@ ibf_insert_into (struct InvertibleBloomFilter *ibf,
136 * Insert an element into an IBF. 136 * Insert an element into an IBF.
137 * 137 *
138 * @param ibf the IBF 138 * @param ibf the IBF
139 * @param id the element's hash code 139 * @param key the element's hash code
140 */ 140 */
141void 141void
142ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key) 142ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
@@ -171,10 +171,10 @@ ibf_is_empty (struct InvertibleBloomFilter *ibf)
171 * Decode and remove an element from the IBF, if possible. 171 * Decode and remove an element from the IBF, if possible.
172 * 172 *
173 * @param ibf the invertible bloom filter to decode 173 * @param ibf the invertible bloom filter to decode
174 * @param side sign of the cell's count where the decoded element came from. 174 * @param ret_side sign of the cell's count where the decoded element came from.
175 * A negative sign indicates that the element was recovered 175 * A negative sign indicates that the element was recovered
176 * resides in an IBF that was previously subtracted from. 176 * resides in an IBF that was previously subtracted from.
177 * @param ret_id the hash code of the decoded element, if successful 177 * @param ret_key receives the hash code of the decoded element, if successful
178 * @return GNUNET_YES if decoding an element was successful, 178 * @return GNUNET_YES if decoding an element was successful,
179 * GNUNET_NO if the IBF is empty, 179 * GNUNET_NO if the IBF is empty,
180 * GNUNET_SYSERR if the decoding has failed 180 * GNUNET_SYSERR if the decoding has failed
@@ -284,7 +284,7 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32
284 * @param size size of the buffer, will be updated 284 * @param size size of the buffer, will be updated
285 * @param start which bucket to start at 285 * @param start which bucket to start at
286 * @param count how many buckets to read 286 * @param count how many buckets to read
287 * @param dst ibf to write buckets to 287 * @param ibf the ibf to read from
288 * @return GNUNET_OK on success 288 * @return GNUNET_OK on success
289 */ 289 */
290int 290int
@@ -325,8 +325,6 @@ ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct
325 * Write an ibf. 325 * Write an ibf.
326 * 326 *
327 * @param ibf the ibf to write 327 * @param ibf the ibf to write
328 * @param start with which bucket to start
329 * @param count how many buckets to write
330 * @param buf buffer to write the data to, will be updated to point to the 328 * @param buf buffer to write the data to, will be updated to point to the
331 * first byte after the written data 329 * first byte after the written data
332 * @param size pointer to the size of the buffer, will be updated, can be NULL 330 * @param size pointer to the size of the buffer, will be updated, can be NULL
@@ -344,8 +342,6 @@ ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size)
344 * @param buf pointer to the buffer to write to, will point to first 342 * @param buf pointer to the buffer to write to, will point to first
345 * byte after the written data 343 * byte after the written data
346 * @param size size of the buffer, will be updated 344 * @param size size of the buffer, will be updated
347 * @param start which bucket to start at
348 * @param count how many buckets to read
349 * @param dst ibf to write buckets to 345 * @param dst ibf to write buckets to
350 * @return GNUNET_OK on success 346 * @return GNUNET_OK on success
351 */ 347 */
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h
index cafe55c8d..d6c3874aa 100644
--- a/src/consensus/ibf.h
+++ b/src/consensus/ibf.h
@@ -128,7 +128,7 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32
128 * @param size size of the buffer, will be updated 128 * @param size size of the buffer, will be updated
129 * @param start which bucket to start at 129 * @param start which bucket to start at
130 * @param count how many buckets to read 130 * @param count how many buckets to read
131 * @param dst ibf to write buckets to 131 * @param ibf the ibf to read from
132 * @return GNUNET_OK on success 132 * @return GNUNET_OK on success
133 */ 133 */
134int 134int
@@ -139,8 +139,6 @@ ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct
139 * Write an ibf. 139 * Write an ibf.
140 * 140 *
141 * @param ibf the ibf to write 141 * @param ibf the ibf to write
142 * @param start with which bucket to start
143 * @param count how many buckets to write
144 * @param buf buffer to write the data to, will be updated to point to the 142 * @param buf buffer to write the data to, will be updated to point to the
145 * first byte after the written data 143 * first byte after the written data
146 * @param size pointer to the size of the buffer, will be updated, can be NULL 144 * @param size pointer to the size of the buffer, will be updated, can be NULL
@@ -149,14 +147,13 @@ void
149ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size); 147ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size);
150 148
151 149
150
152/** 151/**
153 * Read an ibf. 152 * Read an ibf.
154 * 153 *
155 * @param buf pointer to the buffer to write to, will point to first 154 * @param buf pointer to the buffer to write to, will point to first
156 * byte after the written data 155 * byte after the written data
157 * @param size size of the buffer, will be updated 156 * @param size size of the buffer, will be updated
158 * @param start which bucket to start at
159 * @param count how many buckets to read
160 * @param dst ibf to write buckets to 157 * @param dst ibf to write buckets to
161 * @return GNUNET_OK on success 158 * @return GNUNET_OK on success
162 */ 159 */
@@ -223,15 +220,16 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFi
223 * Decode and remove an element from the IBF, if possible. 220 * Decode and remove an element from the IBF, if possible.
224 * 221 *
225 * @param ibf the invertible bloom filter to decode 222 * @param ibf the invertible bloom filter to decode
226 * @param side sign of the cell's count where the decoded element came from. 223 * @param ret_side sign of the cell's count where the decoded element came from.
227 * A negative sign indicates that the element was recovered resides in an IBF 224 * A negative sign indicates that the element was recovered
228 * that was previously subtracted from. 225 * resides in an IBF that was previously subtracted from.
229 * @param ret_id the hash code of the decoded element, if successful 226 * @param ret_key receives the hash code of the decoded element, if successful
230 * @return GNUNET_YES if decoding an element was successful, GNUNET_NO if the IBF is empty, 227 * @return GNUNET_YES if decoding an element was successful,
231 * GNUNET_SYSERR if the decoding has faile 228 * GNUNET_NO if the IBF is empty,
229 * GNUNET_SYSERR if the decoding has failed
232 */ 230 */
233int 231int
234ibf_decode (struct InvertibleBloomFilter *ibf, int *side, struct IBF_Key *ret_key); 232ibf_decode (struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_key);
235 233
236 234
237/** 235/**
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf
index 8a8d5b89f..437636c99 100644
--- a/src/consensus/test_consensus.conf
+++ b/src/consensus/test_consensus.conf
@@ -5,7 +5,7 @@ HOSTNAME = localhost
5HOME = $SERVICEHOME 5HOME = $SERVICEHOME
6BINARY = gnunet-service-consensus 6BINARY = gnunet-service-consensus
7#PREFIX = gdbserver :12345 7#PREFIX = gdbserver :12345
8#PREFIX = valgrind 8PREFIX = valgrind --leak-check=full
9ACCEPT_FROM = 127.0.0.1; 9ACCEPT_FROM = 127.0.0.1;
10ACCEPT_FROM6 = ::1; 10ACCEPT_FROM6 = ::1;
11UNIXPATH = /tmp/gnunet-service-consensus.sock 11UNIXPATH = /tmp/gnunet-service-consensus.sock