diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-03-21 01:06:40 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-03-21 01:06:40 +0000 |
commit | f69659c664077034fcf135acb49b6e680937af1d (patch) | |
tree | 325b219bce0adef70b603d17146ed28838f0f037 /src/consensus | |
parent | 9f0ab5f654c9d5bc1a04c3728ff65f74b5c1f741 (diff) | |
download | gnunet-f69659c664077034fcf135acb49b6e680937af1d.tar.gz gnunet-f69659c664077034fcf135acb49b6e680937af1d.zip |
fixed consensus multi-peer communication, memory leaks, various bugs
Diffstat (limited to 'src/consensus')
-rw-r--r-- | src/consensus/consensus_flout.h | 60 | ||||
-rw-r--r-- | src/consensus/consensus_protocol.h | 4 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus-ibf.c | 2 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 23 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 849 | ||||
-rw-r--r-- | src/consensus/ibf.c | 16 | ||||
-rw-r--r-- | src/consensus/ibf.h | 22 | ||||
-rw-r--r-- | src/consensus/test_consensus.conf | 2 |
8 files changed, 669 insertions, 309 deletions
diff --git a/src/consensus/consensus_flout.h b/src/consensus/consensus_flout.h new file mode 100644 index 000000000..99b4475b9 --- /dev/null +++ b/src/consensus/consensus_flout.h | |||
@@ -0,0 +1,60 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file consensus/consensus_flout.h | ||
23 | * @brief intentionally misbehave in certain ways for testing | ||
24 | * @author Florian Dold | ||
25 | */ | ||
26 | |||
27 | #ifndef GNUNET_CONSENSUS_FLOUT_H | ||
28 | #define GNUNET_CONSENSUS_FLOUT_H | ||
29 | |||
30 | #ifdef __cplusplus | ||
31 | extern "C" | ||
32 | { | ||
33 | #if 0 /* keep Emacsens' auto-indent happy */ | ||
34 | } | ||
35 | #endif | ||
36 | #endif | ||
37 | |||
38 | #include "platform.h" | ||
39 | #include "gnunet_common.h" | ||
40 | #include "gnunet_consensus_service.h" | ||
41 | |||
42 | void | ||
43 | GNUNET_CONSENSUS_flout_disable_peer (struct GNUNET_CONSENSUS_Handle *consensus); | ||
44 | |||
45 | void | ||
46 | GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_HashCode *element_hash); | ||
47 | |||
48 | void | ||
49 | GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_HashCode *element_hash); | ||
50 | |||
51 | |||
52 | |||
53 | #if 0 /* keep Emacsens' auto-indent happy */ | ||
54 | { | ||
55 | #endif | ||
56 | #ifdef __cplusplus | ||
57 | } | ||
58 | #endif | ||
59 | |||
60 | #endif | ||
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h index c0420d55c..32c3d8b09 100644 --- a/src/consensus/consensus_protocol.h +++ b/src/consensus/consensus_protocol.h | |||
@@ -76,12 +76,10 @@ struct ConsensusHello | |||
76 | struct GNUNET_HashCode global_id; | 76 | struct GNUNET_HashCode global_id; |
77 | }; | 77 | }; |
78 | 78 | ||
79 | struct ConsensusRoundHeader | 79 | struct ConsensusRoundMessage |
80 | { | 80 | { |
81 | struct GNUNET_MessageHeader header; | 81 | struct GNUNET_MessageHeader header; |
82 | uint8_t round; | 82 | uint8_t round; |
83 | uint8_t exp_round; | ||
84 | uint8_t exp_subround; | ||
85 | }; | 83 | }; |
86 | 84 | ||
87 | 85 | ||
diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c index f4a233ece..73dc31b56 100644 --- a/src/consensus/gnunet-consensus-ibf.c +++ b/src/consensus/gnunet-consensus-ibf.c | |||
@@ -19,7 +19,7 @@ | |||
19 | */ | 19 | */ |
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file consensus/gnunet-consensus-ibf | 22 | * @file consensus/gnunet-consensus-ibf.c |
23 | * @brief tool for reconciling data with invertible bloom filters | 23 | * @brief tool for reconciling data with invertible bloom filters |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | */ | 25 | */ |
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index 7a951d35b..4525fc719 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c | |||
@@ -39,6 +39,8 @@ static struct GNUNET_TIME_Relative conclude_timeout; | |||
39 | 39 | ||
40 | static struct GNUNET_CONSENSUS_Handle **consensus_handles; | 40 | static struct GNUNET_CONSENSUS_Handle **consensus_handles; |
41 | 41 | ||
42 | static struct GNUNET_TESTBED_Operation **testbed_operations; | ||
43 | |||
42 | static unsigned int num_connected_handles; | 44 | static unsigned int num_connected_handles; |
43 | 45 | ||
44 | static struct GNUNET_TESTBED_Peer **peers; | 46 | static struct GNUNET_TESTBED_Peer **peers; |
@@ -49,6 +51,8 @@ static unsigned int num_retrieved_peer_ids; | |||
49 | 51 | ||
50 | static struct GNUNET_HashCode session_id; | 52 | static struct GNUNET_HashCode session_id; |
51 | 53 | ||
54 | static unsigned int peers_done = 0; | ||
55 | |||
52 | 56 | ||
53 | /** | 57 | /** |
54 | * Signature of the event handler function called by the | 58 | * Signature of the event handler function called by the |
@@ -64,7 +68,6 @@ controller_cb(void *cls, | |||
64 | GNUNET_assert (0); | 68 | GNUNET_assert (0); |
65 | } | 69 | } |
66 | 70 | ||
67 | |||
68 | static void | 71 | static void |
69 | destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx) | 72 | destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx) |
70 | { | 73 | { |
@@ -72,14 +75,21 @@ destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx) | |||
72 | consensus = cls; | 75 | consensus = cls; |
73 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying consensus\n"); | 76 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying consensus\n"); |
74 | GNUNET_CONSENSUS_destroy (consensus); | 77 | GNUNET_CONSENSUS_destroy (consensus); |
78 | peers_done++; | ||
79 | if (peers_done == num_peers) | ||
80 | { | ||
81 | int i; | ||
82 | for (i = 0; i < num_peers; i++) | ||
83 | GNUNET_TESTBED_operation_done (testbed_operations[i]); | ||
84 | GNUNET_SCHEDULER_shutdown (); | ||
85 | } | ||
75 | } | 86 | } |
76 | 87 | ||
77 | 88 | ||
78 | /** | 89 | /** |
79 | * Called when a conclusion was successful. | 90 | * Called when a conclusion was successful. |
80 | * | 91 | * |
81 | * @param cls | 92 | * @param cls closure, the consensus handle |
82 | * @param group | ||
83 | * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not | 93 | * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not |
84 | */ | 94 | */ |
85 | static void | 95 | static void |
@@ -255,8 +265,9 @@ peer_info_cb (void *cb_cls, | |||
255 | num_retrieved_peer_ids++; | 265 | num_retrieved_peer_ids++; |
256 | if (num_retrieved_peer_ids == num_peers) | 266 | if (num_retrieved_peer_ids == num_peers) |
257 | for (i = 0; i < num_peers; i++) | 267 | for (i = 0; i < num_peers; i++) |
258 | GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus", connect_complete, &consensus_handles[i], | 268 | testbed_operations[i] = |
259 | connect_adapter, disconnect_adapter, NULL); | 269 | GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus", connect_complete, &consensus_handles[i], |
270 | connect_adapter, disconnect_adapter, NULL); | ||
260 | } | 271 | } |
261 | else | 272 | else |
262 | { | 273 | { |
@@ -272,7 +283,6 @@ test_master (void *cls, | |||
272 | { | 283 | { |
273 | int i; | 284 | int i; |
274 | 285 | ||
275 | |||
276 | GNUNET_log_setup ("gnunet-consensus", "INFO", NULL); | 286 | GNUNET_log_setup ("gnunet-consensus", "INFO", NULL); |
277 | 287 | ||
278 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n"); | 288 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n"); |
@@ -283,6 +293,7 @@ test_master (void *cls, | |||
283 | peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); | 293 | peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); |
284 | 294 | ||
285 | consensus_handles = GNUNET_malloc (num_peers * sizeof (struct ConsensusHandle *)); | 295 | consensus_handles = GNUNET_malloc (num_peers * sizeof (struct ConsensusHandle *)); |
296 | testbed_operations = GNUNET_malloc (num_peers * sizeof (struct ConsensusHandle *)); | ||
286 | 297 | ||
287 | for (i = 0; i < num_peers; i++) | 298 | for (i = 0; i < num_peers; i++) |
288 | GNUNET_TESTBED_peer_get_information (peers[i], | 299 | GNUNET_TESTBED_peer_get_information (peers[i], |
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index e300d9169..479e7ebdd 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -124,6 +124,42 @@ struct PendingElement | |||
124 | }; | 124 | }; |
125 | 125 | ||
126 | 126 | ||
127 | struct ElementList | ||
128 | { | ||
129 | struct ElementList *next; | ||
130 | struct GNUNET_CONSENSUS_Element *element; | ||
131 | struct GNUNET_HashCode *element_hash; | ||
132 | }; | ||
133 | |||
134 | |||
135 | /** | ||
136 | * Describes the current round a consensus session is in. | ||
137 | */ | ||
138 | enum ConsensusRound | ||
139 | { | ||
140 | /** | ||
141 | * Not started the protocol yet. | ||
142 | */ | ||
143 | CONSENSUS_ROUND_BEGIN=0, | ||
144 | /** | ||
145 | * Distribution of elements with the exponential scheme. | ||
146 | */ | ||
147 | CONSENSUS_ROUND_EXCHANGE, | ||
148 | /** | ||
149 | * Exchange which elements each peer has, but not the elements. | ||
150 | */ | ||
151 | CONSENSUS_ROUND_INVENTORY, | ||
152 | /** | ||
153 | * Collect and distribute missing values. | ||
154 | */ | ||
155 | CONSENSUS_ROUND_STOCK, | ||
156 | /** | ||
157 | * Consensus concluded. | ||
158 | */ | ||
159 | CONSENSUS_ROUND_FINISH | ||
160 | }; | ||
161 | |||
162 | |||
127 | /** | 163 | /** |
128 | * Information about a peer that is in a consensus session. | 164 | * Information about a peer that is in a consensus session. |
129 | */ | 165 | */ |
@@ -148,8 +184,6 @@ struct ConsensusPeerInformation | |||
148 | */ | 184 | */ |
149 | int is_outgoing; | 185 | int is_outgoing; |
150 | 186 | ||
151 | int connected; | ||
152 | |||
153 | /** | 187 | /** |
154 | * Did we receive/send a consensus hello? | 188 | * Did we receive/send a consensus hello? |
155 | */ | 189 | */ |
@@ -246,6 +280,15 @@ struct ConsensusPeerInformation | |||
246 | */ | 280 | */ |
247 | int exp_subround_finished; | 281 | int exp_subround_finished; |
248 | 282 | ||
283 | int inventory_synced; | ||
284 | |||
285 | /** | ||
286 | * Round this peer seems to be in, according to the last SE we got. | ||
287 | * Necessary to store this, as we sometimes need to respond to a request from an | ||
288 | * older round, while we are already in the next round. | ||
289 | */ | ||
290 | enum ConsensusRound apparent_round; | ||
291 | |||
249 | }; | 292 | }; |
250 | 293 | ||
251 | typedef void (*QueuedMessageCallback) (void *msg); | 294 | typedef void (*QueuedMessageCallback) (void *msg); |
@@ -272,32 +315,6 @@ struct QueuedMessage | |||
272 | void *cls; | 315 | void *cls; |
273 | }; | 316 | }; |
274 | 317 | ||
275 | /** | ||
276 | * Describes the current round a consensus session is in. | ||
277 | */ | ||
278 | enum ConsensusRound | ||
279 | { | ||
280 | /** | ||
281 | * Not started the protocol yet. | ||
282 | */ | ||
283 | CONSENSUS_ROUND_BEGIN=0, | ||
284 | /** | ||
285 | * Distribution of elements with the exponential scheme. | ||
286 | */ | ||
287 | CONSENSUS_ROUND_EXCHANGE, | ||
288 | /** | ||
289 | * Exchange which elements each peer has, but not the elements. | ||
290 | */ | ||
291 | CONSENSUS_ROUND_INVENTORY, | ||
292 | /** | ||
293 | * Collect and distribute missing values. | ||
294 | */ | ||
295 | CONSENSUS_ROUND_STOCK, | ||
296 | /** | ||
297 | * Consensus concluded. | ||
298 | */ | ||
299 | CONSENSUS_ROUND_FINISH | ||
300 | }; | ||
301 | 318 | ||
302 | struct StrataEstimator | 319 | struct StrataEstimator |
303 | { | 320 | { |
@@ -342,7 +359,8 @@ struct ConsensusSession | |||
342 | /** | 359 | /** |
343 | * Elements in the consensus set of this session, | 360 | * Elements in the consensus set of this session, |
344 | * all of them either have been sent by or approved by the client. | 361 | * all of them either have been sent by or approved by the client. |
345 | * Contains GNUNET_CONSENSUS_Element. | 362 | * Contains ElementList. |
363 | * Used as a unique-key hashmap. | ||
346 | */ | 364 | */ |
347 | struct GNUNET_CONTAINER_MultiHashMap *values; | 365 | struct GNUNET_CONTAINER_MultiHashMap *values; |
348 | 366 | ||
@@ -544,6 +562,8 @@ queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHea | |||
544 | * | 562 | * |
545 | * @param cpi peer | 563 | * @param cpi peer |
546 | * @param msg message we want to queue | 564 | * @param msg message we want to queue |
565 | * @param cb callback, called when the message is given to strem | ||
566 | * @param cls closure for cb | ||
547 | */ | 567 | */ |
548 | static void | 568 | static void |
549 | queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls) | 569 | queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls) |
@@ -572,14 +592,14 @@ queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageH | |||
572 | } | 592 | } |
573 | 593 | ||
574 | 594 | ||
575 | 595 | /* | |
576 | static void | 596 | static void |
577 | clear_peer_messages (struct ConsensusPeerInformation *cpi) | 597 | clear_peer_messages (struct ConsensusPeerInformation *cpi) |
578 | { | 598 | { |
579 | /* FIXME: deallocate */ | ||
580 | cpi->messages_head = NULL; | 599 | cpi->messages_head = NULL; |
581 | cpi->messages_tail = NULL; | 600 | cpi->messages_tail = NULL; |
582 | } | 601 | } |
602 | */ | ||
583 | 603 | ||
584 | 604 | ||
585 | /** | 605 | /** |
@@ -592,8 +612,8 @@ clear_peer_messages (struct ConsensusPeerInformation *cpi) | |||
592 | * @return the estimated difference | 612 | * @return the estimated difference |
593 | */ | 613 | */ |
594 | static int | 614 | static int |
595 | estimate_difference (struct StrataEstimator *se1, | 615 | estimate_difference (const struct StrataEstimator *se1, |
596 | struct StrataEstimator *se2) | 616 | const struct StrataEstimator *se2) |
597 | { | 617 | { |
598 | int i; | 618 | int i; |
599 | int count; | 619 | int count; |
@@ -701,42 +721,36 @@ incoming_stream_data_processor (void *cls, | |||
701 | } | 721 | } |
702 | 722 | ||
703 | 723 | ||
704 | /** | 724 | static void |
705 | * Iterator over hash map entries. | 725 | send_elements (struct ConsensusPeerInformation *cpi, struct ElementList *head) |
706 | * Queue elements to be sent to the peer in cls. | ||
707 | * | ||
708 | * @param cls closure | ||
709 | * @param key current key code | ||
710 | * @param value value in the hash map | ||
711 | * @return GNUNET_YES if we should continue to | ||
712 | * iterate, | ||
713 | * GNUNET_NO if not. | ||
714 | */ | ||
715 | static int | ||
716 | send_element_iter (void *cls, | ||
717 | const struct GNUNET_HashCode *key, | ||
718 | void *value) | ||
719 | { | 726 | { |
720 | struct ConsensusPeerInformation *cpi; | ||
721 | struct GNUNET_CONSENSUS_Element *element; | 727 | struct GNUNET_CONSENSUS_Element *element; |
722 | struct GNUNET_MessageHeader *element_msg; | 728 | struct GNUNET_MessageHeader *element_msg; |
723 | size_t msize; | 729 | size_t msize; |
724 | 730 | ||
725 | cpi = cls; | 731 | while (NULL != head) |
726 | element = value; | 732 | { |
727 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; | 733 | element = head->element; |
728 | element_msg = GNUNET_malloc (msize); | 734 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; |
729 | element_msg->size = htons (msize); | 735 | element_msg = GNUNET_malloc (msize); |
730 | if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) | 736 | element_msg->size = htons (msize); |
731 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | 737 | switch (cpi->apparent_round) |
732 | else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round) | 738 | { |
733 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); | 739 | case CONSENSUS_ROUND_STOCK: |
734 | else | 740 | case CONSENSUS_ROUND_EXCHANGE: |
735 | GNUNET_assert (0); | 741 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); |
736 | GNUNET_assert (NULL != element->data); | 742 | break; |
737 | memcpy (&element_msg[1], element->data, element->size); | 743 | case CONSENSUS_ROUND_INVENTORY: |
738 | queue_peer_message (cpi, element_msg); | 744 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); |
739 | return GNUNET_YES; | 745 | break; |
746 | default: | ||
747 | GNUNET_break (0); | ||
748 | } | ||
749 | GNUNET_assert (NULL != element->data); | ||
750 | memcpy (&element_msg[1], element->data, element->size); | ||
751 | queue_peer_message (cpi, element_msg); | ||
752 | head = head->next; | ||
753 | } | ||
740 | } | 754 | } |
741 | 755 | ||
742 | /** | 756 | /** |
@@ -755,8 +769,13 @@ ibf_values_iterator (void *cls, | |||
755 | void *value) | 769 | void *value) |
756 | { | 770 | { |
757 | struct ConsensusPeerInformation *cpi; | 771 | struct ConsensusPeerInformation *cpi; |
772 | struct ElementList *head; | ||
773 | struct IBF_Key ibf_key; | ||
758 | cpi = cls; | 774 | cpi = cls; |
759 | ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key_from_hashcode (key)); | 775 | head = value; |
776 | ibf_key = ibf_key_from_hashcode (head->element_hash); | ||
777 | GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val); | ||
778 | ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key); | ||
760 | return GNUNET_YES; | 779 | return GNUNET_YES; |
761 | } | 780 | } |
762 | 781 | ||
@@ -764,7 +783,7 @@ ibf_values_iterator (void *cls, | |||
764 | * Create and populate an IBF for the specified peer, | 783 | * Create and populate an IBF for the specified peer, |
765 | * if it does not already exist. | 784 | * if it does not already exist. |
766 | * | 785 | * |
767 | * @param peer to create the ibf for | 786 | * @param cpi peer to create the ibf for |
768 | */ | 787 | */ |
769 | static void | 788 | static void |
770 | prepare_ibf (struct ConsensusPeerInformation *cpi) | 789 | prepare_ibf (struct ConsensusPeerInformation *cpi) |
@@ -791,22 +810,68 @@ handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GN | |||
791 | GNUNET_assert (0); | 810 | GNUNET_assert (0); |
792 | } | 811 | } |
793 | 812 | ||
794 | static void | 813 | |
795 | fin_sent_cb (void *cls) | 814 | static int |
815 | exp_subround_finished (const struct ConsensusSession *session) | ||
796 | { | 816 | { |
797 | struct ConsensusPeerInformation *cpi; | ||
798 | int not_finished; | 817 | int not_finished; |
799 | cpi = cls; | ||
800 | cpi->exp_subround_finished = GNUNET_YES; | ||
801 | /* the subround is only really over if *both* partners are done */ | ||
802 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); | ||
803 | not_finished = 0; | 818 | not_finished = 0; |
804 | if ((cpi->session->partner_outgoing != NULL) && (cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO)) | 819 | if ((session->partner_outgoing != NULL) && (session->partner_outgoing->exp_subround_finished == GNUNET_NO)) |
805 | not_finished++; | 820 | not_finished++; |
806 | if ((cpi->session->partner_incoming != NULL) && (cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO)) | 821 | if ((session->partner_incoming != NULL) && (session->partner_incoming->exp_subround_finished == GNUNET_NO)) |
807 | not_finished++; | 822 | not_finished++; |
808 | if (0 == not_finished) | 823 | if (0 == not_finished) |
809 | subround_over (cpi->session, NULL); | 824 | return GNUNET_YES; |
825 | return GNUNET_NO; | ||
826 | } | ||
827 | |||
828 | static int | ||
829 | inventory_round_finished (struct ConsensusSession *session) | ||
830 | { | ||
831 | int i; | ||
832 | int finished; | ||
833 | finished = 0; | ||
834 | for (i = 0; i < session->num_peers; i++) | ||
835 | if (GNUNET_YES == session->info[i].inventory_synced) | ||
836 | finished++; | ||
837 | if (finished >= (session->num_peers / 2)) | ||
838 | return GNUNET_YES; | ||
839 | return GNUNET_NO; | ||
840 | } | ||
841 | |||
842 | |||
843 | |||
844 | static void | ||
845 | fin_sent_cb (void *cls) | ||
846 | { | ||
847 | struct ConsensusPeerInformation *cpi; | ||
848 | cpi = cls; | ||
849 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); | ||
850 | switch (cpi->session->current_round) | ||
851 | { | ||
852 | case CONSENSUS_ROUND_EXCHANGE: | ||
853 | case CONSENSUS_ROUND_STOCK: | ||
854 | /* the subround is only really over if *both* partners are done */ | ||
855 | if (cpi->session->current_round != cpi->apparent_round) | ||
856 | { | ||
857 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx); | ||
858 | break; | ||
859 | } | ||
860 | cpi->exp_subround_finished = GNUNET_YES; | ||
861 | if (GNUNET_YES == exp_subround_finished (cpi->session)) | ||
862 | subround_over (cpi->session, NULL); | ||
863 | else | ||
864 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx); | ||
865 | break; | ||
866 | case CONSENSUS_ROUND_INVENTORY: | ||
867 | cpi->inventory_synced = GNUNET_YES; | ||
868 | if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round) | ||
869 | round_over (cpi->session, NULL); | ||
870 | /* FIXME: maybe go to next round */ | ||
871 | break; | ||
872 | default: | ||
873 | GNUNET_break (0); | ||
874 | } | ||
810 | } | 875 | } |
811 | 876 | ||
812 | 877 | ||
@@ -817,17 +882,23 @@ fin_sent_cb (void *cls) | |||
817 | static int | 882 | static int |
818 | handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | 883 | handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) |
819 | { | 884 | { |
820 | struct GNUNET_MessageHeader *fin_msg; | 885 | struct ConsensusRoundMessage *fin_msg; |
886 | |||
821 | switch (cpi->session->current_round) | 887 | switch (cpi->session->current_round) |
822 | { | 888 | { |
889 | case CONSENSUS_ROUND_INVENTORY: | ||
890 | cpi->inventory_synced = GNUNET_YES; | ||
891 | case CONSENSUS_ROUND_STOCK: | ||
823 | case CONSENSUS_ROUND_EXCHANGE: | 892 | case CONSENSUS_ROUND_EXCHANGE: |
824 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | 893 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); |
825 | fin_msg = GNUNET_malloc (sizeof *fin_msg); | 894 | fin_msg = GNUNET_malloc (sizeof *fin_msg); |
826 | fin_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); | 895 | fin_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); |
827 | fin_msg->size = htons (sizeof *fin_msg); | 896 | fin_msg->header.size = htons (sizeof *fin_msg); |
897 | fin_msg->round = cpi->apparent_round; | ||
828 | /* the subround os over once we kicked off sending the fin msg */ | 898 | /* the subround os over once we kicked off sending the fin msg */ |
829 | /* FIXME: assert we are talking to the right peer! */ | 899 | /* FIXME: assert we are talking to the right peer! */ |
830 | queue_peer_message_with_cls (cpi, fin_msg, fin_sent_cb, cpi); | 900 | queue_peer_message_with_cls (cpi, (struct GNUNET_MessageHeader *) fin_msg, fin_sent_cb, cpi); |
901 | /* FIXME: mark peer as synced */ | ||
831 | break; | 902 | break; |
832 | default: | 903 | default: |
833 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); | 904 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); |
@@ -836,30 +907,40 @@ handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_Mes | |||
836 | return GNUNET_YES; | 907 | return GNUNET_YES; |
837 | } | 908 | } |
838 | 909 | ||
910 | |||
839 | /** | 911 | /** |
840 | * The other peer wants us to inform that he sent us all the elements we requested. | 912 | * The other peer wants us to inform that he sent us all the elements we requested. |
841 | */ | 913 | */ |
842 | static int | 914 | static int |
843 | handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | 915 | handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) |
844 | { | 916 | { |
917 | struct ConsensusRoundMessage *round_msg; | ||
918 | round_msg = (struct ConsensusRoundMessage *) msg; | ||
845 | /* FIXME: only call subround_over if round is the current one! */ | 919 | /* FIXME: only call subround_over if round is the current one! */ |
846 | switch (cpi->session->current_round) | 920 | switch (cpi->session->current_round) |
847 | { | 921 | { |
848 | case CONSENSUS_ROUND_EXCHANGE: | 922 | case CONSENSUS_ROUND_EXCHANGE: |
849 | { | 923 | case CONSENSUS_ROUND_STOCK: |
850 | int not_finished; | 924 | if (cpi->session->current_round != round_msg->round) |
925 | { | ||
926 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
927 | cpi->ibf_state = IBF_STATE_NONE; | ||
928 | cpi->ibf_bucket_counter = 0; | ||
929 | break; | ||
930 | } | ||
931 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
851 | cpi->exp_subround_finished = GNUNET_YES; | 932 | cpi->exp_subround_finished = GNUNET_YES; |
852 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | 933 | if (GNUNET_YES == exp_subround_finished (cpi->session)) |
853 | /* the subround is only really over if *both* partners are done */ | ||
854 | not_finished = 0; | ||
855 | if ((cpi->session->partner_outgoing != NULL) && (cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO)) | ||
856 | not_finished++; | ||
857 | if ((cpi->session->partner_incoming != NULL) && (cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO)) | ||
858 | not_finished++; | ||
859 | if (0 == not_finished) | ||
860 | subround_over (cpi->session, NULL); | 934 | subround_over (cpi->session, NULL); |
861 | } | 935 | else |
936 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx); | ||
862 | break; | 937 | break; |
938 | case CONSENSUS_ROUND_INVENTORY: | ||
939 | cpi->inventory_synced = GNUNET_YES; | ||
940 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
941 | if (inventory_round_finished (cpi->session)) | ||
942 | round_over (cpi->session, NULL); | ||
943 | break; | ||
863 | default: | 944 | default: |
864 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); | 945 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); |
865 | break; | 946 | break; |
@@ -884,6 +965,38 @@ strata_estimator_create () | |||
884 | return se; | 965 | return se; |
885 | } | 966 | } |
886 | 967 | ||
968 | static void | ||
969 | strata_estimator_destroy (struct StrataEstimator *se) | ||
970 | { | ||
971 | int i; | ||
972 | for (i = 0; i < STRATA_COUNT; i++) | ||
973 | ibf_destroy (se->strata[i]); | ||
974 | GNUNET_free (se->strata); | ||
975 | GNUNET_free (se); | ||
976 | } | ||
977 | |||
978 | |||
979 | static int | ||
980 | is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg) | ||
981 | { | ||
982 | switch (strata_msg->round) | ||
983 | { | ||
984 | case CONSENSUS_ROUND_STOCK: | ||
985 | case CONSENSUS_ROUND_EXCHANGE: | ||
986 | /* here, we also have to compare subrounds */ | ||
987 | if ( (strata_msg->round != session->current_round) || | ||
988 | (strata_msg->exp_round != session->exp_round) || | ||
989 | (strata_msg->exp_subround != session->exp_subround)) | ||
990 | return GNUNET_YES; | ||
991 | break; | ||
992 | default: | ||
993 | if (session->current_round != strata_msg->round) | ||
994 | return GNUNET_YES; | ||
995 | break; | ||
996 | } | ||
997 | return GNUNET_NO; | ||
998 | } | ||
999 | |||
887 | 1000 | ||
888 | /** | 1001 | /** |
889 | * Called when a peer sends us its strata estimator. | 1002 | * Called when a peer sends us its strata estimator. |
@@ -900,32 +1013,28 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
900 | void *buf; | 1013 | void *buf; |
901 | size_t size; | 1014 | size_t size; |
902 | 1015 | ||
903 | 1016 | if ((cpi->session->current_round == CONSENSUS_ROUND_STOCK) && (strata_msg->round == CONSENSUS_ROUND_INVENTORY)) | |
904 | |||
905 | switch (cpi->session->current_round) | ||
906 | { | 1017 | { |
907 | case CONSENSUS_ROUND_EXCHANGE: | 1018 | /* we still have to handle this request appropriately */ |
908 | if ( (strata_msg->round != CONSENSUS_ROUND_EXCHANGE) || | 1019 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n", |
909 | (strata_msg->exp_round != cpi->session->exp_round) || | 1020 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); |
910 | (strata_msg->exp_subround != cpi->session->exp_subround)) | 1021 | } |
911 | { | 1022 | else if (is_premature_strata_message (cpi->session, strata_msg)) |
912 | if (GNUNET_NO == cpi->replaying_strata_message) | 1023 | { |
913 | { | 1024 | if (GNUNET_NO == cpi->replaying_strata_message) |
914 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n", | 1025 | { |
915 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround); | 1026 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n", |
916 | cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg); | 1027 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround); |
917 | } | 1028 | cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg); |
918 | return GNUNET_YES; | 1029 | } |
919 | } | 1030 | return GNUNET_YES; |
920 | break; | ||
921 | default: | ||
922 | GNUNET_assert (0); | ||
923 | break; | ||
924 | } | 1031 | } |
925 | 1032 | ||
926 | if (NULL == cpi->se) | 1033 | if (NULL == cpi->se) |
927 | cpi->se = strata_estimator_create (); | 1034 | cpi->se = strata_estimator_create (); |
928 | 1035 | ||
1036 | cpi->apparent_round = strata_msg->round; | ||
1037 | |||
929 | size = ntohs (strata_msg->header.size); | 1038 | size = ntohs (strata_msg->header.size); |
930 | buf = (void *) &strata_msg[1]; | 1039 | buf = (void *) &strata_msg[1]; |
931 | for (i = 0; i < STRATA_COUNT; i++) | 1040 | for (i = 0; i < STRATA_COUNT; i++) |
@@ -937,25 +1046,32 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
937 | 1046 | ||
938 | diff = estimate_difference (cpi->session->se, cpi->se); | 1047 | diff = estimate_difference (cpi->session->se, cpi->se); |
939 | 1048 | ||
940 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d\n", | 1049 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n", |
941 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | 1050 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff); |
942 | 1051 | ||
943 | if ( (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) || | 1052 | switch (cpi->session->current_round) |
944 | (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)) | ||
945 | { | 1053 | { |
946 | /* send IBF of the right size */ | 1054 | case CONSENSUS_ROUND_EXCHANGE: |
947 | cpi->ibf_order = 0; | 1055 | case CONSENSUS_ROUND_INVENTORY: |
948 | while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) ) | 1056 | case CONSENSUS_ROUND_STOCK: |
949 | cpi->ibf_order++; | 1057 | /* send IBF of the right size */ |
950 | if (cpi->ibf_order > MAX_IBF_ORDER) | 1058 | cpi->ibf_order = 0; |
951 | cpi->ibf_order = MAX_IBF_ORDER; | 1059 | while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) ) |
952 | cpi->ibf_order += 1; | 1060 | cpi->ibf_order++; |
953 | /* create ibf if not already pre-computed */ | 1061 | if (cpi->ibf_order > MAX_IBF_ORDER) |
954 | prepare_ibf (cpi); | 1062 | cpi->ibf_order = MAX_IBF_ORDER; |
955 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | 1063 | cpi->ibf_order += 1; |
956 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | 1064 | /* create ibf if not already pre-computed */ |
957 | cpi->ibf_bucket_counter = 0; | 1065 | prepare_ibf (cpi); |
958 | send_ibf (cpi); | 1066 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); |
1067 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | ||
1068 | cpi->ibf_bucket_counter = 0; | ||
1069 | send_ibf (cpi); | ||
1070 | break; | ||
1071 | default: | ||
1072 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n", | ||
1073 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1074 | break; | ||
959 | } | 1075 | } |
960 | return GNUNET_YES; | 1076 | return GNUNET_YES; |
961 | } | 1077 | } |
@@ -973,7 +1089,7 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig | |||
973 | switch (cpi->ibf_state) | 1089 | switch (cpi->ibf_state) |
974 | { | 1090 | { |
975 | case IBF_STATE_NONE: | 1091 | case IBF_STATE_NONE: |
976 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving first ibf of order %d\n", digest->order); | 1092 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); |
977 | cpi->ibf_state = IBF_STATE_RECEIVING; | 1093 | cpi->ibf_state = IBF_STATE_RECEIVING; |
978 | cpi->ibf_order = digest->order; | 1094 | cpi->ibf_order = digest->order; |
979 | cpi->ibf_bucket_counter = 0; | 1095 | cpi->ibf_bucket_counter = 0; |
@@ -984,7 +1100,8 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig | |||
984 | } | 1100 | } |
985 | break; | 1101 | break; |
986 | case IBF_STATE_ANTICIPATE_DIFF: | 1102 | case IBF_STATE_ANTICIPATE_DIFF: |
987 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving decode fail ibf of order %d\n", digest->order); | 1103 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d (probably out IBF did not decode)\n", |
1104 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
988 | cpi->ibf_state = IBF_STATE_RECEIVING; | 1105 | cpi->ibf_state = IBF_STATE_RECEIVING; |
989 | cpi->ibf_order = digest->order; | 1106 | cpi->ibf_order = digest->order; |
990 | cpi->ibf_bucket_counter = 0; | 1107 | cpi->ibf_bucket_counter = 0; |
@@ -997,18 +1114,16 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig | |||
997 | case IBF_STATE_RECEIVING: | 1114 | case IBF_STATE_RECEIVING: |
998 | break; | 1115 | break; |
999 | default: | 1116 | default: |
1000 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "received ibf unexpectedly in state %d\n", cpi->ibf_state); | 1117 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); |
1001 | return GNUNET_YES; | 1118 | return GNUNET_YES; |
1002 | } | 1119 | } |
1003 | 1120 | ||
1004 | if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) | 1121 | if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) |
1005 | { | 1122 | { |
1006 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n"); | 1123 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); |
1007 | return GNUNET_YES; | 1124 | return GNUNET_YES; |
1008 | } | 1125 | } |
1009 | 1126 | ||
1010 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, | ||
1011 | cpi->ibf_bucket_counter, (1 << cpi->ibf_order)); | ||
1012 | 1127 | ||
1013 | if (NULL == cpi->ibf) | 1128 | if (NULL == cpi->ibf) |
1014 | cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); | 1129 | cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); |
@@ -1041,6 +1156,17 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me | |||
1041 | struct GNUNET_CONSENSUS_ElementMessage *client_element_msg; | 1156 | struct GNUNET_CONSENSUS_ElementMessage *client_element_msg; |
1042 | size_t size; | 1157 | size_t size; |
1043 | 1158 | ||
1159 | switch (cpi->session->current_round) | ||
1160 | { | ||
1161 | case CONSENSUS_ROUND_STOCK: | ||
1162 | /* FIXME: check if we really expect the element */ | ||
1163 | case CONSENSUS_ROUND_EXCHANGE: | ||
1164 | break; | ||
1165 | default: | ||
1166 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, ignoring\n"); | ||
1167 | return GNUNET_YES; | ||
1168 | } | ||
1169 | |||
1044 | size = ntohs (element_msg->size) - sizeof *element_msg; | 1170 | size = ntohs (element_msg->size) - sizeof *element_msg; |
1045 | 1171 | ||
1046 | element = GNUNET_malloc (size + sizeof *element); | 1172 | element = GNUNET_malloc (size + sizeof *element); |
@@ -1069,7 +1195,9 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me | |||
1069 | 1195 | ||
1070 | /** | 1196 | /** |
1071 | * Handle a request for elements. | 1197 | * Handle a request for elements. |
1072 | * Only allowed in exchange-rounds. | 1198 | * |
1199 | * @param cpi peer that is requesting the element | ||
1200 | * @param msg the element request message | ||
1073 | */ | 1201 | */ |
1074 | static int | 1202 | static int |
1075 | handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) | 1203 | handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) |
@@ -1078,14 +1206,18 @@ handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct E | |||
1078 | struct IBF_Key *ibf_key; | 1206 | struct IBF_Key *ibf_key; |
1079 | unsigned int num; | 1207 | unsigned int num; |
1080 | 1208 | ||
1209 | /* element requests are allowed in every round */ | ||
1210 | |||
1081 | num = ntohs (msg->header.size) / sizeof (struct IBF_Key); | 1211 | num = ntohs (msg->header.size) / sizeof (struct IBF_Key); |
1082 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num); | 1212 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num); |
1083 | 1213 | ||
1084 | ibf_key = (struct IBF_Key *) &msg[1]; | 1214 | ibf_key = (struct IBF_Key *) &msg[1]; |
1085 | while (num--) | 1215 | while (num--) |
1086 | { | 1216 | { |
1217 | struct ElementList *head; | ||
1087 | ibf_hashcode_from_key (*ibf_key, &hashcode); | 1218 | ibf_hashcode_from_key (*ibf_key, &hashcode); |
1088 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi); | 1219 | head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); |
1220 | send_elements (cpi, head); | ||
1089 | ibf_key++; | 1221 | ibf_key++; |
1090 | } | 1222 | } |
1091 | return GNUNET_YES; | 1223 | return GNUNET_YES; |
@@ -1162,10 +1294,12 @@ send_strata_estimator (struct ConsensusPeerInformation *cpi) | |||
1162 | size_t msize; | 1294 | size_t msize; |
1163 | int i; | 1295 | int i; |
1164 | 1296 | ||
1297 | cpi->apparent_round = cpi->session->current_round; | ||
1298 | cpi->ibf_state = IBF_STATE_NONE; | ||
1299 | cpi->ibf_bucket_counter = 0; | ||
1165 | 1300 | ||
1166 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE to P%d\n", | 1301 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE(%d) to P%d\n", |
1167 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | 1302 | cpi->session->local_peer_idx, cpi->session->current_round, (int) (cpi - cpi->session->info)); |
1168 | |||
1169 | 1303 | ||
1170 | msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); | 1304 | msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); |
1171 | 1305 | ||
@@ -1185,21 +1319,20 @@ send_strata_estimator (struct ConsensusPeerInformation *cpi) | |||
1185 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg); | 1319 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg); |
1186 | } | 1320 | } |
1187 | 1321 | ||
1322 | |||
1188 | /** | 1323 | /** |
1189 | * Send an IBF of the order specified in cpi | 1324 | * Send an IBF of the order specified in cpi. |
1190 | * | 1325 | * |
1191 | * @param cpi the peer | 1326 | * @param cpi the peer |
1192 | */ | 1327 | */ |
1193 | static void | 1328 | static void |
1194 | send_ibf (struct ConsensusPeerInformation *cpi) | 1329 | send_ibf (struct ConsensusPeerInformation *cpi) |
1195 | { | 1330 | { |
1196 | int sent_buckets; | ||
1197 | |||
1198 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", | 1331 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", |
1199 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | 1332 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); |
1200 | 1333 | ||
1201 | sent_buckets = 0; | 1334 | cpi->ibf_bucket_counter = 0; |
1202 | while (sent_buckets < (1 << cpi->ibf_order)) | 1335 | while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order)) |
1203 | { | 1336 | { |
1204 | int num_buckets; | 1337 | int num_buckets; |
1205 | void *buf; | 1338 | void *buf; |
@@ -1223,12 +1356,18 @@ send_ibf (struct ConsensusPeerInformation *cpi) | |||
1223 | 1356 | ||
1224 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest); | 1357 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest); |
1225 | 1358 | ||
1226 | sent_buckets += num_buckets; | 1359 | cpi->ibf_bucket_counter += num_buckets; |
1227 | } | 1360 | } |
1361 | cpi->ibf_bucket_counter = 0; | ||
1228 | cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; | 1362 | cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; |
1229 | } | 1363 | } |
1230 | 1364 | ||
1231 | 1365 | ||
1366 | /** | ||
1367 | * Decode the current diff ibf, and send elements/requests/reports/ | ||
1368 | * | ||
1369 | * @param cpi partner peer | ||
1370 | */ | ||
1232 | static void | 1371 | static void |
1233 | decode (struct ConsensusPeerInformation *cpi) | 1372 | decode (struct ConsensusPeerInformation *cpi) |
1234 | { | 1373 | { |
@@ -1236,11 +1375,12 @@ decode (struct ConsensusPeerInformation *cpi) | |||
1236 | struct GNUNET_HashCode hashcode; | 1375 | struct GNUNET_HashCode hashcode; |
1237 | int side; | 1376 | int side; |
1238 | 1377 | ||
1239 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n"); | 1378 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); |
1240 | 1379 | ||
1241 | for (;;) | 1380 | for (;;) |
1242 | { | 1381 | { |
1243 | int res; | 1382 | int res; |
1383 | |||
1244 | res = ibf_decode (cpi->ibf, &side, &key); | 1384 | res = ibf_decode (cpi->ibf, &side, &key); |
1245 | if (GNUNET_SYSERR == res) | 1385 | if (GNUNET_SYSERR == res) |
1246 | { | 1386 | { |
@@ -1256,19 +1396,22 @@ decode (struct ConsensusPeerInformation *cpi) | |||
1256 | } | 1396 | } |
1257 | if (GNUNET_NO == res) | 1397 | if (GNUNET_NO == res) |
1258 | { | 1398 | { |
1259 | struct GNUNET_MessageHeader *msg; | 1399 | struct ConsensusRoundMessage *msg; |
1260 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); | 1400 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); |
1261 | msg = GNUNET_malloc (sizeof *msg); | 1401 | msg = GNUNET_malloc (sizeof *msg); |
1262 | msg->size = htons (sizeof *msg); | 1402 | msg->header.size = htons (sizeof *msg); |
1263 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); | 1403 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); |
1264 | queue_peer_message (cpi, msg); | 1404 | msg->round = cpi->apparent_round; |
1405 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg); | ||
1265 | return; | 1406 | return; |
1266 | } | 1407 | } |
1267 | if (-1 == side) | 1408 | if (-1 == side) |
1268 | { | 1409 | { |
1410 | struct ElementList *head; | ||
1269 | /* we have the element(s), send it to the other peer */ | 1411 | /* we have the element(s), send it to the other peer */ |
1270 | ibf_hashcode_from_key (key, &hashcode); | 1412 | ibf_hashcode_from_key (key, &hashcode); |
1271 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi); | 1413 | head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); |
1414 | send_elements (cpi, head); | ||
1272 | } | 1415 | } |
1273 | else | 1416 | else |
1274 | { | 1417 | { |
@@ -1278,17 +1421,18 @@ decode (struct ConsensusPeerInformation *cpi) | |||
1278 | 1421 | ||
1279 | msize = (sizeof *msg) + sizeof (struct IBF_Key); | 1422 | msize = (sizeof *msg) + sizeof (struct IBF_Key); |
1280 | msg = GNUNET_malloc (msize); | 1423 | msg = GNUNET_malloc (msize); |
1281 | if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) | 1424 | switch (cpi->apparent_round) |
1282 | { | ||
1283 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); | ||
1284 | } | ||
1285 | else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round) | ||
1286 | { | ||
1287 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); | ||
1288 | } | ||
1289 | else | ||
1290 | { | 1425 | { |
1291 | GNUNET_assert (0); | 1426 | case CONSENSUS_ROUND_STOCK: |
1427 | /* FIXME: check if we really want to request the element */ | ||
1428 | case CONSENSUS_ROUND_EXCHANGE: | ||
1429 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); | ||
1430 | break; | ||
1431 | case CONSENSUS_ROUND_INVENTORY: | ||
1432 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); | ||
1433 | break; | ||
1434 | default: | ||
1435 | GNUNET_assert (0); | ||
1292 | } | 1436 | } |
1293 | msg->header.size = htons (msize); | 1437 | msg->header.size = htons (msize); |
1294 | p = (struct IBF_Key *) &msg[1]; | 1438 | p = (struct IBF_Key *) &msg[1]; |
@@ -1308,7 +1452,6 @@ decode (struct ConsensusPeerInformation *cpi) | |||
1308 | * @param cls closure | 1452 | * @param cls closure |
1309 | * @param client identification of the client | 1453 | * @param client identification of the client |
1310 | * @param message the actual message | 1454 | * @param message the actual message |
1311 | * | ||
1312 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | 1455 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing |
1313 | */ | 1456 | */ |
1314 | static int | 1457 | static int |
@@ -1403,6 +1546,36 @@ listen_cb (void *cls, | |||
1403 | 1546 | ||
1404 | 1547 | ||
1405 | /** | 1548 | /** |
1549 | * Iterator over hash map entries. | ||
1550 | * | ||
1551 | * @param cls closure | ||
1552 | * @param key current key code | ||
1553 | * @param value value in the hash map | ||
1554 | * @return GNUNET_YES if we should continue to | ||
1555 | * iterate, | ||
1556 | * GNUNET_NO if not. | ||
1557 | */ | ||
1558 | static int | ||
1559 | destroy_element_list_iter (void *cls, | ||
1560 | const struct GNUNET_HashCode * key, | ||
1561 | void *value) | ||
1562 | { | ||
1563 | struct ElementList *el; | ||
1564 | el = value; | ||
1565 | while (NULL != el) | ||
1566 | { | ||
1567 | struct ElementList *el_old; | ||
1568 | el_old = el; | ||
1569 | el = el->next; | ||
1570 | GNUNET_free (el_old->element_hash); | ||
1571 | GNUNET_free (el_old->element); | ||
1572 | GNUNET_free (el_old); | ||
1573 | } | ||
1574 | return GNUNET_YES; | ||
1575 | } | ||
1576 | |||
1577 | |||
1578 | /** | ||
1406 | * Destroy a session, free all resources associated with it. | 1579 | * Destroy a session, free all resources associated with it. |
1407 | * | 1580 | * |
1408 | * @param session the session to destroy | 1581 | * @param session the session to destroy |
@@ -1410,9 +1583,80 @@ listen_cb (void *cls, | |||
1410 | static void | 1583 | static void |
1411 | destroy_session (struct ConsensusSession *session) | 1584 | destroy_session (struct ConsensusSession *session) |
1412 | { | 1585 | { |
1413 | /* FIXME: more stuff to free! */ | 1586 | int i; |
1587 | |||
1414 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); | 1588 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); |
1415 | GNUNET_SERVER_client_drop (session->client); | 1589 | GNUNET_SERVER_client_drop (session->client); |
1590 | session->client = NULL; | ||
1591 | if (NULL != session->shuffle) | ||
1592 | { | ||
1593 | GNUNET_free (session->shuffle); | ||
1594 | session->shuffle = NULL; | ||
1595 | } | ||
1596 | if (NULL != session->se) | ||
1597 | { | ||
1598 | strata_estimator_destroy (session->se); | ||
1599 | session->se = NULL; | ||
1600 | } | ||
1601 | if (NULL != session->info) | ||
1602 | { | ||
1603 | for (i = 0; i < session->num_peers; i++) | ||
1604 | { | ||
1605 | struct ConsensusPeerInformation *cpi; | ||
1606 | cpi = &session->info[i]; | ||
1607 | if ((NULL != cpi) && (NULL != cpi->socket)) | ||
1608 | { | ||
1609 | if (NULL != cpi->rh) | ||
1610 | { | ||
1611 | GNUNET_STREAM_read_cancel (cpi->rh); | ||
1612 | cpi->rh = NULL; | ||
1613 | } | ||
1614 | if (NULL != cpi->wh) | ||
1615 | { | ||
1616 | GNUNET_STREAM_write_cancel (cpi->wh); | ||
1617 | cpi->wh = NULL; | ||
1618 | } | ||
1619 | GNUNET_STREAM_close (cpi->socket); | ||
1620 | cpi->socket = NULL; | ||
1621 | } | ||
1622 | if (NULL != cpi->se) | ||
1623 | { | ||
1624 | strata_estimator_destroy (cpi->se); | ||
1625 | cpi->se = NULL; | ||
1626 | } | ||
1627 | if (NULL != cpi->ibf) | ||
1628 | { | ||
1629 | ibf_destroy (cpi->ibf); | ||
1630 | cpi->ibf = NULL; | ||
1631 | } | ||
1632 | if (NULL != cpi->mst) | ||
1633 | { | ||
1634 | GNUNET_SERVER_mst_destroy (cpi->mst); | ||
1635 | cpi->mst = NULL; | ||
1636 | } | ||
1637 | } | ||
1638 | GNUNET_free (session->info); | ||
1639 | session->info = NULL; | ||
1640 | } | ||
1641 | if (NULL != session->ibfs) | ||
1642 | { | ||
1643 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
1644 | { | ||
1645 | if (NULL != session->ibfs[i]) | ||
1646 | { | ||
1647 | ibf_destroy (session->ibfs[i]); | ||
1648 | session->ibfs[i] = NULL; | ||
1649 | } | ||
1650 | } | ||
1651 | GNUNET_free (session->ibfs); | ||
1652 | session->ibfs = NULL; | ||
1653 | } | ||
1654 | if (NULL != session->values) | ||
1655 | { | ||
1656 | GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_list_iter, NULL); | ||
1657 | GNUNET_CONTAINER_multihashmap_destroy (session->values); | ||
1658 | session->values = NULL; | ||
1659 | } | ||
1416 | GNUNET_free (session); | 1660 | GNUNET_free (session); |
1417 | } | 1661 | } |
1418 | 1662 | ||
@@ -1448,6 +1692,7 @@ disconnect_client (struct GNUNET_SERVER_Client *client) | |||
1448 | * Thus, if the local id of two consensus sessions coincide, but are not comprised of | 1692 | * Thus, if the local id of two consensus sessions coincide, but are not comprised of |
1449 | * exactly the same peers, the global id will be different. | 1693 | * exactly the same peers, the global id will be different. |
1450 | * | 1694 | * |
1695 | * @param session session to generate the global id for | ||
1451 | * @param session_id local id of the consensus session | 1696 | * @param session_id local id of the consensus session |
1452 | */ | 1697 | */ |
1453 | static void | 1698 | static void |
@@ -1511,9 +1756,9 @@ transmit_queued (void *cls, size_t size, | |||
1511 | 1756 | ||
1512 | 1757 | ||
1513 | /** | 1758 | /** |
1514 | * Schedule transmitting the next queued message (if any) to a client. | 1759 | * Schedule transmitting the next queued message (if any) to the inhabiting client of a session. |
1515 | * | 1760 | * |
1516 | * @param cli the client to send the next message to | 1761 | * @param session the consensus session |
1517 | */ | 1762 | */ |
1518 | static void | 1763 | static void |
1519 | client_send_next (struct ConsensusSession *session) | 1764 | client_send_next (struct ConsensusSession *session) |
@@ -1545,9 +1790,9 @@ client_send_next (struct ConsensusSession *session) | |||
1545 | * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2. | 1790 | * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2. |
1546 | */ | 1791 | */ |
1547 | static int | 1792 | static int |
1548 | hash_cmp (const void *a, const void *b) | 1793 | hash_cmp (const void *h1, const void *h2) |
1549 | { | 1794 | { |
1550 | return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct GNUNET_HashCode *) b); | 1795 | return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct GNUNET_HashCode *) h2); |
1551 | } | 1796 | } |
1552 | 1797 | ||
1553 | 1798 | ||
@@ -1607,6 +1852,7 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) | |||
1607 | cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); | 1852 | cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); |
1608 | cpi->wh = | 1853 | cpi->wh = |
1609 | GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); | 1854 | GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); |
1855 | GNUNET_free (hello); | ||
1610 | cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | 1856 | cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, |
1611 | &session_stream_data_processor, cpi); | 1857 | &session_stream_data_processor, cpi); |
1612 | } | 1858 | } |
@@ -1821,12 +2067,52 @@ hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret) | |||
1821 | } | 2067 | } |
1822 | 2068 | ||
1823 | 2069 | ||
2070 | static void | ||
2071 | insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element) | ||
2072 | { | ||
2073 | struct GNUNET_HashCode hash; | ||
2074 | struct ElementList *head; | ||
2075 | |||
2076 | hash_for_ibf (element->data, element->size, &hash); | ||
2077 | |||
2078 | head = GNUNET_CONTAINER_multihashmap_get (session->values, &hash); | ||
2079 | |||
2080 | if (NULL == head) | ||
2081 | { | ||
2082 | int i; | ||
2083 | |||
2084 | head = GNUNET_malloc (sizeof *head); | ||
2085 | head->element = element; | ||
2086 | head->next = NULL; | ||
2087 | head->element_hash = GNUNET_memdup (&hash, sizeof hash); | ||
2088 | GNUNET_CONTAINER_multihashmap_put (session->values, &hash, head, | ||
2089 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
2090 | strata_estimator_insert (session->se, &hash); | ||
2091 | |||
2092 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
2093 | if (NULL != session->ibfs[i]) | ||
2094 | ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&hash)); | ||
2095 | } | ||
2096 | else | ||
2097 | { | ||
2098 | struct ElementList *el; | ||
2099 | el = GNUNET_malloc (sizeof *el); | ||
2100 | head->element = element; | ||
2101 | head->next = NULL; | ||
2102 | head->element_hash = GNUNET_memdup (&hash, sizeof hash); | ||
2103 | while (NULL != head->next) | ||
2104 | head = head->next; | ||
2105 | head->next = el; | ||
2106 | } | ||
2107 | } | ||
2108 | |||
2109 | |||
1824 | /** | 2110 | /** |
1825 | * Called when a client performs an insert operation. | 2111 | * Called when a client performs an insert operation. |
1826 | * | 2112 | * |
1827 | * @param cls (unused) | 2113 | * @param cls (unused) |
1828 | * @param client client handle | 2114 | * @param client client handle |
1829 | * @param message message sent by the client | 2115 | * @param m message sent by the client |
1830 | */ | 2116 | */ |
1831 | void | 2117 | void |
1832 | client_insert (void *cls, | 2118 | client_insert (void *cls, |
@@ -1836,7 +2122,6 @@ client_insert (void *cls, | |||
1836 | struct ConsensusSession *session; | 2122 | struct ConsensusSession *session; |
1837 | struct GNUNET_CONSENSUS_ElementMessage *msg; | 2123 | struct GNUNET_CONSENSUS_ElementMessage *msg; |
1838 | struct GNUNET_CONSENSUS_Element *element; | 2124 | struct GNUNET_CONSENSUS_Element *element; |
1839 | struct GNUNET_HashCode hash; | ||
1840 | int element_size; | 2125 | int element_size; |
1841 | 2126 | ||
1842 | session = sessions_head; | 2127 | session = sessions_head; |
@@ -1865,12 +2150,7 @@ client_insert (void *cls, | |||
1865 | 2150 | ||
1866 | GNUNET_assert (NULL != element->data); | 2151 | GNUNET_assert (NULL != element->data); |
1867 | 2152 | ||
1868 | hash_for_ibf (element->data, element_size, &hash); | 2153 | insert_element (session, element); |
1869 | |||
1870 | GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element, | ||
1871 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1872 | |||
1873 | strata_estimator_insert (session->se, &hash); | ||
1874 | 2154 | ||
1875 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2155 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1876 | 2156 | ||
@@ -1951,18 +2231,41 @@ find_partners (struct ConsensusSession *session) | |||
1951 | { | 2231 | { |
1952 | GNUNET_assert (NULL == session->partner_outgoing); | 2232 | GNUNET_assert (NULL == session->partner_outgoing); |
1953 | session->partner_outgoing = &session->info[session->shuffle[arc]]; | 2233 | session->partner_outgoing = &session->info[session->shuffle[arc]]; |
2234 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | ||
1954 | } | 2235 | } |
1955 | if (arc == session->local_peer_idx) | 2236 | if (arc == session->local_peer_idx) |
1956 | { | 2237 | { |
1957 | GNUNET_assert (NULL == session->partner_incoming); | 2238 | GNUNET_assert (NULL == session->partner_incoming); |
1958 | session->partner_incoming = &session->info[session->shuffle[i]]; | 2239 | session->partner_incoming = &session->info[session->shuffle[i]]; |
2240 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
1959 | } | 2241 | } |
1960 | } | 2242 | } |
1961 | } | 2243 | } |
1962 | 2244 | ||
1963 | 2245 | ||
2246 | static void | ||
2247 | replay_premature_message (struct ConsensusPeerInformation *cpi) | ||
2248 | { | ||
2249 | if (NULL != cpi->premature_strata_message) | ||
2250 | { | ||
2251 | struct StrataMessage *sm; | ||
2252 | |||
2253 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); | ||
2254 | sm = cpi->premature_strata_message; | ||
2255 | cpi->premature_strata_message = NULL; | ||
2256 | |||
2257 | cpi->replaying_strata_message = GNUNET_YES; | ||
2258 | handle_p2p_strata (cpi, sm); | ||
2259 | cpi->replaying_strata_message = GNUNET_NO; | ||
2260 | |||
2261 | GNUNET_free (sm); | ||
2262 | } | ||
2263 | } | ||
2264 | |||
2265 | |||
1964 | /** | 2266 | /** |
1965 | * Do the next subround in the exp-scheme. | 2267 | * Do the next subround in the exp-scheme. |
2268 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | ||
1966 | * | 2269 | * |
1967 | * @param cls the session | 2270 | * @param cls the session |
1968 | * @param tc task context, for when this task is invoked by the scheduler, | 2271 | * @param tc task context, for when this task is invoked by the scheduler, |
@@ -1977,35 +2280,29 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
1977 | /* don't kick off next subround if we're shutting down */ | 2280 | /* don't kick off next subround if we're shutting down */ |
1978 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | 2281 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) |
1979 | return; | 2282 | return; |
1980 | |||
1981 | session = cls; | 2283 | session = cls; |
1982 | 2284 | /* don't send any messages from the last round */ | |
1983 | 2285 | /* | |
2286 | clear_peer_messages (session->partner_outgoing); | ||
2287 | clear_peer_messages (session->partner_incoming); | ||
1984 | for (i = 0; i < session->num_peers; i++) | 2288 | for (i = 0; i < session->num_peers; i++) |
1985 | clear_peer_messages (&session->info[i]); | 2289 | clear_peer_messages (&session->info[i]); |
1986 | 2290 | */ | |
2291 | /* cancel timeout */ | ||
1987 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | 2292 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) |
1988 | { | ||
1989 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | 2293 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); |
1990 | } | ||
1991 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | 2294 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; |
1992 | 2295 | /* check if we are done with the log phase, 2-peer consensus only does one log round */ | |
1993 | if ((session->num_peers == 2) && (session->exp_round == 1)) | 2296 | if ( (session->exp_round == NUM_EXP_ROUNDS) || |
2297 | ((session->num_peers == 2) && (session->exp_round == 1))) | ||
1994 | { | 2298 | { |
1995 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n"); | 2299 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx); |
1996 | round_over (session, NULL); | 2300 | round_over (session, NULL); |
1997 | return; | 2301 | return; |
1998 | } | 2302 | } |
1999 | |||
2000 | if (session->exp_round == NUM_EXP_ROUNDS) | ||
2001 | { | ||
2002 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n"); | ||
2003 | round_over (session, NULL); | ||
2004 | return; | ||
2005 | } | ||
2006 | |||
2007 | if (session->exp_round == 0) | 2303 | if (session->exp_round == 0) |
2008 | { | 2304 | { |
2305 | /* initialize everything for the log-rounds */ | ||
2009 | session->exp_round = 1; | 2306 | session->exp_round = 1; |
2010 | session->exp_subround = 0; | 2307 | session->exp_subround = 0; |
2011 | if (NULL == session->shuffle) | 2308 | if (NULL == session->shuffle) |
@@ -2015,6 +2312,7 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
2015 | } | 2312 | } |
2016 | else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) | 2313 | else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) |
2017 | { | 2314 | { |
2315 | /* subrounds done, start new log-round */ | ||
2018 | session->exp_round++; | 2316 | session->exp_round++; |
2019 | session->exp_subround = 0; | 2317 | session->exp_subround = 0; |
2020 | shuffle (session); | 2318 | shuffle (session); |
@@ -2026,6 +2324,7 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
2026 | 2324 | ||
2027 | find_partners (session); | 2325 | find_partners (session); |
2028 | 2326 | ||
2327 | #ifdef GNUNET_EXTRA_LOGGING | ||
2029 | { | 2328 | { |
2030 | int in; | 2329 | int in; |
2031 | int out; | 2330 | int out; |
@@ -2040,14 +2339,7 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
2040 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, | 2339 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, |
2041 | session->exp_round, session->exp_subround, in, out); | 2340 | session->exp_round, session->exp_subround, in, out); |
2042 | } | 2341 | } |
2043 | 2342 | #endif /* GNUNET_EXTRA_LOGGING */ | |
2044 | |||
2045 | if (NULL != session->partner_outgoing) | ||
2046 | { | ||
2047 | session->partner_outgoing->ibf_state = IBF_STATE_NONE; | ||
2048 | session->partner_outgoing->ibf_bucket_counter = 0; | ||
2049 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | ||
2050 | } | ||
2051 | 2343 | ||
2052 | if (NULL != session->partner_incoming) | 2344 | if (NULL != session->partner_incoming) |
2053 | { | 2345 | { |
@@ -2056,24 +2348,15 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
2056 | session->partner_incoming->ibf_bucket_counter = 0; | 2348 | session->partner_incoming->ibf_bucket_counter = 0; |
2057 | 2349 | ||
2058 | /* maybe there's an early strata estimator? */ | 2350 | /* maybe there's an early strata estimator? */ |
2059 | if (NULL != session->partner_incoming->premature_strata_message) | 2351 | replay_premature_message (session->partner_incoming); |
2060 | { | ||
2061 | struct StrataMessage *sm; | ||
2062 | |||
2063 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); | ||
2064 | sm = session->partner_incoming->premature_strata_message; | ||
2065 | session->partner_incoming->premature_strata_message = NULL; | ||
2066 | |||
2067 | session->partner_incoming->replaying_strata_message = GNUNET_YES; | ||
2068 | handle_p2p_strata (session->partner_incoming, sm); | ||
2069 | session->partner_incoming->replaying_strata_message = GNUNET_NO; | ||
2070 | |||
2071 | GNUNET_free (sm); | ||
2072 | } | ||
2073 | } | 2352 | } |
2074 | 2353 | ||
2075 | if (NULL != session->partner_outgoing) | 2354 | if (NULL != session->partner_outgoing) |
2076 | { | 2355 | { |
2356 | session->partner_outgoing->ibf_state = IBF_STATE_NONE; | ||
2357 | session->partner_outgoing->ibf_bucket_counter = 0; | ||
2358 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | ||
2359 | |||
2077 | if (NULL == session->partner_outgoing->socket) | 2360 | if (NULL == session->partner_outgoing->socket) |
2078 | { | 2361 | { |
2079 | session->partner_outgoing->socket = | 2362 | session->partner_outgoing->socket = |
@@ -2092,7 +2375,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
2092 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), | 2375 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), |
2093 | subround_over, session); | 2376 | subround_over, session); |
2094 | */ | 2377 | */ |
2095 | |||
2096 | } | 2378 | } |
2097 | 2379 | ||
2098 | static void | 2380 | static void |
@@ -2110,31 +2392,63 @@ contact_peer_a2a (struct ConsensusPeerInformation *cpi) | |||
2110 | } | 2392 | } |
2111 | } | 2393 | } |
2112 | 2394 | ||
2395 | /** | ||
2396 | * Start the inventory round, contact all peers we are supposed to contact. | ||
2397 | * | ||
2398 | * @param session the current session | ||
2399 | */ | ||
2113 | static void | 2400 | static void |
2114 | start_inventory (struct ConsensusSession *session) | 2401 | start_inventory (struct ConsensusSession *session) |
2115 | { | 2402 | { |
2116 | int i; | 2403 | int i; |
2117 | int last; | 2404 | int last; |
2118 | 2405 | ||
2406 | for (i = 0; i < session->num_peers; i++) | ||
2407 | { | ||
2408 | session->info[i].ibf_bucket_counter = 0; | ||
2409 | session->info[i].ibf_state = IBF_STATE_NONE; | ||
2410 | session->info[i].is_outgoing = GNUNET_NO; | ||
2411 | } | ||
2412 | |||
2119 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; | 2413 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; |
2120 | i = (session->local_peer_idx + 1) % session->num_peers; | 2414 | i = (session->local_peer_idx + 1) % session->num_peers; |
2121 | while (i != last) | 2415 | while (i != last) |
2122 | { | 2416 | { |
2417 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); | ||
2123 | contact_peer_a2a (&session->info[i]); | 2418 | contact_peer_a2a (&session->info[i]); |
2419 | session->info[i].is_outgoing = GNUNET_YES; | ||
2124 | i = (i + 1) % session->num_peers; | 2420 | i = (i + 1) % session->num_peers; |
2125 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i); | ||
2126 | } | 2421 | } |
2127 | // tie-breaker for even number of peers | 2422 | // tie-breaker for even number of peers |
2128 | if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) | 2423 | if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) |
2129 | { | 2424 | { |
2425 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); | ||
2426 | session->info[last].is_outgoing = GNUNET_YES; | ||
2130 | contact_peer_a2a (&session->info[last]); | 2427 | contact_peer_a2a (&session->info[last]); |
2131 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last); | 2428 | } |
2429 | |||
2430 | for (i = 0; i < session->num_peers; i++) | ||
2431 | { | ||
2432 | if (GNUNET_NO == session->info[i].is_outgoing) | ||
2433 | replay_premature_message (&session->info[i]); | ||
2132 | } | 2434 | } |
2133 | } | 2435 | } |
2134 | 2436 | ||
2437 | static void | ||
2438 | send_client_conclude_done (struct ConsensusSession *session) | ||
2439 | { | ||
2440 | struct GNUNET_MessageHeader *msg; | ||
2441 | session->current_round = CONSENSUS_ROUND_FINISH; | ||
2442 | msg = GNUNET_malloc (sizeof *msg); | ||
2443 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); | ||
2444 | msg->size = htons (sizeof *msg); | ||
2445 | queue_client_message (session, msg); | ||
2446 | client_send_next (session); | ||
2447 | } | ||
2135 | 2448 | ||
2136 | /** | 2449 | /** |
2137 | * Select and kick off the next round, based on the current round. | 2450 | * Start the next round. |
2451 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | ||
2138 | * | 2452 | * |
2139 | * @param cls the session | 2453 | * @param cls the session |
2140 | * @param tc task context, for when this task is invoked by the scheduler, | 2454 | * @param tc task context, for when this task is invoked by the scheduler, |
@@ -2144,17 +2458,18 @@ static void | |||
2144 | round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 2458 | round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
2145 | { | 2459 | { |
2146 | struct ConsensusSession *session; | 2460 | struct ConsensusSession *session; |
2147 | int i; | ||
2148 | 2461 | ||
2149 | /* don't kick off next round if we're shutting down */ | 2462 | /* don't kick off next round if we're shutting down */ |
2150 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | 2463 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) |
2151 | return; | 2464 | return; |
2152 | 2465 | ||
2153 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n"); | ||
2154 | session = cls; | 2466 | session = cls; |
2467 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); | ||
2155 | 2468 | ||
2469 | /* | ||
2156 | for (i = 0; i < session->num_peers; i++) | 2470 | for (i = 0; i < session->num_peers; i++) |
2157 | clear_peer_messages (&session->info[i]); | 2471 | clear_peer_messages (&session->info[i]); |
2472 | */ | ||
2158 | 2473 | ||
2159 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | 2474 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) |
2160 | { | 2475 | { |
@@ -2162,8 +2477,6 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
2162 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | 2477 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; |
2163 | } | 2478 | } |
2164 | 2479 | ||
2165 | /* FIXME: cancel current round */ | ||
2166 | |||
2167 | switch (session->current_round) | 2480 | switch (session->current_round) |
2168 | { | 2481 | { |
2169 | case CONSENSUS_ROUND_BEGIN: | 2482 | case CONSENSUS_ROUND_BEGIN: |
@@ -2172,31 +2485,24 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
2172 | subround_over (session, NULL); | 2485 | subround_over (session, NULL); |
2173 | break; | 2486 | break; |
2174 | case CONSENSUS_ROUND_EXCHANGE: | 2487 | case CONSENSUS_ROUND_EXCHANGE: |
2175 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx); | 2488 | /* handle two peers specially */ |
2176 | 2489 | if (session->num_peers <= 2) | |
2177 | if (0) | ||
2178 | { | 2490 | { |
2179 | struct GNUNET_MessageHeader *msg; | 2491 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx); |
2180 | msg = GNUNET_malloc (sizeof *msg); | 2492 | send_client_conclude_done (session); |
2181 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); | 2493 | return; |
2182 | msg->size = htons (sizeof *msg); | ||
2183 | queue_client_message (session, msg); | ||
2184 | client_send_next (session); | ||
2185 | } | ||
2186 | |||
2187 | if (0) | ||
2188 | { | ||
2189 | session->current_round = CONSENSUS_ROUND_INVENTORY; | ||
2190 | start_inventory (session); | ||
2191 | } | 2494 | } |
2495 | session->current_round = CONSENSUS_ROUND_INVENTORY; | ||
2496 | start_inventory (session); | ||
2192 | break; | 2497 | break; |
2193 | case CONSENSUS_ROUND_INVENTORY: | 2498 | case CONSENSUS_ROUND_INVENTORY: |
2194 | session->current_round = CONSENSUS_ROUND_STOCK; | 2499 | session->current_round = CONSENSUS_ROUND_STOCK; |
2195 | /* FIXME: exchange stock */ | 2500 | session->exp_round = 0; |
2501 | subround_over (session, NULL); | ||
2196 | break; | 2502 | break; |
2197 | case CONSENSUS_ROUND_STOCK: | 2503 | case CONSENSUS_ROUND_STOCK: |
2198 | session->current_round = CONSENSUS_ROUND_FINISH; | 2504 | session->current_round = CONSENSUS_ROUND_FINISH; |
2199 | /* FIXME: send elements to client */ | 2505 | send_client_conclude_done (session); |
2200 | break; | 2506 | break; |
2201 | default: | 2507 | default: |
2202 | GNUNET_assert (0); | 2508 | GNUNET_assert (0); |
@@ -2241,10 +2547,16 @@ client_conclude (void *cls, | |||
2241 | return; | 2547 | return; |
2242 | } | 2548 | } |
2243 | 2549 | ||
2244 | session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); | 2550 | if (session->num_peers <= 1) |
2245 | 2551 | { | |
2246 | /* the 'begin' round is over, start with the next, real round */ | 2552 | send_client_conclude_done (session); |
2247 | round_over (session, NULL); | 2553 | } |
2554 | else | ||
2555 | { | ||
2556 | session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); | ||
2557 | /* the 'begin' round is over, start with the next, real round */ | ||
2558 | round_over (session, NULL); | ||
2559 | } | ||
2248 | 2560 | ||
2249 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2561 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2250 | client_send_next (session); | 2562 | client_send_next (session); |
@@ -2267,7 +2579,6 @@ client_ack (void *cls, | |||
2267 | struct GNUNET_CONSENSUS_AckMessage *msg; | 2579 | struct GNUNET_CONSENSUS_AckMessage *msg; |
2268 | struct PendingElement *pending; | 2580 | struct PendingElement *pending; |
2269 | struct GNUNET_CONSENSUS_Element *element; | 2581 | struct GNUNET_CONSENSUS_Element *element; |
2270 | struct GNUNET_HashCode key; | ||
2271 | 2582 | ||
2272 | session = sessions_head; | 2583 | session = sessions_head; |
2273 | while (NULL != session) | 2584 | while (NULL != session) |
@@ -2291,25 +2602,17 @@ client_ack (void *cls, | |||
2291 | 2602 | ||
2292 | if (msg->keep) | 2603 | if (msg->keep) |
2293 | { | 2604 | { |
2294 | int i; | ||
2295 | element = pending->element; | 2605 | element = pending->element; |
2296 | hash_for_ibf (element->data, element->size, &key); | 2606 | insert_element (session, element); |
2297 | |||
2298 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, | ||
2299 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
2300 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n"); | 2607 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n"); |
2301 | strata_estimator_insert (session->se, &key); | ||
2302 | |||
2303 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
2304 | { | ||
2305 | if (NULL != session->ibfs[i]) | ||
2306 | ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&key)); | ||
2307 | } | ||
2308 | } | 2608 | } |
2309 | 2609 | ||
2610 | GNUNET_free (pending); | ||
2611 | |||
2310 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2612 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2311 | } | 2613 | } |
2312 | 2614 | ||
2615 | |||
2313 | /** | 2616 | /** |
2314 | * Task that disconnects from core. | 2617 | * Task that disconnects from core. |
2315 | * | 2618 | * |
@@ -2366,7 +2669,18 @@ shutdown_task (void *cls, | |||
2366 | socket = incoming_sockets_head; | 2669 | socket = incoming_sockets_head; |
2367 | if (NULL == socket->cpi) | 2670 | if (NULL == socket->cpi) |
2368 | { | 2671 | { |
2672 | if (NULL != socket->rh) | ||
2673 | { | ||
2674 | GNUNET_STREAM_read_cancel (socket->rh); | ||
2675 | socket->rh = NULL; | ||
2676 | } | ||
2369 | GNUNET_STREAM_close (socket->socket); | 2677 | GNUNET_STREAM_close (socket->socket); |
2678 | socket->socket = NULL; | ||
2679 | if (NULL != socket->mst) | ||
2680 | { | ||
2681 | GNUNET_SERVER_mst_destroy (socket->mst); | ||
2682 | socket->mst = NULL; | ||
2683 | } | ||
2370 | } | 2684 | } |
2371 | incoming_sockets_head = incoming_sockets_head->next; | 2685 | incoming_sockets_head = incoming_sockets_head->next; |
2372 | GNUNET_free (socket); | 2686 | GNUNET_free (socket); |
@@ -2375,26 +2689,9 @@ shutdown_task (void *cls, | |||
2375 | while (NULL != sessions_head) | 2689 | while (NULL != sessions_head) |
2376 | { | 2690 | { |
2377 | struct ConsensusSession *session; | 2691 | struct ConsensusSession *session; |
2378 | int i; | 2692 | session = sessions_head->next; |
2379 | 2693 | destroy_session (sessions_head); | |
2380 | session = sessions_head; | 2694 | sessions_head = session; |
2381 | |||
2382 | if (NULL != session->info) | ||
2383 | for (i = 0; i < session->num_peers; i++) | ||
2384 | { | ||
2385 | struct ConsensusPeerInformation *cpi; | ||
2386 | cpi = &session->info[i]; | ||
2387 | if ((NULL != cpi) && (NULL != cpi->socket)) | ||
2388 | { | ||
2389 | GNUNET_STREAM_close (cpi->socket); | ||
2390 | } | ||
2391 | } | ||
2392 | |||
2393 | if (NULL != session->client) | ||
2394 | GNUNET_SERVER_client_disconnect (session->client); | ||
2395 | |||
2396 | sessions_head = sessions_head->next; | ||
2397 | GNUNET_free (session); | ||
2398 | } | 2695 | } |
2399 | 2696 | ||
2400 | if (NULL != core) | 2697 | if (NULL != core) |
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c index 5246b63a6..a51137493 100644 --- a/src/consensus/ibf.c +++ b/src/consensus/ibf.c | |||
@@ -136,7 +136,7 @@ ibf_insert_into (struct InvertibleBloomFilter *ibf, | |||
136 | * Insert an element into an IBF. | 136 | * Insert an element into an IBF. |
137 | * | 137 | * |
138 | * @param ibf the IBF | 138 | * @param ibf the IBF |
139 | * @param id the element's hash code | 139 | * @param key the element's hash code |
140 | */ | 140 | */ |
141 | void | 141 | void |
142 | ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key) | 142 | ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key) |
@@ -171,10 +171,10 @@ ibf_is_empty (struct InvertibleBloomFilter *ibf) | |||
171 | * Decode and remove an element from the IBF, if possible. | 171 | * Decode and remove an element from the IBF, if possible. |
172 | * | 172 | * |
173 | * @param ibf the invertible bloom filter to decode | 173 | * @param ibf the invertible bloom filter to decode |
174 | * @param side sign of the cell's count where the decoded element came from. | 174 | * @param ret_side sign of the cell's count where the decoded element came from. |
175 | * A negative sign indicates that the element was recovered | 175 | * A negative sign indicates that the element was recovered |
176 | * resides in an IBF that was previously subtracted from. | 176 | * resides in an IBF that was previously subtracted from. |
177 | * @param ret_id the hash code of the decoded element, if successful | 177 | * @param ret_key receives the hash code of the decoded element, if successful |
178 | * @return GNUNET_YES if decoding an element was successful, | 178 | * @return GNUNET_YES if decoding an element was successful, |
179 | * GNUNET_NO if the IBF is empty, | 179 | * GNUNET_NO if the IBF is empty, |
180 | * GNUNET_SYSERR if the decoding has failed | 180 | * GNUNET_SYSERR if the decoding has failed |
@@ -284,7 +284,7 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32 | |||
284 | * @param size size of the buffer, will be updated | 284 | * @param size size of the buffer, will be updated |
285 | * @param start which bucket to start at | 285 | * @param start which bucket to start at |
286 | * @param count how many buckets to read | 286 | * @param count how many buckets to read |
287 | * @param dst ibf to write buckets to | 287 | * @param ibf the ibf to read from |
288 | * @return GNUNET_OK on success | 288 | * @return GNUNET_OK on success |
289 | */ | 289 | */ |
290 | int | 290 | int |
@@ -325,8 +325,6 @@ ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct | |||
325 | * Write an ibf. | 325 | * Write an ibf. |
326 | * | 326 | * |
327 | * @param ibf the ibf to write | 327 | * @param ibf the ibf to write |
328 | * @param start with which bucket to start | ||
329 | * @param count how many buckets to write | ||
330 | * @param buf buffer to write the data to, will be updated to point to the | 328 | * @param buf buffer to write the data to, will be updated to point to the |
331 | * first byte after the written data | 329 | * first byte after the written data |
332 | * @param size pointer to the size of the buffer, will be updated, can be NULL | 330 | * @param size pointer to the size of the buffer, will be updated, can be NULL |
@@ -344,8 +342,6 @@ ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size) | |||
344 | * @param buf pointer to the buffer to write to, will point to first | 342 | * @param buf pointer to the buffer to write to, will point to first |
345 | * byte after the written data | 343 | * byte after the written data |
346 | * @param size size of the buffer, will be updated | 344 | * @param size size of the buffer, will be updated |
347 | * @param start which bucket to start at | ||
348 | * @param count how many buckets to read | ||
349 | * @param dst ibf to write buckets to | 345 | * @param dst ibf to write buckets to |
350 | * @return GNUNET_OK on success | 346 | * @return GNUNET_OK on success |
351 | */ | 347 | */ |
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h index cafe55c8d..d6c3874aa 100644 --- a/src/consensus/ibf.h +++ b/src/consensus/ibf.h | |||
@@ -128,7 +128,7 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32 | |||
128 | * @param size size of the buffer, will be updated | 128 | * @param size size of the buffer, will be updated |
129 | * @param start which bucket to start at | 129 | * @param start which bucket to start at |
130 | * @param count how many buckets to read | 130 | * @param count how many buckets to read |
131 | * @param dst ibf to write buckets to | 131 | * @param ibf the ibf to read from |
132 | * @return GNUNET_OK on success | 132 | * @return GNUNET_OK on success |
133 | */ | 133 | */ |
134 | int | 134 | int |
@@ -139,8 +139,6 @@ ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct | |||
139 | * Write an ibf. | 139 | * Write an ibf. |
140 | * | 140 | * |
141 | * @param ibf the ibf to write | 141 | * @param ibf the ibf to write |
142 | * @param start with which bucket to start | ||
143 | * @param count how many buckets to write | ||
144 | * @param buf buffer to write the data to, will be updated to point to the | 142 | * @param buf buffer to write the data to, will be updated to point to the |
145 | * first byte after the written data | 143 | * first byte after the written data |
146 | * @param size pointer to the size of the buffer, will be updated, can be NULL | 144 | * @param size pointer to the size of the buffer, will be updated, can be NULL |
@@ -149,14 +147,13 @@ void | |||
149 | ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size); | 147 | ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size); |
150 | 148 | ||
151 | 149 | ||
150 | |||
152 | /** | 151 | /** |
153 | * Read an ibf. | 152 | * Read an ibf. |
154 | * | 153 | * |
155 | * @param buf pointer to the buffer to write to, will point to first | 154 | * @param buf pointer to the buffer to write to, will point to first |
156 | * byte after the written data | 155 | * byte after the written data |
157 | * @param size size of the buffer, will be updated | 156 | * @param size size of the buffer, will be updated |
158 | * @param start which bucket to start at | ||
159 | * @param count how many buckets to read | ||
160 | * @param dst ibf to write buckets to | 157 | * @param dst ibf to write buckets to |
161 | * @return GNUNET_OK on success | 158 | * @return GNUNET_OK on success |
162 | */ | 159 | */ |
@@ -223,15 +220,16 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFi | |||
223 | * Decode and remove an element from the IBF, if possible. | 220 | * Decode and remove an element from the IBF, if possible. |
224 | * | 221 | * |
225 | * @param ibf the invertible bloom filter to decode | 222 | * @param ibf the invertible bloom filter to decode |
226 | * @param side sign of the cell's count where the decoded element came from. | 223 | * @param ret_side sign of the cell's count where the decoded element came from. |
227 | * A negative sign indicates that the element was recovered resides in an IBF | 224 | * A negative sign indicates that the element was recovered |
228 | * that was previously subtracted from. | 225 | * resides in an IBF that was previously subtracted from. |
229 | * @param ret_id the hash code of the decoded element, if successful | 226 | * @param ret_key receives the hash code of the decoded element, if successful |
230 | * @return GNUNET_YES if decoding an element was successful, GNUNET_NO if the IBF is empty, | 227 | * @return GNUNET_YES if decoding an element was successful, |
231 | * GNUNET_SYSERR if the decoding has faile | 228 | * GNUNET_NO if the IBF is empty, |
229 | * GNUNET_SYSERR if the decoding has failed | ||
232 | */ | 230 | */ |
233 | int | 231 | int |
234 | ibf_decode (struct InvertibleBloomFilter *ibf, int *side, struct IBF_Key *ret_key); | 232 | ibf_decode (struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_key); |
235 | 233 | ||
236 | 234 | ||
237 | /** | 235 | /** |
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf index 8a8d5b89f..437636c99 100644 --- a/src/consensus/test_consensus.conf +++ b/src/consensus/test_consensus.conf | |||
@@ -5,7 +5,7 @@ HOSTNAME = localhost | |||
5 | HOME = $SERVICEHOME | 5 | HOME = $SERVICEHOME |
6 | BINARY = gnunet-service-consensus | 6 | BINARY = gnunet-service-consensus |
7 | #PREFIX = gdbserver :12345 | 7 | #PREFIX = gdbserver :12345 |
8 | #PREFIX = valgrind | 8 | PREFIX = valgrind --leak-check=full |
9 | ACCEPT_FROM = 127.0.0.1; | 9 | ACCEPT_FROM = 127.0.0.1; |
10 | ACCEPT_FROM6 = ::1; | 10 | ACCEPT_FROM6 = ::1; |
11 | UNIXPATH = /tmp/gnunet-service-consensus.sock | 11 | UNIXPATH = /tmp/gnunet-service-consensus.sock |