diff options
Diffstat (limited to 'src/consensus')
-rw-r--r-- | src/consensus/Makefile.am | 6 | ||||
-rw-r--r-- | src/consensus/consensus-simulation.py | 103 | ||||
-rw-r--r-- | src/consensus/consensus_api.c | 20 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus-ibf.c | 4 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 3 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 2779 | ||||
-rw-r--r-- | src/consensus/ibf.c | 87 | ||||
-rw-r--r-- | src/consensus/ibf.h | 53 | ||||
-rw-r--r-- | src/consensus/strata_estimator.c | 145 | ||||
-rw-r--r-- | src/consensus/strata_estimator.h (renamed from src/consensus/consensus_flout.h) | 46 |
10 files changed, 1620 insertions, 1626 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am index e469de057..82af29c87 100644 --- a/src/consensus/Makefile.am +++ b/src/consensus/Makefile.am | |||
@@ -61,7 +61,8 @@ gnunet_consensus_ibf_LDADD = \ | |||
61 | 61 | ||
62 | gnunet_service_consensus_SOURCES = \ | 62 | gnunet_service_consensus_SOURCES = \ |
63 | gnunet-service-consensus.c \ | 63 | gnunet-service-consensus.c \ |
64 | ibf.c | 64 | ibf.c \ |
65 | strata_estimator.c | ||
65 | gnunet_service_consensus_LDADD = \ | 66 | gnunet_service_consensus_LDADD = \ |
66 | $(top_builddir)/src/util/libgnunetutil.la \ | 67 | $(top_builddir)/src/util/libgnunetutil.la \ |
67 | $(top_builddir)/src/core/libgnunetcore.la \ | 68 | $(top_builddir)/src/core/libgnunetcore.la \ |
@@ -71,7 +72,8 @@ gnunet_service_consensus_LDADD = \ | |||
71 | 72 | ||
72 | gnunet_service_evil_consensus_SOURCES = \ | 73 | gnunet_service_evil_consensus_SOURCES = \ |
73 | gnunet-service-consensus.c \ | 74 | gnunet-service-consensus.c \ |
74 | ibf.c | 75 | ibf.c \ |
76 | strata_estimator.c | ||
75 | gnunet_service_evil_consensus_LDADD = \ | 77 | gnunet_service_evil_consensus_LDADD = \ |
76 | $(top_builddir)/src/util/libgnunetutil.la \ | 78 | $(top_builddir)/src/util/libgnunetutil.la \ |
77 | $(top_builddir)/src/core/libgnunetcore.la \ | 79 | $(top_builddir)/src/core/libgnunetcore.la \ |
diff --git a/src/consensus/consensus-simulation.py b/src/consensus/consensus-simulation.py new file mode 100644 index 000000000..930dfee62 --- /dev/null +++ b/src/consensus/consensus-simulation.py | |||
@@ -0,0 +1,103 @@ | |||
1 | #!/usr/bin/python | ||
2 | # This file is part of GNUnet | ||
3 | # (C) 2013 Christian Grothoff (and other contributing authors) | ||
4 | # | ||
5 | # GNUnet is free software; you can redistribute it and/or modify | ||
6 | # it under the terms of the GNU General Public License as published | ||
7 | # by the Free Software Foundation; either version 2, or (at your | ||
8 | # option) any later version. | ||
9 | # | ||
10 | # GNUnet is distributed in the hope that it will be useful, but | ||
11 | # WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | # General Public License for more details. | ||
14 | # | ||
15 | # You should have received a copy of the GNU General Public License | ||
16 | # along with GNUnet; see the file COPYING. If not, write to the | ||
17 | # Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | # Boston, MA 02111-1307, USA. | ||
19 | |||
20 | import argparse | ||
21 | import random | ||
22 | from math import ceil,log,floor | ||
23 | |||
24 | def bsc(n): | ||
25 | """ count the bits set in n""" | ||
26 | l = n.bit_length() | ||
27 | c = 0 | ||
28 | x = 1 | ||
29 | for _ in range(0, l): | ||
30 | if n & x: | ||
31 | c = c + 1 | ||
32 | x = x << 1 | ||
33 | return c | ||
34 | |||
35 | def simulate(k, n, verbose): | ||
36 | assert k < n | ||
37 | largest_arc = int(2**ceil(log(n, 2))) / 2 | ||
38 | num_ghosts = (2 * largest_arc) - n | ||
39 | if verbose: | ||
40 | print "we have", num_ghosts, "ghost peers" | ||
41 | # n.b. all peers with idx<k are evil | ||
42 | peers = range(n) | ||
43 | info = [1 << x for x in xrange(n)] | ||
44 | def done_p(): | ||
45 | for x in xrange(k, n): | ||
46 | if bsc(info[x]) < n-k: | ||
47 | return False | ||
48 | return True | ||
49 | rounds = 0 | ||
50 | while not done_p(): | ||
51 | if verbose: | ||
52 | print "-- round --" | ||
53 | arc = 1 | ||
54 | while arc <= largest_arc: | ||
55 | if verbose: | ||
56 | print "-- subround --" | ||
57 | new_info = [x for x in info] | ||
58 | for peer_physical in xrange(n): | ||
59 | peer_logical = peers[peer_physical] | ||
60 | peer_type = None | ||
61 | partner_logical = (peer_logical + arc) % n | ||
62 | partner_physical = peers.index(partner_logical) | ||
63 | if peer_physical < k or partner_physical < k: | ||
64 | if verbose: | ||
65 | print "bad peer in connection", peer_physical, "--", partner_physical | ||
66 | continue | ||
67 | if peer_logical & arc == 0: | ||
68 | # we are outgoing | ||
69 | if verbose: | ||
70 | print peer_physical, "connects to", partner_physical | ||
71 | peer_type = "outgoing" | ||
72 | if peer_logical < num_ghosts: | ||
73 | # we have a ghost, check if the peer who connects | ||
74 | # to our ghost is actually outgoing | ||
75 | ghost_partner_logical = (peer_logical - arc) % n | ||
76 | if ghost_partner_logical & arc == 0: | ||
77 | peer_type = peer_type + ", ghost incoming" | ||
78 | new_info[peer_physical] = new_info[peer_physical] | info[peer_physical] | info[partner_physical] | ||
79 | new_info[partner_physical] = new_info[partner_physical] | info[peer_physical] | info[partner_physical] | ||
80 | else: | ||
81 | peer_type = "incoming" | ||
82 | if verbose > 1: | ||
83 | print "type of", str(peer_physical) + ":", peer_type | ||
84 | info = new_info | ||
85 | arc = arc << 1; | ||
86 | rounds = rounds + 1 | ||
87 | random.shuffle(peers) | ||
88 | return rounds | ||
89 | |||
90 | if __name__ == "__main__": | ||
91 | parser = argparse.ArgumentParser() | ||
92 | parser.add_argument("k", metavar="k", type=int, help="#(bad peers)") | ||
93 | parser.add_argument("n", metavar="n", type=int, help="#(all peers)") | ||
94 | parser.add_argument("r", metavar="r", type=int, help="#(rounds)") | ||
95 | parser.add_argument('--verbose', '-v', action='count') | ||
96 | |||
97 | args = parser.parse_args() | ||
98 | sum = 0.0; | ||
99 | for n in xrange (0, args.r): | ||
100 | sum += simulate(args.k, args.n, args.verbose) | ||
101 | print sum / args.r; | ||
102 | |||
103 | |||
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 25ace3a4d..fd61d3712 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -231,15 +231,6 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus) | |||
231 | } | 231 | } |
232 | } | 232 | } |
233 | 233 | ||
234 | static void | ||
235 | queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg) | ||
236 | { | ||
237 | struct QueuedMessage *qm; | ||
238 | qm = GNUNET_malloc (sizeof *qm); | ||
239 | qm->msg = msg; | ||
240 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm); | ||
241 | } | ||
242 | |||
243 | 234 | ||
244 | /** | 235 | /** |
245 | * Called when the server has sent is a new element | 236 | * Called when the server has sent is a new element |
@@ -252,8 +243,6 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | |||
252 | struct GNUNET_CONSENSUS_ElementMessage *msg) | 243 | struct GNUNET_CONSENSUS_ElementMessage *msg) |
253 | { | 244 | { |
254 | struct GNUNET_CONSENSUS_Element element; | 245 | struct GNUNET_CONSENSUS_Element element; |
255 | struct GNUNET_CONSENSUS_AckMessage *ack_msg; | ||
256 | int ret; | ||
257 | 246 | ||
258 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); | 247 | LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); |
259 | 248 | ||
@@ -261,14 +250,7 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | |||
261 | element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); | 250 | element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); |
262 | element.data = &msg[1]; | 251 | element.data = &msg[1]; |
263 | 252 | ||
264 | ret = consensus->new_element_cb (consensus->new_element_cls, &element); | 253 | consensus->new_element_cb (consensus->new_element_cls, &element); |
265 | |||
266 | ack_msg = GNUNET_new (struct GNUNET_CONSENSUS_AckMessage); | ||
267 | ack_msg->header.size = htons (sizeof *ack_msg); | ||
268 | ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); | ||
269 | ack_msg->keep = ret; | ||
270 | |||
271 | queue_message (consensus, &ack_msg->header); | ||
272 | 254 | ||
273 | send_next (consensus); | 255 | send_next (consensus); |
274 | } | 256 | } |
diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c index 73dc31b56..d431795f1 100644 --- a/src/consensus/gnunet-consensus-ibf.c +++ b/src/consensus/gnunet-consensus-ibf.c | |||
@@ -160,8 +160,8 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
160 | i++; | 160 | i++; |
161 | } | 161 | } |
162 | 162 | ||
163 | ibf_a = ibf_create (ibf_size, hash_num, 0); | 163 | ibf_a = ibf_create (ibf_size, hash_num); |
164 | ibf_b = ibf_create (ibf_size, hash_num, 0); | 164 | ibf_b = ibf_create (ibf_size, hash_num); |
165 | 165 | ||
166 | printf ("generated sets\n"); | 166 | printf ("generated sets\n"); |
167 | 167 | ||
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index 9e9b89446..d8c1b14ee 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c | |||
@@ -192,12 +192,11 @@ connect_complete (void *cls, | |||
192 | } | 192 | } |
193 | 193 | ||
194 | 194 | ||
195 | static int | 195 | static void |
196 | new_element_cb (void *cls, | 196 | new_element_cb (void *cls, |
197 | const struct GNUNET_CONSENSUS_Element *element) | 197 | const struct GNUNET_CONSENSUS_Element *element) |
198 | { | 198 | { |
199 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); | 199 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); |
200 | return GNUNET_YES; | ||
201 | } | 200 | } |
202 | 201 | ||
203 | 202 | ||
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index ebd2d238b..179df0fb0 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -32,23 +32,32 @@ | |||
32 | #include "gnunet_consensus_service.h" | 32 | #include "gnunet_consensus_service.h" |
33 | #include "gnunet_core_service.h" | 33 | #include "gnunet_core_service.h" |
34 | #include "gnunet_stream_lib.h" | 34 | #include "gnunet_stream_lib.h" |
35 | |||
35 | #include "consensus_protocol.h" | 36 | #include "consensus_protocol.h" |
36 | #include "ibf.h" | ||
37 | #include "consensus.h" | 37 | #include "consensus.h" |
38 | #include "ibf.h" | ||
39 | #include "strata_estimator.h" | ||
40 | |||
41 | |||
42 | /* | ||
43 | * Log macro that prefixes the local peer and the peer we are in contact with. | ||
44 | */ | ||
45 | #define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \ | ||
46 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__) | ||
38 | 47 | ||
39 | 48 | ||
40 | /** | 49 | /** |
41 | * Number of IBFs in a strata estimator. | 50 | * Number of IBFs in a strata estimator. |
42 | */ | 51 | */ |
43 | #define STRATA_COUNT 32 | 52 | #define SE_STRATA_COUNT 32 |
44 | /** | 53 | /** |
45 | * Number of buckets per IBF. | 54 | * Size of the IBFs in the strata estimator. |
46 | */ | 55 | */ |
47 | #define STRATA_IBF_BUCKETS 80 | 56 | #define SE_IBF_SIZE 80 |
48 | /** | 57 | /** |
49 | * hash num parameter for the difference digests and strata estimators | 58 | * hash num parameter for the difference digests and strata estimators |
50 | */ | 59 | */ |
51 | #define STRATA_HASH_NUM 3 | 60 | #define SE_IBF_HASH_NUM 3 |
52 | 61 | ||
53 | /** | 62 | /** |
54 | * Number of buckets that can be transmitted in one message. | 63 | * Number of buckets that can be transmitted in one message. |
@@ -63,72 +72,55 @@ | |||
63 | #define MAX_IBF_ORDER (16) | 72 | #define MAX_IBF_ORDER (16) |
64 | 73 | ||
65 | /** | 74 | /** |
66 | * Number exp-rounds. | 75 | * Number of exponential rounds, used in the inventory and completion round. |
67 | */ | 76 | */ |
68 | #define NUM_EXP_ROUNDS (4) | 77 | #define NUM_EXP_ROUNDS (4) |
69 | 78 | ||
70 | 79 | ||
71 | /* forward declarations */ | 80 | /* forward declarations */ |
72 | 81 | ||
73 | struct ConsensusSession; | 82 | /* mutual recursion with struct ConsensusSession */ |
74 | struct IncomingSocket; | ||
75 | struct ConsensusPeerInformation; | 83 | struct ConsensusPeerInformation; |
76 | 84 | ||
77 | static void | 85 | struct MessageQueue; |
78 | client_send_next (struct ConsensusSession *session); | ||
79 | |||
80 | static int | ||
81 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); | ||
82 | |||
83 | static void | ||
84 | round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | ||
85 | 86 | ||
87 | /* mutual recursion with round_over */ | ||
86 | static void | 88 | static void |
87 | send_ibf (struct ConsensusPeerInformation *cpi); | 89 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); |
88 | 90 | ||
91 | /* mutial recursion with transmit_queued */ | ||
89 | static void | 92 | static void |
90 | send_strata_estimator (struct ConsensusPeerInformation *cpi); | 93 | client_send_next (struct MessageQueue *mq); |
91 | 94 | ||
95 | /* mutual recursion with mst_session_callback */ | ||
92 | static void | 96 | static void |
93 | decode (struct ConsensusPeerInformation *cpi); | 97 | open_cb (void *cls, struct GNUNET_STREAM_Socket *socket); |
94 | |||
95 | static void | ||
96 | write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size); | ||
97 | 98 | ||
98 | static void | 99 | static int |
99 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); | 100 | mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message); |
100 | 101 | ||
101 | 102 | ||
102 | /** | 103 | /** |
103 | * An element that is waiting to be transmitted to the client. | 104 | * Additional information about a consensus element. |
104 | */ | 105 | */ |
105 | struct PendingElement | 106 | struct ElementInfo |
106 | { | 107 | { |
107 | /** | 108 | /** |
108 | * Pending elements are kept in a DLL. | 109 | * The element itself. |
109 | */ | 110 | */ |
110 | struct PendingElement *next; | 111 | struct GNUNET_CONSENSUS_Element *element; |
111 | |||
112 | /** | 112 | /** |
113 | * Pending elements are kept in a DLL. | 113 | * Hash of the element |
114 | */ | 114 | */ |
115 | struct PendingElement *prev; | 115 | struct GNUNET_HashCode *element_hash; |
116 | |||
117 | /** | 116 | /** |
118 | * The actual element | 117 | * Number of other peers that have the element in the inventory. |
119 | */ | 118 | */ |
120 | struct GNUNET_CONSENSUS_Element *element; | 119 | unsigned int inventory_count; |
121 | 120 | /** | |
122 | /* peer this element is coming from */ | 121 | * Bitmap of peers that have this element in their inventory |
123 | struct ConsensusPeerInformation *cpi; | 122 | */ |
124 | }; | 123 | uint8_t *inventory_bitmap; |
125 | |||
126 | |||
127 | struct ElementList | ||
128 | { | ||
129 | struct ElementList *next; | ||
130 | struct GNUNET_CONSENSUS_Element *element; | ||
131 | struct GNUNET_HashCode *element_hash; | ||
132 | }; | 124 | }; |
133 | 125 | ||
134 | 126 | ||
@@ -147,178 +139,93 @@ enum ConsensusRound | |||
147 | CONSENSUS_ROUND_EXCHANGE, | 139 | CONSENSUS_ROUND_EXCHANGE, |
148 | /** | 140 | /** |
149 | * Exchange which elements each peer has, but not the elements. | 141 | * Exchange which elements each peer has, but not the elements. |
142 | * This round uses the all-to-all scheme. | ||
150 | */ | 143 | */ |
151 | CONSENSUS_ROUND_INVENTORY, | 144 | CONSENSUS_ROUND_INVENTORY, |
152 | /** | 145 | /** |
153 | * Collect and distribute missing values. | 146 | * Collect and distribute missing values with the exponential scheme. |
154 | */ | 147 | */ |
155 | CONSENSUS_ROUND_STOCK, | 148 | CONSENSUS_ROUND_COMPLETION, |
156 | /** | 149 | /** |
157 | * Consensus concluded. | 150 | * Consensus concluded. After timeout and finished communication with client, |
151 | * consensus session will be destroyed. | ||
158 | */ | 152 | */ |
159 | CONSENSUS_ROUND_FINISH | 153 | CONSENSUS_ROUND_FINISH |
160 | }; | 154 | }; |
161 | 155 | ||
156 | /* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */ | ||
162 | 157 | ||
163 | /** | 158 | /** |
164 | * Information about a peer that is in a consensus session. | 159 | * State of another peer with respect to the |
160 | * current ibf. | ||
165 | */ | 161 | */ |
166 | struct ConsensusPeerInformation | 162 | enum ConsensusIBFState { |
167 | { | ||
168 | struct GNUNET_PeerIdentity peer_id; | ||
169 | |||
170 | /** | ||
171 | * Socket for communicating with the peer, either created by the local peer, | ||
172 | * or the remote peer. | ||
173 | */ | ||
174 | struct GNUNET_STREAM_Socket *socket; | ||
175 | |||
176 | /** | ||
177 | * Message tokenizer, for the data received from this peer via the stream socket. | ||
178 | */ | ||
179 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | ||
180 | |||
181 | /** | ||
182 | * Do we connect to the peer, or does the peer connect to us? | ||
183 | * Only valid for all-to-all phases | ||
184 | */ | ||
185 | int is_outgoing; | ||
186 | |||
187 | /** | ||
188 | * Did we receive/send a consensus hello? | ||
189 | */ | ||
190 | int hello; | ||
191 | |||
192 | /** | 163 | /** |
193 | * Handle for currently active read | 164 | * There is nothing going on with the IBF. |
194 | */ | 165 | */ |
195 | struct GNUNET_STREAM_ReadHandle *rh; | 166 | IBF_STATE_NONE=0, |
196 | |||
197 | /** | 167 | /** |
198 | * Handle for currently active read | 168 | * We currently receive an ibf. |
199 | */ | 169 | */ |
200 | struct GNUNET_STREAM_WriteHandle *wh; | 170 | IBF_STATE_RECEIVING, |
201 | 171 | /* | |
202 | enum { | 172 | * we decode a received ibf |
203 | /* beginning of round */ | 173 | */ |
204 | IBF_STATE_NONE=0, | 174 | IBF_STATE_DECODING, |
205 | /* we currently receive an ibf */ | ||
206 | IBF_STATE_RECEIVING, | ||
207 | /* we currently transmit an ibf */ | ||
208 | IBF_STATE_TRANSMITTING, | ||
209 | /* we decode a received ibf */ | ||
210 | IBF_STATE_DECODING, | ||
211 | /* wait for elements and element requests */ | ||
212 | IBF_STATE_ANTICIPATE_DIFF | ||
213 | } ibf_state ; | ||
214 | |||
215 | /** | ||
216 | * What is the order (=log2 size) of the ibf | ||
217 | * we're currently dealing with? | ||
218 | * Interpretation depends on ibf_state. | ||
219 | */ | ||
220 | int ibf_order; | ||
221 | |||
222 | /** | ||
223 | * The current IBF for this peer, | ||
224 | * purpose dependent on ibf_state | ||
225 | */ | ||
226 | struct InvertibleBloomFilter *ibf; | ||
227 | |||
228 | /** | ||
229 | * How many buckets have we transmitted/received? | ||
230 | * Interpretatin depends on ibf_state | ||
231 | */ | ||
232 | int ibf_bucket_counter; | ||
233 | |||
234 | /** | ||
235 | * Strata estimator of the peer, NULL if our peer | ||
236 | * initiated the reconciliation. | ||
237 | */ | ||
238 | struct StrataEstimator *se; | ||
239 | |||
240 | /** | ||
241 | * Element keys that this peer misses, but we have them. | ||
242 | */ | ||
243 | struct GNUNET_CONTAINER_MultiHashMap *requested_keys; | ||
244 | |||
245 | /** | ||
246 | * Element keys that this peer has, but we miss. | ||
247 | */ | ||
248 | struct GNUNET_CONTAINER_MultiHashMap *reported_keys; | ||
249 | |||
250 | /** | ||
251 | * Back-reference to the consensus session, | ||
252 | * to that ConsensusPeerInformation can be used as a closure | ||
253 | */ | ||
254 | struct ConsensusSession *session; | ||
255 | |||
256 | /** | 175 | /** |
257 | * Messages queued for the current round. | 176 | * wait for elements and element requests |
258 | */ | 177 | */ |
259 | struct QueuedMessage *messages_head; | 178 | IBF_STATE_ANTICIPATE_DIFF |
179 | }; | ||
260 | 180 | ||
261 | /** | ||
262 | * Messages queued for the current round. | ||
263 | */ | ||
264 | struct QueuedMessage *messages_tail; | ||
265 | 181 | ||
266 | /** | 182 | typedef void (*AddCallback) (struct MessageQueue *mq); |
267 | * True if we are actually replaying the strata message, | 183 | typedef void (*MessageSentCallback) (void *cls); |
268 | * e.g. currently handling the premature_strata_message. | ||
269 | */ | ||
270 | int replaying_strata_message; | ||
271 | 184 | ||
272 | /** | ||
273 | * A strata message that is not actually for the current round, | ||
274 | * used in the exp-scheme. | ||
275 | */ | ||
276 | struct StrataMessage *premature_strata_message; | ||
277 | 185 | ||
278 | /** | 186 | /** |
279 | * We have finishes the exp-subround with the peer. | 187 | * Collection of the state necessary to read and write gnunet messages |
280 | */ | 188 | * to a stream socket. Should be used as closure for stream_data_processor. |
281 | int exp_subround_finished; | 189 | */ |
190 | struct MessageStreamState | ||
191 | { | ||
192 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | ||
193 | struct MessageQueue *mq; | ||
194 | void *mst_cls; | ||
195 | struct GNUNET_STREAM_Socket *socket; | ||
196 | struct GNUNET_STREAM_ReadHandle *rh; | ||
197 | struct GNUNET_STREAM_WriteHandle *wh; | ||
198 | }; | ||
282 | 199 | ||
283 | int inventory_synced; | ||
284 | |||
285 | /** | ||
286 | * Round this peer seems to be in, according to the last SE we got. | ||
287 | * Necessary to store this, as we sometimes need to respond to a request from an | ||
288 | * older round, while we are already in the next round. | ||
289 | */ | ||
290 | enum ConsensusRound apparent_round; | ||
291 | 200 | ||
201 | struct ServerClientSocketState | ||
202 | { | ||
203 | struct GNUNET_SERVER_Client *client; | ||
204 | struct GNUNET_SERVER_TransmitHandle* th; | ||
292 | }; | 205 | }; |
293 | 206 | ||
294 | typedef void (*QueuedMessageCallback) (void *msg); | ||
295 | 207 | ||
296 | /** | 208 | /** |
297 | * A doubly linked list of messages. | 209 | * Generic message queue, for queueing outgoing messages. |
298 | */ | 210 | */ |
299 | struct QueuedMessage | 211 | struct MessageQueue |
300 | { | 212 | { |
301 | struct GNUNET_MessageHeader *msg; | 213 | void *state; |
302 | 214 | AddCallback add_cb; | |
303 | /** | 215 | struct PendingMessage *pending_head; |
304 | * Queued messages are stored in a doubly linked list. | 216 | struct PendingMessage *pending_tail; |
305 | */ | 217 | struct PendingMessage *current_pm; |
306 | struct QueuedMessage *next; | ||
307 | |||
308 | /** | ||
309 | * Queued messages are stored in a doubly linked list. | ||
310 | */ | ||
311 | struct QueuedMessage *prev; | ||
312 | |||
313 | QueuedMessageCallback cb; | ||
314 | |||
315 | void *cls; | ||
316 | }; | 218 | }; |
317 | 219 | ||
318 | 220 | ||
319 | struct StrataEstimator | 221 | struct PendingMessage |
320 | { | 222 | { |
321 | struct InvertibleBloomFilter **strata; | 223 | struct GNUNET_MessageHeader *msg; |
224 | struct MessageQueue *parent_queue; | ||
225 | struct PendingMessage *next; | ||
226 | struct PendingMessage *prev; | ||
227 | MessageSentCallback sent_cb; | ||
228 | void *sent_cb_cls; | ||
322 | }; | 229 | }; |
323 | 230 | ||
324 | 231 | ||
@@ -351,38 +258,26 @@ struct ConsensusSession | |||
351 | struct GNUNET_HashCode global_id; | 258 | struct GNUNET_HashCode global_id; |
352 | 259 | ||
353 | /** | 260 | /** |
354 | * Local client in this consensus session. | 261 | * The server's client and associated local state |
355 | * There is only one client per consensus session. | ||
356 | */ | ||
357 | struct GNUNET_SERVER_Client *client; | ||
358 | |||
359 | /** | ||
360 | * Elements in the consensus set of this session, | ||
361 | * all of them either have been sent by or approved by the client. | ||
362 | * Contains ElementList. | ||
363 | * Used as a unique-key hashmap. | ||
364 | */ | 262 | */ |
365 | struct GNUNET_CONTAINER_MultiHashMap *values; | 263 | struct ServerClientSocketState scss; |
366 | |||
367 | /** | ||
368 | * Elements that have not been approved (or rejected) by the client yet. | ||
369 | */ | ||
370 | struct PendingElement *client_approval_head; | ||
371 | 264 | ||
372 | /** | 265 | /** |
373 | * Elements that have not been approved (or rejected) by the client yet. | 266 | * Queued messages to the client. |
374 | */ | 267 | */ |
375 | struct PendingElement *client_approval_tail; | 268 | struct MessageQueue *client_mq; |
376 | 269 | ||
377 | /** | 270 | /** |
378 | * Messages to be sent to the local client that owns this session | 271 | * IBF_Key -> 2^(HashCode*) |
272 | * FIXME: | ||
273 | * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *. | ||
379 | */ | 274 | */ |
380 | struct QueuedMessage *client_messages_head; | 275 | struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map; |
381 | 276 | ||
382 | /** | 277 | /** |
383 | * Messages to be sent to the local client that owns this session | 278 | * Maps HashCodes to ElementInfos |
384 | */ | 279 | */ |
385 | struct QueuedMessage *client_messages_tail; | 280 | struct GNUNET_CONTAINER_MultiHashMap *values; |
386 | 281 | ||
387 | /** | 282 | /** |
388 | * Currently active transmit handle for sending to the client | 283 | * Currently active transmit handle for sending to the client |
@@ -412,9 +307,14 @@ struct ConsensusSession | |||
412 | struct ConsensusPeerInformation *info; | 307 | struct ConsensusPeerInformation *info; |
413 | 308 | ||
414 | /** | 309 | /** |
310 | * GNUNET_YES if the client has called conclude. | ||
311 | * */ | ||
312 | int conclude; | ||
313 | |||
314 | /** | ||
415 | * Index of the local peer in the peers array | 315 | * Index of the local peer in the peers array |
416 | */ | 316 | */ |
417 | int local_peer_idx; | 317 | unsigned int local_peer_idx; |
418 | 318 | ||
419 | /** | 319 | /** |
420 | * Strata estimator, computed online | 320 | * Strata estimator, computed online |
@@ -431,16 +331,16 @@ struct ConsensusSession | |||
431 | */ | 331 | */ |
432 | enum ConsensusRound current_round; | 332 | enum ConsensusRound current_round; |
433 | 333 | ||
434 | int exp_round; | ||
435 | |||
436 | int exp_subround; | ||
437 | |||
438 | /** | 334 | /** |
439 | * Permutation of peers for the current round, | 335 | * Permutation of peers for the current round, |
440 | * maps logical index (for current round) to physical index (location in info array) | 336 | * maps logical index (for current round) to physical index (location in info array) |
441 | */ | 337 | */ |
442 | int *shuffle; | 338 | int *shuffle; |
443 | 339 | ||
340 | int exp_round; | ||
341 | |||
342 | int exp_subround; | ||
343 | |||
444 | /** | 344 | /** |
445 | * The partner for the current exp-round | 345 | * The partner for the current exp-round |
446 | */ | 346 | */ |
@@ -454,41 +354,121 @@ struct ConsensusSession | |||
454 | 354 | ||
455 | 355 | ||
456 | /** | 356 | /** |
457 | * Sockets from other peers who want to communicate with us. | 357 | * Information about a peer that is in a consensus session. |
458 | * It may not be known yet which consensus session they belong to. | ||
459 | * Also, the session might not exist yet locally. | ||
460 | */ | 358 | */ |
461 | struct IncomingSocket | 359 | struct ConsensusPeerInformation |
462 | { | 360 | { |
463 | /** | 361 | /** |
464 | * Incoming sockets are kept in a double linked list. | 362 | * Peer identitty of the peer in the consensus session |
465 | */ | 363 | */ |
466 | struct IncomingSocket *next; | 364 | struct GNUNET_PeerIdentity peer_id; |
467 | 365 | ||
468 | /** | 366 | /** |
469 | * Incoming sockets are kept in a double linked list. | 367 | * Do we connect to the peer, or does the peer connect to us? |
368 | * Only valid for all-to-all phases | ||
470 | */ | 369 | */ |
471 | struct IncomingSocket *prev; | 370 | int is_outgoing; |
472 | 371 | ||
473 | /** | 372 | /** |
474 | * The actual socket. | 373 | * Did we receive/send a consensus hello? |
475 | */ | 374 | */ |
476 | struct GNUNET_STREAM_Socket *socket; | 375 | int hello; |
376 | |||
377 | /* | ||
378 | * FIXME | ||
379 | */ | ||
380 | struct MessageStreamState mss; | ||
477 | 381 | ||
478 | /** | 382 | /** |
479 | * Handle for currently active read | 383 | * Current state |
480 | */ | 384 | */ |
481 | struct GNUNET_STREAM_ReadHandle *rh; | 385 | enum ConsensusIBFState ibf_state; |
482 | 386 | ||
483 | /** | 387 | /** |
484 | * Peer that connected to us with the socket. | 388 | * What is the order (=log2 size) of the ibf |
389 | * we're currently dealing with? | ||
390 | * Interpretation depends on ibf_state. | ||
485 | */ | 391 | */ |
486 | struct GNUNET_PeerIdentity peer_id; | 392 | int ibf_order; |
393 | |||
394 | /** | ||
395 | * The current IBF for this peer, | ||
396 | * purpose dependent on ibf_state | ||
397 | */ | ||
398 | struct InvertibleBloomFilter *ibf; | ||
487 | 399 | ||
488 | /** | 400 | /** |
489 | * Message stream tokenizer for this socket. | 401 | * How many buckets have we transmitted/received? |
402 | * Interpretatin depends on ibf_state | ||
490 | */ | 403 | */ |
491 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | 404 | int ibf_bucket_counter; |
405 | |||
406 | /** | ||
407 | * Strata estimator of the peer, NULL if our peer | ||
408 | * initiated the reconciliation. | ||
409 | */ | ||
410 | struct StrataEstimator *se; | ||
411 | |||
412 | /** | ||
413 | * Back-reference to the consensus session, | ||
414 | * to that ConsensusPeerInformation can be used as a closure | ||
415 | */ | ||
416 | struct ConsensusSession *session; | ||
417 | |||
418 | /** | ||
419 | * True if we are actually replaying the strata message, | ||
420 | * e.g. currently handling the premature_strata_message. | ||
421 | */ | ||
422 | int replaying_strata_message; | ||
423 | |||
424 | /** | ||
425 | * A strata message that is not actually for the current round, | ||
426 | * used in the exp-scheme. | ||
427 | */ | ||
428 | struct StrataMessage *premature_strata_message; | ||
429 | |||
430 | /** | ||
431 | * We have finishes the exp-subround with the peer. | ||
432 | */ | ||
433 | int exp_subround_finished; | ||
434 | |||
435 | /** | ||
436 | * GNUNET_YES if we synced inventory with this peer; | ||
437 | * GNUNET_NO otherwise. | ||
438 | */ | ||
439 | int inventory_synced; | ||
440 | |||
441 | /** | ||
442 | * Round this peer seems to be in, according to the last SE we got. | ||
443 | * Necessary to store this, as we sometimes need to respond to a request from an | ||
444 | * older round, while we are already in the next round. | ||
445 | */ | ||
446 | enum ConsensusRound apparent_round; | ||
447 | }; | ||
448 | |||
449 | |||
450 | /** | ||
451 | * Sockets from other peers who want to communicate with us. | ||
452 | * It may not be known yet which consensus session they belong to, we have to wait for the | ||
453 | * peer's hello. | ||
454 | * Also, the session might not exist yet locally, we have to wait for a local client to connect. | ||
455 | */ | ||
456 | struct IncomingSocket | ||
457 | { | ||
458 | /** | ||
459 | * Incoming sockets are kept in a double linked list. | ||
460 | */ | ||
461 | struct IncomingSocket *next; | ||
462 | |||
463 | /** | ||
464 | * Incoming sockets are kept in a double linked list. | ||
465 | */ | ||
466 | struct IncomingSocket *prev; | ||
467 | |||
468 | /** | ||
469 | * Peer that connected to us with the socket. | ||
470 | */ | ||
471 | struct GNUNET_PeerIdentity peer_id; | ||
492 | 472 | ||
493 | /** | 473 | /** |
494 | * Peer-in-session this socket belongs to, once known, otherwise NULL. | 474 | * Peer-in-session this socket belongs to, once known, otherwise NULL. |
@@ -500,19 +480,35 @@ struct IncomingSocket | |||
500 | * but the session does not exist yet. | 480 | * but the session does not exist yet. |
501 | */ | 481 | */ |
502 | struct GNUNET_HashCode *requested_gid; | 482 | struct GNUNET_HashCode *requested_gid; |
483 | |||
484 | /* | ||
485 | * Timeout, will disconnect the socket if not yet in a session. | ||
486 | * FIXME: implement | ||
487 | */ | ||
488 | GNUNET_SCHEDULER_TaskIdentifier timeout; | ||
489 | |||
490 | /* FIXME */ | ||
491 | struct MessageStreamState mss; | ||
503 | }; | 492 | }; |
504 | 493 | ||
505 | 494 | ||
495 | /** | ||
496 | * Linked list of incoming sockets. | ||
497 | */ | ||
506 | static struct IncomingSocket *incoming_sockets_head; | 498 | static struct IncomingSocket *incoming_sockets_head; |
499 | |||
500 | /** | ||
501 | * Linked list of incoming sockets. | ||
502 | */ | ||
507 | static struct IncomingSocket *incoming_sockets_tail; | 503 | static struct IncomingSocket *incoming_sockets_tail; |
508 | 504 | ||
509 | /** | 505 | /** |
510 | * Linked list of sesstions this peer participates in. | 506 | * Linked list of sessions this peer participates in. |
511 | */ | 507 | */ |
512 | static struct ConsensusSession *sessions_head; | 508 | static struct ConsensusSession *sessions_head; |
513 | 509 | ||
514 | /** | 510 | /** |
515 | * Linked list of sesstions this peer participates in. | 511 | * Linked list of sessions this peer participates in. |
516 | */ | 512 | */ |
517 | static struct ConsensusSession *sessions_tail; | 513 | static struct ConsensusSession *sessions_tail; |
518 | 514 | ||
@@ -543,151 +539,159 @@ static struct GNUNET_STREAM_ListenSocket *listener; | |||
543 | 539 | ||
544 | 540 | ||
545 | /** | 541 | /** |
546 | * Queue a message to be sent to the inhabiting client of a session. | 542 | * Transmit a queued message to the session's client. |
547 | * | 543 | * |
548 | * @param session session | 544 | * @param cls consensus session |
549 | * @param msg message we want to queue | 545 | * @param size number of bytes available in buf |
546 | * @param buf where the callee should write the message | ||
547 | * @return number of bytes written to buf | ||
550 | */ | 548 | */ |
551 | static void | 549 | static size_t |
552 | queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) | 550 | transmit_queued (void *cls, size_t size, |
551 | void *buf) | ||
553 | { | 552 | { |
554 | struct QueuedMessage *qm; | 553 | struct MessageQueue *mq = cls; |
555 | qm = GNUNET_malloc (sizeof *qm); | 554 | struct PendingMessage *pm = mq->pending_head; |
556 | qm->msg = msg; | 555 | struct ServerClientSocketState *state = mq->state; |
557 | GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); | 556 | size_t msg_size; |
557 | |||
558 | GNUNET_assert (NULL != pm); | ||
559 | GNUNET_assert (NULL != buf); | ||
560 | msg_size = ntohs (pm->msg->size); | ||
561 | GNUNET_assert (size >= msg_size); | ||
562 | memcpy (buf, pm->msg, msg_size); | ||
563 | GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); | ||
564 | state->th = NULL; | ||
565 | client_send_next (cls); | ||
566 | GNUNET_free (pm); | ||
567 | return msg_size; | ||
558 | } | 568 | } |
559 | 569 | ||
560 | /** | 570 | |
561 | * Queue a message to be sent to another peer | ||
562 | * | ||
563 | * @param cpi peer | ||
564 | * @param msg message we want to queue | ||
565 | * @param cb callback, called when the message is given to strem | ||
566 | * @param cls closure for cb | ||
567 | */ | ||
568 | static void | 571 | static void |
569 | queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls) | 572 | client_send_next (struct MessageQueue *mq) |
570 | { | 573 | { |
571 | struct QueuedMessage *qm; | 574 | struct ServerClientSocketState *state = mq->state; |
572 | qm = GNUNET_malloc (sizeof *qm); | 575 | int msize; |
573 | qm->msg = msg; | 576 | |
574 | qm->cls = cls; | 577 | GNUNET_assert (NULL != state); |
575 | qm->cb = cb; | 578 | |
576 | GNUNET_CONTAINER_DLL_insert_tail (cpi->messages_head, cpi->messages_tail, qm); | 579 | if ( (NULL != state->th) || |
577 | if (cpi->wh == NULL) | 580 | (NULL == mq->pending_head) ) |
578 | write_queued (cpi, GNUNET_STREAM_OK, 0); | 581 | return; |
582 | msize = ntohs (mq->pending_head->msg->size); | ||
583 | state->th = | ||
584 | GNUNET_SERVER_notify_transmit_ready (state->client, msize, | ||
585 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
586 | &transmit_queued, mq); | ||
587 | } | ||
588 | |||
589 | |||
590 | struct MessageQueue * | ||
591 | create_message_queue_for_server_client (struct ServerClientSocketState *scss) | ||
592 | { | ||
593 | struct MessageQueue *mq; | ||
594 | mq = GNUNET_new (struct MessageQueue); | ||
595 | mq->add_cb = client_send_next; | ||
596 | mq->state = scss; | ||
597 | return mq; | ||
579 | } | 598 | } |
580 | 599 | ||
581 | 600 | ||
582 | /** | 601 | /** |
583 | * Queue a message to be sent to another peer | 602 | * Functions of this signature are called whenever writing operations |
603 | * on a stream are executed | ||
584 | * | 604 | * |
585 | * @param cpi peer | 605 | * @param cls the closure from GNUNET_STREAM_write |
586 | * @param msg message we want to queue | 606 | * @param status the status of the stream at the time this function is called; |
607 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
608 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
609 | * (this doesn't mean that the data is never sent, the receiver may | ||
610 | * have read the data but its ACKs may have been lost); | ||
611 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
612 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
613 | * be processed. | ||
614 | * @param size the number of bytes written | ||
587 | */ | 615 | */ |
588 | static void | 616 | static void |
589 | queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg) | 617 | write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
590 | { | 618 | { |
591 | queue_peer_message_with_cls (cpi, msg, NULL, NULL); | 619 | struct MessageQueue *mq = cls; |
620 | struct MessageStreamState *mss = mq->state; | ||
621 | struct PendingMessage *pm; | ||
622 | |||
623 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
624 | |||
625 | /* call cb for message we finished sending */ | ||
626 | pm = mq->current_pm; | ||
627 | if (NULL != pm) | ||
628 | { | ||
629 | if (NULL != pm->sent_cb) | ||
630 | pm->sent_cb (pm->sent_cb_cls); | ||
631 | GNUNET_free (pm); | ||
632 | } | ||
633 | |||
634 | mss->wh = NULL; | ||
635 | |||
636 | pm = mq->pending_head; | ||
637 | mq->current_pm = pm; | ||
638 | if (NULL == pm) | ||
639 | return; | ||
640 | GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); | ||
641 | mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size), | ||
642 | GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls); | ||
643 | GNUNET_assert (NULL != mss->wh); | ||
592 | } | 644 | } |
593 | 645 | ||
594 | 646 | ||
595 | /* | ||
596 | static void | 647 | static void |
597 | clear_peer_messages (struct ConsensusPeerInformation *cpi) | 648 | stream_socket_add_cb (struct MessageQueue *mq) |
598 | { | 649 | { |
599 | cpi->messages_head = NULL; | 650 | if (NULL != mq->current_pm) |
600 | cpi->messages_tail = NULL; | 651 | return; |
652 | write_queued (mq, GNUNET_STREAM_OK, 0); | ||
601 | } | 653 | } |
602 | */ | ||
603 | 654 | ||
604 | 655 | ||
605 | /** | 656 | struct MessageQueue * |
606 | * Estimate set difference with two strata estimators, | 657 | create_message_queue_for_stream_socket (struct MessageStreamState *mss) |
607 | * i.e. arrays of IBFs. | ||
608 | * Does not not modify its arguments. | ||
609 | * | ||
610 | * @param se1 first strata estimator | ||
611 | * @param se2 second strata estimator | ||
612 | * @return the estimated difference | ||
613 | */ | ||
614 | static int | ||
615 | estimate_difference (const struct StrataEstimator *se1, | ||
616 | const struct StrataEstimator *se2) | ||
617 | { | 658 | { |
618 | int i; | 659 | struct MessageQueue *mq; |
619 | int count; | 660 | mq = GNUNET_new (struct MessageQueue); |
620 | count = 0; | 661 | mq->state = mss; |
621 | for (i = STRATA_COUNT - 1; i >= 0; i--) | 662 | mq->add_cb = stream_socket_add_cb; |
622 | { | 663 | return mq; |
623 | struct InvertibleBloomFilter *diff; | 664 | } |
624 | /* number of keys decoded from the ibf */ | 665 | |
625 | int ibf_count; | 666 | |
626 | int more; | 667 | struct PendingMessage * |
627 | ibf_count = 0; | 668 | new_pending_message (uint16_t size, uint16_t type) |
628 | /* FIXME: implement this without always allocating new IBFs */ | 669 | { |
629 | diff = ibf_dup (se1->strata[i]); | 670 | struct PendingMessage *pm; |
630 | ibf_subtract (diff, se2->strata[i]); | 671 | pm = GNUNET_malloc (sizeof *pm + size); |
631 | for (;;) | 672 | pm->msg = (void *) &pm[1]; |
632 | { | 673 | pm->msg->size = htons (size); |
633 | more = ibf_decode (diff, NULL, NULL); | 674 | pm->msg->type = htons (type); |
634 | if (GNUNET_NO == more) | 675 | return pm; |
635 | { | ||
636 | count += ibf_count; | ||
637 | break; | ||
638 | } | ||
639 | if (GNUNET_SYSERR == more) | ||
640 | { | ||
641 | ibf_destroy (diff); | ||
642 | return count * (1 << (i + 1)); | ||
643 | } | ||
644 | ibf_count++; | ||
645 | } | ||
646 | ibf_destroy (diff); | ||
647 | } | ||
648 | return count; | ||
649 | } | 676 | } |
650 | 677 | ||
651 | 678 | ||
652 | /** | 679 | /** |
653 | * Called when receiving data from a peer that is member of | 680 | * Queue a message in a message queue. |
654 | * an inhabited consensus session. | ||
655 | * | 681 | * |
656 | * @param cls the closure from GNUNET_STREAM_read | 682 | * @param queue the message queue |
657 | * @param status the status of the stream at the time this function is called | 683 | * @param pending message, message with additional information |
658 | * @param data traffic from the other side | ||
659 | * @param size the number of bytes available in data read; will be 0 on timeout | ||
660 | * @return number of bytes of processed from 'data' (any data remaining should be | ||
661 | * given to the next time the read processor is called). | ||
662 | */ | 684 | */ |
663 | static size_t | 685 | void |
664 | session_stream_data_processor (void *cls, | 686 | message_queue_add (struct MessageQueue *queue, struct PendingMessage *msg) |
665 | enum GNUNET_STREAM_Status status, | ||
666 | const void *data, | ||
667 | size_t size) | ||
668 | { | 687 | { |
669 | struct ConsensusPeerInformation *cpi; | 688 | GNUNET_CONTAINER_DLL_insert_tail (queue->pending_head, queue->pending_tail, msg); |
670 | int ret; | 689 | queue->add_cb (queue); |
671 | |||
672 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
673 | cpi = cls; | ||
674 | GNUNET_assert (NULL != cpi->mst); | ||
675 | ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES); | ||
676 | if (GNUNET_SYSERR == ret) | ||
677 | { | ||
678 | /* FIXME: handle this correctly */ | ||
679 | GNUNET_assert (0); | ||
680 | } | ||
681 | /* read again */ | ||
682 | cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
683 | &session_stream_data_processor, cpi); | ||
684 | /* we always read all data */ | ||
685 | return size; | ||
686 | } | 690 | } |
687 | 691 | ||
692 | |||
688 | /** | 693 | /** |
689 | * Called when we receive data from a peer that is not member of | 694 | * Called when we receive data from a peer via stream. |
690 | * a session yet, or the session is not yet inhabited. | ||
691 | * | 695 | * |
692 | * @param cls the closure from GNUNET_STREAM_read | 696 | * @param cls the closure from GNUNET_STREAM_read |
693 | * @param status the status of the stream at the time this function is called | 697 | * @param status the status of the stream at the time this function is called |
@@ -697,62 +701,66 @@ session_stream_data_processor (void *cls, | |||
697 | * given to the next time the read processor is called). | 701 | * given to the next time the read processor is called). |
698 | */ | 702 | */ |
699 | static size_t | 703 | static size_t |
700 | incoming_stream_data_processor (void *cls, | 704 | stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size) |
701 | enum GNUNET_STREAM_Status status, | ||
702 | const void *data, | ||
703 | size_t size) | ||
704 | { | 705 | { |
705 | struct IncomingSocket *incoming; | 706 | struct MessageStreamState *mss = cls; |
706 | int ret; | 707 | int ret; |
707 | 708 | ||
708 | GNUNET_assert (GNUNET_STREAM_OK == status); | 709 | mss->rh = NULL; |
709 | incoming = cls; | 710 | |
710 | ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES); | 711 | if (GNUNET_STREAM_OK != status) |
712 | { | ||
713 | /* FIXME: handle this correctly */ | ||
714 | GNUNET_break (0); | ||
715 | return 0; | ||
716 | } | ||
717 | GNUNET_assert (NULL != mss->mst); | ||
718 | ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_YES); | ||
711 | if (GNUNET_SYSERR == ret) | 719 | if (GNUNET_SYSERR == ret) |
712 | { | 720 | { |
713 | /* FIXME: handle this correctly */ | 721 | /* FIXME: handle this correctly */ |
714 | GNUNET_assert (0); | 722 | GNUNET_break (0); |
723 | return 0; | ||
715 | } | 724 | } |
716 | /* read again */ | 725 | /* read again */ |
717 | incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, | 726 | mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, &stream_data_processor, mss); |
718 | &incoming_stream_data_processor, incoming); | ||
719 | /* we always read all data */ | 727 | /* we always read all data */ |
720 | return size; | 728 | return size; |
721 | } | 729 | } |
722 | 730 | ||
723 | 731 | ||
732 | /** | ||
733 | * Send element or element report to the peer specified in cpi. | ||
734 | * | ||
735 | * @param cpi peer to send the elements to | ||
736 | * @param head head of the element list | ||
737 | */ | ||
724 | static void | 738 | static void |
725 | send_elements (struct ConsensusPeerInformation *cpi, struct ElementList *head) | 739 | send_element_or_report (struct ConsensusPeerInformation *cpi, struct ElementInfo *e) |
726 | { | 740 | { |
727 | struct GNUNET_CONSENSUS_Element *element; | 741 | struct PendingMessage *pm; |
728 | struct GNUNET_MessageHeader *element_msg; | ||
729 | size_t msize; | ||
730 | 742 | ||
731 | while (NULL != head) | 743 | switch (cpi->apparent_round) |
732 | { | 744 | { |
733 | element = head->element; | 745 | case CONSENSUS_ROUND_COMPLETION: |
734 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; | 746 | case CONSENSUS_ROUND_EXCHANGE: |
735 | element_msg = GNUNET_malloc (msize); | 747 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + e->element->size, |
736 | element_msg->size = htons (msize); | 748 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); |
737 | switch (cpi->apparent_round) | 749 | memcpy (&pm->msg[1], e->element->data, e->element->size); |
738 | { | 750 | message_queue_add (cpi->mss.mq, pm); |
739 | case CONSENSUS_ROUND_STOCK: | 751 | break; |
740 | case CONSENSUS_ROUND_EXCHANGE: | 752 | case CONSENSUS_ROUND_INVENTORY: |
741 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | 753 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct GNUNET_HashCode), |
742 | break; | 754 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); |
743 | case CONSENSUS_ROUND_INVENTORY: | 755 | memcpy (&pm->msg[1], e->element_hash, sizeof (struct GNUNET_HashCode)); |
744 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); | 756 | message_queue_add (cpi->mss.mq, pm); |
745 | break; | 757 | break; |
746 | default: | 758 | default: |
747 | GNUNET_break (0); | 759 | GNUNET_break (0); |
748 | } | ||
749 | GNUNET_assert (NULL != element->data); | ||
750 | memcpy (&element_msg[1], element->data, element->size); | ||
751 | queue_peer_message (cpi, element_msg); | ||
752 | head = head->next; | ||
753 | } | 760 | } |
754 | } | 761 | } |
755 | 762 | ||
763 | |||
756 | /** | 764 | /** |
757 | * Iterator to insert values into an ibf. | 765 | * Iterator to insert values into an ibf. |
758 | * | 766 | * |
@@ -768,12 +776,10 @@ ibf_values_iterator (void *cls, | |||
768 | const struct GNUNET_HashCode *key, | 776 | const struct GNUNET_HashCode *key, |
769 | void *value) | 777 | void *value) |
770 | { | 778 | { |
771 | struct ConsensusPeerInformation *cpi; | 779 | struct ConsensusPeerInformation *cpi = cls; |
772 | struct ElementList *head; | 780 | struct ElementInfo *e = value; |
773 | struct IBF_Key ibf_key; | 781 | struct IBF_Key ibf_key = ibf_key_from_hashcode (e->element_hash); |
774 | cpi = cls; | 782 | |
775 | head = value; | ||
776 | ibf_key = ibf_key_from_hashcode (head->element_hash); | ||
777 | GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val); | 783 | GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val); |
778 | ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key); | 784 | ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key); |
779 | return GNUNET_YES; | 785 | return GNUNET_YES; |
@@ -788,11 +794,10 @@ ibf_values_iterator (void *cls, | |||
788 | static void | 794 | static void |
789 | prepare_ibf (struct ConsensusPeerInformation *cpi) | 795 | prepare_ibf (struct ConsensusPeerInformation *cpi) |
790 | { | 796 | { |
791 | if (NULL == cpi->session->ibfs[cpi->ibf_order]) | 797 | if (NULL != cpi->session->ibfs[cpi->ibf_order]) |
792 | { | 798 | return; |
793 | cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); | 799 | cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); |
794 | GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); | 800 | GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); |
795 | } | ||
796 | } | 801 | } |
797 | 802 | ||
798 | 803 | ||
@@ -816,15 +821,18 @@ exp_subround_finished (const struct ConsensusSession *session) | |||
816 | { | 821 | { |
817 | int not_finished; | 822 | int not_finished; |
818 | not_finished = 0; | 823 | not_finished = 0; |
819 | if ((session->partner_outgoing != NULL) && (session->partner_outgoing->exp_subround_finished == GNUNET_NO)) | 824 | if ( (NULL != session->partner_outgoing) && |
820 | not_finished++; | 825 | (GNUNET_NO == session->partner_outgoing->exp_subround_finished) ) |
821 | if ((session->partner_incoming != NULL) && (session->partner_incoming->exp_subround_finished == GNUNET_NO)) | 826 | not_finished++; |
822 | not_finished++; | 827 | if ( (NULL != session->partner_incoming) && |
828 | (GNUNET_NO == session->partner_incoming->exp_subround_finished) ) | ||
829 | not_finished++; | ||
823 | if (0 == not_finished) | 830 | if (0 == not_finished) |
824 | return GNUNET_YES; | 831 | return GNUNET_YES; |
825 | return GNUNET_NO; | 832 | return GNUNET_NO; |
826 | } | 833 | } |
827 | 834 | ||
835 | |||
828 | static int | 836 | static int |
829 | inventory_round_finished (struct ConsensusSession *session) | 837 | inventory_round_finished (struct ConsensusSession *session) |
830 | { | 838 | { |
@@ -840,153 +848,170 @@ inventory_round_finished (struct ConsensusSession *session) | |||
840 | } | 848 | } |
841 | 849 | ||
842 | 850 | ||
843 | |||
844 | static void | 851 | static void |
845 | fin_sent_cb (void *cls) | 852 | clear_message_stream_state (struct MessageStreamState *mss) |
846 | { | 853 | { |
847 | struct ConsensusPeerInformation *cpi; | 854 | if (NULL != mss->mst) |
848 | cpi = cls; | ||
849 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); | ||
850 | switch (cpi->session->current_round) | ||
851 | { | 855 | { |
852 | case CONSENSUS_ROUND_EXCHANGE: | 856 | GNUNET_SERVER_mst_destroy (mss->mst); |
853 | case CONSENSUS_ROUND_STOCK: | 857 | mss->mst = NULL; |
854 | if (cpi->session->current_round != cpi->apparent_round) | 858 | } |
855 | { | 859 | if (NULL != mss->rh) |
856 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx); | 860 | { |
857 | break; | 861 | GNUNET_STREAM_read_cancel (mss->rh); |
858 | } | 862 | mss->rh = NULL; |
859 | cpi->exp_subround_finished = GNUNET_YES; | 863 | } |
860 | /* the subround is only really over if *both* partners are done */ | 864 | if (NULL != mss->wh) |
861 | if (GNUNET_YES == exp_subround_finished (cpi->session)) | 865 | { |
862 | subround_over (cpi->session, NULL); | 866 | GNUNET_STREAM_write_cancel (mss->wh); |
863 | else | 867 | mss->wh = NULL; |
864 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx); | 868 | } |
865 | break; | 869 | if (NULL != mss->socket) |
866 | case CONSENSUS_ROUND_INVENTORY: | 870 | { |
867 | cpi->inventory_synced = GNUNET_YES; | 871 | GNUNET_STREAM_close (mss->socket); |
868 | if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round) | 872 | mss->socket = NULL; |
869 | round_over (cpi->session, NULL); | 873 | } |
870 | /* FIXME: maybe go to next round */ | 874 | if (NULL != mss->mq) |
871 | break; | 875 | { |
872 | default: | 876 | GNUNET_free (mss->mq); |
873 | GNUNET_break (0); | 877 | mss->mq = NULL; |
874 | } | 878 | } |
875 | } | 879 | } |
876 | 880 | ||
877 | 881 | ||
878 | /** | 882 | /** |
879 | * Gets called when the other peer wants us to inform that | 883 | * Iterator over hash map entries. |
880 | * it has decoded our ibf and sent us all elements / requests | 884 | * |
885 | * @param cls closure | ||
886 | * @param key current key code | ||
887 | * @param value value in the hash map | ||
888 | * @return GNUNET_YES if we should continue to | ||
889 | * iterate, | ||
890 | * GNUNET_NO if not. | ||
881 | */ | 891 | */ |
882 | static int | 892 | static int |
883 | handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | 893 | destroy_element_info_iter (void *cls, |
894 | const struct GNUNET_HashCode * key, | ||
895 | void *value) | ||
884 | { | 896 | { |
885 | struct ConsensusRoundMessage *fin_msg; | 897 | struct ElementInfo *ei = value; |
886 | 898 | GNUNET_free (ei->element); | |
887 | switch (cpi->session->current_round) | 899 | GNUNET_free (ei->element_hash); |
888 | { | 900 | GNUNET_free (ei); |
889 | case CONSENSUS_ROUND_INVENTORY: | ||
890 | cpi->inventory_synced = GNUNET_YES; | ||
891 | case CONSENSUS_ROUND_STOCK: | ||
892 | case CONSENSUS_ROUND_EXCHANGE: | ||
893 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
894 | fin_msg = GNUNET_malloc (sizeof *fin_msg); | ||
895 | fin_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); | ||
896 | fin_msg->header.size = htons (sizeof *fin_msg); | ||
897 | fin_msg->round = cpi->apparent_round; | ||
898 | /* the subround os over once we kicked off sending the fin msg */ | ||
899 | /* FIXME: assert we are talking to the right peer! */ | ||
900 | queue_peer_message_with_cls (cpi, (struct GNUNET_MessageHeader *) fin_msg, fin_sent_cb, cpi); | ||
901 | /* FIXME: mark peer as synced */ | ||
902 | break; | ||
903 | default: | ||
904 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); | ||
905 | break; | ||
906 | } | ||
907 | return GNUNET_YES; | 901 | return GNUNET_YES; |
908 | } | 902 | } |
909 | 903 | ||
910 | 904 | ||
911 | /** | 905 | /** |
912 | * The other peer wants us to inform that he sent us all the elements we requested. | 906 | * Destroy a session, free all resources associated with it. |
907 | * | ||
908 | * @param session the session to destroy | ||
913 | */ | 909 | */ |
914 | static int | 910 | static void |
915 | handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | 911 | destroy_session (struct ConsensusSession *session) |
916 | { | 912 | { |
917 | struct ConsensusRoundMessage *round_msg; | 913 | int i; |
918 | round_msg = (struct ConsensusRoundMessage *) msg; | 914 | |
919 | /* FIXME: only call subround_over if round is the current one! */ | 915 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); |
920 | switch (cpi->session->current_round) | 916 | GNUNET_SERVER_client_drop (session->scss.client); |
917 | session->scss.client = NULL; | ||
918 | if (NULL != session->client_mq) | ||
921 | { | 919 | { |
922 | case CONSENSUS_ROUND_EXCHANGE: | 920 | GNUNET_free (session->client_mq); |
923 | case CONSENSUS_ROUND_STOCK: | 921 | session->client_mq = NULL; |
924 | if (cpi->session->current_round != round_msg->round) | 922 | } |
923 | if (NULL != session->shuffle) | ||
924 | { | ||
925 | GNUNET_free (session->shuffle); | ||
926 | session->shuffle = NULL; | ||
927 | } | ||
928 | if (NULL != session->se) | ||
929 | { | ||
930 | strata_estimator_destroy (session->se); | ||
931 | session->se = NULL; | ||
932 | } | ||
933 | if (NULL != session->info) | ||
934 | { | ||
935 | for (i = 0; i < session->num_peers; i++) | ||
936 | { | ||
937 | struct ConsensusPeerInformation *cpi; | ||
938 | cpi = &session->info[i]; | ||
939 | clear_message_stream_state (&cpi->mss); | ||
940 | if (NULL != cpi->se) | ||
925 | { | 941 | { |
926 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | 942 | strata_estimator_destroy (cpi->se); |
927 | cpi->ibf_state = IBF_STATE_NONE; | 943 | cpi->se = NULL; |
928 | cpi->ibf_bucket_counter = 0; | ||
929 | break; | ||
930 | } | 944 | } |
931 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | 945 | if (NULL != cpi->ibf) |
932 | cpi->exp_subround_finished = GNUNET_YES; | 946 | { |
933 | if (GNUNET_YES == exp_subround_finished (cpi->session)) | 947 | ibf_destroy (cpi->ibf); |
934 | subround_over (cpi->session, NULL); | 948 | cpi->ibf = NULL; |
935 | else | 949 | } |
936 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx); | 950 | } |
937 | break; | 951 | GNUNET_free (session->info); |
938 | case CONSENSUS_ROUND_INVENTORY: | 952 | session->info = NULL; |
939 | cpi->inventory_synced = GNUNET_YES; | 953 | } |
940 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | 954 | if (NULL != session->ibfs) |
941 | if (inventory_round_finished (cpi->session)) | 955 | { |
942 | round_over (cpi->session, NULL); | 956 | for (i = 0; i <= MAX_IBF_ORDER; i++) |
943 | break; | 957 | { |
944 | default: | 958 | if (NULL != session->ibfs[i]) |
945 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); | 959 | { |
946 | break; | 960 | ibf_destroy (session->ibfs[i]); |
961 | session->ibfs[i] = NULL; | ||
962 | } | ||
963 | } | ||
964 | GNUNET_free (session->ibfs); | ||
965 | session->ibfs = NULL; | ||
966 | } | ||
967 | if (NULL != session->values) | ||
968 | { | ||
969 | GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_info_iter, NULL); | ||
970 | GNUNET_CONTAINER_multihashmap_destroy (session->values); | ||
971 | session->values = NULL; | ||
947 | } | 972 | } |
948 | return GNUNET_YES; | ||
949 | } | ||
950 | |||
951 | |||
952 | static struct StrataEstimator * | ||
953 | strata_estimator_create () | ||
954 | { | ||
955 | struct StrataEstimator *se; | ||
956 | int i; | ||
957 | |||
958 | /* fixme: allocate everything in one chunk */ | ||
959 | |||
960 | se = GNUNET_malloc (sizeof (struct StrataEstimator)); | ||
961 | se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter) * STRATA_COUNT); | ||
962 | for (i = 0; i < STRATA_COUNT; i++) | ||
963 | se->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); | ||
964 | 973 | ||
965 | return se; | 974 | if (NULL != session->ibf_key_map) |
975 | { | ||
976 | GNUNET_CONTAINER_multihashmap_destroy (session->ibf_key_map); | ||
977 | session->ibf_key_map = NULL; | ||
978 | } | ||
979 | GNUNET_free (session); | ||
966 | } | 980 | } |
967 | 981 | ||
982 | |||
968 | static void | 983 | static void |
969 | strata_estimator_destroy (struct StrataEstimator *se) | 984 | send_client_conclude_done (struct ConsensusSession *session) |
970 | { | 985 | { |
971 | int i; | 986 | struct PendingMessage *pm; |
972 | for (i = 0; i < STRATA_COUNT; i++) | 987 | |
973 | ibf_destroy (se->strata[i]); | 988 | /* check if client is even there anymore */ |
974 | GNUNET_free (se->strata); | 989 | if (NULL == session->scss.client) |
975 | GNUNET_free (se); | 990 | return; |
991 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader), | ||
992 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); | ||
993 | message_queue_add (session->client_mq, pm); | ||
976 | } | 994 | } |
977 | 995 | ||
978 | 996 | ||
997 | /** | ||
998 | * Check if a strata message is for the current round or not | ||
999 | * | ||
1000 | * @param session session we are in | ||
1001 | * @param strata_msg the strata message to check | ||
1002 | * @return GNUNET_YES if the strata_msg is premature, GNUNET_NO otherwise | ||
1003 | */ | ||
979 | static int | 1004 | static int |
980 | is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg) | 1005 | is_premature_strata_message (const struct ConsensusSession *session, const struct StrataMessage *strata_msg) |
981 | { | 1006 | { |
982 | switch (strata_msg->round) | 1007 | switch (strata_msg->round) |
983 | { | 1008 | { |
984 | case CONSENSUS_ROUND_STOCK: | 1009 | case CONSENSUS_ROUND_COMPLETION: |
985 | case CONSENSUS_ROUND_EXCHANGE: | 1010 | case CONSENSUS_ROUND_EXCHANGE: |
986 | /* here, we also have to compare subrounds */ | 1011 | /* here, we also have to compare subrounds */ |
987 | if ( (strata_msg->round != session->current_round) || | 1012 | if ( (strata_msg->round != session->current_round) || |
988 | (strata_msg->exp_round != session->exp_round) || | 1013 | (strata_msg->exp_round != session->exp_round) || |
989 | (strata_msg->exp_subround != session->exp_subround)) | 1014 | (strata_msg->exp_subround != session->exp_subround) ) |
990 | return GNUNET_YES; | 1015 | return GNUNET_YES; |
991 | break; | 1016 | break; |
992 | default: | 1017 | default: |
@@ -999,6 +1024,72 @@ is_premature_strata_message (const struct ConsensusSession *session, const struc | |||
999 | 1024 | ||
1000 | 1025 | ||
1001 | /** | 1026 | /** |
1027 | * Send a strata estimator. | ||
1028 | * | ||
1029 | * @param cpi the peer | ||
1030 | */ | ||
1031 | static void | ||
1032 | send_strata_estimator (struct ConsensusPeerInformation *cpi) | ||
1033 | { | ||
1034 | struct PendingMessage *pm; | ||
1035 | struct StrataMessage *strata_msg; | ||
1036 | |||
1037 | /* FIXME: why is this correct? */ | ||
1038 | cpi->apparent_round = cpi->session->current_round; | ||
1039 | cpi->ibf_state = IBF_STATE_NONE; | ||
1040 | cpi->ibf_bucket_counter = 0; | ||
1041 | |||
1042 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending SE (in round: %d)\n", cpi->session->current_round); | ||
1043 | |||
1044 | pm = new_pending_message ((sizeof *strata_msg) + (SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE), | ||
1045 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); | ||
1046 | strata_msg = (struct StrataMessage *) pm->msg; | ||
1047 | strata_msg->round = cpi->session->current_round; | ||
1048 | strata_msg->exp_round = cpi->session->exp_round; | ||
1049 | strata_msg->exp_subround = cpi->session->exp_subround; | ||
1050 | strata_estimator_write (cpi->session->se, &strata_msg[1]); | ||
1051 | message_queue_add (cpi->mss.mq, pm); | ||
1052 | } | ||
1053 | |||
1054 | |||
1055 | /** | ||
1056 | * Send an IBF of the order specified in cpi. | ||
1057 | * | ||
1058 | * @param cpi the peer | ||
1059 | */ | ||
1060 | static void | ||
1061 | send_ibf (struct ConsensusPeerInformation *cpi) | ||
1062 | { | ||
1063 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", | ||
1064 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1065 | |||
1066 | cpi->ibf_bucket_counter = 0; | ||
1067 | while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order)) | ||
1068 | { | ||
1069 | unsigned int num_buckets; | ||
1070 | struct PendingMessage *pm; | ||
1071 | struct DifferenceDigest *digest; | ||
1072 | |||
1073 | num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; | ||
1074 | /* limit to maximum */ | ||
1075 | if (num_buckets > BUCKETS_PER_MESSAGE) | ||
1076 | num_buckets = BUCKETS_PER_MESSAGE; | ||
1077 | |||
1078 | pm = new_pending_message ((sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE), | ||
1079 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); | ||
1080 | digest = (struct DifferenceDigest *) pm->msg; | ||
1081 | digest->order = cpi->ibf_order; | ||
1082 | digest->round = cpi->apparent_round; | ||
1083 | ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &digest[1]); | ||
1084 | cpi->ibf_bucket_counter += num_buckets; | ||
1085 | message_queue_add (cpi->mss.mq, pm); | ||
1086 | } | ||
1087 | cpi->ibf_bucket_counter = 0; | ||
1088 | cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; | ||
1089 | } | ||
1090 | |||
1091 | |||
1092 | /** | ||
1002 | * Called when a peer sends us its strata estimator. | 1093 | * Called when a peer sends us its strata estimator. |
1003 | * In response, we sent out IBF of appropriate size back. | 1094 | * In response, we sent out IBF of appropriate size back. |
1004 | * | 1095 | * |
@@ -1008,12 +1099,10 @@ is_premature_strata_message (const struct ConsensusSession *session, const struc | |||
1008 | static int | 1099 | static int |
1009 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) | 1100 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) |
1010 | { | 1101 | { |
1011 | int i; // unsigned? | ||
1012 | unsigned int diff; | 1102 | unsigned int diff; |
1013 | void *buf; | ||
1014 | size_t size; | ||
1015 | 1103 | ||
1016 | if ((cpi->session->current_round == CONSENSUS_ROUND_STOCK) && (strata_msg->round == CONSENSUS_ROUND_INVENTORY)) | 1104 | if ( (cpi->session->current_round == CONSENSUS_ROUND_COMPLETION) && |
1105 | (strata_msg->round == CONSENSUS_ROUND_INVENTORY) ) | ||
1017 | { | 1106 | { |
1018 | /* we still have to handle this request appropriately */ | 1107 | /* we still have to handle this request appropriately */ |
1019 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n", | 1108 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we are already further alog\n", |
@@ -1023,28 +1112,26 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
1023 | { | 1112 | { |
1024 | if (GNUNET_NO == cpi->replaying_strata_message) | 1113 | if (GNUNET_NO == cpi->replaying_strata_message) |
1025 | { | 1114 | { |
1026 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from P%d, (%d,%d)\n", | 1115 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got probably premature SE (%d,%d)\n", |
1027 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround); | 1116 | strata_msg->exp_round, strata_msg->exp_subround); |
1028 | cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg); | 1117 | cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message (&strata_msg->header); |
1029 | } | 1118 | } |
1030 | return GNUNET_YES; | 1119 | return GNUNET_YES; |
1031 | } | 1120 | } |
1032 | 1121 | ||
1033 | if (NULL == cpi->se) | 1122 | if (NULL == cpi->se) |
1034 | cpi->se = strata_estimator_create (); | 1123 | cpi->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); |
1035 | 1124 | ||
1036 | cpi->apparent_round = strata_msg->round; | 1125 | cpi->apparent_round = strata_msg->round; |
1037 | 1126 | ||
1038 | size = ntohs (strata_msg->header.size); | 1127 | if (htons (strata_msg->header.size) != ((sizeof *strata_msg) + SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE)) |
1039 | buf = (void *) &strata_msg[1]; // FIXME: do NOT cast away 'const'! | ||
1040 | for (i = 0; i < STRATA_COUNT; i++) | ||
1041 | { | 1128 | { |
1042 | int res; | 1129 | LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "got SE of wrong size\n"); |
1043 | res = ibf_read (&buf, &size, cpi->se->strata[i]); | 1130 | return GNUNET_NO; |
1044 | GNUNET_assert (GNUNET_OK == res); | ||
1045 | } | 1131 | } |
1046 | 1132 | strata_estimator_read (&strata_msg[1], cpi->se); | |
1047 | diff = estimate_difference (cpi->session->se, cpi->se); | 1133 | GNUNET_assert (NULL != cpi->session->se); |
1134 | diff = strata_estimator_difference (cpi->session->se, cpi->se); | ||
1048 | 1135 | ||
1049 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n", | 1136 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n", |
1050 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff); | 1137 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), diff); |
@@ -1053,10 +1140,10 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
1053 | { | 1140 | { |
1054 | case CONSENSUS_ROUND_EXCHANGE: | 1141 | case CONSENSUS_ROUND_EXCHANGE: |
1055 | case CONSENSUS_ROUND_INVENTORY: | 1142 | case CONSENSUS_ROUND_INVENTORY: |
1056 | case CONSENSUS_ROUND_STOCK: | 1143 | case CONSENSUS_ROUND_COMPLETION: |
1057 | /* send IBF of the right size */ | 1144 | /* send IBF of the right size */ |
1058 | cpi->ibf_order = 0; | 1145 | cpi->ibf_order = 0; |
1059 | while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) ) | 1146 | while (((1 << cpi->ibf_order) < diff) || (SE_IBF_HASH_NUM > (1 << cpi->ibf_order)) ) |
1060 | cpi->ibf_order++; | 1147 | cpi->ibf_order++; |
1061 | if (cpi->ibf_order > MAX_IBF_ORDER) | 1148 | if (cpi->ibf_order > MAX_IBF_ORDER) |
1062 | cpi->ibf_order = MAX_IBF_ORDER; | 1149 | cpi->ibf_order = MAX_IBF_ORDER; |
@@ -1066,7 +1153,6 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
1066 | if (NULL != cpi->ibf) | 1153 | if (NULL != cpi->ibf) |
1067 | ibf_destroy (cpi->ibf); | 1154 | ibf_destroy (cpi->ibf); |
1068 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | 1155 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); |
1069 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | ||
1070 | cpi->ibf_bucket_counter = 0; | 1156 | cpi->ibf_bucket_counter = 0; |
1071 | send_ibf (cpi); | 1157 | send_ibf (cpi); |
1072 | break; | 1158 | break; |
@@ -1079,11 +1165,104 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
1079 | } | 1165 | } |
1080 | 1166 | ||
1081 | 1167 | ||
1168 | |||
1169 | static int | ||
1170 | send_elements_iterator (void *cls, | ||
1171 | const struct GNUNET_HashCode * key, | ||
1172 | void *value) | ||
1173 | { | ||
1174 | struct ConsensusPeerInformation *cpi = cls; | ||
1175 | struct ElementInfo *ei; | ||
1176 | ei = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, value); | ||
1177 | if (NULL == ei) | ||
1178 | { | ||
1179 | LOG_PP (GNUNET_ERROR_TYPE_WARNING, cpi, "peer's ibf contained non-existing element %s\n", | ||
1180 | GNUNET_h2s((struct GNUNET_HashCode *) value)); | ||
1181 | return GNUNET_YES; | ||
1182 | } | ||
1183 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "sending element\n"); | ||
1184 | send_element_or_report (cpi, ei); | ||
1185 | return GNUNET_YES; | ||
1186 | } | ||
1187 | |||
1188 | |||
1189 | /** | ||
1190 | * Decode the current diff ibf, and send elements/requests/reports/ | ||
1191 | * | ||
1192 | * @param cpi partner peer | ||
1193 | */ | ||
1194 | static void | ||
1195 | decode (struct ConsensusPeerInformation *cpi) | ||
1196 | { | ||
1197 | struct IBF_Key key; | ||
1198 | int side; | ||
1199 | |||
1200 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1201 | |||
1202 | while (1) | ||
1203 | { | ||
1204 | int res; | ||
1205 | |||
1206 | res = ibf_decode (cpi->ibf, &side, &key); | ||
1207 | if (GNUNET_SYSERR == res) | ||
1208 | { | ||
1209 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); | ||
1210 | /* decoding failed, we tell the other peer by sending our ibf with a larger order */ | ||
1211 | cpi->ibf_order++; | ||
1212 | prepare_ibf (cpi); | ||
1213 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
1214 | cpi->ibf_bucket_counter = 0; | ||
1215 | send_ibf (cpi); | ||
1216 | return; | ||
1217 | } | ||
1218 | if (GNUNET_NO == res) | ||
1219 | { | ||
1220 | struct PendingMessage *pm; | ||
1221 | struct ConsensusRoundMessage *rmsg; | ||
1222 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); | ||
1223 | |||
1224 | pm = new_pending_message (sizeof *rmsg, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); | ||
1225 | rmsg = (struct ConsensusRoundMessage *) pm->msg; | ||
1226 | rmsg->round = cpi->apparent_round; | ||
1227 | message_queue_add (cpi->mss.mq, pm); | ||
1228 | return; | ||
1229 | } | ||
1230 | if (-1 == side) | ||
1231 | { | ||
1232 | struct GNUNET_HashCode hashcode; | ||
1233 | /* we have the element(s), send it to the other peer */ | ||
1234 | ibf_hashcode_from_key (key, &hashcode); | ||
1235 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); | ||
1236 | } | ||
1237 | else | ||
1238 | { | ||
1239 | struct PendingMessage *pm; | ||
1240 | uint16_t type; | ||
1241 | |||
1242 | switch (cpi->apparent_round) | ||
1243 | { | ||
1244 | case CONSENSUS_ROUND_COMPLETION: | ||
1245 | /* FIXME: check if we really want to request the element */ | ||
1246 | case CONSENSUS_ROUND_EXCHANGE: | ||
1247 | case CONSENSUS_ROUND_INVENTORY: | ||
1248 | type = GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST; | ||
1249 | break; | ||
1250 | default: | ||
1251 | GNUNET_assert (0); | ||
1252 | } | ||
1253 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + sizeof (struct IBF_Key), | ||
1254 | type); | ||
1255 | *(struct IBF_Key *) &pm->msg[1] = key; | ||
1256 | message_queue_add (cpi->mss.mq, pm); | ||
1257 | } | ||
1258 | } | ||
1259 | } | ||
1260 | |||
1261 | |||
1082 | static int | 1262 | static int |
1083 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) | 1263 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) |
1084 | { | 1264 | { |
1085 | int num_buckets; | 1265 | int num_buckets; |
1086 | void *buf; | ||
1087 | 1266 | ||
1088 | /* FIXME: find out if we're still expecting the same ibf! */ | 1267 | /* FIXME: find out if we're still expecting the same ibf! */ |
1089 | 1268 | ||
@@ -1128,13 +1307,10 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig | |||
1128 | return GNUNET_YES; | 1307 | return GNUNET_YES; |
1129 | } | 1308 | } |
1130 | 1309 | ||
1131 | |||
1132 | if (NULL == cpi->ibf) | 1310 | if (NULL == cpi->ibf) |
1133 | cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); | 1311 | cpi->ibf = ibf_create (1 << cpi->ibf_order, SE_IBF_HASH_NUM); |
1134 | |||
1135 | buf = (void *) &digest[1]; // FIXME: digest is supposed to be READ ONLY! | ||
1136 | ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf); | ||
1137 | 1312 | ||
1313 | ibf_read_slice (&digest[1], cpi->ibf_bucket_counter, num_buckets, cpi->ibf); | ||
1138 | cpi->ibf_bucket_counter += num_buckets; | 1314 | cpi->ibf_bucket_counter += num_buckets; |
1139 | 1315 | ||
1140 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) | 1316 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) |
@@ -1150,19 +1326,53 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig | |||
1150 | 1326 | ||
1151 | 1327 | ||
1152 | /** | 1328 | /** |
1329 | * Insert an element into the consensus set of the specified session. | ||
1330 | * The element will not be copied, and freed when destroying the session. | ||
1331 | * | ||
1332 | * @param session session for new element | ||
1333 | * @param element element to insert | ||
1334 | */ | ||
1335 | static void | ||
1336 | insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element) | ||
1337 | { | ||
1338 | struct GNUNET_HashCode hash; | ||
1339 | struct ElementInfo *e; | ||
1340 | struct IBF_Key ibf_key; | ||
1341 | int i; | ||
1342 | |||
1343 | e = GNUNET_new (struct ElementInfo); | ||
1344 | e->element = element; | ||
1345 | e->element_hash = GNUNET_new (struct GNUNET_HashCode); | ||
1346 | GNUNET_CRYPTO_hash (e->element->data, e->element->size, e->element_hash); | ||
1347 | ibf_key = ibf_key_from_hashcode (e->element_hash); | ||
1348 | ibf_hashcode_from_key (ibf_key, &hash); | ||
1349 | strata_estimator_insert (session->se, &hash); | ||
1350 | GNUNET_CONTAINER_multihashmap_put (session->values, e->element_hash, e, | ||
1351 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
1352 | GNUNET_CONTAINER_multihashmap_put (session->ibf_key_map, &hash, e->element_hash, | ||
1353 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1354 | |||
1355 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
1356 | { | ||
1357 | if (NULL == session->ibfs[i]) | ||
1358 | continue; | ||
1359 | ibf_insert (session->ibfs[i], ibf_key); | ||
1360 | } | ||
1361 | } | ||
1362 | |||
1363 | |||
1364 | /** | ||
1153 | * Handle an element that another peer sent us | 1365 | * Handle an element that another peer sent us |
1154 | */ | 1366 | */ |
1155 | static int | 1367 | static int |
1156 | handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) | 1368 | handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) |
1157 | { | 1369 | { |
1158 | struct PendingElement *pending_element; | ||
1159 | struct GNUNET_CONSENSUS_Element *element; | 1370 | struct GNUNET_CONSENSUS_Element *element; |
1160 | struct GNUNET_CONSENSUS_ElementMessage *client_element_msg; | ||
1161 | size_t size; | 1371 | size_t size; |
1162 | 1372 | ||
1163 | switch (cpi->session->current_round) | 1373 | switch (cpi->session->current_round) |
1164 | { | 1374 | { |
1165 | case CONSENSUS_ROUND_STOCK: | 1375 | case CONSENSUS_ROUND_COMPLETION: |
1166 | /* FIXME: check if we really expect the element */ | 1376 | /* FIXME: check if we really expect the element */ |
1167 | case CONSENSUS_ROUND_EXCHANGE: | 1377 | case CONSENSUS_ROUND_EXCHANGE: |
1168 | break; | 1378 | break; |
@@ -1178,21 +1388,10 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me | |||
1178 | memcpy (&element[1], &element_msg[1], size); | 1388 | memcpy (&element[1], &element_msg[1], size); |
1179 | element->data = &element[1]; | 1389 | element->data = &element[1]; |
1180 | 1390 | ||
1181 | pending_element = GNUNET_malloc (sizeof *pending_element); | 1391 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "got element\n"); |
1182 | pending_element->element = element; | ||
1183 | GNUNET_CONTAINER_DLL_insert_tail (cpi->session->client_approval_head, cpi->session->client_approval_tail, pending_element); | ||
1184 | |||
1185 | client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg); | ||
1186 | client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); | ||
1187 | client_element_msg->header.size = htons (size + sizeof *client_element_msg); | ||
1188 | memcpy (&client_element_msg[1], &element[1], size); | ||
1189 | 1392 | ||
1190 | queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg); | 1393 | insert_element (cpi->session, element); |
1191 | 1394 | ||
1192 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element, size=%d\n", size); | ||
1193 | |||
1194 | client_send_next (cpi->session); | ||
1195 | |||
1196 | return GNUNET_YES; | 1395 | return GNUNET_YES; |
1197 | } | 1396 | } |
1198 | 1397 | ||
@@ -1213,20 +1412,36 @@ handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct E | |||
1213 | /* element requests are allowed in every round */ | 1412 | /* element requests are allowed in every round */ |
1214 | 1413 | ||
1215 | num = ntohs (msg->header.size) / sizeof (struct IBF_Key); | 1414 | num = ntohs (msg->header.size) / sizeof (struct IBF_Key); |
1216 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num); | ||
1217 | 1415 | ||
1218 | ibf_key = (struct IBF_Key *) &msg[1]; | 1416 | ibf_key = (struct IBF_Key *) &msg[1]; |
1219 | while (num--) | 1417 | while (num--) |
1220 | { | 1418 | { |
1221 | struct ElementList *head; | ||
1222 | ibf_hashcode_from_key (*ibf_key, &hashcode); | 1419 | ibf_hashcode_from_key (*ibf_key, &hashcode); |
1223 | head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); | 1420 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->ibf_key_map, &hashcode, send_elements_iterator, cpi); |
1224 | send_elements (cpi, head); | ||
1225 | ibf_key++; | 1421 | ibf_key++; |
1226 | } | 1422 | } |
1227 | return GNUNET_YES; | 1423 | return GNUNET_YES; |
1228 | } | 1424 | } |
1229 | 1425 | ||
1426 | static int | ||
1427 | is_peer_connected (struct ConsensusPeerInformation *cpi) | ||
1428 | { | ||
1429 | if (NULL == cpi->mss.socket) | ||
1430 | return GNUNET_NO; | ||
1431 | return GNUNET_YES; | ||
1432 | } | ||
1433 | |||
1434 | |||
1435 | static void | ||
1436 | ensure_peer_connected (struct ConsensusPeerInformation *cpi) | ||
1437 | { | ||
1438 | if (NULL != cpi->mss.socket) | ||
1439 | return; | ||
1440 | cpi->mss.socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
1441 | open_cb, cpi, GNUNET_STREAM_OPTION_END); | ||
1442 | } | ||
1443 | |||
1444 | |||
1230 | /** | 1445 | /** |
1231 | * If necessary, send a message to the peer, depending on the current | 1446 | * If necessary, send a message to the peer, depending on the current |
1232 | * round. | 1447 | * round. |
@@ -1234,17 +1449,22 @@ handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct E | |||
1234 | static void | 1449 | static void |
1235 | embrace_peer (struct ConsensusPeerInformation *cpi) | 1450 | embrace_peer (struct ConsensusPeerInformation *cpi) |
1236 | { | 1451 | { |
1237 | GNUNET_assert (GNUNET_YES == cpi->hello); | 1452 | if (GNUNET_NO == is_peer_connected (cpi)) |
1453 | { | ||
1454 | ensure_peer_connected (cpi); | ||
1455 | return; | ||
1456 | } | ||
1457 | if (GNUNET_NO == cpi->hello) | ||
1458 | return; | ||
1459 | /* FIXME: correctness of switch */ | ||
1238 | switch (cpi->session->current_round) | 1460 | switch (cpi->session->current_round) |
1239 | { | 1461 | { |
1240 | case CONSENSUS_ROUND_EXCHANGE: | 1462 | case CONSENSUS_ROUND_EXCHANGE: |
1463 | case CONSENSUS_ROUND_INVENTORY: | ||
1241 | if (cpi->session->partner_outgoing != cpi) | 1464 | if (cpi->session->partner_outgoing != cpi) |
1242 | break; | 1465 | break; |
1243 | /* fallthrough */ | 1466 | /* fallthrough */ |
1244 | case CONSENSUS_ROUND_INVENTORY: | 1467 | case CONSENSUS_ROUND_COMPLETION: |
1245 | /* fallthrough */ | ||
1246 | case CONSENSUS_ROUND_STOCK: | ||
1247 | if (cpi == cpi->session->partner_outgoing) | ||
1248 | send_strata_estimator (cpi); | 1468 | send_strata_estimator (cpi); |
1249 | default: | 1469 | default: |
1250 | break; | 1470 | break; |
@@ -1253,195 +1473,313 @@ embrace_peer (struct ConsensusPeerInformation *cpi) | |||
1253 | 1473 | ||
1254 | 1474 | ||
1255 | /** | 1475 | /** |
1256 | * Handle a HELLO-message, send when another peer wants to join a session where | 1476 | * Called when stream has finishes writing the hello message |
1257 | * our peer is a member. The session may or may not be inhabited yet. | ||
1258 | */ | 1477 | */ |
1259 | static int | 1478 | static void |
1260 | handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) | 1479 | hello_cont (void *cls) |
1261 | { | 1480 | { |
1262 | /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */ | 1481 | struct ConsensusPeerInformation *cpi = cls; |
1263 | struct ConsensusSession *session; | ||
1264 | int idx; | ||
1265 | 1482 | ||
1266 | for (session = sessions_head; NULL != session; session = session->next) | 1483 | cpi->hello = GNUNET_YES; |
1267 | { | 1484 | embrace_peer (cpi); |
1268 | if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) | ||
1269 | continue; | ||
1270 | idx = get_peer_idx (&inc->peer_id, session); | ||
1271 | GNUNET_assert (-1 != idx); | ||
1272 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx); | ||
1273 | inc->cpi = &session->info[idx]; | ||
1274 | inc->cpi->mst = inc->mst; | ||
1275 | inc->cpi->hello = GNUNET_YES; | ||
1276 | inc->cpi->socket = inc->socket; | ||
1277 | embrace_peer (inc->cpi); | ||
1278 | return GNUNET_YES; | ||
1279 | } | ||
1280 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n"); | ||
1281 | return GNUNET_NO; | ||
1282 | } | 1485 | } |
1283 | 1486 | ||
1284 | 1487 | ||
1285 | /** | 1488 | /** |
1286 | * Send a strata estimator. | 1489 | * Called when we established a stream connection to another peer |
1287 | * | 1490 | * |
1288 | * @param cpi the peer | 1491 | * @param cls cpi of the peer we just connected to |
1492 | * @param socket socket to use to communicate with the other side (read/write) | ||
1289 | */ | 1493 | */ |
1290 | static void | 1494 | static void |
1291 | send_strata_estimator (struct ConsensusPeerInformation *cpi) | 1495 | open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) |
1292 | { | 1496 | { |
1293 | struct StrataMessage *strata_msg; | 1497 | struct ConsensusPeerInformation *cpi = cls; |
1294 | void *buf; | 1498 | struct PendingMessage *pm; |
1295 | size_t msize; | 1499 | struct ConsensusHello *hello; |
1296 | int i; | ||
1297 | 1500 | ||
1298 | cpi->apparent_round = cpi->session->current_round; | 1501 | GNUNET_assert (NULL == cpi->mss.mst); |
1299 | cpi->ibf_state = IBF_STATE_NONE; | 1502 | GNUNET_assert (NULL == cpi->mss.mq); |
1300 | cpi->ibf_bucket_counter = 0; | ||
1301 | 1503 | ||
1302 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE(%d) to P%d\n", | 1504 | cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); |
1303 | cpi->session->local_peer_idx, cpi->session->current_round, (int) (cpi - cpi->session->info)); | 1505 | cpi->mss.mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); |
1506 | cpi->mss.mst_cls = cpi; | ||
1304 | 1507 | ||
1305 | msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); | 1508 | pm = new_pending_message (sizeof *hello, GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); |
1509 | hello = (struct ConsensusHello *) pm->msg; | ||
1510 | memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); | ||
1511 | pm->sent_cb = hello_cont; | ||
1512 | pm->sent_cb_cls = cpi; | ||
1513 | message_queue_add (cpi->mss.mq, pm); | ||
1514 | cpi->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1515 | &stream_data_processor, &cpi->mss); | ||
1516 | } | ||
1306 | 1517 | ||
1307 | strata_msg = GNUNET_malloc (msize); | 1518 | |
1308 | strata_msg->header.size = htons (msize); | 1519 | static void |
1309 | strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); | 1520 | replay_premature_message (struct ConsensusPeerInformation *cpi) |
1310 | strata_msg->round = cpi->session->current_round; | 1521 | { |
1311 | strata_msg->exp_round = cpi->session->exp_round; | 1522 | if (NULL != cpi->premature_strata_message) |
1312 | strata_msg->exp_subround = cpi->session->exp_subround; | ||
1313 | |||
1314 | buf = &strata_msg[1]; | ||
1315 | for (i = 0; i < STRATA_COUNT; i++) | ||
1316 | { | 1523 | { |
1317 | ibf_write (cpi->session->se->strata[i], &buf, NULL); | 1524 | struct StrataMessage *sm; |
1318 | } | 1525 | |
1526 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); | ||
1527 | sm = cpi->premature_strata_message; | ||
1528 | cpi->premature_strata_message = NULL; | ||
1529 | |||
1530 | cpi->replaying_strata_message = GNUNET_YES; | ||
1531 | handle_p2p_strata (cpi, sm); | ||
1532 | cpi->replaying_strata_message = GNUNET_NO; | ||
1319 | 1533 | ||
1320 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg); | 1534 | GNUNET_free (sm); |
1535 | } | ||
1321 | } | 1536 | } |
1322 | 1537 | ||
1323 | 1538 | ||
1324 | /** | 1539 | /** |
1325 | * Send an IBF of the order specified in cpi. | 1540 | * Start the inventory round, contact all peers we are supposed to contact. |
1326 | * | 1541 | * |
1327 | * @param cpi the peer | 1542 | * @param session the current session |
1328 | */ | 1543 | */ |
1329 | static void | 1544 | static void |
1330 | send_ibf (struct ConsensusPeerInformation *cpi) | 1545 | start_inventory (struct ConsensusSession *session) |
1331 | { | 1546 | { |
1332 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n", | 1547 | int i; |
1333 | cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | 1548 | int last; |
1334 | 1549 | ||
1335 | cpi->ibf_bucket_counter = 0; | 1550 | for (i = 0; i < session->num_peers; i++) |
1336 | while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order)) | ||
1337 | { | 1551 | { |
1338 | int num_buckets; | 1552 | session->info[i].ibf_bucket_counter = 0; |
1339 | void *buf; | 1553 | session->info[i].ibf_state = IBF_STATE_NONE; |
1340 | struct DifferenceDigest *digest; | 1554 | session->info[i].is_outgoing = GNUNET_NO; |
1341 | int msize; | 1555 | } |
1342 | 1556 | ||
1343 | num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; | 1557 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; |
1344 | /* limit to maximum */ | 1558 | i = (session->local_peer_idx + 1) % session->num_peers; |
1345 | if (num_buckets > BUCKETS_PER_MESSAGE) | 1559 | while (i != last) |
1346 | num_buckets = BUCKETS_PER_MESSAGE; | 1560 | { |
1561 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); | ||
1562 | session->info[i].is_outgoing = GNUNET_YES; | ||
1563 | embrace_peer (&session->info[i]); | ||
1564 | i = (i + 1) % session->num_peers; | ||
1565 | } | ||
1566 | // tie-breaker for even number of peers | ||
1567 | if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) | ||
1568 | { | ||
1569 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); | ||
1570 | session->info[last].is_outgoing = GNUNET_YES; | ||
1571 | embrace_peer (&session->info[last]); | ||
1572 | } | ||
1347 | 1573 | ||
1348 | msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); | 1574 | for (i = 0; i < session->num_peers; i++) |
1575 | { | ||
1576 | if (GNUNET_NO == session->info[i].is_outgoing) | ||
1577 | replay_premature_message (&session->info[i]); | ||
1578 | } | ||
1579 | } | ||
1349 | 1580 | ||
1350 | digest = GNUNET_malloc (msize); | ||
1351 | digest->header.size = htons (msize); | ||
1352 | digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); | ||
1353 | digest->order = cpi->ibf_order; | ||
1354 | digest->round = cpi->apparent_round; | ||
1355 | 1581 | ||
1356 | buf = &digest[1]; | 1582 | /** |
1357 | ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL); | 1583 | * Iterator over hash map entries. |
1584 | * | ||
1585 | * @param cls closure | ||
1586 | * @param key current key code | ||
1587 | * @param value value in the hash map | ||
1588 | * @return GNUNET_YES if we should continue to | ||
1589 | * iterate, | ||
1590 | * GNUNET_NO if not. | ||
1591 | */ | ||
1592 | static int | ||
1593 | send_client_elements_iter (void *cls, | ||
1594 | const struct GNUNET_HashCode * key, | ||
1595 | void *value) | ||
1596 | { | ||
1597 | struct ConsensusSession *session = cls; | ||
1598 | struct ElementInfo *ei = value; | ||
1599 | struct PendingMessage *pm; | ||
1358 | 1600 | ||
1359 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest); | 1601 | /* is the client still there? */ |
1602 | if (NULL == session->scss.client) | ||
1603 | return GNUNET_NO; | ||
1360 | 1604 | ||
1361 | cpi->ibf_bucket_counter += num_buckets; | 1605 | pm = new_pending_message (sizeof (struct GNUNET_MessageHeader) + ei->element->size, |
1362 | } | 1606 | GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); |
1363 | cpi->ibf_bucket_counter = 0; | 1607 | message_queue_add (session->client_mq, pm); |
1364 | cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; | 1608 | return GNUNET_YES; |
1365 | } | 1609 | } |
1366 | 1610 | ||
1367 | 1611 | ||
1612 | |||
1368 | /** | 1613 | /** |
1369 | * Decode the current diff ibf, and send elements/requests/reports/ | 1614 | * Start the next round. |
1615 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | ||
1370 | * | 1616 | * |
1371 | * @param cpi partner peer | 1617 | * @param cls the session |
1618 | * @param tc task context, for when this task is invoked by the scheduler, | ||
1619 | * NULL if invoked for another reason | ||
1372 | */ | 1620 | */ |
1373 | static void | 1621 | static void |
1374 | decode (struct ConsensusPeerInformation *cpi) | 1622 | round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
1375 | { | 1623 | { |
1376 | struct IBF_Key key; | 1624 | struct ConsensusSession *session; |
1377 | struct GNUNET_HashCode hashcode; | ||
1378 | int side; | ||
1379 | 1625 | ||
1380 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | 1626 | /* don't kick off next round if we're shutting down */ |
1627 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
1628 | return; | ||
1381 | 1629 | ||
1382 | while (1) | 1630 | session = cls; |
1631 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); | ||
1632 | |||
1633 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | ||
1383 | { | 1634 | { |
1384 | int res; | 1635 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); |
1636 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | ||
1637 | } | ||
1385 | 1638 | ||
1386 | res = ibf_decode (cpi->ibf, &side, &key); | 1639 | switch (session->current_round) |
1387 | if (GNUNET_SYSERR == res) | 1640 | { |
1388 | { | 1641 | case CONSENSUS_ROUND_BEGIN: |
1389 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); | 1642 | session->current_round = CONSENSUS_ROUND_EXCHANGE; |
1390 | /* decoding failed, we tell the other peer by sending our ibf with a larger order */ | 1643 | session->exp_round = 0; |
1391 | cpi->ibf_order++; | 1644 | subround_over (session, NULL); |
1392 | prepare_ibf (cpi); | 1645 | break; |
1393 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | 1646 | case CONSENSUS_ROUND_EXCHANGE: |
1394 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | 1647 | /* handle two peers specially */ |
1395 | cpi->ibf_bucket_counter = 0; | 1648 | if (session->num_peers <= 2) |
1396 | send_ibf (cpi); | 1649 | { |
1397 | return; | 1650 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); |
1398 | } | 1651 | GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); |
1399 | if (GNUNET_NO == res) | 1652 | send_client_conclude_done (session); |
1400 | { | 1653 | session->current_round = CONSENSUS_ROUND_FINISH; |
1401 | struct ConsensusRoundMessage *msg; | 1654 | return; |
1402 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, sending SYNC\n", cpi->session->local_peer_idx); | 1655 | } |
1403 | msg = GNUNET_malloc (sizeof *msg); | 1656 | session->current_round = CONSENSUS_ROUND_INVENTORY; |
1404 | msg->header.size = htons (sizeof *msg); | 1657 | start_inventory (session); |
1405 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); | 1658 | break; |
1406 | msg->round = cpi->apparent_round; | 1659 | case CONSENSUS_ROUND_INVENTORY: |
1407 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg); | 1660 | session->current_round = CONSENSUS_ROUND_COMPLETION; |
1408 | return; | 1661 | session->exp_round = 0; |
1409 | } | 1662 | subround_over (session, NULL); |
1410 | if (-1 == side) | 1663 | break; |
1411 | { | 1664 | case CONSENSUS_ROUND_COMPLETION: |
1412 | struct ElementList *head; | 1665 | session->current_round = CONSENSUS_ROUND_FINISH; |
1413 | /* we have the element(s), send it to the other peer */ | 1666 | send_client_conclude_done (session); |
1414 | ibf_hashcode_from_key (key, &hashcode); | 1667 | break; |
1415 | head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); | 1668 | default: |
1416 | send_elements (cpi, head); | 1669 | GNUNET_assert (0); |
1417 | } | 1670 | } |
1418 | else | 1671 | } |
1419 | { | ||
1420 | struct ElementRequest *msg; | ||
1421 | size_t msize; | ||
1422 | struct IBF_Key *p; | ||
1423 | 1672 | ||
1424 | msize = (sizeof *msg) + sizeof (struct IBF_Key); | 1673 | |
1425 | msg = GNUNET_malloc (msize); | 1674 | static void |
1426 | switch (cpi->apparent_round) | 1675 | fin_sent_cb (void *cls) |
1676 | { | ||
1677 | struct ConsensusPeerInformation *cpi; | ||
1678 | cpi = cls; | ||
1679 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", cpi->session->local_peer_idx); | ||
1680 | switch (cpi->session->current_round) | ||
1681 | { | ||
1682 | case CONSENSUS_ROUND_EXCHANGE: | ||
1683 | case CONSENSUS_ROUND_COMPLETION: | ||
1684 | if (cpi->session->current_round != cpi->apparent_round) | ||
1427 | { | 1685 | { |
1428 | case CONSENSUS_ROUND_STOCK: | 1686 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the past\n", cpi->session->local_peer_idx); |
1429 | /* FIXME: check if we really want to request the element */ | 1687 | break; |
1430 | case CONSENSUS_ROUND_EXCHANGE: | ||
1431 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); | ||
1432 | break; | ||
1433 | case CONSENSUS_ROUND_INVENTORY: | ||
1434 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); | ||
1435 | break; | ||
1436 | default: | ||
1437 | GNUNET_assert (0); | ||
1438 | } | 1688 | } |
1439 | msg->header.size = htons (msize); | 1689 | cpi->exp_subround_finished = GNUNET_YES; |
1440 | p = (struct IBF_Key *) &msg[1]; | 1690 | /* the subround is only really over if *both* partners are done */ |
1441 | *p = key; | 1691 | if (GNUNET_YES == exp_subround_finished (cpi->session)) |
1442 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg); | 1692 | subround_over (cpi->session, NULL); |
1443 | } | 1693 | else |
1694 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after FIN sent\n", cpi->session->local_peer_idx); | ||
1695 | break; | ||
1696 | case CONSENSUS_ROUND_INVENTORY: | ||
1697 | cpi->inventory_synced = GNUNET_YES; | ||
1698 | if (inventory_round_finished (cpi->session) && cpi->session->current_round == cpi->apparent_round) | ||
1699 | round_over (cpi->session, NULL); | ||
1700 | /* FIXME: maybe go to next round */ | ||
1701 | break; | ||
1702 | default: | ||
1703 | GNUNET_break (0); | ||
1704 | } | ||
1705 | } | ||
1706 | |||
1707 | |||
1708 | /** | ||
1709 | * The other peer wants us to inform that he sent us all the elements we requested. | ||
1710 | */ | ||
1711 | static int | ||
1712 | handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | ||
1713 | { | ||
1714 | struct ConsensusRoundMessage *round_msg; | ||
1715 | round_msg = (struct ConsensusRoundMessage *) msg; | ||
1716 | /* FIXME: only call subround_over if round is the current one! */ | ||
1717 | switch (cpi->session->current_round) | ||
1718 | { | ||
1719 | case CONSENSUS_ROUND_EXCHANGE: | ||
1720 | case CONSENSUS_ROUND_COMPLETION: | ||
1721 | if (cpi->session->current_round != round_msg->round) | ||
1722 | { | ||
1723 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1724 | cpi->ibf_state = IBF_STATE_NONE; | ||
1725 | cpi->ibf_bucket_counter = 0; | ||
1726 | break; | ||
1727 | } | ||
1728 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1729 | cpi->exp_subround_finished = GNUNET_YES; | ||
1730 | if (GNUNET_YES == exp_subround_finished (cpi->session)) | ||
1731 | subround_over (cpi->session, NULL); | ||
1732 | else | ||
1733 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after got FIN\n", cpi->session->local_peer_idx); | ||
1734 | break; | ||
1735 | case CONSENSUS_ROUND_INVENTORY: | ||
1736 | cpi->inventory_synced = GNUNET_YES; | ||
1737 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info)); | ||
1738 | if (inventory_round_finished (cpi->session)) | ||
1739 | round_over (cpi->session, NULL); | ||
1740 | break; | ||
1741 | default: | ||
1742 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); | ||
1743 | break; | ||
1444 | } | 1744 | } |
1745 | return GNUNET_YES; | ||
1746 | } | ||
1747 | |||
1748 | |||
1749 | /** | ||
1750 | * Gets called when the other peer wants us to inform that | ||
1751 | * it has decoded our ibf and sent us all elements / requests | ||
1752 | */ | ||
1753 | static int | ||
1754 | handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | ||
1755 | { | ||
1756 | struct PendingMessage *pm; | ||
1757 | struct ConsensusRoundMessage *fin_msg; | ||
1758 | |||
1759 | /* FIXME: why handle current round?? */ | ||
1760 | switch (cpi->session->current_round) | ||
1761 | { | ||
1762 | case CONSENSUS_ROUND_INVENTORY: | ||
1763 | cpi->inventory_synced = GNUNET_YES; | ||
1764 | case CONSENSUS_ROUND_COMPLETION: | ||
1765 | case CONSENSUS_ROUND_EXCHANGE: | ||
1766 | LOG_PP (GNUNET_ERROR_TYPE_INFO, cpi, "received SYNC\n"); | ||
1767 | pm = new_pending_message (sizeof *fin_msg, | ||
1768 | GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); | ||
1769 | fin_msg = (struct ConsensusRoundMessage *) pm->msg; | ||
1770 | fin_msg->round = cpi->apparent_round; | ||
1771 | /* the subround is over once we kicked off sending the fin msg */ | ||
1772 | /* FIXME: assert we are talking to the right peer! */ | ||
1773 | /* FIXME: mark peer as synced */ | ||
1774 | pm->sent_cb = fin_sent_cb; | ||
1775 | pm->sent_cb_cls = cpi; | ||
1776 | message_queue_add (cpi->mss.mq, pm); | ||
1777 | break; | ||
1778 | default: | ||
1779 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); | ||
1780 | break; | ||
1781 | } | ||
1782 | return GNUNET_YES; | ||
1445 | } | 1783 | } |
1446 | 1784 | ||
1447 | 1785 | ||
@@ -1459,8 +1797,9 @@ decode (struct ConsensusPeerInformation *cpi) | |||
1459 | static int | 1797 | static int |
1460 | mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | 1798 | mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) |
1461 | { | 1799 | { |
1462 | struct ConsensusPeerInformation *cpi; | 1800 | struct ConsensusPeerInformation *cpi = cls; |
1463 | cpi = cls; | 1801 | GNUNET_assert (NULL == client); |
1802 | GNUNET_assert (NULL != cls); | ||
1464 | switch (ntohs (message->type)) | 1803 | switch (ntohs (message->type)) |
1465 | { | 1804 | { |
1466 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: | 1805 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: |
@@ -1485,6 +1824,228 @@ mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader | |||
1485 | } | 1824 | } |
1486 | 1825 | ||
1487 | 1826 | ||
1827 | static void | ||
1828 | shuffle (struct ConsensusSession *session) | ||
1829 | { | ||
1830 | /* adapted from random_permute in util/crypto_random.c */ | ||
1831 | /* FIXME | ||
1832 | unsigned int *ret; | ||
1833 | unsigned int i; | ||
1834 | unsigned int tmp; | ||
1835 | uint32_t x; | ||
1836 | |||
1837 | GNUNET_assert (n > 0); | ||
1838 | ret = GNUNET_malloc (n * sizeof (unsigned int)); | ||
1839 | for (i = 0; i < n; i++) | ||
1840 | ret[i] = i; | ||
1841 | for (i = n - 1; i > 0; i--) | ||
1842 | { | ||
1843 | x = GNUNET_CRYPTO_random_u32 (mode, i + 1); | ||
1844 | tmp = ret[x]; | ||
1845 | ret[x] = ret[i]; | ||
1846 | ret[i] = tmp; | ||
1847 | } | ||
1848 | */ | ||
1849 | } | ||
1850 | |||
1851 | |||
1852 | /** | ||
1853 | * Find and set the partner_incoming and partner_outgoing of our peer, | ||
1854 | * one of them may not exist in most cases. | ||
1855 | * | ||
1856 | * @param session the consensus session | ||
1857 | */ | ||
1858 | static void | ||
1859 | find_partners (struct ConsensusSession *session) | ||
1860 | { | ||
1861 | int mark[session->num_peers]; | ||
1862 | int i; | ||
1863 | memset (mark, 0, session->num_peers * sizeof (int)); | ||
1864 | session->partner_incoming = session->partner_outgoing = NULL; | ||
1865 | for (i = 0; i < session->num_peers; i++) | ||
1866 | { | ||
1867 | int arc; | ||
1868 | if (0 != mark[i]) | ||
1869 | continue; | ||
1870 | arc = (i + (1 << session->exp_subround)) % session->num_peers; | ||
1871 | mark[i] = mark[arc] = 1; | ||
1872 | GNUNET_assert (i != arc); | ||
1873 | if (i == session->local_peer_idx) | ||
1874 | { | ||
1875 | GNUNET_assert (NULL == session->partner_outgoing); | ||
1876 | session->partner_outgoing = &session->info[session->shuffle[arc]]; | ||
1877 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | ||
1878 | } | ||
1879 | if (arc == session->local_peer_idx) | ||
1880 | { | ||
1881 | GNUNET_assert (NULL == session->partner_incoming); | ||
1882 | session->partner_incoming = &session->info[session->shuffle[i]]; | ||
1883 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
1884 | } | ||
1885 | } | ||
1886 | } | ||
1887 | |||
1888 | |||
1889 | /** | ||
1890 | * Do the next subround in the exp-scheme. | ||
1891 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | ||
1892 | * | ||
1893 | * @param cls the session | ||
1894 | * @param tc task context, for when this task is invoked by the scheduler, | ||
1895 | * NULL if invoked for another reason | ||
1896 | */ | ||
1897 | static void | ||
1898 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1899 | { | ||
1900 | struct ConsensusSession *session; | ||
1901 | int i; | ||
1902 | |||
1903 | /* don't kick off next subround if we're shutting down */ | ||
1904 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
1905 | return; | ||
1906 | session = cls; | ||
1907 | /* cancel timeout */ | ||
1908 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | ||
1909 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | ||
1910 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | ||
1911 | /* check if we are done with the log phase, 2-peer consensus only does one log round */ | ||
1912 | if ( (session->exp_round == NUM_EXP_ROUNDS) || | ||
1913 | ((session->num_peers == 2) && (session->exp_round == 1))) | ||
1914 | { | ||
1915 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx); | ||
1916 | round_over (session, NULL); | ||
1917 | return; | ||
1918 | } | ||
1919 | if (session->exp_round == 0) | ||
1920 | { | ||
1921 | /* initialize everything for the log-rounds */ | ||
1922 | session->exp_round = 1; | ||
1923 | session->exp_subround = 0; | ||
1924 | if (NULL == session->shuffle) | ||
1925 | session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers); | ||
1926 | for (i = 0; i < session->num_peers; i++) | ||
1927 | session->shuffle[i] = i; | ||
1928 | } | ||
1929 | else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) | ||
1930 | { | ||
1931 | /* subrounds done, start new log-round */ | ||
1932 | session->exp_round++; | ||
1933 | session->exp_subround = 0; | ||
1934 | shuffle (session); | ||
1935 | } | ||
1936 | else | ||
1937 | { | ||
1938 | session->exp_subround++; | ||
1939 | } | ||
1940 | |||
1941 | find_partners (session); | ||
1942 | |||
1943 | #ifdef GNUNET_EXTRA_LOGGING | ||
1944 | { | ||
1945 | int in; | ||
1946 | int out; | ||
1947 | if (session->partner_outgoing == NULL) | ||
1948 | out = -1; | ||
1949 | else | ||
1950 | out = (int) (session->partner_outgoing - session->info); | ||
1951 | if (session->partner_incoming == NULL) | ||
1952 | in = -1; | ||
1953 | else | ||
1954 | in = (int) (session->partner_incoming - session->info); | ||
1955 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, | ||
1956 | session->exp_round, session->exp_subround, in, out); | ||
1957 | } | ||
1958 | #endif /* GNUNET_EXTRA_LOGGING */ | ||
1959 | |||
1960 | if (NULL != session->partner_incoming) | ||
1961 | { | ||
1962 | session->partner_incoming->ibf_state = IBF_STATE_NONE; | ||
1963 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
1964 | session->partner_incoming->ibf_bucket_counter = 0; | ||
1965 | |||
1966 | /* maybe there's an early strata estimator? */ | ||
1967 | replay_premature_message (session->partner_incoming); | ||
1968 | } | ||
1969 | |||
1970 | if (NULL != session->partner_outgoing) | ||
1971 | { | ||
1972 | session->partner_outgoing->ibf_state = IBF_STATE_NONE; | ||
1973 | session->partner_outgoing->ibf_bucket_counter = 0; | ||
1974 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | ||
1975 | /* make sure peer is connected and send the SE */ | ||
1976 | embrace_peer (session->partner_outgoing); | ||
1977 | } | ||
1978 | |||
1979 | /* | ||
1980 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), | ||
1981 | subround_over, session); | ||
1982 | */ | ||
1983 | } | ||
1984 | |||
1985 | |||
1986 | /** | ||
1987 | * Search peer in the list of peers in session. | ||
1988 | * | ||
1989 | * @param peer peer to find | ||
1990 | * @param session session with peer | ||
1991 | * @return index of peer, -1 if peer is not in session | ||
1992 | */ | ||
1993 | static int | ||
1994 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) | ||
1995 | { | ||
1996 | int i; | ||
1997 | for (i = 0; i < session->num_peers; i++) | ||
1998 | if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer)) | ||
1999 | return i; | ||
2000 | return -1; | ||
2001 | } | ||
2002 | |||
2003 | |||
2004 | /** | ||
2005 | * Handle a HELLO-message, send when another peer wants to join a session where | ||
2006 | * our peer is a member. The session may or may not be inhabited yet. | ||
2007 | */ | ||
2008 | static int | ||
2009 | handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) | ||
2010 | { | ||
2011 | struct ConsensusSession *session; | ||
2012 | |||
2013 | if (NULL != inc->requested_gid) | ||
2014 | { | ||
2015 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session more than once, ignoring\n"); | ||
2016 | return GNUNET_YES; | ||
2017 | } | ||
2018 | if (NULL != inc->cpi) | ||
2019 | { | ||
2020 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer with active session sent HELLO again, ignoring\n"); | ||
2021 | return GNUNET_YES; | ||
2022 | } | ||
2023 | |||
2024 | for (session = sessions_head; NULL != session; session = session->next) | ||
2025 | { | ||
2026 | int idx; | ||
2027 | struct ConsensusPeerInformation *cpi; | ||
2028 | if (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) | ||
2029 | continue; | ||
2030 | idx = get_peer_idx (&inc->peer_id, session); | ||
2031 | GNUNET_assert (-1 != idx); | ||
2032 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx); | ||
2033 | cpi = &session->info[idx]; | ||
2034 | inc->cpi = cpi; | ||
2035 | cpi->mss = inc->mss; | ||
2036 | cpi = &session->info[idx]; | ||
2037 | cpi->hello = GNUNET_YES; | ||
2038 | cpi->mss.mq = create_message_queue_for_stream_socket (&cpi->mss); | ||
2039 | embrace_peer (cpi); | ||
2040 | return GNUNET_YES; | ||
2041 | } | ||
2042 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n"); | ||
2043 | inc->requested_gid = GNUNET_memdup (&hello->global_id, sizeof (struct GNUNET_HashCode)); | ||
2044 | return GNUNET_YES; | ||
2045 | } | ||
2046 | |||
2047 | |||
2048 | |||
1488 | /** | 2049 | /** |
1489 | * Handle tokenized messages from stream sockets. | 2050 | * Handle tokenized messages from stream sockets. |
1490 | * Delegate them if the socket belongs to a session, | 2051 | * Delegate them if the socket belongs to a session, |
@@ -1502,7 +2063,9 @@ static int | |||
1502 | mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | 2063 | mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) |
1503 | { | 2064 | { |
1504 | struct IncomingSocket *inc; | 2065 | struct IncomingSocket *inc; |
1505 | inc = (struct IncomingSocket *) client; | 2066 | GNUNET_assert (NULL == client); |
2067 | GNUNET_assert (NULL != cls); | ||
2068 | inc = (struct IncomingSocket *) cls; | ||
1506 | switch (ntohs( message->type)) | 2069 | switch (ntohs( message->type)) |
1507 | { | 2070 | { |
1508 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO: | 2071 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO: |
@@ -1542,133 +2105,18 @@ listen_cb (void *cls, | |||
1542 | return GNUNET_SYSERR; | 2105 | return GNUNET_SYSERR; |
1543 | } | 2106 | } |
1544 | incoming = GNUNET_malloc (sizeof *incoming); | 2107 | incoming = GNUNET_malloc (sizeof *incoming); |
1545 | incoming->socket = socket; | ||
1546 | incoming->peer_id = *initiator; | 2108 | incoming->peer_id = *initiator; |
1547 | incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | 2109 | incoming->mss.socket = socket; |
1548 | &incoming_stream_data_processor, incoming); | 2110 | incoming->mss.rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, |
1549 | incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); | 2111 | &stream_data_processor, &incoming->mss); |
2112 | incoming->mss.mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); | ||
2113 | incoming->mss.mst_cls = incoming; | ||
1550 | GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); | 2114 | GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); |
1551 | return GNUNET_OK; | 2115 | return GNUNET_OK; |
1552 | } | 2116 | } |
1553 | 2117 | ||
1554 | 2118 | ||
1555 | /** | 2119 | /** |
1556 | * Iterator over hash map entries. | ||
1557 | * | ||
1558 | * @param cls closure | ||
1559 | * @param key current key code | ||
1560 | * @param value value in the hash map | ||
1561 | * @return GNUNET_YES if we should continue to | ||
1562 | * iterate, | ||
1563 | * GNUNET_NO if not. | ||
1564 | */ | ||
1565 | static int | ||
1566 | destroy_element_list_iter (void *cls, | ||
1567 | const struct GNUNET_HashCode * key, | ||
1568 | void *value) | ||
1569 | { | ||
1570 | struct ElementList *el; | ||
1571 | el = value; | ||
1572 | while (NULL != el) | ||
1573 | { | ||
1574 | struct ElementList *el_old; | ||
1575 | el_old = el; | ||
1576 | el = el->next; | ||
1577 | GNUNET_free (el_old->element_hash); | ||
1578 | GNUNET_free (el_old->element); | ||
1579 | GNUNET_free (el_old); | ||
1580 | } | ||
1581 | return GNUNET_YES; | ||
1582 | } | ||
1583 | |||
1584 | |||
1585 | /** | ||
1586 | * Destroy a session, free all resources associated with it. | ||
1587 | * | ||
1588 | * @param session the session to destroy | ||
1589 | */ | ||
1590 | static void | ||
1591 | destroy_session (struct ConsensusSession *session) | ||
1592 | { | ||
1593 | int i; | ||
1594 | |||
1595 | GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); | ||
1596 | GNUNET_SERVER_client_drop (session->client); | ||
1597 | session->client = NULL; | ||
1598 | if (NULL != session->shuffle) | ||
1599 | { | ||
1600 | GNUNET_free (session->shuffle); | ||
1601 | session->shuffle = NULL; | ||
1602 | } | ||
1603 | if (NULL != session->se) | ||
1604 | { | ||
1605 | strata_estimator_destroy (session->se); | ||
1606 | session->se = NULL; | ||
1607 | } | ||
1608 | if (NULL != session->info) | ||
1609 | { | ||
1610 | for (i = 0; i < session->num_peers; i++) | ||
1611 | { | ||
1612 | struct ConsensusPeerInformation *cpi; | ||
1613 | cpi = &session->info[i]; | ||
1614 | if ((NULL != cpi) && (NULL != cpi->socket)) | ||
1615 | { | ||
1616 | if (NULL != cpi->rh) | ||
1617 | { | ||
1618 | GNUNET_STREAM_read_cancel (cpi->rh); | ||
1619 | cpi->rh = NULL; | ||
1620 | } | ||
1621 | if (NULL != cpi->wh) | ||
1622 | { | ||
1623 | GNUNET_STREAM_write_cancel (cpi->wh); | ||
1624 | cpi->wh = NULL; | ||
1625 | } | ||
1626 | GNUNET_STREAM_close (cpi->socket); | ||
1627 | cpi->socket = NULL; | ||
1628 | } | ||
1629 | if (NULL != cpi->se) | ||
1630 | { | ||
1631 | strata_estimator_destroy (cpi->se); | ||
1632 | cpi->se = NULL; | ||
1633 | } | ||
1634 | if (NULL != cpi->ibf) | ||
1635 | { | ||
1636 | ibf_destroy (cpi->ibf); | ||
1637 | cpi->ibf = NULL; | ||
1638 | } | ||
1639 | if (NULL != cpi->mst) | ||
1640 | { | ||
1641 | GNUNET_SERVER_mst_destroy (cpi->mst); | ||
1642 | cpi->mst = NULL; | ||
1643 | } | ||
1644 | } | ||
1645 | GNUNET_free (session->info); | ||
1646 | session->info = NULL; | ||
1647 | } | ||
1648 | if (NULL != session->ibfs) | ||
1649 | { | ||
1650 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
1651 | { | ||
1652 | if (NULL != session->ibfs[i]) | ||
1653 | { | ||
1654 | ibf_destroy (session->ibfs[i]); | ||
1655 | session->ibfs[i] = NULL; | ||
1656 | } | ||
1657 | } | ||
1658 | GNUNET_free (session->ibfs); | ||
1659 | session->ibfs = NULL; | ||
1660 | } | ||
1661 | if (NULL != session->values) | ||
1662 | { | ||
1663 | GNUNET_CONTAINER_multihashmap_iterate (session->values, destroy_element_list_iter, NULL); | ||
1664 | GNUNET_CONTAINER_multihashmap_destroy (session->values); | ||
1665 | session->values = NULL; | ||
1666 | } | ||
1667 | GNUNET_free (session); | ||
1668 | } | ||
1669 | |||
1670 | |||
1671 | /** | ||
1672 | * Disconnect a client, and destroy all sessions associated with it. | 2120 | * Disconnect a client, and destroy all sessions associated with it. |
1673 | * | 2121 | * |
1674 | * @param client the client to disconnect | 2122 | * @param client the client to disconnect |
@@ -1683,7 +2131,7 @@ disconnect_client (struct GNUNET_SERVER_Client *client) | |||
1683 | session = sessions_head; | 2131 | session = sessions_head; |
1684 | while (NULL != session) | 2132 | while (NULL != session) |
1685 | { | 2133 | { |
1686 | if (client == session->client) | 2134 | if (client == session->scss.client) |
1687 | { | 2135 | { |
1688 | destroy_session (session); | 2136 | destroy_session (session); |
1689 | break; | 2137 | break; |
@@ -1720,74 +2168,6 @@ compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCod | |||
1720 | 2168 | ||
1721 | 2169 | ||
1722 | /** | 2170 | /** |
1723 | * Transmit a queued message to the session's client. | ||
1724 | * | ||
1725 | * @param cls consensus session | ||
1726 | * @param size number of bytes available in buf | ||
1727 | * @param buf where the callee should write the message | ||
1728 | * @return number of bytes written to buf | ||
1729 | */ | ||
1730 | static size_t | ||
1731 | transmit_queued (void *cls, size_t size, | ||
1732 | void *buf) | ||
1733 | { | ||
1734 | struct ConsensusSession *session; | ||
1735 | struct QueuedMessage *qmsg; | ||
1736 | size_t msg_size; | ||
1737 | |||
1738 | session = cls; | ||
1739 | session->client_th = NULL; | ||
1740 | |||
1741 | qmsg = session->client_messages_head; | ||
1742 | GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); | ||
1743 | GNUNET_assert (qmsg); | ||
1744 | |||
1745 | if (NULL == buf) | ||
1746 | { | ||
1747 | destroy_session (session); | ||
1748 | return 0; | ||
1749 | } | ||
1750 | |||
1751 | msg_size = ntohs (qmsg->msg->size); | ||
1752 | |||
1753 | GNUNET_assert (size >= msg_size); | ||
1754 | |||
1755 | memcpy (buf, qmsg->msg, msg_size); | ||
1756 | GNUNET_free (qmsg->msg); | ||
1757 | GNUNET_free (qmsg); | ||
1758 | |||
1759 | client_send_next (session); | ||
1760 | |||
1761 | return msg_size; | ||
1762 | } | ||
1763 | |||
1764 | |||
1765 | /** | ||
1766 | * Schedule transmitting the next queued message (if any) to the inhabiting client of a session. | ||
1767 | * | ||
1768 | * @param session the consensus session | ||
1769 | */ | ||
1770 | static void | ||
1771 | client_send_next (struct ConsensusSession *session) | ||
1772 | { | ||
1773 | |||
1774 | GNUNET_assert (NULL != session); | ||
1775 | |||
1776 | if (NULL != session->client_th) | ||
1777 | return; | ||
1778 | |||
1779 | if (NULL != session->client_messages_head) | ||
1780 | { | ||
1781 | int msize; | ||
1782 | msize = ntohs (session->client_messages_head->msg->size); | ||
1783 | session->client_th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, | ||
1784 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1785 | &transmit_queued, session); | ||
1786 | } | ||
1787 | } | ||
1788 | |||
1789 | |||
1790 | /** | ||
1791 | * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have | 2171 | * Although GNUNET_CRYPTO_hash_cmp exisits, it does not have |
1792 | * the correct signature to be used with e.g. qsort. | 2172 | * the correct signature to be used with e.g. qsort. |
1793 | * We use this function instead. | 2173 | * We use this function instead. |
@@ -1804,67 +2184,6 @@ hash_cmp (const void *h1, const void *h2) | |||
1804 | 2184 | ||
1805 | 2185 | ||
1806 | /** | 2186 | /** |
1807 | * Search peer in the list of peers in session. | ||
1808 | * | ||
1809 | * @param peer peer to find | ||
1810 | * @param session session with peer | ||
1811 | * @return index of peer, -1 if peer is not in session | ||
1812 | */ | ||
1813 | static int | ||
1814 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) | ||
1815 | { | ||
1816 | int i; | ||
1817 | for (i = 0; i < session->num_peers; i++) | ||
1818 | if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer)) | ||
1819 | return i; | ||
1820 | return -1; | ||
1821 | } | ||
1822 | |||
1823 | |||
1824 | /** | ||
1825 | * Called when stream has finishes writing the hello message | ||
1826 | */ | ||
1827 | static void | ||
1828 | hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
1829 | { | ||
1830 | struct ConsensusPeerInformation *cpi; | ||
1831 | |||
1832 | cpi = cls; | ||
1833 | cpi->wh = NULL; | ||
1834 | cpi->hello = GNUNET_YES; | ||
1835 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1836 | embrace_peer (cpi); | ||
1837 | } | ||
1838 | |||
1839 | |||
1840 | /** | ||
1841 | * Called when we established a stream connection to another peer | ||
1842 | * | ||
1843 | * @param cls cpi of the peer we just connected to | ||
1844 | * @param socket socket to use to communicate with the other side (read/write) | ||
1845 | */ | ||
1846 | static void | ||
1847 | open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) | ||
1848 | { | ||
1849 | struct ConsensusPeerInformation *cpi; | ||
1850 | struct ConsensusHello *hello; | ||
1851 | |||
1852 | cpi = cls; | ||
1853 | hello = GNUNET_malloc (sizeof *hello); | ||
1854 | hello->header.size = htons (sizeof *hello); | ||
1855 | hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); | ||
1856 | memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); | ||
1857 | GNUNET_assert (NULL == cpi->mst); | ||
1858 | cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); | ||
1859 | cpi->wh = | ||
1860 | GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); | ||
1861 | GNUNET_free (hello); | ||
1862 | cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1863 | &session_stream_data_processor, cpi); | ||
1864 | } | ||
1865 | |||
1866 | |||
1867 | /** | ||
1868 | * Create the sorted list of peers for the session, | 2187 | * Create the sorted list of peers for the session, |
1869 | * add the local peer if not in the join message. | 2188 | * add the local peer if not in the join message. |
1870 | */ | 2189 | */ |
@@ -1921,19 +2240,6 @@ initialize_session_peer_list (struct ConsensusSession *session) | |||
1921 | } | 2240 | } |
1922 | 2241 | ||
1923 | 2242 | ||
1924 | static void | ||
1925 | strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key) | ||
1926 | { | ||
1927 | uint32_t v; | ||
1928 | int i; | ||
1929 | v = key->bits[0]; | ||
1930 | /* count trailing '1'-bits of v */ | ||
1931 | for (i = 0; v & 1; v>>=1, i++) | ||
1932 | /* empty */; | ||
1933 | ibf_insert (se->strata[i], ibf_key_from_hashcode (key)); | ||
1934 | } | ||
1935 | |||
1936 | |||
1937 | /** | 2243 | /** |
1938 | * Add incoming peer connections to the session, | 2244 | * Add incoming peer connections to the session, |
1939 | * for peers who have connected to us before the local session has been established | 2245 | * for peers who have connected to us before the local session has been established |
@@ -1944,28 +2250,23 @@ static void | |||
1944 | add_incoming_peers (struct ConsensusSession *session) | 2250 | add_incoming_peers (struct ConsensusSession *session) |
1945 | { | 2251 | { |
1946 | struct IncomingSocket *inc; | 2252 | struct IncomingSocket *inc; |
1947 | inc = incoming_sockets_head; | 2253 | int i; |
2254 | struct ConsensusPeerInformation *cpi; | ||
1948 | 2255 | ||
1949 | while (NULL != inc) | 2256 | for (inc = incoming_sockets_head; NULL != inc; inc = inc->next) |
1950 | { | 2257 | { |
1951 | if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) | 2258 | if ( (NULL == inc->requested_gid) || |
2259 | (0 != GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) ) | ||
2260 | continue; | ||
2261 | for (i = 0; i < session->num_peers; i++) | ||
1952 | { | 2262 | { |
1953 | int i; | 2263 | cpi = &session->info[i]; |
1954 | for (i = 0; i < session->num_peers; i++) | 2264 | cpi->peer_id = inc->peer_id; |
1955 | { | 2265 | cpi->mss = inc->mss; |
1956 | struct ConsensusPeerInformation *cpi; | 2266 | cpi->hello = GNUNET_YES; |
1957 | cpi = &session->info[i]; | 2267 | inc->cpi = cpi; |
1958 | if (0 == memcmp (&inc->peer_id, &cpi->peer_id, sizeof (struct GNUNET_PeerIdentity))) | 2268 | break; |
1959 | { | ||
1960 | cpi->socket = inc->socket; | ||
1961 | inc->cpi = cpi; | ||
1962 | inc->cpi->mst = inc->mst; | ||
1963 | inc->cpi->hello = GNUNET_YES; | ||
1964 | break; | ||
1965 | } | ||
1966 | } | ||
1967 | } | 2269 | } |
1968 | inc = inc->next; | ||
1969 | } | 2270 | } |
1970 | } | 2271 | } |
1971 | 2272 | ||
@@ -1982,7 +2283,6 @@ initialize_session (struct ConsensusSession *session) | |||
1982 | 2283 | ||
1983 | GNUNET_assert (NULL != session->join_msg); | 2284 | GNUNET_assert (NULL != session->join_msg); |
1984 | initialize_session_peer_list (session); | 2285 | initialize_session_peer_list (session); |
1985 | session->current_round = CONSENSUS_ROUND_BEGIN; | ||
1986 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); | 2286 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); |
1987 | compute_global_id (session, &session->join_msg->session_id); | 2287 | compute_global_id (session, &session->join_msg->session_id); |
1988 | 2288 | ||
@@ -1993,24 +2293,30 @@ initialize_session (struct ConsensusSession *session) | |||
1993 | if ((other_session != session) && | 2293 | if ((other_session != session) && |
1994 | (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) | 2294 | (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) |
1995 | { | 2295 | { |
1996 | /* session already owned by another client */ | 2296 | if (GNUNET_NO == other_session->conclude) |
1997 | GNUNET_break (0); | 2297 | { |
1998 | disconnect_client (session->client); | 2298 | /* session already owned by another client */ |
1999 | return; | 2299 | GNUNET_break (0); |
2300 | disconnect_client (session->scss.client); | ||
2301 | return; | ||
2302 | } | ||
2303 | else | ||
2304 | { | ||
2305 | GNUNET_SERVER_client_drop (session->scss.client); | ||
2306 | session->scss.client = NULL; | ||
2307 | break; | ||
2308 | } | ||
2000 | } | 2309 | } |
2001 | other_session = other_session->next; | 2310 | other_session = other_session->next; |
2002 | } | 2311 | } |
2003 | 2312 | ||
2004 | session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); | ||
2005 | session->local_peer_idx = get_peer_idx (my_peer, session); | 2313 | session->local_peer_idx = get_peer_idx (my_peer, session); |
2006 | GNUNET_assert (-1 != session->local_peer_idx); | 2314 | GNUNET_assert (-1 != session->local_peer_idx); |
2007 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); | 2315 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); |
2008 | session->se = strata_estimator_create (); | ||
2009 | session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); | ||
2010 | GNUNET_free (session->join_msg); | 2316 | GNUNET_free (session->join_msg); |
2011 | session->join_msg = NULL; | 2317 | session->join_msg = NULL; |
2012 | add_incoming_peers (session); | 2318 | add_incoming_peers (session); |
2013 | GNUNET_SERVER_receive_done (session->client, GNUNET_OK); | 2319 | GNUNET_SERVER_receive_done (session->scss.client, GNUNET_OK); |
2014 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); | 2320 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); |
2015 | } | 2321 | } |
2016 | 2322 | ||
@@ -2033,7 +2339,7 @@ client_join (void *cls, | |||
2033 | session = sessions_head; | 2339 | session = sessions_head; |
2034 | while (NULL != session) | 2340 | while (NULL != session) |
2035 | { | 2341 | { |
2036 | if (session->client == client) | 2342 | if (session->scss.client == client) |
2037 | { | 2343 | { |
2038 | GNUNET_break (0); | 2344 | GNUNET_break (0); |
2039 | disconnect_client (client); | 2345 | disconnect_client (client); |
@@ -2042,9 +2348,15 @@ client_join (void *cls, | |||
2042 | session = session->next; | 2348 | session = session->next; |
2043 | } | 2349 | } |
2044 | 2350 | ||
2045 | session = GNUNET_malloc (sizeof (struct ConsensusSession)); | 2351 | session = GNUNET_new (struct ConsensusSession); |
2046 | session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m); | 2352 | session->join_msg = (struct GNUNET_CONSENSUS_JoinMessage *) GNUNET_copy_message (m); |
2047 | session->client = client; | 2353 | /* these have to be initialized here, as the client can already start to give us values */ |
2354 | session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); | ||
2355 | session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); | ||
2356 | session->ibf_key_map = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); | ||
2357 | session->se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, SE_IBF_HASH_NUM); | ||
2358 | session->scss.client = client; | ||
2359 | session->client_mq = create_message_queue_for_server_client (&session->scss); | ||
2048 | GNUNET_SERVER_client_keep (client); | 2360 | GNUNET_SERVER_client_keep (client); |
2049 | 2361 | ||
2050 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); | 2362 | GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); |
@@ -2060,57 +2372,6 @@ client_join (void *cls, | |||
2060 | } | 2372 | } |
2061 | 2373 | ||
2062 | 2374 | ||
2063 | /** | ||
2064 | * Hash a block of data, producing a replicated ibf hash. | ||
2065 | */ | ||
2066 | static void | ||
2067 | hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret) | ||
2068 | { | ||
2069 | struct IBF_Key ibf_key; | ||
2070 | GNUNET_CRYPTO_hash (block, size, ret); | ||
2071 | ibf_key = ibf_key_from_hashcode (ret); | ||
2072 | ibf_hashcode_from_key (ibf_key, ret); | ||
2073 | } | ||
2074 | |||
2075 | |||
2076 | static void | ||
2077 | insert_element (struct ConsensusSession *session, struct GNUNET_CONSENSUS_Element *element) | ||
2078 | { | ||
2079 | struct GNUNET_HashCode hash; | ||
2080 | struct ElementList *head; | ||
2081 | |||
2082 | hash_for_ibf (element->data, element->size, &hash); | ||
2083 | |||
2084 | head = GNUNET_CONTAINER_multihashmap_get (session->values, &hash); | ||
2085 | |||
2086 | if (NULL == head) | ||
2087 | { | ||
2088 | int i; | ||
2089 | |||
2090 | head = GNUNET_malloc (sizeof *head); | ||
2091 | head->element = element; | ||
2092 | head->next = NULL; | ||
2093 | head->element_hash = GNUNET_memdup (&hash, sizeof hash); | ||
2094 | GNUNET_CONTAINER_multihashmap_put (session->values, &hash, head, | ||
2095 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); | ||
2096 | strata_estimator_insert (session->se, &hash); | ||
2097 | |||
2098 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
2099 | if (NULL != session->ibfs[i]) | ||
2100 | ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&hash)); | ||
2101 | } | ||
2102 | else | ||
2103 | { | ||
2104 | struct ElementList *el; | ||
2105 | el = GNUNET_malloc (sizeof *el); | ||
2106 | head->element = element; | ||
2107 | head->next = NULL; | ||
2108 | head->element_hash = GNUNET_memdup (&hash, sizeof hash); | ||
2109 | while (NULL != head->next) | ||
2110 | head = head->next; | ||
2111 | head->next = el; | ||
2112 | } | ||
2113 | } | ||
2114 | 2375 | ||
2115 | 2376 | ||
2116 | /** | 2377 | /** |
@@ -2133,7 +2394,7 @@ client_insert (void *cls, | |||
2133 | session = sessions_head; | 2394 | session = sessions_head; |
2134 | while (NULL != session) | 2395 | while (NULL != session) |
2135 | { | 2396 | { |
2136 | if (session->client == client) | 2397 | if (session->scss.client == client) |
2137 | break; | 2398 | break; |
2138 | } | 2399 | } |
2139 | 2400 | ||
@@ -2146,373 +2407,15 @@ client_insert (void *cls, | |||
2146 | 2407 | ||
2147 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; | 2408 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; |
2148 | element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); | 2409 | element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); |
2149 | |||
2150 | element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); | 2410 | element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); |
2151 | |||
2152 | element->type = msg->element_type; | 2411 | element->type = msg->element_type; |
2153 | element->size = element_size; | 2412 | element->size = element_size; |
2154 | memcpy (&element[1], &msg[1], element_size); | 2413 | memcpy (&element[1], &msg[1], element_size); |
2155 | element->data = &element[1]; | 2414 | element->data = &element[1]; |
2156 | |||
2157 | GNUNET_assert (NULL != element->data); | 2415 | GNUNET_assert (NULL != element->data); |
2158 | |||
2159 | insert_element (session, element); | 2416 | insert_element (session, element); |
2160 | 2417 | ||
2161 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2418 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2162 | |||
2163 | client_send_next (session); | ||
2164 | } | ||
2165 | |||
2166 | |||
2167 | |||
2168 | /** | ||
2169 | * Functions of this signature are called whenever writing operations | ||
2170 | * on a stream are executed | ||
2171 | * | ||
2172 | * @param cls the closure from GNUNET_STREAM_write | ||
2173 | * @param status the status of the stream at the time this function is called; | ||
2174 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
2175 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
2176 | * (this doesn't mean that the data is never sent, the receiver may | ||
2177 | * have read the data but its ACKs may have been lost); | ||
2178 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
2179 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
2180 | * be processed. | ||
2181 | * @param size the number of bytes written | ||
2182 | */ | ||
2183 | static void | ||
2184 | write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
2185 | { | ||
2186 | struct ConsensusPeerInformation *cpi; | ||
2187 | |||
2188 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
2189 | cpi = cls; | ||
2190 | cpi->wh = NULL; | ||
2191 | if (NULL != cpi->messages_head) | ||
2192 | { | ||
2193 | struct QueuedMessage *qm; | ||
2194 | qm = cpi->messages_head; | ||
2195 | GNUNET_CONTAINER_DLL_remove (cpi->messages_head, cpi->messages_tail, qm); | ||
2196 | cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size), | ||
2197 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
2198 | write_queued, cpi); | ||
2199 | if (NULL != qm->cb) | ||
2200 | qm->cb (qm->cls); | ||
2201 | GNUNET_free (qm->msg); | ||
2202 | GNUNET_free (qm); | ||
2203 | GNUNET_assert (NULL != cpi->wh); | ||
2204 | } | ||
2205 | } | ||
2206 | |||
2207 | |||
2208 | static void | ||
2209 | shuffle (struct ConsensusSession *session) | ||
2210 | { | ||
2211 | /* FIXME: implement */ | ||
2212 | } | ||
2213 | |||
2214 | |||
2215 | /** | ||
2216 | * Find and set the partner_incoming and partner_outgoing of our peer, | ||
2217 | * one of them may not exist in most cases. | ||
2218 | * | ||
2219 | * @param session the consensus session | ||
2220 | */ | ||
2221 | static void | ||
2222 | find_partners (struct ConsensusSession *session) | ||
2223 | { | ||
2224 | int mark[session->num_peers]; | ||
2225 | int i; | ||
2226 | memset (mark, 0, session->num_peers * sizeof (int)); | ||
2227 | session->partner_incoming = session->partner_outgoing = NULL; | ||
2228 | for (i = 0; i < session->num_peers; i++) | ||
2229 | { | ||
2230 | int arc; | ||
2231 | if (0 != mark[i]) | ||
2232 | continue; | ||
2233 | arc = (i + (1 << session->exp_subround)) % session->num_peers; | ||
2234 | mark[i] = mark[arc] = 1; | ||
2235 | GNUNET_assert (i != arc); | ||
2236 | if (i == session->local_peer_idx) | ||
2237 | { | ||
2238 | GNUNET_assert (NULL == session->partner_outgoing); | ||
2239 | session->partner_outgoing = &session->info[session->shuffle[arc]]; | ||
2240 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | ||
2241 | } | ||
2242 | if (arc == session->local_peer_idx) | ||
2243 | { | ||
2244 | GNUNET_assert (NULL == session->partner_incoming); | ||
2245 | session->partner_incoming = &session->info[session->shuffle[i]]; | ||
2246 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
2247 | } | ||
2248 | } | ||
2249 | } | ||
2250 | |||
2251 | |||
2252 | static void | ||
2253 | replay_premature_message (struct ConsensusPeerInformation *cpi) | ||
2254 | { | ||
2255 | if (NULL != cpi->premature_strata_message) | ||
2256 | { | ||
2257 | struct StrataMessage *sm; | ||
2258 | |||
2259 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n"); | ||
2260 | sm = cpi->premature_strata_message; | ||
2261 | cpi->premature_strata_message = NULL; | ||
2262 | |||
2263 | cpi->replaying_strata_message = GNUNET_YES; | ||
2264 | handle_p2p_strata (cpi, sm); | ||
2265 | cpi->replaying_strata_message = GNUNET_NO; | ||
2266 | |||
2267 | GNUNET_free (sm); | ||
2268 | } | ||
2269 | } | ||
2270 | |||
2271 | |||
2272 | /** | ||
2273 | * Do the next subround in the exp-scheme. | ||
2274 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | ||
2275 | * | ||
2276 | * @param cls the session | ||
2277 | * @param tc task context, for when this task is invoked by the scheduler, | ||
2278 | * NULL if invoked for another reason | ||
2279 | */ | ||
2280 | static void | ||
2281 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
2282 | { | ||
2283 | struct ConsensusSession *session; | ||
2284 | int i; | ||
2285 | |||
2286 | /* don't kick off next subround if we're shutting down */ | ||
2287 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
2288 | return; | ||
2289 | session = cls; | ||
2290 | /* don't send any messages from the last round */ | ||
2291 | /* | ||
2292 | clear_peer_messages (session->partner_outgoing); | ||
2293 | clear_peer_messages (session->partner_incoming); | ||
2294 | for (i = 0; i < session->num_peers; i++) | ||
2295 | clear_peer_messages (&session->info[i]); | ||
2296 | */ | ||
2297 | /* cancel timeout */ | ||
2298 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | ||
2299 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | ||
2300 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | ||
2301 | /* check if we are done with the log phase, 2-peer consensus only does one log round */ | ||
2302 | if ( (session->exp_round == NUM_EXP_ROUNDS) || | ||
2303 | ((session->num_peers == 2) && (session->exp_round == 1))) | ||
2304 | { | ||
2305 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx); | ||
2306 | round_over (session, NULL); | ||
2307 | return; | ||
2308 | } | ||
2309 | if (session->exp_round == 0) | ||
2310 | { | ||
2311 | /* initialize everything for the log-rounds */ | ||
2312 | session->exp_round = 1; | ||
2313 | session->exp_subround = 0; | ||
2314 | if (NULL == session->shuffle) | ||
2315 | session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers); | ||
2316 | for (i = 0; i < session->num_peers; i++) | ||
2317 | session->shuffle[i] = i; | ||
2318 | } | ||
2319 | else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) | ||
2320 | { | ||
2321 | /* subrounds done, start new log-round */ | ||
2322 | session->exp_round++; | ||
2323 | session->exp_subround = 0; | ||
2324 | shuffle (session); | ||
2325 | } | ||
2326 | else | ||
2327 | { | ||
2328 | session->exp_subround++; | ||
2329 | } | ||
2330 | |||
2331 | find_partners (session); | ||
2332 | |||
2333 | #ifdef GNUNET_EXTRA_LOGGING | ||
2334 | { | ||
2335 | int in; | ||
2336 | int out; | ||
2337 | if (session->partner_outgoing == NULL) | ||
2338 | out = -1; | ||
2339 | else | ||
2340 | out = (int) (session->partner_outgoing - session->info); | ||
2341 | if (session->partner_incoming == NULL) | ||
2342 | in = -1; | ||
2343 | else | ||
2344 | in = (int) (session->partner_incoming - session->info); | ||
2345 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, in: %d, out: %d\n", session->local_peer_idx, | ||
2346 | session->exp_round, session->exp_subround, in, out); | ||
2347 | } | ||
2348 | #endif /* GNUNET_EXTRA_LOGGING */ | ||
2349 | |||
2350 | if (NULL != session->partner_incoming) | ||
2351 | { | ||
2352 | session->partner_incoming->ibf_state = IBF_STATE_NONE; | ||
2353 | session->partner_incoming->exp_subround_finished = GNUNET_NO; | ||
2354 | session->partner_incoming->ibf_bucket_counter = 0; | ||
2355 | |||
2356 | /* maybe there's an early strata estimator? */ | ||
2357 | replay_premature_message (session->partner_incoming); | ||
2358 | } | ||
2359 | |||
2360 | if (NULL != session->partner_outgoing) | ||
2361 | { | ||
2362 | session->partner_outgoing->ibf_state = IBF_STATE_NONE; | ||
2363 | session->partner_outgoing->ibf_bucket_counter = 0; | ||
2364 | session->partner_outgoing->exp_subround_finished = GNUNET_NO; | ||
2365 | |||
2366 | if (NULL == session->partner_outgoing->socket) | ||
2367 | { | ||
2368 | session->partner_outgoing->socket = | ||
2369 | GNUNET_STREAM_open (cfg, &session->partner_outgoing->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
2370 | open_cb, session->partner_outgoing, | ||
2371 | GNUNET_STREAM_OPTION_END); | ||
2372 | } | ||
2373 | else if (GNUNET_YES == session->partner_outgoing->hello) | ||
2374 | { | ||
2375 | send_strata_estimator (session->partner_outgoing); | ||
2376 | } | ||
2377 | /* else: do nothing, the send hello cb will handle this */ | ||
2378 | } | ||
2379 | |||
2380 | /* | ||
2381 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), | ||
2382 | subround_over, session); | ||
2383 | */ | ||
2384 | } | ||
2385 | |||
2386 | static void | ||
2387 | contact_peer_a2a (struct ConsensusPeerInformation *cpi) | ||
2388 | { | ||
2389 | cpi->is_outgoing = GNUNET_YES; | ||
2390 | if (NULL == cpi->socket) | ||
2391 | { | ||
2392 | cpi->socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
2393 | open_cb, cpi, GNUNET_STREAM_OPTION_END); | ||
2394 | } | ||
2395 | else if (GNUNET_YES == cpi->hello) | ||
2396 | { | ||
2397 | send_strata_estimator (cpi); | ||
2398 | } | ||
2399 | } | ||
2400 | |||
2401 | /** | ||
2402 | * Start the inventory round, contact all peers we are supposed to contact. | ||
2403 | * | ||
2404 | * @param session the current session | ||
2405 | */ | ||
2406 | static void | ||
2407 | start_inventory (struct ConsensusSession *session) | ||
2408 | { | ||
2409 | int i; | ||
2410 | int last; | ||
2411 | |||
2412 | for (i = 0; i < session->num_peers; i++) | ||
2413 | { | ||
2414 | session->info[i].ibf_bucket_counter = 0; | ||
2415 | session->info[i].ibf_state = IBF_STATE_NONE; | ||
2416 | session->info[i].is_outgoing = GNUNET_NO; | ||
2417 | } | ||
2418 | |||
2419 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; | ||
2420 | i = (session->local_peer_idx + 1) % session->num_peers; | ||
2421 | while (i != last) | ||
2422 | { | ||
2423 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); | ||
2424 | contact_peer_a2a (&session->info[i]); | ||
2425 | session->info[i].is_outgoing = GNUNET_YES; | ||
2426 | i = (i + 1) % session->num_peers; | ||
2427 | } | ||
2428 | // tie-breaker for even number of peers | ||
2429 | if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) | ||
2430 | { | ||
2431 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); | ||
2432 | session->info[last].is_outgoing = GNUNET_YES; | ||
2433 | contact_peer_a2a (&session->info[last]); | ||
2434 | } | ||
2435 | |||
2436 | for (i = 0; i < session->num_peers; i++) | ||
2437 | { | ||
2438 | if (GNUNET_NO == session->info[i].is_outgoing) | ||
2439 | replay_premature_message (&session->info[i]); | ||
2440 | } | ||
2441 | } | ||
2442 | |||
2443 | static void | ||
2444 | send_client_conclude_done (struct ConsensusSession *session) | ||
2445 | { | ||
2446 | struct GNUNET_MessageHeader *msg; | ||
2447 | session->current_round = CONSENSUS_ROUND_FINISH; | ||
2448 | msg = GNUNET_malloc (sizeof *msg); | ||
2449 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); | ||
2450 | msg->size = htons (sizeof *msg); | ||
2451 | queue_client_message (session, msg); | ||
2452 | client_send_next (session); | ||
2453 | } | ||
2454 | |||
2455 | /** | ||
2456 | * Start the next round. | ||
2457 | * This function can be invoked as a timeout task, or called manually (tc will be NULL then). | ||
2458 | * | ||
2459 | * @param cls the session | ||
2460 | * @param tc task context, for when this task is invoked by the scheduler, | ||
2461 | * NULL if invoked for another reason | ||
2462 | */ | ||
2463 | static void | ||
2464 | round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
2465 | { | ||
2466 | struct ConsensusSession *session; | ||
2467 | |||
2468 | /* don't kick off next round if we're shutting down */ | ||
2469 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
2470 | return; | ||
2471 | |||
2472 | session = cls; | ||
2473 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", session->local_peer_idx); | ||
2474 | |||
2475 | /* | ||
2476 | for (i = 0; i < session->num_peers; i++) | ||
2477 | clear_peer_messages (&session->info[i]); | ||
2478 | */ | ||
2479 | |||
2480 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | ||
2481 | { | ||
2482 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | ||
2483 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | ||
2484 | } | ||
2485 | |||
2486 | switch (session->current_round) | ||
2487 | { | ||
2488 | case CONSENSUS_ROUND_BEGIN: | ||
2489 | session->current_round = CONSENSUS_ROUND_EXCHANGE; | ||
2490 | session->exp_round = 0; | ||
2491 | subround_over (session, NULL); | ||
2492 | break; | ||
2493 | case CONSENSUS_ROUND_EXCHANGE: | ||
2494 | /* handle two peers specially */ | ||
2495 | if (session->num_peers <= 2) | ||
2496 | { | ||
2497 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", session->local_peer_idx); | ||
2498 | send_client_conclude_done (session); | ||
2499 | return; | ||
2500 | } | ||
2501 | session->current_round = CONSENSUS_ROUND_INVENTORY; | ||
2502 | start_inventory (session); | ||
2503 | break; | ||
2504 | case CONSENSUS_ROUND_INVENTORY: | ||
2505 | session->current_round = CONSENSUS_ROUND_STOCK; | ||
2506 | session->exp_round = 0; | ||
2507 | subround_over (session, NULL); | ||
2508 | break; | ||
2509 | case CONSENSUS_ROUND_STOCK: | ||
2510 | session->current_round = CONSENSUS_ROUND_FINISH; | ||
2511 | send_client_conclude_done (session); | ||
2512 | break; | ||
2513 | default: | ||
2514 | GNUNET_assert (0); | ||
2515 | } | ||
2516 | } | 2419 | } |
2517 | 2420 | ||
2518 | 2421 | ||
@@ -2534,7 +2437,7 @@ client_conclude (void *cls, | |||
2534 | cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; | 2437 | cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; |
2535 | 2438 | ||
2536 | session = sessions_head; | 2439 | session = sessions_head; |
2537 | while ((session != NULL) && (session->client != client)) | 2440 | while ((session != NULL) && (session->scss.client != client)) |
2538 | session = session->next; | 2441 | session = session->next; |
2539 | if (NULL == session) | 2442 | if (NULL == session) |
2540 | { | 2443 | { |
@@ -2553,6 +2456,8 @@ client_conclude (void *cls, | |||
2553 | return; | 2456 | return; |
2554 | } | 2457 | } |
2555 | 2458 | ||
2459 | session->conclude = GNUNET_YES; | ||
2460 | |||
2556 | if (session->num_peers <= 1) | 2461 | if (session->num_peers <= 1) |
2557 | { | 2462 | { |
2558 | send_client_conclude_done (session); | 2463 | send_client_conclude_done (session); |
@@ -2565,57 +2470,6 @@ client_conclude (void *cls, | |||
2565 | } | 2470 | } |
2566 | 2471 | ||
2567 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2472 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2568 | client_send_next (session); | ||
2569 | } | ||
2570 | |||
2571 | |||
2572 | /** | ||
2573 | * Called when a client sends an ack | ||
2574 | * | ||
2575 | * @param cls (unused) | ||
2576 | * @param client client handle | ||
2577 | * @param message message sent by the client | ||
2578 | */ | ||
2579 | static void | ||
2580 | client_ack (void *cls, | ||
2581 | struct GNUNET_SERVER_Client *client, | ||
2582 | const struct GNUNET_MessageHeader *message) | ||
2583 | { | ||
2584 | struct ConsensusSession *session; | ||
2585 | struct GNUNET_CONSENSUS_AckMessage *msg; | ||
2586 | struct PendingElement *pending; | ||
2587 | struct GNUNET_CONSENSUS_Element *element; | ||
2588 | |||
2589 | session = sessions_head; | ||
2590 | while (NULL != session) | ||
2591 | { | ||
2592 | if (session->client == client) | ||
2593 | break; | ||
2594 | } | ||
2595 | |||
2596 | if (NULL == session) | ||
2597 | { | ||
2598 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n"); | ||
2599 | GNUNET_SERVER_client_disconnect (client); | ||
2600 | return; | ||
2601 | } | ||
2602 | |||
2603 | pending = session->client_approval_head; | ||
2604 | |||
2605 | GNUNET_CONTAINER_DLL_remove (session->client_approval_head, session->client_approval_tail, pending); | ||
2606 | |||
2607 | msg = (struct GNUNET_CONSENSUS_AckMessage *) message; | ||
2608 | |||
2609 | if (msg->keep) | ||
2610 | { | ||
2611 | element = pending->element; | ||
2612 | insert_element (session, element); | ||
2613 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n"); | ||
2614 | } | ||
2615 | |||
2616 | GNUNET_free (pending); | ||
2617 | |||
2618 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
2619 | } | 2473 | } |
2620 | 2474 | ||
2621 | 2475 | ||
@@ -2649,14 +2503,10 @@ core_startup (void *cls, | |||
2649 | /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ | 2503 | /* core can't be disconnected directly in the core startup callback, schedule a task to do it! */ |
2650 | GNUNET_SCHEDULER_add_now (&disconnect_core, core); | 2504 | GNUNET_SCHEDULER_add_now (&disconnect_core, core); |
2651 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); | 2505 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "connected to core\n"); |
2652 | 2506 | /* initialize sessions that are waiting for the local peer identity */ | |
2653 | session = sessions_head; | 2507 | for (session = sessions_head; NULL != session; session = session->next) |
2654 | while (NULL != session) | ||
2655 | { | ||
2656 | if (NULL != session->join_msg) | 2508 | if (NULL != session->join_msg) |
2657 | initialize_session (session); | 2509 | initialize_session (session); |
2658 | session = session->next; | ||
2659 | } | ||
2660 | } | 2510 | } |
2661 | 2511 | ||
2662 | 2512 | ||
@@ -2670,27 +2520,12 @@ static void | |||
2670 | shutdown_task (void *cls, | 2520 | shutdown_task (void *cls, |
2671 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 2521 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
2672 | { | 2522 | { |
2673 | /* FIXME: complete; write separate destructors for different data types */ | ||
2674 | |||
2675 | while (NULL != incoming_sockets_head) | 2523 | while (NULL != incoming_sockets_head) |
2676 | { | 2524 | { |
2677 | struct IncomingSocket *socket; | 2525 | struct IncomingSocket *socket; |
2678 | socket = incoming_sockets_head; | 2526 | socket = incoming_sockets_head; |
2679 | if (NULL != socket->rh) | ||
2680 | { | ||
2681 | GNUNET_STREAM_read_cancel (socket->rh); | ||
2682 | socket->rh = NULL; | ||
2683 | } | ||
2684 | if (NULL == socket->cpi) | 2527 | if (NULL == socket->cpi) |
2685 | { | 2528 | clear_message_stream_state (&socket->mss); |
2686 | GNUNET_STREAM_close (socket->socket); | ||
2687 | socket->socket = NULL; | ||
2688 | if (NULL != socket->mst) | ||
2689 | { | ||
2690 | GNUNET_SERVER_mst_destroy (socket->mst); | ||
2691 | socket->mst = NULL; | ||
2692 | } | ||
2693 | } | ||
2694 | incoming_sockets_head = incoming_sockets_head->next; | 2529 | incoming_sockets_head = incoming_sockets_head->next; |
2695 | GNUNET_free (socket); | 2530 | GNUNET_free (socket); |
2696 | } | 2531 | } |
@@ -2738,8 +2573,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU | |||
2738 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, | 2573 | {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, |
2739 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, | 2574 | {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, |
2740 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, | 2575 | sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, |
2741 | {&client_ack, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK, | ||
2742 | sizeof (struct GNUNET_CONSENSUS_AckMessage)}, | ||
2743 | {NULL, NULL, 0, 0} | 2576 | {NULL, NULL, 0, 0} |
2744 | }; | 2577 | }; |
2745 | 2578 | ||
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c index 87dbdd696..739b97339 100644 --- a/src/consensus/ibf.c +++ b/src/consensus/ibf.c | |||
@@ -63,12 +63,10 @@ ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst) | |||
63 | * | 63 | * |
64 | * @param size number of IBF buckets | 64 | * @param size number of IBF buckets |
65 | * @param hash_num number of buckets one element is hashed in | 65 | * @param hash_num number of buckets one element is hashed in |
66 | * @param salt salt for mingling hashes, different salt may | ||
67 | * result in less (or more) collisions | ||
68 | * @return the newly created invertible bloom filter | 66 | * @return the newly created invertible bloom filter |
69 | */ | 67 | */ |
70 | struct InvertibleBloomFilter * | 68 | struct InvertibleBloomFilter * |
71 | ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt) | 69 | ibf_create (uint32_t size, uint8_t hash_num) |
72 | { | 70 | { |
73 | struct InvertibleBloomFilter *ibf; | 71 | struct InvertibleBloomFilter *ibf; |
74 | 72 | ||
@@ -235,32 +233,25 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
235 | 233 | ||
236 | 234 | ||
237 | /** | 235 | /** |
238 | * Write an ibf. | 236 | * Write buckets from an ibf to a buffer. |
237 | * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. | ||
239 | * | 238 | * |
240 | * @param ibf the ibf to write | 239 | * @param ibf the ibf to write |
241 | * @param start with which bucket to start | 240 | * @param start with which bucket to start |
242 | * @param count how many buckets to write | 241 | * @param count how many buckets to write |
243 | * @param buf buffer to write the data to, will be updated to point to the | 242 | * @param buf buffer to write the data to |
244 | * first byte after the written data | ||
245 | * @param size pointer to the size of the buffer, will be updated, can be NULL | ||
246 | */ | 243 | */ |
247 | void | 244 | void |
248 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size) | 245 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf) |
249 | { | 246 | { |
250 | struct IBF_Key *key_dst; | 247 | struct IBF_Key *key_dst; |
251 | struct IBF_KeyHash *key_hash_dst; | 248 | struct IBF_KeyHash *key_hash_dst; |
252 | struct IBF_Count *count_dst; | 249 | struct IBF_Count *count_dst; |
253 | 250 | ||
254 | /* update size and check for overflow */ | 251 | GNUNET_assert (start + count <= ibf->size); |
255 | if (NULL != size) | 252 | |
256 | { | ||
257 | size_t old_size; | ||
258 | old_size = *size; | ||
259 | *size = *size - count * IBF_BUCKET_SIZE; | ||
260 | GNUNET_assert (*size < old_size); | ||
261 | } | ||
262 | /* copy keys */ | 253 | /* copy keys */ |
263 | key_dst = (struct IBF_Key *) *buf; | 254 | key_dst = (struct IBF_Key *) buf; |
264 | memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst); | 255 | memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst); |
265 | key_dst += count; | 256 | key_dst += count; |
266 | /* copy key hashes */ | 257 | /* copy key hashes */ |
@@ -271,40 +262,28 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32 | |||
271 | count_dst = (struct IBF_Count *) key_hash_dst; | 262 | count_dst = (struct IBF_Count *) key_hash_dst; |
272 | memcpy (count_dst, ibf->count + start, count * sizeof *count_dst); | 263 | memcpy (count_dst, ibf->count + start, count * sizeof *count_dst); |
273 | count_dst += count; | 264 | count_dst += count; |
274 | /* returned buffer is at the end of written data*/ | ||
275 | *buf = (void *) count_dst; | ||
276 | } | 265 | } |
277 | 266 | ||
278 | 267 | ||
279 | /** | 268 | /** |
280 | * Read an ibf. | 269 | * Read buckets from a buffer into an ibf. |
281 | * | 270 | * |
282 | * @param buf pointer to the buffer to write to, will point to first | 271 | * @param buf pointer to the buffer to read from |
283 | * byte after the written data // FIXME: take 'const void *buf' for input, return number of bytes READ | ||
284 | * @param size size of the buffer, will be updated | ||
285 | * @param start which bucket to start at | 272 | * @param start which bucket to start at |
286 | * @param count how many buckets to read | 273 | * @param count how many buckets to read |
287 | * @param ibf the ibf to read from | 274 | * @param ibf the ibf to read from |
288 | * @return GNUNET_OK on success // FIXME: return 0 on error (or -1/ssize_t), number of bytes read otherwise | ||
289 | */ | 275 | */ |
290 | int | 276 | void |
291 | ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf) | 277 | ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf) |
292 | { | 278 | { |
293 | struct IBF_Key *key_src; | 279 | struct IBF_Key *key_src; |
294 | struct IBF_KeyHash *key_hash_src; | 280 | struct IBF_KeyHash *key_hash_src; |
295 | struct IBF_Count *count_src; | 281 | struct IBF_Count *count_src; |
296 | 282 | ||
297 | /* update size and check for overflow */ | 283 | GNUNET_assert (start + count <= ibf->size); |
298 | if (NULL != size) | 284 | |
299 | { | ||
300 | size_t old_size; | ||
301 | old_size = *size; | ||
302 | *size = *size - count * IBF_BUCKET_SIZE; | ||
303 | if (*size > old_size) | ||
304 | return GNUNET_SYSERR; | ||
305 | } | ||
306 | /* copy keys */ | 285 | /* copy keys */ |
307 | key_src = (struct IBF_Key *) *buf; | 286 | key_src = (struct IBF_Key *) buf; |
308 | memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src); | 287 | memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src); |
309 | key_src += count; | 288 | key_src += count; |
310 | /* copy key hashes */ | 289 | /* copy key hashes */ |
@@ -315,40 +294,6 @@ ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct | |||
315 | count_src = (struct IBF_Count *) key_hash_src; | 294 | count_src = (struct IBF_Count *) key_hash_src; |
316 | memcpy (ibf->count + start, count_src, count * sizeof *count_src); | 295 | memcpy (ibf->count + start, count_src, count * sizeof *count_src); |
317 | count_src += count; | 296 | count_src += count; |
318 | /* returned buffer is at the end of written data*/ | ||
319 | *buf = (void *) count_src; | ||
320 | return GNUNET_OK; | ||
321 | } | ||
322 | |||
323 | |||
324 | /** | ||
325 | * Write an ibf. | ||
326 | * | ||
327 | * @param ibf the ibf to write | ||
328 | * @param buf buffer to write the data to, will be updated to point to the | ||
329 | * first byte after the written data | ||
330 | * @param size pointer to the size of the buffer, will be updated, can be NULL | ||
331 | */ | ||
332 | void | ||
333 | ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size) | ||
334 | { | ||
335 | ibf_write_slice (ibf, 0, ibf->size, buf, size); | ||
336 | } | ||
337 | |||
338 | |||
339 | /** | ||
340 | * Read an ibf. | ||
341 | * | ||
342 | * @param buf pointer to the buffer to write to, will point to first | ||
343 | * byte after the written data | ||
344 | * @param size size of the buffer, will be updated | ||
345 | * @param dst ibf to write buckets to | ||
346 | * @return GNUNET_OK on success | ||
347 | */ | ||
348 | int | ||
349 | ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst) | ||
350 | { | ||
351 | return ibf_read_slice (buf, size, 0, dst->size, dst); | ||
352 | } | 297 | } |
353 | 298 | ||
354 | 299 | ||
@@ -366,7 +311,6 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFi | |||
366 | 311 | ||
367 | GNUNET_assert (ibf1->size == ibf2->size); | 312 | GNUNET_assert (ibf1->size == ibf2->size); |
368 | GNUNET_assert (ibf1->hash_num == ibf2->hash_num); | 313 | GNUNET_assert (ibf1->hash_num == ibf2->hash_num); |
369 | GNUNET_assert (ibf1->salt == ibf2->salt); | ||
370 | 314 | ||
371 | for (i = 0; i < ibf1->size; i++) | 315 | for (i = 0; i < ibf1->size; i++) |
372 | { | 316 | { |
@@ -388,7 +332,6 @@ ibf_dup (const struct InvertibleBloomFilter *ibf) | |||
388 | struct InvertibleBloomFilter *copy; | 332 | struct InvertibleBloomFilter *copy; |
389 | copy = GNUNET_malloc (sizeof *copy); | 333 | copy = GNUNET_malloc (sizeof *copy); |
390 | copy->hash_num = ibf->hash_num; | 334 | copy->hash_num = ibf->hash_num; |
391 | copy->salt = ibf->salt; | ||
392 | copy->size = ibf->size; | 335 | copy->size = ibf->size; |
393 | copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size * sizeof (struct IBF_KeyHash)); | 336 | copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size * sizeof (struct IBF_KeyHash)); |
394 | copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof (struct IBF_Key)); | 337 | copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof (struct IBF_Key)); |
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h index 609653889..2bf3ef7c7 100644 --- a/src/consensus/ibf.h +++ b/src/consensus/ibf.h | |||
@@ -81,11 +81,6 @@ struct InvertibleBloomFilter | |||
81 | uint8_t hash_num; | 81 | uint8_t hash_num; |
82 | 82 | ||
83 | /** | 83 | /** |
84 | * Salt for mingling hashes | ||
85 | */ | ||
86 | uint32_t salt; | ||
87 | |||
88 | /** | ||
89 | * Xor sums of the elements' keys, used to identify the elements. | 84 | * Xor sums of the elements' keys, used to identify the elements. |
90 | * Array of 'size' elements. | 85 | * Array of 'size' elements. |
91 | */ | 86 | */ |
@@ -107,58 +102,28 @@ struct InvertibleBloomFilter | |||
107 | 102 | ||
108 | 103 | ||
109 | /** | 104 | /** |
110 | * Write an ibf. | 105 | * Write buckets from an ibf to a buffer. |
106 | * Exactly (IBF_BUCKET_SIZE*ibf->size) bytes are written to buf. | ||
111 | * | 107 | * |
112 | * @param ibf the ibf to write | 108 | * @param ibf the ibf to write |
113 | * @param start with which bucket to start | 109 | * @param start with which bucket to start |
114 | * @param count how many buckets to write | 110 | * @param count how many buckets to write |
115 | * @param buf buffer to write the data to, will be updated to point to the | 111 | * @param buf buffer to write the data to |
116 | * first byte after the written data | ||
117 | * @param size pointer to the size of the buffer, will be updated, can be NULL | ||
118 | */ | 112 | */ |
119 | void | 113 | void |
120 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size); | 114 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf); |
121 | 115 | ||
122 | 116 | ||
123 | /** | 117 | /** |
124 | * Read an ibf. | 118 | * Read buckets from a buffer into an ibf. |
125 | * | 119 | * |
126 | * @param buf pointer to the buffer to write to, will point to first | 120 | * @param buf pointer to the buffer to read from |
127 | * byte after the written data | ||
128 | * @param size size of the buffer, will be updated | ||
129 | * @param start which bucket to start at | 121 | * @param start which bucket to start at |
130 | * @param count how many buckets to read | 122 | * @param count how many buckets to read |
131 | * @param ibf the ibf to read from | 123 | * @param ibf the ibf to read from |
132 | * @return GNUNET_OK on success | ||
133 | */ | ||
134 | int | ||
135 | ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *dst); | ||
136 | |||
137 | |||
138 | /** | ||
139 | * Write an ibf. | ||
140 | * | ||
141 | * @param ibf the ibf to write | ||
142 | * @param buf buffer to write the data to, will be updated to point to the | ||
143 | * first byte after the written data | ||
144 | * @param size pointer to the size of the buffer, will be updated, can be NULL | ||
145 | */ | 124 | */ |
146 | void | 125 | void |
147 | ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size); | 126 | ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf); |
148 | |||
149 | |||
150 | |||
151 | /** | ||
152 | * Read an ibf. | ||
153 | * | ||
154 | * @param buf pointer to the buffer to write to, will point to first | ||
155 | * byte after the written data | ||
156 | * @param size size of the buffer, will be updated | ||
157 | * @param dst ibf to write buckets to | ||
158 | * @return GNUNET_OK on success | ||
159 | */ | ||
160 | int | ||
161 | ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst); | ||
162 | 127 | ||
163 | 128 | ||
164 | /** | 129 | /** |
@@ -187,12 +152,10 @@ ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst); | |||
187 | * | 152 | * |
188 | * @param size number of IBF buckets | 153 | * @param size number of IBF buckets |
189 | * @param hash_num number of buckets one element is hashed in, usually 3 or 4 | 154 | * @param hash_num number of buckets one element is hashed in, usually 3 or 4 |
190 | * @param salt salt for mingling hashes, different salt may | ||
191 | * result in less (or more) collisions | ||
192 | * @return the newly created invertible bloom filter | 155 | * @return the newly created invertible bloom filter |
193 | */ | 156 | */ |
194 | struct InvertibleBloomFilter * | 157 | struct InvertibleBloomFilter * |
195 | ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt); | 158 | ibf_create (uint32_t size, uint8_t hash_num); |
196 | 159 | ||
197 | 160 | ||
198 | /** | 161 | /** |
diff --git a/src/consensus/strata_estimator.c b/src/consensus/strata_estimator.c new file mode 100644 index 000000000..685c50f0f --- /dev/null +++ b/src/consensus/strata_estimator.c | |||
@@ -0,0 +1,145 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet | ||
3 | (C) 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 2, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file consensus/ibf.h | ||
23 | * @brief invertible bloom filter | ||
24 | * @author Florian Dold | ||
25 | */ | ||
26 | |||
27 | #include "platform.h" | ||
28 | #include "gnunet_common.h" | ||
29 | #include "ibf.h" | ||
30 | #include "strata_estimator.h" | ||
31 | |||
32 | void | ||
33 | strata_estimator_write (const struct StrataEstimator *se, void *buf) | ||
34 | { | ||
35 | int i; | ||
36 | for (i = 0; i < se->strata_count; i++) | ||
37 | { | ||
38 | ibf_write_slice (se->strata[i], 0, se->ibf_size, buf); | ||
39 | buf += se->ibf_size * IBF_BUCKET_SIZE; | ||
40 | } | ||
41 | } | ||
42 | |||
43 | void | ||
44 | strata_estimator_read (const void *buf, struct StrataEstimator *se) | ||
45 | { | ||
46 | int i; | ||
47 | for (i = 0; i < se->strata_count; i++) | ||
48 | { | ||
49 | ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]); | ||
50 | buf += se->ibf_size * IBF_BUCKET_SIZE; | ||
51 | } | ||
52 | } | ||
53 | |||
54 | |||
55 | void | ||
56 | strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key) | ||
57 | { | ||
58 | uint32_t v; | ||
59 | int i; | ||
60 | v = key->bits[0]; | ||
61 | /* count trailing '1'-bits of v */ | ||
62 | for (i = 0; v & 1; v>>=1, i++) | ||
63 | /* empty */; | ||
64 | ibf_insert (se->strata[i], ibf_key_from_hashcode (key)); | ||
65 | } | ||
66 | |||
67 | |||
68 | |||
69 | struct StrataEstimator * | ||
70 | strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum) | ||
71 | { | ||
72 | struct StrataEstimator *se; | ||
73 | int i; | ||
74 | |||
75 | /* fixme: allocate everything in one chunk */ | ||
76 | |||
77 | se = GNUNET_malloc (sizeof (struct StrataEstimator)); | ||
78 | se->strata_count = strata_count; | ||
79 | se->ibf_size = ibf_size; | ||
80 | se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter *) * strata_count); | ||
81 | for (i = 0; i < strata_count; i++) | ||
82 | se->strata[i] = ibf_create (ibf_size, ibf_hashnum); | ||
83 | return se; | ||
84 | } | ||
85 | |||
86 | |||
87 | /** | ||
88 | * Estimate set difference with two strata estimators, | ||
89 | * i.e. arrays of IBFs. | ||
90 | * Does not not modify its arguments. | ||
91 | * | ||
92 | * @param se1 first strata estimator | ||
93 | * @param se2 second strata estimator | ||
94 | * @return the estimated difference | ||
95 | */ | ||
96 | unsigned int | ||
97 | strata_estimator_difference (const struct StrataEstimator *se1, | ||
98 | const struct StrataEstimator *se2) | ||
99 | { | ||
100 | int i; | ||
101 | int count; | ||
102 | |||
103 | GNUNET_assert (se1->strata_count == se2->strata_count); | ||
104 | count = 0; | ||
105 | for (i = se1->strata_count - 1; i >= 0; i--) | ||
106 | { | ||
107 | struct InvertibleBloomFilter *diff; | ||
108 | /* number of keys decoded from the ibf */ | ||
109 | int ibf_count; | ||
110 | int more; | ||
111 | ibf_count = 0; | ||
112 | /* FIXME: implement this without always allocating new IBFs */ | ||
113 | diff = ibf_dup (se1->strata[i]); | ||
114 | ibf_subtract (diff, se2->strata[i]); | ||
115 | for (;;) | ||
116 | { | ||
117 | more = ibf_decode (diff, NULL, NULL); | ||
118 | if (GNUNET_NO == more) | ||
119 | { | ||
120 | count += ibf_count; | ||
121 | break; | ||
122 | } | ||
123 | if (GNUNET_SYSERR == more) | ||
124 | { | ||
125 | ibf_destroy (diff); | ||
126 | return count * (1 << (i + 1)); | ||
127 | } | ||
128 | ibf_count++; | ||
129 | } | ||
130 | ibf_destroy (diff); | ||
131 | } | ||
132 | return count; | ||
133 | } | ||
134 | |||
135 | |||
136 | void | ||
137 | strata_estimator_destroy (struct StrataEstimator *se) | ||
138 | { | ||
139 | int i; | ||
140 | for (i = 0; i < se->strata_count; i++) | ||
141 | ibf_destroy (se->strata[i]); | ||
142 | GNUNET_free (se->strata); | ||
143 | GNUNET_free (se); | ||
144 | } | ||
145 | |||
diff --git a/src/consensus/consensus_flout.h b/src/consensus/strata_estimator.h index 6c97813a5..cb5bd3d0a 100644 --- a/src/consensus/consensus_flout.h +++ b/src/consensus/strata_estimator.h | |||
@@ -16,16 +16,20 @@ | |||
16 | along with GNUnet; see the file COPYING. If not, write to the | 16 | along with GNUnet; see the file COPYING. If not, write to the |
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | 17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
18 | Boston, MA 02111-1307, USA. | 18 | Boston, MA 02111-1307, USA. |
19 | */ | 19 | */ |
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file consensus/consensus_flout.h | 22 | * @file consensus/strata_estimator.h |
23 | * @brief intentionally misbehave in certain ways for testing | 23 | * @brief estimator of set difference |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | */ | 25 | */ |
26 | 26 | ||
27 | #ifndef GNUNET_CONSENSUS_FLOUT_H | 27 | #ifndef GNUNET_CONSENSUS_STRATA_ESTIMATOR_H |
28 | #define GNUNET_CONSENSUS_FLOUT_H | 28 | #define GNUNET_CONSENSUS_STRATA_ESTIMATOR_H |
29 | |||
30 | #include "platform.h" | ||
31 | #include "gnunet_common.h" | ||
32 | #include "gnunet_util_lib.h" | ||
29 | 33 | ||
30 | #ifdef __cplusplus | 34 | #ifdef __cplusplus |
31 | extern "C" | 35 | extern "C" |
@@ -35,19 +39,38 @@ extern "C" | |||
35 | #endif | 39 | #endif |
36 | #endif | 40 | #endif |
37 | 41 | ||
38 | #include "platform.h" | 42 | |
39 | #include "gnunet_common.h" | 43 | struct StrataEstimator |
40 | #include "gnunet_consensus_service.h" | 44 | { |
45 | struct InvertibleBloomFilter **strata; | ||
46 | unsigned int strata_count; | ||
47 | unsigned int ibf_size; | ||
48 | }; | ||
49 | |||
41 | 50 | ||
42 | void | 51 | void |
43 | GNUNET_CONSENSUS_flout_disable_peer (struct GNUNET_CONSENSUS_Handle *consensus); | 52 | strata_estimator_write (const struct StrataEstimator *se, void *buf); |
53 | |||
44 | 54 | ||
45 | void | 55 | void |
46 | GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_HashCode *element_hash); | 56 | strata_estimator_read (const void *buf, struct StrataEstimator *se); |
57 | |||
58 | |||
59 | struct StrataEstimator * | ||
60 | strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum); | ||
61 | |||
62 | |||
63 | unsigned int | ||
64 | strata_estimator_difference (const struct StrataEstimator *se1, | ||
65 | const struct StrataEstimator *se2); | ||
66 | |||
47 | 67 | ||
48 | void | 68 | void |
49 | GNUNET_CONSENSUS_flout_send_bogos_ibf (struct GNUNET_CONSENSUS_Handle *consensus, ...); | 69 | strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key); |
70 | |||
50 | 71 | ||
72 | void | ||
73 | strata_estimator_destroy (struct StrataEstimator *se); | ||
51 | 74 | ||
52 | 75 | ||
53 | #if 0 /* keep Emacsens' auto-indent happy */ | 76 | #if 0 /* keep Emacsens' auto-indent happy */ |
@@ -58,3 +81,4 @@ GNUNET_CONSENSUS_flout_send_bogos_ibf (struct GNUNET_CONSENSUS_Handle *consensus | |||
58 | #endif | 81 | #endif |
59 | 82 | ||
60 | #endif | 83 | #endif |
84 | |||