diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-03-19 02:09:10 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-03-19 02:09:10 +0000 |
commit | de88db7fecd4651732563450070095a2309079b7 (patch) | |
tree | e732c2a10aa5fe1bcf149597394975c1ea4a0857 /src/consensus | |
parent | ca39427d26e560d3ca74580825e1635b1dae4166 (diff) | |
download | gnunet-de88db7fecd4651732563450070095a2309079b7.tar.gz gnunet-de88db7fecd4651732563450070095a2309079b7.zip |
fixed consensus for two peers, added log-rounds, started implementing freeze, multiple peers still buggy
Diffstat (limited to 'src/consensus')
-rw-r--r-- | src/consensus/consensus.h | 23 | ||||
-rw-r--r-- | src/consensus/consensus_api.c | 8 | ||||
-rw-r--r-- | src/consensus/consensus_protocol.h | 22 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 6 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 1306 | ||||
-rw-r--r-- | src/consensus/ibf.c | 8 |
6 files changed, 695 insertions, 678 deletions
diff --git a/src/consensus/consensus.h b/src/consensus/consensus.h index 2c68849b9..ebbadb926 100644 --- a/src/consensus/consensus.h +++ b/src/consensus/consensus.h | |||
@@ -52,12 +52,6 @@ struct GNUNET_CONSENSUS_ConcludeMessage | |||
52 | */ | 52 | */ |
53 | struct GNUNET_MessageHeader header; | 53 | struct GNUNET_MessageHeader header; |
54 | 54 | ||
55 | |||
56 | /** | ||
57 | * Minimum group size required for a consensus group. | ||
58 | */ | ||
59 | uint32_t min_group_size GNUNET_PACKED; | ||
60 | |||
61 | /** | 55 | /** |
62 | * Timeout for conclude | 56 | * Timeout for conclude |
63 | */ | 57 | */ |
@@ -65,23 +59,6 @@ struct GNUNET_CONSENSUS_ConcludeMessage | |||
65 | }; | 59 | }; |
66 | 60 | ||
67 | 61 | ||
68 | struct GNUNET_CONSENSUS_ConcludeDoneMessage | ||
69 | { | ||
70 | /** | ||
71 | * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE | ||
72 | */ | ||
73 | struct GNUNET_MessageHeader header; | ||
74 | |||
75 | uint32_t group_id GNUNET_PACKED; | ||
76 | |||
77 | uint32_t num_elements GNUNET_PACKED; | ||
78 | |||
79 | uint32_t num_peers GNUNET_PACKED; | ||
80 | |||
81 | /** PeerIdentity[num_peers] */ | ||
82 | }; | ||
83 | |||
84 | |||
85 | /** | 62 | /** |
86 | * Message with an element | 63 | * Message with an element |
87 | */ | 64 | */ |
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 19bf81c86..be5117730 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -282,11 +282,11 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | |||
282 | */ | 282 | */ |
283 | static void | 283 | static void |
284 | handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, | 284 | handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, |
285 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) | 285 | const struct GNUNET_MessageHeader *msg) |
286 | { | 286 | { |
287 | GNUNET_assert (NULL != consensus->conclude_cb); | 287 | GNUNET_assert (NULL != consensus->conclude_cb); |
288 | consensus->may_not_destroy = GNUNET_YES; | 288 | consensus->may_not_destroy = GNUNET_YES; |
289 | consensus->conclude_cb (consensus->conclude_cls, NULL); | 289 | consensus->conclude_cb (consensus->conclude_cls); |
290 | consensus->may_not_destroy = GNUNET_NO; | 290 | consensus->may_not_destroy = GNUNET_NO; |
291 | consensus->conclude_cb = NULL; | 291 | consensus->conclude_cb = NULL; |
292 | } | 292 | } |
@@ -323,7 +323,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) | |||
323 | handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); | 323 | handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); |
324 | break; | 324 | break; |
325 | case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: | 325 | case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: |
326 | handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); | 326 | handle_conclude_done (consensus, msg); |
327 | break; | 327 | break; |
328 | default: | 328 | default: |
329 | GNUNET_break (0); | 329 | GNUNET_break (0); |
@@ -491,7 +491,6 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, | |||
491 | void | 491 | void |
492 | GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | 492 | GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, |
493 | struct GNUNET_TIME_Relative timeout, | 493 | struct GNUNET_TIME_Relative timeout, |
494 | unsigned int min_group_size_in_consensus, | ||
495 | GNUNET_CONSENSUS_ConcludeCallback conclude, | 494 | GNUNET_CONSENSUS_ConcludeCallback conclude, |
496 | void *conclude_cls) | 495 | void *conclude_cls) |
497 | { | 496 | { |
@@ -508,7 +507,6 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | |||
508 | conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); | 507 | conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); |
509 | conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); | 508 | conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); |
510 | conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); | 509 | conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); |
511 | conclude_msg->min_group_size = min_group_size_in_consensus; | ||
512 | 510 | ||
513 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); | 511 | qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); |
514 | qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; | 512 | qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; |
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h index e8b2c8a34..c0420d55c 100644 --- a/src/consensus/consensus_protocol.h +++ b/src/consensus/consensus_protocol.h | |||
@@ -38,14 +38,9 @@ GNUNET_NETWORK_STRUCT_BEGIN | |||
38 | struct StrataMessage | 38 | struct StrataMessage |
39 | { | 39 | { |
40 | struct GNUNET_MessageHeader header; | 40 | struct GNUNET_MessageHeader header; |
41 | /** | 41 | uint8_t round; |
42 | * Number of elements the sender currently has. | 42 | uint8_t exp_round; |
43 | */ | 43 | uint8_t exp_subround; |
44 | uint16_t num_elements; | ||
45 | /** | ||
46 | * Number of strata in this estimator. | ||
47 | */ | ||
48 | uint16_t num_strata; | ||
49 | /* struct GNUNET_HashCode hash_buckets[ibf_size*num_strata] */ | 44 | /* struct GNUNET_HashCode hash_buckets[ibf_size*num_strata] */ |
50 | /* struct GNUNET_HashCode id_buckets[ibf_size*num_strata] */ | 45 | /* struct GNUNET_HashCode id_buckets[ibf_size*num_strata] */ |
51 | /* uint8_t count_buckets[ibf_size*num_strata] */ | 46 | /* uint8_t count_buckets[ibf_size*num_strata] */ |
@@ -56,8 +51,12 @@ struct DifferenceDigest | |||
56 | struct GNUNET_MessageHeader header; | 51 | struct GNUNET_MessageHeader header; |
57 | uint8_t order; | 52 | uint8_t order; |
58 | uint8_t round; | 53 | uint8_t round; |
54 | uint8_t exp_round; | ||
55 | uint8_t exp_subround; | ||
56 | /* rest: IBF */ | ||
59 | }; | 57 | }; |
60 | 58 | ||
59 | |||
61 | struct Element | 60 | struct Element |
62 | { | 61 | { |
63 | struct GNUNET_MessageHeader header; | 62 | struct GNUNET_MessageHeader header; |
@@ -75,7 +74,14 @@ struct ConsensusHello | |||
75 | { | 74 | { |
76 | struct GNUNET_MessageHeader header; | 75 | struct GNUNET_MessageHeader header; |
77 | struct GNUNET_HashCode global_id; | 76 | struct GNUNET_HashCode global_id; |
77 | }; | ||
78 | |||
79 | struct ConsensusRoundHeader | ||
80 | { | ||
81 | struct GNUNET_MessageHeader header; | ||
78 | uint8_t round; | 82 | uint8_t round; |
83 | uint8_t exp_round; | ||
84 | uint8_t exp_subround; | ||
79 | }; | 85 | }; |
80 | 86 | ||
81 | 87 | ||
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index fd6019c20..7a951d35b 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c | |||
@@ -83,7 +83,7 @@ destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx) | |||
83 | * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not | 83 | * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not |
84 | */ | 84 | */ |
85 | static void | 85 | static void |
86 | conclude_cb (void *cls, const struct GNUNET_CONSENSUS_Group *group) | 86 | conclude_cb (void *cls) |
87 | { | 87 | { |
88 | GNUNET_SCHEDULER_add_now (destroy, cls); | 88 | GNUNET_SCHEDULER_add_now (destroy, cls); |
89 | } | 89 | } |
@@ -142,7 +142,7 @@ do_consensus () | |||
142 | } | 142 | } |
143 | 143 | ||
144 | for (i = 0; i < num_peers; i++) | 144 | for (i = 0; i < num_peers; i++) |
145 | GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, 0, conclude_cb, consensus_handles[i]); | 145 | GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]); |
146 | } | 146 | } |
147 | 147 | ||
148 | 148 | ||
@@ -184,7 +184,7 @@ connect_complete (void *cls, | |||
184 | 184 | ||
185 | static int | 185 | static int |
186 | new_element_cb (void *cls, | 186 | new_element_cb (void *cls, |
187 | struct GNUNET_CONSENSUS_Element *element) | 187 | const struct GNUNET_CONSENSUS_Element *element) |
188 | { | 188 | { |
189 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); | 189 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); |
190 | return GNUNET_YES; | 190 | return GNUNET_YES; |
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 494271e45..b1323f902 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -62,6 +62,11 @@ | |||
62 | */ | 62 | */ |
63 | #define MAX_IBF_ORDER (16) | 63 | #define MAX_IBF_ORDER (16) |
64 | 64 | ||
65 | /** | ||
66 | * Number exp-rounds. | ||
67 | */ | ||
68 | #define NUM_EXP_ROUNDS (4) | ||
69 | |||
65 | 70 | ||
66 | /* forward declarations */ | 71 | /* forward declarations */ |
67 | 72 | ||
@@ -70,23 +75,32 @@ struct IncomingSocket; | |||
70 | struct ConsensusPeerInformation; | 75 | struct ConsensusPeerInformation; |
71 | 76 | ||
72 | static void | 77 | static void |
73 | send_next (struct ConsensusSession *session); | 78 | client_send_next (struct ConsensusSession *session); |
74 | 79 | ||
75 | static void | 80 | static int |
76 | write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size); | 81 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); |
77 | 82 | ||
78 | static void | 83 | static void |
79 | write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size); | 84 | round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); |
85 | |||
86 | static void | ||
87 | send_ibf (struct ConsensusPeerInformation *cpi); | ||
88 | |||
89 | static void | ||
90 | send_strata_estimator (struct ConsensusPeerInformation *cpi); | ||
91 | |||
92 | static void | ||
93 | decode (struct ConsensusPeerInformation *cpi); | ||
80 | 94 | ||
81 | static void | 95 | static void |
82 | write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size); | 96 | write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size); |
83 | 97 | ||
84 | static int | 98 | static void |
85 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); | 99 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); |
86 | 100 | ||
87 | 101 | ||
88 | /** | 102 | /** |
89 | * An element that is waiting to be transmitted. | 103 | * An element that is waiting to be transmitted to the client. |
90 | */ | 104 | */ |
91 | struct PendingElement | 105 | struct PendingElement |
92 | { | 106 | { |
@@ -109,11 +123,14 @@ struct PendingElement | |||
109 | struct ConsensusPeerInformation *cpi; | 123 | struct ConsensusPeerInformation *cpi; |
110 | }; | 124 | }; |
111 | 125 | ||
126 | |||
112 | /** | 127 | /** |
113 | * Information about a peer that is in a consensus session. | 128 | * Information about a peer that is in a consensus session. |
114 | */ | 129 | */ |
115 | struct ConsensusPeerInformation | 130 | struct ConsensusPeerInformation |
116 | { | 131 | { |
132 | struct GNUNET_PeerIdentity peer_id; | ||
133 | |||
117 | /** | 134 | /** |
118 | * Socket for communicating with the peer, either created by the local peer, | 135 | * Socket for communicating with the peer, either created by the local peer, |
119 | * or the remote peer. | 136 | * or the remote peer. |
@@ -126,22 +143,12 @@ struct ConsensusPeerInformation | |||
126 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | 143 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; |
127 | 144 | ||
128 | /** | 145 | /** |
129 | * Is socket's connection established, i.e. can we write to it? | 146 | * Do we connect to the peer, or does the peer connect to us? |
130 | * Only relevent to outgoing cpi. | 147 | * Only valid for all-to-all phases |
131 | */ | ||
132 | int is_connected; | ||
133 | |||
134 | /** | ||
135 | * Type of the peer in the all-to-all rounds, | ||
136 | * GNUNET_YES if we initiate reconciliation. | ||
137 | */ | 148 | */ |
138 | int is_outgoing; | 149 | int is_outgoing; |
139 | 150 | ||
140 | /** | 151 | int connected; |
141 | * if the peer did something wrong, and was disconnected, | ||
142 | * never interact with this peer again. | ||
143 | */ | ||
144 | int is_bad; | ||
145 | 152 | ||
146 | /** | 153 | /** |
147 | * Did we receive/send a consensus hello? | 154 | * Did we receive/send a consensus hello? |
@@ -194,43 +201,50 @@ struct ConsensusPeerInformation | |||
194 | * Strata estimator of the peer, NULL if our peer | 201 | * Strata estimator of the peer, NULL if our peer |
195 | * initiated the reconciliation. | 202 | * initiated the reconciliation. |
196 | */ | 203 | */ |
197 | struct InvertibleBloomFilter **strata; | 204 | struct StrataEstimator *se; |
198 | 205 | ||
199 | /** | 206 | /** |
200 | * Elements that the peer is missing from us. | 207 | * Element keys that this peer misses, but we have them. |
201 | */ | 208 | */ |
202 | uint64_t *missing_local; | 209 | struct GNUNET_CONTAINER_MultiHashMap *requested_keys; |
203 | 210 | ||
204 | /** | 211 | /** |
205 | * Number of elements in missing_local | 212 | * Element keys that this peer has, but we miss. |
206 | */ | 213 | */ |
207 | unsigned int num_missing_local; | 214 | struct GNUNET_CONTAINER_MultiHashMap *reported_keys; |
208 | 215 | ||
209 | /** | 216 | /** |
210 | * Elements that this peer told us *we* don't have, | 217 | * Back-reference to the consensus session, |
211 | * i.e. we are the remote peer that has some values missing. | 218 | * to that ConsensusPeerInformation can be used as a closure |
212 | */ | 219 | */ |
213 | uint64_t *missing_remote; | 220 | struct ConsensusSession *session; |
214 | 221 | ||
215 | /** | 222 | /** |
216 | * Number of elements in missing_local | 223 | * Messages queued for the current round. |
217 | */ | 224 | */ |
218 | unsigned int num_missing_remote; | 225 | struct QueuedMessage *messages_head; |
219 | 226 | ||
220 | /** | 227 | /** |
221 | * Back-reference to the consensus session, | 228 | * Messages queued for the current round. |
222 | * to that ConsensusPeerInformation can be used as a closure | ||
223 | */ | 229 | */ |
224 | struct ConsensusSession *session; | 230 | struct QueuedMessage *messages_tail; |
225 | 231 | ||
226 | /** | 232 | /** |
227 | * When decoding the IBF, requests for elements and outgoing elements | 233 | * True if we are actually replaying the strata message, |
228 | * have to be queued, to ensure that messages actually fit in the stream buffer. | 234 | * e.g. currently handling the premature_strata_message. |
229 | */ | 235 | */ |
230 | struct QueuedMessage *requests_and_elements_head; | 236 | int replaying_strata_message; |
231 | struct QueuedMessage *requests_and_elements_tail; | 237 | |
238 | /** | ||
239 | * A strata message that is not actually for the current round, | ||
240 | * used in the exp-scheme. | ||
241 | */ | ||
242 | struct StrataMessage *premature_strata_message; | ||
243 | |||
232 | }; | 244 | }; |
233 | 245 | ||
246 | typedef void (*QueuedMessageCallback) (void *msg); | ||
247 | |||
234 | /** | 248 | /** |
235 | * A doubly linked list of messages. | 249 | * A doubly linked list of messages. |
236 | */ | 250 | */ |
@@ -247,6 +261,10 @@ struct QueuedMessage | |||
247 | * Queued messages are stored in a doubly linked list. | 261 | * Queued messages are stored in a doubly linked list. |
248 | */ | 262 | */ |
249 | struct QueuedMessage *prev; | 263 | struct QueuedMessage *prev; |
264 | |||
265 | QueuedMessageCallback cb; | ||
266 | |||
267 | void *cls; | ||
250 | }; | 268 | }; |
251 | 269 | ||
252 | /** | 270 | /** |
@@ -255,31 +273,32 @@ struct QueuedMessage | |||
255 | enum ConsensusRound | 273 | enum ConsensusRound |
256 | { | 274 | { |
257 | /** | 275 | /** |
258 | * Not started the protocl yet | 276 | * Not started the protocol yet. |
259 | */ | 277 | */ |
260 | CONSENSUS_ROUND_BEGIN=0, | 278 | CONSENSUS_ROUND_BEGIN=0, |
261 | /** | 279 | /** |
262 | * distribution of information with the exponential scheme. | 280 | * Distribution of elements with the exponential scheme. |
263 | */ | ||
264 | CONSENSUS_ROUND_EXP_EXCHANGE, | ||
265 | /** | ||
266 | * All-to-all, exchange missing values. | ||
267 | */ | 281 | */ |
268 | CONSENSUS_ROUND_A2A_EXCHANGE, | 282 | CONSENSUS_ROUND_EXCHANGE, |
269 | /** | 283 | /** |
270 | * All-to-all, check what values are missing, don't exchange anything. | 284 | * Exchange which elements each peer has, but not the elements. |
271 | */ | 285 | */ |
272 | CONSENSUS_ROUND_A2A_INVENTORY, | 286 | CONSENSUS_ROUND_INVENTORY, |
273 | /** | 287 | /** |
274 | * All-to-all round to exchange information for byzantine fault detection. | 288 | * Collect and distribute missing values. |
275 | */ | 289 | */ |
276 | CONSENSUS_ROUND_A2A_INVENTORY_AGREEMENT, | 290 | CONSENSUS_ROUND_STOCK, |
277 | /** | 291 | /** |
278 | * Rounds are over | 292 | * Consensus concluded. |
279 | */ | 293 | */ |
280 | CONSENSUS_ROUND_FINISH | 294 | CONSENSUS_ROUND_FINISH |
281 | }; | 295 | }; |
282 | 296 | ||
297 | struct StrataEstimator | ||
298 | { | ||
299 | struct InvertibleBloomFilter **strata; | ||
300 | }; | ||
301 | |||
283 | 302 | ||
284 | /** | 303 | /** |
285 | * A consensus session consists of one local client and the remote authorities. | 304 | * A consensus session consists of one local client and the remote authorities. |
@@ -305,7 +324,7 @@ struct ConsensusSession | |||
305 | 324 | ||
306 | /** | 325 | /** |
307 | * Global consensus identification, computed | 326 | * Global consensus identification, computed |
308 | * from the local id and participating authorities. | 327 | * from the session id and participating authorities. |
309 | */ | 328 | */ |
310 | struct GNUNET_HashCode global_id; | 329 | struct GNUNET_HashCode global_id; |
311 | 330 | ||
@@ -316,7 +335,7 @@ struct ConsensusSession | |||
316 | struct GNUNET_SERVER_Client *client; | 335 | struct GNUNET_SERVER_Client *client; |
317 | 336 | ||
318 | /** | 337 | /** |
319 | * Values in the consensus set of this session, | 338 | * Elements in the consensus set of this session, |
320 | * all of them either have been sent by or approved by the client. | 339 | * all of them either have been sent by or approved by the client. |
321 | * Contains GNUNET_CONSENSUS_Element. | 340 | * Contains GNUNET_CONSENSUS_Element. |
322 | */ | 341 | */ |
@@ -325,12 +344,12 @@ struct ConsensusSession | |||
325 | /** | 344 | /** |
326 | * Elements that have not been approved (or rejected) by the client yet. | 345 | * Elements that have not been approved (or rejected) by the client yet. |
327 | */ | 346 | */ |
328 | struct PendingElement *approval_pending_head; | 347 | struct PendingElement *client_approval_head; |
329 | 348 | ||
330 | /** | 349 | /** |
331 | * Elements that have not been approved (or rejected) by the client yet. | 350 | * Elements that have not been approved (or rejected) by the client yet. |
332 | */ | 351 | */ |
333 | struct PendingElement *approval_pending_tail; | 352 | struct PendingElement *client_approval_tail; |
334 | 353 | ||
335 | /** | 354 | /** |
336 | * Messages to be sent to the local client that owns this session | 355 | * Messages to be sent to the local client that owns this session |
@@ -345,7 +364,7 @@ struct ConsensusSession | |||
345 | /** | 364 | /** |
346 | * Currently active transmit handle for sending to the client | 365 | * Currently active transmit handle for sending to the client |
347 | */ | 366 | */ |
348 | struct GNUNET_SERVER_TransmitHandle *th; | 367 | struct GNUNET_SERVER_TransmitHandle *client_th; |
349 | 368 | ||
350 | /** | 369 | /** |
351 | * Timeout for all rounds together, single rounds will schedule a timeout task | 370 | * Timeout for all rounds together, single rounds will schedule a timeout task |
@@ -370,12 +389,6 @@ struct ConsensusSession | |||
370 | struct ConsensusPeerInformation *info; | 389 | struct ConsensusPeerInformation *info; |
371 | 390 | ||
372 | /** | 391 | /** |
373 | * Sorted array of peer identities in this consensus session, | ||
374 | * includes the local peer. | ||
375 | */ | ||
376 | struct GNUNET_PeerIdentity *peers; | ||
377 | |||
378 | /** | ||
379 | * Index of the local peer in the peers array | 392 | * Index of the local peer in the peers array |
380 | */ | 393 | */ |
381 | int local_peer_idx; | 394 | int local_peer_idx; |
@@ -383,7 +396,7 @@ struct ConsensusSession | |||
383 | /** | 396 | /** |
384 | * Strata estimator, computed online | 397 | * Strata estimator, computed online |
385 | */ | 398 | */ |
386 | struct InvertibleBloomFilter **strata; | 399 | struct StrataEstimator *se; |
387 | 400 | ||
388 | /** | 401 | /** |
389 | * Pre-computed IBFs | 402 | * Pre-computed IBFs |
@@ -394,6 +407,26 @@ struct ConsensusSession | |||
394 | * Current round | 407 | * Current round |
395 | */ | 408 | */ |
396 | enum ConsensusRound current_round; | 409 | enum ConsensusRound current_round; |
410 | |||
411 | int exp_round; | ||
412 | |||
413 | int exp_subround; | ||
414 | |||
415 | /** | ||
416 | * Permutation of peers for the current round, | ||
417 | * maps logical index (for current round) to physical index (location in info array) | ||
418 | */ | ||
419 | int *shuffle; | ||
420 | |||
421 | /** | ||
422 | * The partner for the current exp-round | ||
423 | */ | ||
424 | struct ConsensusPeerInformation* partner_outgoing; | ||
425 | |||
426 | /** | ||
427 | * The partner for the current exp-round | ||
428 | */ | ||
429 | struct ConsensusPeerInformation* partner_incoming; | ||
397 | }; | 430 | }; |
398 | 431 | ||
399 | 432 | ||
@@ -427,7 +460,7 @@ struct IncomingSocket | |||
427 | /** | 460 | /** |
428 | * Peer that connected to us with the socket. | 461 | * Peer that connected to us with the socket. |
429 | */ | 462 | */ |
430 | struct GNUNET_PeerIdentity *peer; | 463 | struct GNUNET_PeerIdentity peer_id; |
431 | 464 | ||
432 | /** | 465 | /** |
433 | * Message stream tokenizer for this socket. | 466 | * Message stream tokenizer for this socket. |
@@ -442,8 +475,6 @@ struct IncomingSocket | |||
442 | /** | 475 | /** |
443 | * Set to the global session id, if the peer sent us a hello-message, | 476 | * Set to the global session id, if the peer sent us a hello-message, |
444 | * but the session does not exist yet. | 477 | * but the session does not exist yet. |
445 | * | ||
446 | * FIXME: not implemented yet | ||
447 | */ | 478 | */ |
448 | struct GNUNET_HashCode *requested_gid; | 479 | struct GNUNET_HashCode *requested_gid; |
449 | }; | 480 | }; |
@@ -504,26 +535,45 @@ queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHea | |||
504 | } | 535 | } |
505 | 536 | ||
506 | /** | 537 | /** |
507 | * Get peer index associated with the peer information, | 538 | * Queue a message to be sent to another peer |
508 | * unique for every session among all peers. | 539 | * |
540 | * @param cpi peer | ||
541 | * @param msg message we want to queue | ||
509 | */ | 542 | */ |
510 | static int | 543 | static void |
511 | get_cpi_index (struct ConsensusPeerInformation *cpi) | 544 | queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls) |
512 | { | 545 | { |
513 | return cpi - cpi->session->info; | 546 | struct QueuedMessage *qm; |
547 | qm = GNUNET_malloc (sizeof *qm); | ||
548 | qm->msg = msg; | ||
549 | qm->cls = cls; | ||
550 | qm->cb = cb; | ||
551 | GNUNET_CONTAINER_DLL_insert_tail (cpi->messages_head, cpi->messages_tail, qm); | ||
552 | if (cpi->wh == NULL) | ||
553 | write_queued (cpi, GNUNET_STREAM_OK, 0); | ||
514 | } | 554 | } |
515 | 555 | ||
556 | |||
516 | /** | 557 | /** |
517 | * Mark the peer as bad, free state we don't need anymore. | 558 | * Queue a message to be sent to another peer |
518 | * | 559 | * |
519 | * @param cpi consensus peer information of the bad peer | 560 | * @param cpi peer |
561 | * @param msg message we want to queue | ||
520 | */ | 562 | */ |
521 | static void | 563 | static void |
522 | mark_peer_bad (struct ConsensusPeerInformation *cpi) | 564 | queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg) |
523 | { | 565 | { |
524 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer #%u marked as bad\n", get_cpi_index (cpi)); | 566 | queue_peer_message_with_cls (cpi, msg, NULL, NULL); |
525 | cpi->is_bad = GNUNET_YES; | 567 | } |
526 | /* FIXME: free ibfs etc. */ | 568 | |
569 | |||
570 | |||
571 | static void | ||
572 | clear_peer_messages (struct ConsensusPeerInformation *cpi) | ||
573 | { | ||
574 | /* FIXME: deallocate */ | ||
575 | cpi->messages_head = NULL; | ||
576 | cpi->messages_tail = NULL; | ||
527 | } | 577 | } |
528 | 578 | ||
529 | 579 | ||
@@ -532,13 +582,13 @@ mark_peer_bad (struct ConsensusPeerInformation *cpi) | |||
532 | * i.e. arrays of IBFs. | 582 | * i.e. arrays of IBFs. |
533 | * Does not not modify its arguments. | 583 | * Does not not modify its arguments. |
534 | * | 584 | * |
535 | * @param strata1 first strata estimator | 585 | * @param se1 first strata estimator |
536 | * @param strata2 second strata estimator | 586 | * @param se2 second strata estimator |
537 | * @return the estimated difference | 587 | * @return the estimated difference |
538 | */ | 588 | */ |
539 | static int | 589 | static int |
540 | estimate_difference (struct InvertibleBloomFilter** strata1, | 590 | estimate_difference (struct StrataEstimator *se1, |
541 | struct InvertibleBloomFilter** strata2) | 591 | struct StrataEstimator *se2) |
542 | { | 592 | { |
543 | int i; | 593 | int i; |
544 | int count; | 594 | int count; |
@@ -551,8 +601,8 @@ estimate_difference (struct InvertibleBloomFilter** strata1, | |||
551 | int more; | 601 | int more; |
552 | ibf_count = 0; | 602 | ibf_count = 0; |
553 | /* FIXME: implement this without always allocating new IBFs */ | 603 | /* FIXME: implement this without always allocating new IBFs */ |
554 | diff = ibf_dup (strata1[i]); | 604 | diff = ibf_dup (se1->strata[i]); |
555 | ibf_subtract (diff, strata2[i]); | 605 | ibf_subtract (diff, se2->strata[i]); |
556 | for (;;) | 606 | for (;;) |
557 | { | 607 | { |
558 | more = ibf_decode (diff, NULL, NULL); | 608 | more = ibf_decode (diff, NULL, NULL); |
@@ -664,25 +714,23 @@ send_element_iter (void *cls, | |||
664 | { | 714 | { |
665 | struct ConsensusPeerInformation *cpi; | 715 | struct ConsensusPeerInformation *cpi; |
666 | struct GNUNET_CONSENSUS_Element *element; | 716 | struct GNUNET_CONSENSUS_Element *element; |
667 | struct QueuedMessage *qm; | ||
668 | struct GNUNET_MessageHeader *element_msg; | 717 | struct GNUNET_MessageHeader *element_msg; |
669 | size_t msize; | 718 | size_t msize; |
719 | |||
670 | cpi = cls; | 720 | cpi = cls; |
671 | element = value; | 721 | element = value; |
672 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; | 722 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; |
673 | element_msg = GNUNET_malloc (msize); | 723 | element_msg = GNUNET_malloc (msize); |
674 | element_msg->size = htons (msize); | 724 | element_msg->size = htons (msize); |
675 | if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) | 725 | if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) |
676 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | 726 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); |
677 | else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round) | 727 | else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round) |
678 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE); | 728 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT); |
679 | else | 729 | else |
680 | GNUNET_assert (0); | 730 | GNUNET_assert (0); |
681 | GNUNET_assert (NULL != element->data); | 731 | GNUNET_assert (NULL != element->data); |
682 | memcpy (&element_msg[1], element->data, element->size); | 732 | memcpy (&element_msg[1], element->data, element->size); |
683 | qm = GNUNET_malloc (sizeof *qm); | 733 | queue_peer_message (cpi, element_msg); |
684 | qm->msg = element_msg; | ||
685 | GNUNET_CONTAINER_DLL_insert (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm); | ||
686 | return GNUNET_YES; | 734 | return GNUNET_YES; |
687 | } | 735 | } |
688 | 736 | ||
@@ -733,30 +781,79 @@ prepare_ibf (struct ConsensusPeerInformation *cpi) | |||
733 | * @param msg message | 781 | * @param msg message |
734 | */ | 782 | */ |
735 | static int | 783 | static int |
736 | handle_p2p_missing_local (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | 784 | handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) |
737 | { | 785 | { |
738 | uint64_t key; | 786 | GNUNET_assert (0); |
739 | key = *(uint64_t *) &msg[1]; | 787 | } |
740 | GNUNET_array_append (cpi->missing_remote, cpi->num_missing_remote, key); | 788 | |
741 | return GNUNET_OK; | 789 | static void |
790 | queue_cont_subround_over (void *cls) | ||
791 | { | ||
792 | struct ConsensusSession *session; | ||
793 | session = cls; | ||
794 | subround_over (session, NULL); | ||
742 | } | 795 | } |
743 | 796 | ||
744 | 797 | ||
745 | /** | 798 | /** |
746 | * Called when a remote peer wants to inform the local peer | 799 | * Gets called when the other peer wants us to inform that |
747 | * that the local peer misses elements. | 800 | * it has decoded our ibf and sent us all elements / requests |
748 | * Elements are not reconciled. | ||
749 | * | ||
750 | * @param cpi session | ||
751 | * @param msg message | ||
752 | */ | 801 | */ |
753 | static int | 802 | static int |
754 | handle_p2p_missing_remote (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | 803 | handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) |
755 | { | 804 | { |
756 | uint64_t key; | 805 | struct GNUNET_MessageHeader *fin_msg; |
757 | key = *(uint64_t *) &msg[1]; | 806 | switch (cpi->session->current_round) |
758 | GNUNET_array_append (cpi->missing_local, cpi->num_missing_local, key); | 807 | { |
759 | return GNUNET_OK; | 808 | case CONSENSUS_ROUND_EXCHANGE: |
809 | fin_msg = GNUNET_malloc (sizeof *fin_msg); | ||
810 | fin_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN); | ||
811 | fin_msg->size = htons (sizeof *fin_msg); | ||
812 | /* the subround os over once we kicked off sending the fin msg */ | ||
813 | /* FIXME: assert we are talking to the right peer! */ | ||
814 | queue_peer_message_with_cls (cpi, fin_msg, queue_cont_subround_over, cpi->session); | ||
815 | break; | ||
816 | default: | ||
817 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n"); | ||
818 | break; | ||
819 | } | ||
820 | return GNUNET_YES; | ||
821 | } | ||
822 | |||
823 | /** | ||
824 | * The other peer wants us to inform that he sent us all the elements we requested. | ||
825 | */ | ||
826 | static int | ||
827 | handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | ||
828 | { | ||
829 | /* FIXME: only call subround_over if round is the current one! */ | ||
830 | switch (cpi->session->current_round) | ||
831 | { | ||
832 | case CONSENSUS_ROUND_EXCHANGE: | ||
833 | subround_over (cpi->session, NULL); | ||
834 | break; | ||
835 | default: | ||
836 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n"); | ||
837 | break; | ||
838 | } | ||
839 | return GNUNET_YES; | ||
840 | } | ||
841 | |||
842 | |||
843 | static struct StrataEstimator * | ||
844 | strata_estimator_create () | ||
845 | { | ||
846 | struct StrataEstimator *se; | ||
847 | int i; | ||
848 | |||
849 | /* fixme: allocate everything in one chunk */ | ||
850 | |||
851 | se = GNUNET_malloc (sizeof (struct StrataEstimator)); | ||
852 | se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter) * STRATA_COUNT); | ||
853 | for (i = 0; i < STRATA_COUNT; i++) | ||
854 | se->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); | ||
855 | |||
856 | return se; | ||
760 | } | 857 | } |
761 | 858 | ||
762 | 859 | ||
@@ -775,33 +872,47 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
775 | void *buf; | 872 | void *buf; |
776 | size_t size; | 873 | size_t size; |
777 | 874 | ||
778 | GNUNET_assert (GNUNET_NO == cpi->is_outgoing); | 875 | switch (cpi->session->current_round) |
779 | |||
780 | if (NULL == cpi->strata) | ||
781 | { | 876 | { |
782 | cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *)); | 877 | case CONSENSUS_ROUND_EXCHANGE: |
783 | for (i = 0; i < STRATA_COUNT; i++) | 878 | if ( (strata_msg->round != CONSENSUS_ROUND_EXCHANGE) || |
784 | cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); | 879 | (strata_msg->exp_round != cpi->session->exp_round) || |
880 | (strata_msg->exp_subround != cpi->session->exp_subround)) | ||
881 | { | ||
882 | if (GNUNET_NO == cpi->replaying_strata_message) | ||
883 | { | ||
884 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got probably premature message\n"); | ||
885 | cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg); | ||
886 | } | ||
887 | return GNUNET_YES; | ||
888 | } | ||
889 | break; | ||
890 | default: | ||
891 | GNUNET_assert (0); | ||
892 | break; | ||
785 | } | 893 | } |
786 | 894 | ||
895 | if (NULL == cpi->se) | ||
896 | cpi->se = strata_estimator_create (); | ||
897 | |||
787 | size = ntohs (strata_msg->header.size); | 898 | size = ntohs (strata_msg->header.size); |
788 | buf = (void *) &strata_msg[1]; | 899 | buf = (void *) &strata_msg[1]; |
789 | for (i = 0; i < STRATA_COUNT; i++) | 900 | for (i = 0; i < STRATA_COUNT; i++) |
790 | { | 901 | { |
791 | int res; | 902 | int res; |
792 | res = ibf_read (&buf, &size, cpi->strata[i]); | 903 | res = ibf_read (&buf, &size, cpi->se->strata[i]); |
793 | GNUNET_assert (GNUNET_OK == res); | 904 | GNUNET_assert (GNUNET_OK == res); |
794 | } | 905 | } |
795 | 906 | ||
796 | diff = estimate_difference (cpi->session->strata, cpi->strata); | 907 | diff = estimate_difference (cpi->session->se, cpi->se); |
797 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", diff); | 908 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", diff); |
798 | 909 | ||
799 | if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) || | 910 | if ( (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) || |
800 | (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round)) | 911 | (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)) |
801 | { | 912 | { |
802 | /* send IBF of the right size */ | 913 | /* send IBF of the right size */ |
803 | cpi->ibf_order = 0; | 914 | cpi->ibf_order = 0; |
804 | while ((1 << cpi->ibf_order) < diff) | 915 | while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) ) |
805 | cpi->ibf_order++; | 916 | cpi->ibf_order++; |
806 | if (cpi->ibf_order > MAX_IBF_ORDER) | 917 | if (cpi->ibf_order > MAX_IBF_ORDER) |
807 | cpi->ibf_order = MAX_IBF_ORDER; | 918 | cpi->ibf_order = MAX_IBF_ORDER; |
@@ -811,9 +922,8 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
811 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | 922 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); |
812 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | 923 | cpi->ibf_state = IBF_STATE_TRANSMITTING; |
813 | cpi->ibf_bucket_counter = 0; | 924 | cpi->ibf_bucket_counter = 0; |
814 | write_ibf (cpi, GNUNET_STREAM_OK, 0); | 925 | send_ibf (cpi); |
815 | } | 926 | } |
816 | |||
817 | return GNUNET_YES; | 927 | return GNUNET_YES; |
818 | } | 928 | } |
819 | 929 | ||
@@ -824,6 +934,8 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig | |||
824 | int num_buckets; | 934 | int num_buckets; |
825 | void *buf; | 935 | void *buf; |
826 | 936 | ||
937 | /* FIXME: find out if we're still expecting the same ibf! */ | ||
938 | |||
827 | num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; | 939 | num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; |
828 | switch (cpi->ibf_state) | 940 | switch (cpi->ibf_state) |
829 | { | 941 | { |
@@ -852,16 +964,14 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig | |||
852 | case IBF_STATE_RECEIVING: | 964 | case IBF_STATE_RECEIVING: |
853 | break; | 965 | break; |
854 | default: | 966 | default: |
855 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received ibf unexpectedly in state %d\n", cpi->ibf_state); | 967 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "received ibf unexpectedly in state %d\n", cpi->ibf_state); |
856 | mark_peer_bad (cpi); | 968 | return GNUNET_YES; |
857 | return GNUNET_NO; | ||
858 | } | 969 | } |
859 | 970 | ||
860 | if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) | 971 | if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) |
861 | { | 972 | { |
862 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n"); | 973 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n"); |
863 | mark_peer_bad (cpi); | 974 | return GNUNET_YES; |
864 | return GNUNET_NO; | ||
865 | } | 975 | } |
866 | 976 | ||
867 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, | 977 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, |
@@ -877,16 +987,19 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig | |||
877 | 987 | ||
878 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) | 988 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) |
879 | { | 989 | { |
880 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); | ||
881 | cpi->ibf_state = IBF_STATE_DECODING; | 990 | cpi->ibf_state = IBF_STATE_DECODING; |
882 | prepare_ibf (cpi); | 991 | prepare_ibf (cpi); |
883 | ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); | 992 | ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); |
884 | write_requests_and_elements (cpi, GNUNET_STREAM_OK, 0); | 993 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "about to decode\n"); |
994 | decode (cpi); | ||
885 | } | 995 | } |
886 | return GNUNET_YES; | 996 | return GNUNET_YES; |
887 | } | 997 | } |
888 | 998 | ||
889 | 999 | ||
1000 | /** | ||
1001 | * Handle an element that another peer sent us | ||
1002 | */ | ||
890 | static int | 1003 | static int |
891 | handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) | 1004 | handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) |
892 | { | 1005 | { |
@@ -897,8 +1010,6 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me | |||
897 | 1010 | ||
898 | size = ntohs (element_msg->size) - sizeof *element_msg; | 1011 | size = ntohs (element_msg->size) - sizeof *element_msg; |
899 | 1012 | ||
900 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size); | ||
901 | |||
902 | element = GNUNET_malloc (size + sizeof *element); | 1013 | element = GNUNET_malloc (size + sizeof *element); |
903 | element->size = size; | 1014 | element->size = size; |
904 | memcpy (&element[1], &element_msg[1], size); | 1015 | memcpy (&element[1], &element_msg[1], size); |
@@ -906,7 +1017,7 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me | |||
906 | 1017 | ||
907 | pending_element = GNUNET_malloc (sizeof *pending_element); | 1018 | pending_element = GNUNET_malloc (sizeof *pending_element); |
908 | pending_element->element = element; | 1019 | pending_element->element = element; |
909 | GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, cpi->session->approval_pending_tail, pending_element); | 1020 | GNUNET_CONTAINER_DLL_insert_tail (cpi->session->client_approval_head, cpi->session->client_approval_tail, pending_element); |
910 | 1021 | ||
911 | client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg); | 1022 | client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg); |
912 | client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); | 1023 | client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); |
@@ -915,74 +1026,61 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me | |||
915 | 1026 | ||
916 | queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg); | 1027 | queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg); |
917 | 1028 | ||
918 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n"); | 1029 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element, size=%d\n", size); |
919 | 1030 | ||
920 | send_next (cpi->session); | 1031 | client_send_next (cpi->session); |
921 | 1032 | ||
922 | return GNUNET_YES; | 1033 | return GNUNET_YES; |
923 | } | 1034 | } |
924 | 1035 | ||
925 | 1036 | ||
926 | /** | 1037 | /** |
927 | * Functions of this signature are called whenever writing operations | ||
928 | * on a stream are executed | ||
929 | * | ||
930 | * @param cls the closure from GNUNET_STREAM_write | ||
931 | * @param status the status of the stream at the time this function is called; | ||
932 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
933 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
934 | * (this doesn't mean that the data is never sent, the receiver may | ||
935 | * have read the data but its ACKs may have been lost); | ||
936 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
937 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
938 | * be processed. | ||
939 | * @param size the number of bytes written | ||
940 | */ | ||
941 | static void | ||
942 | write_requested_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
943 | { | ||
944 | struct ConsensusPeerInformation *cpi; | ||
945 | cpi = cls; | ||
946 | GNUNET_assert (NULL == cpi->wh); | ||
947 | cpi->wh = NULL; | ||
948 | if (NULL != cpi->requests_and_elements_head) | ||
949 | { | ||
950 | struct QueuedMessage *qm; | ||
951 | qm = cpi->requests_and_elements_head; | ||
952 | GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm); | ||
953 | |||
954 | cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size), | ||
955 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
956 | write_requested_elements, cpi); | ||
957 | GNUNET_assert (NULL != cpi->wh); | ||
958 | } | ||
959 | } | ||
960 | |||
961 | |||
962 | /** | ||
963 | * Handle a request for elements. | 1038 | * Handle a request for elements. |
964 | * Only allowed in exchange-rounds. | 1039 | * Only allowed in exchange-rounds. |
965 | */ | 1040 | */ |
966 | static int | 1041 | static int |
967 | handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) | 1042 | handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) |
968 | { | 1043 | { |
969 | struct GNUNET_HashCode *hashcode; | 1044 | struct GNUNET_HashCode hashcode; |
1045 | struct IBF_Key *ibf_key; | ||
970 | unsigned int num; | 1046 | unsigned int num; |
971 | 1047 | ||
972 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request\n"); | 1048 | num = ntohs (msg->header.size) / sizeof (struct IBF_Key); |
973 | num = ntohs (msg->header.size) / sizeof (struct GNUNET_HashCode); | 1049 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num); |
974 | hashcode = (struct GNUNET_HashCode *) &msg[1]; | 1050 | |
1051 | ibf_key = (struct IBF_Key *) &msg[1]; | ||
975 | while (num--) | 1052 | while (num--) |
976 | { | 1053 | { |
977 | GNUNET_assert (IBF_STATE_ANTICIPATE_DIFF == cpi->ibf_state); | 1054 | ibf_hashcode_from_key (*ibf_key, &hashcode); |
978 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, hashcode, send_element_iter, cpi); | 1055 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi); |
979 | if (NULL == cpi->wh) | 1056 | ibf_key++; |
980 | write_requested_elements (cpi, GNUNET_STREAM_OK, 0); | ||
981 | hashcode++; | ||
982 | } | 1057 | } |
983 | return GNUNET_YES; | 1058 | return GNUNET_YES; |
984 | } | 1059 | } |
985 | 1060 | ||
1061 | /** | ||
1062 | * If necessary, send a message to the peer, depending on the current | ||
1063 | * round. | ||
1064 | */ | ||
1065 | static void | ||
1066 | embrace_peer (struct ConsensusPeerInformation *cpi) | ||
1067 | { | ||
1068 | GNUNET_assert (GNUNET_YES == cpi->hello); | ||
1069 | switch (cpi->session->current_round) | ||
1070 | { | ||
1071 | case CONSENSUS_ROUND_EXCHANGE: | ||
1072 | if (cpi->session->partner_outgoing != cpi) | ||
1073 | break; | ||
1074 | /* fallthrough */ | ||
1075 | case CONSENSUS_ROUND_INVENTORY: | ||
1076 | /* fallthrough */ | ||
1077 | case CONSENSUS_ROUND_STOCK: | ||
1078 | send_strata_estimator (cpi); | ||
1079 | default: | ||
1080 | break; | ||
1081 | } | ||
1082 | } | ||
1083 | |||
986 | 1084 | ||
987 | /** | 1085 | /** |
988 | * Handle a HELLO-message, send when another peer wants to join a session where | 1086 | * Handle a HELLO-message, send when another peer wants to join a session where |
@@ -993,38 +1091,172 @@ handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello | |||
993 | { | 1091 | { |
994 | /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */ | 1092 | /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */ |
995 | struct ConsensusSession *session; | 1093 | struct ConsensusSession *session; |
1094 | |||
996 | session = sessions_head; | 1095 | session = sessions_head; |
997 | while (NULL != session) | 1096 | while (NULL != session) |
998 | { | 1097 | { |
999 | if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) | 1098 | if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) |
1000 | { | 1099 | { |
1001 | int idx; | 1100 | int idx; |
1002 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n"); | 1101 | idx = get_peer_idx (&inc->peer_id, session); |
1003 | idx = get_peer_idx (inc->peer, session); | ||
1004 | GNUNET_assert (-1 != idx); | 1102 | GNUNET_assert (-1 != idx); |
1005 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx); | 1103 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx); |
1006 | inc->cpi = &session->info[idx]; | 1104 | inc->cpi = &session->info[idx]; |
1007 | GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing); | ||
1008 | inc->cpi->mst = inc->mst; | 1105 | inc->cpi->mst = inc->mst; |
1009 | inc->cpi->hello = GNUNET_YES; | 1106 | inc->cpi->hello = GNUNET_YES; |
1010 | inc->cpi->socket = inc->socket; | 1107 | inc->cpi->socket = inc->socket; |
1011 | 1108 | embrace_peer (inc->cpi); | |
1012 | if ( (CONSENSUS_ROUND_A2A_EXCHANGE == session->current_round) && | ||
1013 | (GNUNET_YES == inc->cpi->is_outgoing)) | ||
1014 | { | ||
1015 | write_strata (&session->info[idx], GNUNET_STREAM_OK, 0); | ||
1016 | } | ||
1017 | return GNUNET_YES; | 1109 | return GNUNET_YES; |
1018 | } | 1110 | } |
1019 | session = session->next; | 1111 | session = session->next; |
1020 | } | 1112 | } |
1021 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer tried to HELLO uninhabited session\n"); | 1113 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n"); |
1022 | GNUNET_break (0); | ||
1023 | return GNUNET_NO; | 1114 | return GNUNET_NO; |
1024 | } | 1115 | } |
1025 | 1116 | ||
1026 | 1117 | ||
1027 | /** | 1118 | /** |
1119 | * Send a strata estimator. | ||
1120 | * | ||
1121 | * @param cpi the peer | ||
1122 | */ | ||
1123 | static void | ||
1124 | send_strata_estimator (struct ConsensusPeerInformation *cpi) | ||
1125 | { | ||
1126 | struct StrataMessage *strata_msg; | ||
1127 | void *buf; | ||
1128 | size_t msize; | ||
1129 | int i; | ||
1130 | |||
1131 | msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); | ||
1132 | |||
1133 | strata_msg = GNUNET_malloc (msize); | ||
1134 | strata_msg->header.size = htons (msize); | ||
1135 | strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); | ||
1136 | strata_msg->round = cpi->session->current_round; | ||
1137 | strata_msg->exp_round = cpi->session->exp_round; | ||
1138 | strata_msg->exp_subround = cpi->session->exp_subround; | ||
1139 | |||
1140 | buf = &strata_msg[1]; | ||
1141 | for (i = 0; i < STRATA_COUNT; i++) | ||
1142 | { | ||
1143 | ibf_write (cpi->session->se->strata[i], &buf, NULL); | ||
1144 | } | ||
1145 | |||
1146 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg); | ||
1147 | } | ||
1148 | |||
1149 | /** | ||
1150 | * Send an IBF of the order specified in cpi | ||
1151 | * | ||
1152 | * @param cpi the peer | ||
1153 | */ | ||
1154 | static void | ||
1155 | send_ibf (struct ConsensusPeerInformation *cpi) | ||
1156 | { | ||
1157 | int sent_buckets; | ||
1158 | sent_buckets = 0; | ||
1159 | while (sent_buckets < (1 << cpi->ibf_order)) | ||
1160 | { | ||
1161 | int num_buckets; | ||
1162 | void *buf; | ||
1163 | struct DifferenceDigest *digest; | ||
1164 | int msize; | ||
1165 | |||
1166 | num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; | ||
1167 | /* limit to maximum */ | ||
1168 | if (num_buckets > BUCKETS_PER_MESSAGE) | ||
1169 | num_buckets = BUCKETS_PER_MESSAGE; | ||
1170 | |||
1171 | msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); | ||
1172 | |||
1173 | digest = GNUNET_malloc (msize); | ||
1174 | digest->header.size = htons (msize); | ||
1175 | digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); | ||
1176 | digest->order = cpi->ibf_order; | ||
1177 | |||
1178 | buf = &digest[1]; | ||
1179 | ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL); | ||
1180 | |||
1181 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest); | ||
1182 | |||
1183 | sent_buckets += num_buckets; | ||
1184 | } | ||
1185 | cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; | ||
1186 | } | ||
1187 | |||
1188 | |||
1189 | static void | ||
1190 | decode (struct ConsensusPeerInformation *cpi) | ||
1191 | { | ||
1192 | struct IBF_Key key; | ||
1193 | struct GNUNET_HashCode hashcode; | ||
1194 | int side; | ||
1195 | |||
1196 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n"); | ||
1197 | |||
1198 | for (;;) | ||
1199 | { | ||
1200 | int res; | ||
1201 | res = ibf_decode (cpi->ibf, &side, &key); | ||
1202 | if (GNUNET_SYSERR == res) | ||
1203 | { | ||
1204 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); | ||
1205 | /* decoding failed, we tell the other peer by sending our ibf with a larger order */ | ||
1206 | cpi->ibf_order++; | ||
1207 | prepare_ibf (cpi); | ||
1208 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
1209 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | ||
1210 | cpi->ibf_bucket_counter = 0; | ||
1211 | send_ibf (cpi); | ||
1212 | return; | ||
1213 | } | ||
1214 | if (GNUNET_NO == res) | ||
1215 | { | ||
1216 | struct GNUNET_MessageHeader *msg; | ||
1217 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n"); | ||
1218 | msg = GNUNET_malloc (sizeof *msg); | ||
1219 | msg->size = htons (sizeof *msg); | ||
1220 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED); | ||
1221 | queue_peer_message (cpi, msg); | ||
1222 | return; | ||
1223 | } | ||
1224 | if (-1 == side) | ||
1225 | { | ||
1226 | /* we have the element(s), send it to the other peer */ | ||
1227 | ibf_hashcode_from_key (key, &hashcode); | ||
1228 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi); | ||
1229 | } | ||
1230 | else | ||
1231 | { | ||
1232 | struct ElementRequest *msg; | ||
1233 | size_t msize; | ||
1234 | struct IBF_Key *p; | ||
1235 | |||
1236 | msize = (sizeof *msg) + sizeof (struct IBF_Key); | ||
1237 | msg = GNUNET_malloc (msize); | ||
1238 | if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) | ||
1239 | { | ||
1240 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); | ||
1241 | } | ||
1242 | else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round) | ||
1243 | { | ||
1244 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); | ||
1245 | } | ||
1246 | else | ||
1247 | { | ||
1248 | GNUNET_assert (0); | ||
1249 | } | ||
1250 | msg->header.size = htons (msize); | ||
1251 | p = (struct IBF_Key *) &msg[1]; | ||
1252 | *p = key; | ||
1253 | queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg); | ||
1254 | } | ||
1255 | } | ||
1256 | } | ||
1257 | |||
1258 | |||
1259 | /** | ||
1028 | * Functions with this signature are called whenever a | 1260 | * Functions with this signature are called whenever a |
1029 | * complete message is received by the tokenizer. | 1261 | * complete message is received by the tokenizer. |
1030 | * | 1262 | * |
@@ -1049,16 +1281,17 @@ mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader | |||
1049 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); | 1281 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); |
1050 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: | 1282 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: |
1051 | return handle_p2p_element (cpi, message); | 1283 | return handle_p2p_element (cpi, message); |
1052 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL: | 1284 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT: |
1053 | return handle_p2p_missing_local (cpi, message); | 1285 | return handle_p2p_element_report (cpi, message); |
1054 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE: | ||
1055 | return handle_p2p_missing_remote (cpi, message); | ||
1056 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: | 1286 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: |
1057 | return handle_p2p_element_request (cpi, (struct ElementRequest *) message); | 1287 | return handle_p2p_element_request (cpi, (struct ElementRequest *) message); |
1288 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED: | ||
1289 | return handle_p2p_synced (cpi, message); | ||
1290 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN: | ||
1291 | return handle_p2p_fin (cpi, message); | ||
1058 | default: | 1292 | default: |
1059 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type)); | 1293 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n", |
1060 | /* FIXME: handle correctly */ | 1294 | ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey)); |
1061 | GNUNET_assert (0); | ||
1062 | } | 1295 | } |
1063 | return GNUNET_OK; | 1296 | return GNUNET_OK; |
1064 | } | 1297 | } |
@@ -1089,8 +1322,8 @@ mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeade | |||
1089 | default: | 1322 | default: |
1090 | if (NULL != inc->cpi) | 1323 | if (NULL != inc->cpi) |
1091 | return mst_session_callback (inc->cpi, client, message); | 1324 | return mst_session_callback (inc->cpi, client, message); |
1092 | /* FIXME: disconnect peer properly */ | 1325 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n", |
1093 | GNUNET_assert (0); | 1326 | ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey)); |
1094 | } | 1327 | } |
1095 | return GNUNET_OK; | 1328 | return GNUNET_OK; |
1096 | } | 1329 | } |
@@ -1114,21 +1347,14 @@ listen_cb (void *cls, | |||
1114 | const struct GNUNET_PeerIdentity *initiator) | 1347 | const struct GNUNET_PeerIdentity *initiator) |
1115 | { | 1348 | { |
1116 | struct IncomingSocket *incoming; | 1349 | struct IncomingSocket *incoming; |
1117 | |||
1118 | GNUNET_assert (NULL != socket); | 1350 | GNUNET_assert (NULL != socket); |
1119 | |||
1120 | incoming = GNUNET_malloc (sizeof *incoming); | 1351 | incoming = GNUNET_malloc (sizeof *incoming); |
1121 | |||
1122 | incoming->socket = socket; | 1352 | incoming->socket = socket; |
1123 | incoming->peer = GNUNET_memdup (initiator, sizeof *initiator); | 1353 | incoming->peer_id = *initiator; |
1124 | |||
1125 | incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | 1354 | incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, |
1126 | &incoming_stream_data_processor, incoming); | 1355 | &incoming_stream_data_processor, incoming); |
1127 | |||
1128 | incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); | 1356 | incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); |
1129 | |||
1130 | GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); | 1357 | GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); |
1131 | |||
1132 | return GNUNET_OK; | 1358 | return GNUNET_OK; |
1133 | } | 1359 | } |
1134 | 1360 | ||
@@ -1179,26 +1405,21 @@ disconnect_client (struct GNUNET_SERVER_Client *client) | |||
1179 | * Thus, if the local id of two consensus sessions coincide, but are not comprised of | 1405 | * Thus, if the local id of two consensus sessions coincide, but are not comprised of |
1180 | * exactly the same peers, the global id will be different. | 1406 | * exactly the same peers, the global id will be different. |
1181 | * | 1407 | * |
1182 | * @param local_id local id of the consensus session | 1408 | * @param session_id local id of the consensus session |
1183 | * @param peers array of all peers participating in the consensus session | ||
1184 | * @param num_peers number of elements in the peers array | ||
1185 | * @param dst where the result is stored, may not be NULL | ||
1186 | */ | 1409 | */ |
1187 | static void | 1410 | static void |
1188 | compute_global_id (const struct GNUNET_HashCode *local_id, | 1411 | compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id) |
1189 | const struct GNUNET_PeerIdentity *peers, int num_peers, | ||
1190 | struct GNUNET_HashCode *dst) | ||
1191 | { | 1412 | { |
1192 | int i; | 1413 | int i; |
1193 | struct GNUNET_HashCode tmp; | 1414 | struct GNUNET_HashCode tmp; |
1194 | 1415 | ||
1195 | *dst = *local_id; | 1416 | session->global_id = *session_id; |
1196 | for (i = 0; i < num_peers; ++i) | 1417 | for (i = 0; i < session->num_peers; ++i) |
1197 | { | 1418 | { |
1198 | GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp); | 1419 | GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp); |
1199 | *dst = tmp; | 1420 | session->global_id = tmp; |
1200 | GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp); | 1421 | GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp); |
1201 | *dst = tmp; | 1422 | session->global_id = tmp; |
1202 | } | 1423 | } |
1203 | } | 1424 | } |
1204 | 1425 | ||
@@ -1220,7 +1441,7 @@ transmit_queued (void *cls, size_t size, | |||
1220 | size_t msg_size; | 1441 | size_t msg_size; |
1221 | 1442 | ||
1222 | session = cls; | 1443 | session = cls; |
1223 | session->th = NULL; | 1444 | session->client_th = NULL; |
1224 | 1445 | ||
1225 | qmsg = session->client_messages_head; | 1446 | qmsg = session->client_messages_head; |
1226 | GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); | 1447 | GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); |
@@ -1240,7 +1461,7 @@ transmit_queued (void *cls, size_t size, | |||
1240 | GNUNET_free (qmsg->msg); | 1461 | GNUNET_free (qmsg->msg); |
1241 | GNUNET_free (qmsg); | 1462 | GNUNET_free (qmsg); |
1242 | 1463 | ||
1243 | send_next (session); | 1464 | client_send_next (session); |
1244 | 1465 | ||
1245 | return msg_size; | 1466 | return msg_size; |
1246 | } | 1467 | } |
@@ -1252,21 +1473,21 @@ transmit_queued (void *cls, size_t size, | |||
1252 | * @param cli the client to send the next message to | 1473 | * @param cli the client to send the next message to |
1253 | */ | 1474 | */ |
1254 | static void | 1475 | static void |
1255 | send_next (struct ConsensusSession *session) | 1476 | client_send_next (struct ConsensusSession *session) |
1256 | { | 1477 | { |
1257 | 1478 | ||
1258 | GNUNET_assert (NULL != session); | 1479 | GNUNET_assert (NULL != session); |
1259 | 1480 | ||
1260 | if (NULL != session->th) | 1481 | if (NULL != session->client_th) |
1261 | return; | 1482 | return; |
1262 | 1483 | ||
1263 | if (NULL != session->client_messages_head) | 1484 | if (NULL != session->client_messages_head) |
1264 | { | 1485 | { |
1265 | int msize; | 1486 | int msize; |
1266 | msize = ntohs (session->client_messages_head->msg->size); | 1487 | msize = ntohs (session->client_messages_head->msg->size); |
1267 | session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, | 1488 | session->client_th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, |
1268 | GNUNET_TIME_UNIT_FOREVER_REL, | 1489 | GNUNET_TIME_UNIT_FOREVER_REL, |
1269 | &transmit_queued, session); | 1490 | &transmit_queued, session); |
1270 | } | 1491 | } |
1271 | } | 1492 | } |
1272 | 1493 | ||
@@ -1297,11 +1518,11 @@ hash_cmp (const void *a, const void *b) | |||
1297 | static int | 1518 | static int |
1298 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) | 1519 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) |
1299 | { | 1520 | { |
1300 | const struct GNUNET_PeerIdentity *needle; | 1521 | int i; |
1301 | needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); | 1522 | for (i = 0; i < session->num_peers; i++) |
1302 | if (NULL == needle) | 1523 | if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer)) |
1303 | return -1; | 1524 | return i; |
1304 | return needle - session->peers; | 1525 | return -1; |
1305 | } | 1526 | } |
1306 | 1527 | ||
1307 | 1528 | ||
@@ -1316,16 +1537,8 @@ hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1316 | cpi = cls; | 1537 | cpi = cls; |
1317 | cpi->wh = NULL; | 1538 | cpi->wh = NULL; |
1318 | cpi->hello = GNUNET_YES; | 1539 | cpi->hello = GNUNET_YES; |
1319 | |||
1320 | GNUNET_assert (GNUNET_STREAM_OK == status); | 1540 | GNUNET_assert (GNUNET_STREAM_OK == status); |
1321 | 1541 | embrace_peer (cpi); | |
1322 | /* FIXME: other rounds */ | ||
1323 | |||
1324 | if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) && | ||
1325 | (GNUNET_YES == cpi->is_outgoing)) | ||
1326 | { | ||
1327 | write_strata (cpi, GNUNET_STREAM_OK, 0); | ||
1328 | } | ||
1329 | } | 1542 | } |
1330 | 1543 | ||
1331 | 1544 | ||
@@ -1342,62 +1555,20 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) | |||
1342 | struct ConsensusHello *hello; | 1555 | struct ConsensusHello *hello; |
1343 | 1556 | ||
1344 | cpi = cls; | 1557 | cpi = cls; |
1345 | cpi->is_connected = GNUNET_YES; | ||
1346 | cpi->wh = NULL; | 1558 | cpi->wh = NULL; |
1347 | |||
1348 | hello = GNUNET_malloc (sizeof *hello); | 1559 | hello = GNUNET_malloc (sizeof *hello); |
1349 | hello->header.size = htons (sizeof *hello); | 1560 | hello->header.size = htons (sizeof *hello); |
1350 | hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); | 1561 | hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); |
1351 | memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); | 1562 | memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); |
1352 | 1563 | GNUNET_assert (NULL == cpi->mst); | |
1564 | cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi); | ||
1353 | cpi->wh = | 1565 | cpi->wh = |
1354 | GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); | 1566 | GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); |
1355 | |||
1356 | cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | 1567 | cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, |
1357 | &session_stream_data_processor, cpi); | 1568 | &session_stream_data_processor, cpi); |
1358 | } | 1569 | } |
1359 | 1570 | ||
1360 | 1571 | ||
1361 | static void | ||
1362 | initialize_session_info (struct ConsensusSession *session) | ||
1363 | { | ||
1364 | int i; | ||
1365 | int last; | ||
1366 | |||
1367 | for (i = 0; i < session->num_peers; ++i) | ||
1368 | { | ||
1369 | /* initialize back-references, so consensus peer information can | ||
1370 | * be used as closure */ | ||
1371 | session->info[i].session = session; | ||
1372 | } | ||
1373 | |||
1374 | session->current_round = CONSENSUS_ROUND_BEGIN; | ||
1375 | |||
1376 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; | ||
1377 | i = (session->local_peer_idx + 1) % session->num_peers; | ||
1378 | while (i != last) | ||
1379 | { | ||
1380 | session->info[i].is_outgoing = GNUNET_YES; | ||
1381 | session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
1382 | open_cb, &session->info[i], GNUNET_STREAM_OPTION_END); | ||
1383 | session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[i]); | ||
1384 | i = (i + 1) % session->num_peers; | ||
1385 | |||
1386 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i); | ||
1387 | } | ||
1388 | // tie-breaker for even number of peers | ||
1389 | if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) | ||
1390 | { | ||
1391 | session->info[last].is_outgoing = GNUNET_YES; | ||
1392 | session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
1393 | open_cb, &session->info[last], GNUNET_STREAM_OPTION_END); | ||
1394 | session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[last]); | ||
1395 | |||
1396 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last); | ||
1397 | } | ||
1398 | } | ||
1399 | |||
1400 | |||
1401 | /** | 1572 | /** |
1402 | * Create the sorted list of peers for the session, | 1573 | * Create the sorted list of peers for the session, |
1403 | * add the local peer if not in the join message. | 1574 | * add the local peer if not in the join message. |
@@ -1408,6 +1579,7 @@ initialize_session_peer_list (struct ConsensusSession *session) | |||
1408 | unsigned int local_peer_in_list; | 1579 | unsigned int local_peer_in_list; |
1409 | uint32_t listed_peers; | 1580 | uint32_t listed_peers; |
1410 | const struct GNUNET_PeerIdentity *msg_peers; | 1581 | const struct GNUNET_PeerIdentity *msg_peers; |
1582 | struct GNUNET_PeerIdentity *peers; | ||
1411 | unsigned int i; | 1583 | unsigned int i; |
1412 | 1584 | ||
1413 | GNUNET_assert (NULL != session->join_msg); | 1585 | GNUNET_assert (NULL != session->join_msg); |
@@ -1432,18 +1604,30 @@ initialize_session_peer_list (struct ConsensusSession *session) | |||
1432 | if (GNUNET_NO == local_peer_in_list) | 1604 | if (GNUNET_NO == local_peer_in_list) |
1433 | session->num_peers++; | 1605 | session->num_peers++; |
1434 | 1606 | ||
1435 | session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); | 1607 | peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); |
1436 | 1608 | ||
1437 | if (GNUNET_NO == local_peer_in_list) | 1609 | if (GNUNET_NO == local_peer_in_list) |
1438 | session->peers[session->num_peers - 1] = *my_peer; | 1610 | peers[session->num_peers - 1] = *my_peer; |
1611 | |||
1612 | memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); | ||
1613 | qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); | ||
1614 | |||
1615 | session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); | ||
1439 | 1616 | ||
1440 | memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); | 1617 | for (i = 0; i < session->num_peers; ++i) |
1441 | qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); | 1618 | { |
1619 | /* initialize back-references, so consensus peer information can | ||
1620 | * be used as closure */ | ||
1621 | session->info[i].session = session; | ||
1622 | session->info[i].peer_id = peers[i]; | ||
1623 | } | ||
1624 | |||
1625 | free (peers); | ||
1442 | } | 1626 | } |
1443 | 1627 | ||
1444 | 1628 | ||
1445 | static void | 1629 | static void |
1446 | strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key) | 1630 | strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key) |
1447 | { | 1631 | { |
1448 | uint32_t v; | 1632 | uint32_t v; |
1449 | int i; | 1633 | int i; |
@@ -1451,7 +1635,7 @@ strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *ke | |||
1451 | /* count trailing '1'-bits of v */ | 1635 | /* count trailing '1'-bits of v */ |
1452 | for (i = 0; v & 1; v>>=1, i++) | 1636 | for (i = 0; v & 1; v>>=1, i++) |
1453 | /* empty */; | 1637 | /* empty */; |
1454 | ibf_insert (strata[i], ibf_key_from_hashcode (key)); | 1638 | ibf_insert (se->strata[i], ibf_key_from_hashcode (key)); |
1455 | } | 1639 | } |
1456 | 1640 | ||
1457 | 1641 | ||
@@ -1476,13 +1660,8 @@ add_incoming_peers (struct ConsensusSession *session) | |||
1476 | { | 1660 | { |
1477 | struct ConsensusPeerInformation *cpi; | 1661 | struct ConsensusPeerInformation *cpi; |
1478 | cpi = &session->info[i]; | 1662 | cpi = &session->info[i]; |
1479 | if (0 == memcmp (inc->peer, &cpi->session->peers[i], sizeof (struct GNUNET_PeerIdentity))) | 1663 | if (0 == memcmp (&inc->peer_id, &cpi->peer_id, sizeof (struct GNUNET_PeerIdentity))) |
1480 | { | 1664 | { |
1481 | if (GNUNET_YES == cpi->is_outgoing) | ||
1482 | { | ||
1483 | /* FIXME: disconnect */ | ||
1484 | continue; | ||
1485 | } | ||
1486 | cpi->socket = inc->socket; | 1665 | cpi->socket = inc->socket; |
1487 | inc->cpi = cpi; | 1666 | inc->cpi = cpi; |
1488 | inc->cpi->mst = inc->mst; | 1667 | inc->cpi->mst = inc->mst; |
@@ -1505,15 +1684,12 @@ static void | |||
1505 | initialize_session (struct ConsensusSession *session) | 1684 | initialize_session (struct ConsensusSession *session) |
1506 | { | 1685 | { |
1507 | const struct ConsensusSession *other_session; | 1686 | const struct ConsensusSession *other_session; |
1508 | int i; | ||
1509 | 1687 | ||
1510 | GNUNET_assert (NULL != session->join_msg); | 1688 | GNUNET_assert (NULL != session->join_msg); |
1511 | |||
1512 | initialize_session_peer_list (session); | 1689 | initialize_session_peer_list (session); |
1513 | 1690 | session->current_round = CONSENSUS_ROUND_BEGIN; | |
1514 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); | 1691 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); |
1515 | 1692 | compute_global_id (session, &session->join_msg->session_id); | |
1516 | compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id); | ||
1517 | 1693 | ||
1518 | /* Check if some local client already owns the session. */ | 1694 | /* Check if some local client already owns the session. */ |
1519 | other_session = sessions_head; | 1695 | other_session = sessions_head; |
@@ -1531,26 +1707,14 @@ initialize_session (struct ConsensusSession *session) | |||
1531 | } | 1707 | } |
1532 | 1708 | ||
1533 | session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); | 1709 | session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); |
1534 | |||
1535 | session->local_peer_idx = get_peer_idx (my_peer, session); | 1710 | session->local_peer_idx = get_peer_idx (my_peer, session); |
1536 | GNUNET_assert (-1 != session->local_peer_idx); | 1711 | GNUNET_assert (-1 != session->local_peer_idx); |
1537 | |||
1538 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); | 1712 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); |
1539 | 1713 | session->se = strata_estimator_create (); | |
1540 | session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *)); | ||
1541 | for (i = 0; i < STRATA_COUNT; i++) | ||
1542 | session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); | ||
1543 | |||
1544 | session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); | 1714 | session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); |
1545 | |||
1546 | session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); | ||
1547 | initialize_session_info (session); | ||
1548 | |||
1549 | GNUNET_free (session->join_msg); | 1715 | GNUNET_free (session->join_msg); |
1550 | session->join_msg = NULL; | 1716 | session->join_msg = NULL; |
1551 | |||
1552 | add_incoming_peers (session); | 1717 | add_incoming_peers (session); |
1553 | |||
1554 | GNUNET_SERVER_receive_done (session->client, GNUNET_OK); | 1718 | GNUNET_SERVER_receive_done (session->client, GNUNET_OK); |
1555 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); | 1719 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); |
1556 | } | 1720 | } |
@@ -1658,46 +1822,19 @@ client_insert (void *cls, | |||
1658 | 1822 | ||
1659 | GNUNET_assert (NULL != element->data); | 1823 | GNUNET_assert (NULL != element->data); |
1660 | 1824 | ||
1661 | hash_for_ibf (element, element_size, &hash); | 1825 | hash_for_ibf (element->data, element_size, &hash); |
1662 | |||
1663 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting with hash_for_ibf %s\n", GNUNET_h2s (&hash)); | ||
1664 | 1826 | ||
1665 | GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element, | 1827 | GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element, |
1666 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 1828 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
1667 | 1829 | ||
1668 | strata_insert (session->strata, &hash); | 1830 | strata_estimator_insert (session->se, &hash); |
1669 | 1831 | ||
1670 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1832 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1671 | 1833 | ||
1672 | send_next (session); | 1834 | client_send_next (session); |
1673 | } | 1835 | } |
1674 | 1836 | ||
1675 | 1837 | ||
1676 | /** | ||
1677 | * Functions of this signature are called whenever writing operations | ||
1678 | * on a stream are executed | ||
1679 | * | ||
1680 | * @param cls the closure from GNUNET_STREAM_write | ||
1681 | * @param status the status of the stream at the time this function is called; | ||
1682 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
1683 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
1684 | * (this doesn't mean that the data is never sent, the receiver may | ||
1685 | * have read the data but its ACKs may have been lost); | ||
1686 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
1687 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
1688 | * be processed. | ||
1689 | * @param size the number of bytes written | ||
1690 | */ | ||
1691 | static void | ||
1692 | write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
1693 | { | ||
1694 | struct ConsensusPeerInformation *cpi; | ||
1695 | cpi = cls; | ||
1696 | cpi->wh = NULL; | ||
1697 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1698 | /* just wait for the ibf */ | ||
1699 | } | ||
1700 | |||
1701 | 1838 | ||
1702 | /** | 1839 | /** |
1703 | * Functions of this signature are called whenever writing operations | 1840 | * Functions of this signature are called whenever writing operations |
@@ -1715,319 +1852,226 @@ write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1715 | * @param size the number of bytes written | 1852 | * @param size the number of bytes written |
1716 | */ | 1853 | */ |
1717 | static void | 1854 | static void |
1718 | write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 1855 | write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
1719 | { | 1856 | { |
1720 | struct ConsensusPeerInformation *cpi; | 1857 | struct ConsensusPeerInformation *cpi; |
1721 | struct StrataMessage *strata_msg; | ||
1722 | void *buf; | ||
1723 | size_t msize; | ||
1724 | int i; | ||
1725 | 1858 | ||
1859 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1726 | cpi = cls; | 1860 | cpi = cls; |
1727 | cpi->wh = NULL; | 1861 | cpi->wh = NULL; |
1728 | 1862 | if (NULL != cpi->messages_head) | |
1729 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1730 | |||
1731 | GNUNET_assert (GNUNET_YES == cpi->is_outgoing); | ||
1732 | |||
1733 | /* FIXME: handle this */ | ||
1734 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1735 | |||
1736 | msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); | ||
1737 | |||
1738 | strata_msg = GNUNET_malloc (msize); | ||
1739 | strata_msg->header.size = htons (msize); | ||
1740 | strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); | ||
1741 | |||
1742 | buf = &strata_msg[1]; | ||
1743 | for (i = 0; i < STRATA_COUNT; i++) | ||
1744 | { | 1863 | { |
1745 | ibf_write (cpi->session->strata[i], &buf, NULL); | 1864 | struct QueuedMessage *qm; |
1865 | qm = cpi->messages_head; | ||
1866 | GNUNET_CONTAINER_DLL_remove (cpi->messages_head, cpi->messages_tail, qm); | ||
1867 | cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size), | ||
1868 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1869 | write_queued, cpi); | ||
1870 | if (NULL != qm->cb) | ||
1871 | qm->cb (qm->cls); | ||
1872 | GNUNET_free (qm->msg); | ||
1873 | GNUNET_free (qm); | ||
1874 | GNUNET_assert (NULL != cpi->wh); | ||
1746 | } | 1875 | } |
1747 | |||
1748 | cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1749 | write_strata_done, cpi); | ||
1750 | |||
1751 | GNUNET_assert (NULL != cpi->wh); | ||
1752 | |||
1753 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n"); | ||
1754 | } | 1876 | } |
1755 | 1877 | ||
1756 | 1878 | ||
1757 | static void | 1879 | static void |
1758 | write_ibf_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 1880 | shuffle (struct ConsensusSession *session) |
1759 | { | 1881 | { |
1760 | struct ConsensusPeerInformation *cpi; | 1882 | /* FIXME: implement */ |
1761 | cpi = cls; | ||
1762 | cpi->wh = NULL; | ||
1763 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1764 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "write ibf done callback\n"); | ||
1765 | } | 1883 | } |
1766 | 1884 | ||
1767 | 1885 | ||
1768 | /** | 1886 | /** |
1769 | * Functions of this signature are called whenever writing operations | 1887 | * Find and set the partner_incoming and partner_outgoing of our peer, |
1770 | * on a stream are executed | 1888 | * one of them may not exist in most cases. |
1771 | * | 1889 | * |
1772 | * @param cls the closure from GNUNET_STREAM_write | 1890 | * @param session the consensus session |
1773 | * @param status the status of the stream at the time this function is called; | ||
1774 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
1775 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
1776 | * (this doesn't mean that the data is never sent, the receiver may | ||
1777 | * have read the data but its ACKs may have been lost); | ||
1778 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
1779 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
1780 | * be processed. | ||
1781 | * @param size the number of bytes written | ||
1782 | */ | 1891 | */ |
1783 | static void | 1892 | static void |
1784 | write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 1893 | find_partners (struct ConsensusSession *session) |
1785 | { | 1894 | { |
1786 | struct ConsensusPeerInformation *cpi; | 1895 | int mark[session->num_peers]; |
1787 | struct DifferenceDigest *digest; | 1896 | int i; |
1788 | int msize; | 1897 | memset (mark, 0, session->num_peers * sizeof (int)); |
1789 | int num_buckets; | 1898 | session->partner_incoming = session->partner_outgoing = NULL; |
1790 | void *buf; | 1899 | for (i = 0; i < session->num_peers; i++) |
1791 | 1900 | { | |
1792 | cpi = cls; | 1901 | int arc; |
1793 | cpi->wh = NULL; | 1902 | if (0 != mark[i]) |
1903 | continue; | ||
1904 | arc = (i + (1 << session->exp_subround)) % session->num_peers; | ||
1905 | mark[i] = mark[arc] = 1; | ||
1906 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d talks to %d\n", i, arc); | ||
1907 | GNUNET_assert (i != arc); | ||
1908 | if (i == session->local_peer_idx) | ||
1909 | { | ||
1910 | GNUNET_assert (NULL == session->partner_outgoing); | ||
1911 | session->partner_outgoing = &session->info[session->shuffle[arc]]; | ||
1912 | } | ||
1913 | if (arc == session->local_peer_idx) | ||
1914 | { | ||
1915 | GNUNET_assert (NULL == session->partner_incoming); | ||
1916 | session->partner_incoming = &session->info[session->shuffle[i]]; | ||
1917 | } | ||
1918 | if (0 != mark[session->local_peer_idx]) | ||
1919 | { | ||
1920 | return; | ||
1921 | } | ||
1922 | } | ||
1923 | } | ||
1794 | 1924 | ||
1795 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1796 | GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state); | ||
1797 | 1925 | ||
1798 | /* we should not be done here! */ | 1926 | /** |
1799 | GNUNET_assert (cpi->ibf_bucket_counter != (1 << cpi->ibf_order)); | 1927 | * Do the next subround in the exp-scheme. |
1928 | * | ||
1929 | * @param cls the session | ||
1930 | * @param tc task context, for when this task is invoked by the scheduler, | ||
1931 | * NULL if invoked for another reason | ||
1932 | */ | ||
1933 | static void | ||
1934 | subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
1935 | { | ||
1936 | struct ConsensusSession *session; | ||
1937 | int i; | ||
1800 | 1938 | ||
1801 | /* remaining buckets */ | 1939 | /* don't kick off next subround if we're shutting down */ |
1802 | num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; | 1940 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) |
1941 | return; | ||
1803 | 1942 | ||
1804 | /* limit to maximum */ | 1943 | session = cls; |
1805 | if (num_buckets > BUCKETS_PER_MESSAGE) | ||
1806 | num_buckets = BUCKETS_PER_MESSAGE; | ||
1807 | 1944 | ||
1808 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->ibf_bucket_counter, (1<<cpi->ibf_order)); | 1945 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "subround over, exp_round=%d, exp_subround=%d\n", session->exp_round, session->exp_subround); |
1809 | 1946 | ||
1810 | msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); | 1947 | for (i = 0; i < session->num_peers; i++) |
1948 | clear_peer_messages (&session->info[i]); | ||
1811 | 1949 | ||
1812 | digest = GNUNET_malloc (msize); | 1950 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) |
1813 | digest->header.size = htons (msize); | 1951 | { |
1814 | digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); | 1952 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); |
1815 | digest->order = cpi->ibf_order; | 1953 | } |
1954 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | ||
1816 | 1955 | ||
1817 | buf = &digest[1]; | 1956 | if ((session->num_peers == 2) && (session->exp_round == 1)) |
1818 | ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL); | 1957 | { |
1958 | round_over (session, NULL); | ||
1959 | return; | ||
1960 | } | ||
1819 | 1961 | ||
1820 | cpi->ibf_bucket_counter += num_buckets; | 1962 | if (session->exp_round == NUM_EXP_ROUNDS) |
1963 | { | ||
1964 | round_over (session, NULL); | ||
1965 | return; | ||
1966 | } | ||
1821 | 1967 | ||
1822 | /* we have to set the new state here, because of non-deterministic schedulung */ | 1968 | if (session->exp_round == 0) |
1823 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) | ||
1824 | { | 1969 | { |
1825 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); | 1970 | session->exp_round = 1; |
1826 | /* we now wait for values / requests / another IBF because peer could not decode with our IBF */ | 1971 | session->exp_subround = 0; |
1827 | cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; | 1972 | if (NULL == session->shuffle) |
1828 | cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf_done, cpi); | 1973 | session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers); |
1974 | for (i = 0; i < session->num_peers; i++) | ||
1975 | session->shuffle[i] = i; | ||
1829 | } | 1976 | } |
1830 | else | 1977 | else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers))) |
1831 | { | 1978 | { |
1832 | cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf, cpi); | 1979 | session->exp_round++; |
1980 | session->exp_subround = 0; | ||
1981 | shuffle (session); | ||
1982 | } | ||
1983 | else | ||
1984 | { | ||
1985 | session->exp_subround++; | ||
1833 | } | 1986 | } |
1834 | 1987 | ||
1835 | GNUNET_assert (NULL != cpi->wh); | 1988 | find_partners (session); |
1836 | } | ||
1837 | |||
1838 | |||
1839 | /** | ||
1840 | * Functions of this signature are called whenever writing operations | ||
1841 | * on a stream are executed | ||
1842 | * | ||
1843 | * @param cls the closure from GNUNET_STREAM_write | ||
1844 | * @param status the status of the stream at the time this function is called; | ||
1845 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
1846 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
1847 | * (this doesn't mean that the data is never sent, the receiver may | ||
1848 | * have read the data but its ACKs may have been lost); | ||
1849 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
1850 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
1851 | * be processed. | ||
1852 | * @param size the number of bytes written | ||
1853 | */ | ||
1854 | static void | ||
1855 | write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
1856 | { | ||
1857 | struct ConsensusPeerInformation *cpi; | ||
1858 | struct IBF_Key key; | ||
1859 | struct GNUNET_HashCode hashcode; | ||
1860 | int side; | ||
1861 | |||
1862 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1863 | |||
1864 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n"); | ||
1865 | 1989 | ||
1866 | cpi = cls; | 1990 | if (NULL != session->partner_outgoing) |
1867 | GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state); | 1991 | { |
1868 | cpi->wh = NULL; | 1992 | session->partner_outgoing->ibf_state = IBF_STATE_NONE; |
1993 | session->partner_outgoing->ibf_bucket_counter = 0; | ||
1994 | } | ||
1869 | 1995 | ||
1870 | if (NULL != cpi->requests_and_elements_head) | 1996 | if (NULL != session->partner_incoming) |
1871 | { | 1997 | { |
1872 | struct QueuedMessage *qm; | 1998 | session->partner_incoming->ibf_state = IBF_STATE_NONE; |
1873 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending queued element\n"); | 1999 | session->partner_incoming->ibf_bucket_counter = 0; |
1874 | qm = cpi->requests_and_elements_head; | ||
1875 | GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm); | ||
1876 | 2000 | ||
1877 | cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size), | 2001 | /* maybe there's an early strata estimator? */ |
1878 | GNUNET_TIME_UNIT_FOREVER_REL, | 2002 | if (NULL != session->partner_incoming->premature_strata_message) |
1879 | write_requests_and_elements, cpi); | 2003 | { |
1880 | GNUNET_assert (NULL != cpi->wh); | 2004 | session->partner_incoming->replaying_strata_message = GNUNET_YES; |
1881 | /* some elements / requests have queued up, we have to transmit them first */ | 2005 | handle_p2p_strata (session->partner_incoming, session->partner_incoming->premature_strata_message); |
1882 | return; | 2006 | GNUNET_free (session->partner_incoming->premature_strata_message); |
2007 | session->partner_incoming->replaying_strata_message = GNUNET_NO; | ||
2008 | } | ||
1883 | } | 2009 | } |
1884 | 2010 | ||
1885 | for (;;) | 2011 | if (NULL != session->partner_outgoing) |
1886 | { | 2012 | { |
1887 | int res; | 2013 | if (NULL == session->partner_outgoing->socket) |
1888 | res = ibf_decode (cpi->ibf, &side, &key); | ||
1889 | if (GNUNET_SYSERR == res) | ||
1890 | { | 2014 | { |
1891 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); | 2015 | session->partner_outgoing->socket = |
1892 | /* decoding failed, we tell the other peer by sending our ibf with a larger order */ | 2016 | GNUNET_STREAM_open (cfg, &session->partner_outgoing->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, |
1893 | cpi->ibf_order++; | 2017 | open_cb, session->partner_outgoing, |
1894 | prepare_ibf (cpi); | 2018 | GNUNET_STREAM_OPTION_END); |
1895 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
1896 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | ||
1897 | cpi->ibf_bucket_counter = 0; | ||
1898 | write_ibf (cls, status, size); | ||
1899 | return; | ||
1900 | } | 2019 | } |
1901 | if (GNUNET_NO == res) | 2020 | else if (GNUNET_YES == session->partner_outgoing->hello) |
1902 | { | 2021 | { |
1903 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n"); | 2022 | send_strata_estimator (session->partner_outgoing); |
1904 | return; | ||
1905 | } | 2023 | } |
1906 | if (-1 == side) | 2024 | /* else: do nothing, the send hello cb will handle this */ |
1907 | { | 2025 | } |
1908 | /* we have the element, send it to the other peer */ | ||
1909 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element\n"); | ||
1910 | ibf_hashcode_from_key (key, &hashcode); | ||
1911 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi); | ||
1912 | /* send the first message, because we can! */ | ||
1913 | if (NULL != cpi->requests_and_elements_head) | ||
1914 | { | ||
1915 | struct QueuedMessage *qm; | ||
1916 | qm = cpi->requests_and_elements_head; | ||
1917 | GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm); | ||
1918 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing element\n"); | ||
1919 | cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size), | ||
1920 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1921 | write_requests_and_elements, cpi); | ||
1922 | GNUNET_assert (NULL != cpi->wh); | ||
1923 | } | ||
1924 | else | ||
1925 | { | ||
1926 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "no element found for decoded hash %s\n", GNUNET_h2s (&hashcode)); | ||
1927 | } | ||
1928 | return; | ||
1929 | } | ||
1930 | else | ||
1931 | { | ||
1932 | struct ElementRequest *msg; | ||
1933 | size_t msize; | ||
1934 | struct IBF_Key *p; | ||
1935 | 2026 | ||
1936 | msize = (sizeof *msg) + sizeof (uint64_t); | 2027 | /* |
1937 | msg = GNUNET_malloc (msize); | 2028 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS), |
1938 | if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) | 2029 | subround_over, session); |
1939 | { | 2030 | */ |
1940 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request for element\n"); | ||
1941 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); | ||
1942 | } | ||
1943 | else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round) | ||
1944 | { | ||
1945 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending locally missing element\n"); | ||
1946 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL); | ||
1947 | } | ||
1948 | else | ||
1949 | { | ||
1950 | GNUNET_assert (0); | ||
1951 | } | ||
1952 | msg->header.size = htons (msize); | ||
1953 | p = (struct IBF_Key *) &msg[1]; | ||
1954 | *p = key; | ||
1955 | 2031 | ||
1956 | cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1957 | write_requests_and_elements, cpi); | ||
1958 | GNUNET_assert (NULL != cpi->wh); | ||
1959 | GNUNET_free (msg); | ||
1960 | return; | ||
1961 | } | ||
1962 | } | ||
1963 | } | 2032 | } |
1964 | 2033 | ||
1965 | 2034 | static void | |
1966 | static double | 2035 | contact_peer_a2a (struct ConsensusPeerInformation *cpi) |
1967 | compute_similarity (struct ConsensusSession *session, int p1, int p2) | ||
1968 | { | 2036 | { |
1969 | /* FIXME: simplistic dummy implementation, use real set union/intersecion */ | 2037 | cpi->is_outgoing = GNUNET_YES; |
1970 | return (session->info[p1].num_missing_local + session->info[p2].num_missing_local) / | 2038 | if (NULL == cpi->socket) |
1971 | ((double) (session->info[p1].num_missing_remote + session->info[p2].num_missing_remote + 1)); | 2039 | { |
2040 | cpi->socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS, | ||
2041 | open_cb, cpi, GNUNET_STREAM_OPTION_END); | ||
2042 | } | ||
2043 | else if (GNUNET_YES == cpi->hello) | ||
2044 | { | ||
2045 | send_strata_estimator (cpi); | ||
2046 | } | ||
1972 | } | 2047 | } |
1973 | 2048 | ||
1974 | |||
1975 | static void | 2049 | static void |
1976 | select_fittest_group (struct ConsensusSession *session) | 2050 | start_inventory (struct ConsensusSession *session) |
1977 | { | 2051 | { |
1978 | /* simplistic implementation: compute the similarity with the latest strata estimator, | ||
1979 | * rank the results once */ | ||
1980 | struct GNUNET_PeerIdentity *group; | ||
1981 | double rating[session->num_peers]; | ||
1982 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *done_msg; | ||
1983 | size_t msize; | ||
1984 | int i; | 2052 | int i; |
1985 | int j; | 2053 | int last; |
1986 | /* number of peers in the consensus group */ | ||
1987 | int k; | ||
1988 | |||
1989 | k = ceil(session->num_peers / 3.0) * 2; | ||
1990 | group = GNUNET_malloc (k * sizeof *group); | ||
1991 | 2054 | ||
1992 | /* do strata subtraction */ | 2055 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; |
1993 | /* FIXME: we know the real sets, subtract them! */ | 2056 | i = (session->local_peer_idx + 1) % session->num_peers; |
1994 | for (i = 0; i < session->num_peers; i++) | 2057 | while (i != last) |
1995 | { | 2058 | { |
1996 | rating[i] = 0; | 2059 | contact_peer_a2a (&session->info[i]); |
1997 | for (j = 0; j < i; j++) | 2060 | i = (i + 1) % session->num_peers; |
1998 | { | 2061 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i); |
1999 | double sim; | ||
2000 | sim = compute_similarity (session, i, j); | ||
2001 | rating[i] += sim; | ||
2002 | rating[j] += sim; | ||
2003 | } | ||
2004 | } | 2062 | } |
2005 | for (i = 0; i < k; i++) | 2063 | // tie-breaker for even number of peers |
2064 | if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) | ||
2006 | { | 2065 | { |
2007 | int best_idx = 0; | 2066 | contact_peer_a2a (&session->info[last]); |
2008 | for (j = 1; j < session->num_peers; j++) | 2067 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last); |
2009 | if (rating[j] > rating[best_idx]) | ||
2010 | best_idx = j; | ||
2011 | rating[best_idx] = -1; | ||
2012 | group[i] = session->peers[best_idx]; | ||
2013 | } | 2068 | } |
2014 | |||
2015 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got group!\n"); | ||
2016 | |||
2017 | msize = sizeof *done_msg + k * sizeof *group; | ||
2018 | |||
2019 | done_msg = GNUNET_malloc (msize); | ||
2020 | done_msg->header.size = htons (msize); | ||
2021 | done_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); | ||
2022 | memcpy (&done_msg[1], group, k * sizeof *group); | ||
2023 | |||
2024 | queue_client_message (session, (struct GNUNET_MessageHeader *) done_msg); | ||
2025 | send_next (session); | ||
2026 | } | 2069 | } |
2027 | 2070 | ||
2028 | 2071 | ||
2029 | /** | 2072 | /** |
2030 | * Select and kick off the next round, based on the current round. | 2073 | * Select and kick off the next round, based on the current round. |
2074 | * | ||
2031 | * @param cls the session | 2075 | * @param cls the session |
2032 | * @param tc task context, for when this task is invoked by the scheduler, | 2076 | * @param tc task context, for when this task is invoked by the scheduler, |
2033 | * NULL if invoked for another reason | 2077 | * NULL if invoked for another reason |
@@ -2045,61 +2089,50 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
2045 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n"); | 2089 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n"); |
2046 | session = cls; | 2090 | session = cls; |
2047 | 2091 | ||
2092 | for (i = 0; i < session->num_peers; i++) | ||
2093 | clear_peer_messages (&session->info[i]); | ||
2094 | |||
2048 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | 2095 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) |
2049 | { | 2096 | { |
2050 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | 2097 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); |
2051 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | 2098 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; |
2052 | } | 2099 | } |
2053 | 2100 | ||
2054 | for (i = 0; i < session->num_peers; i++) | 2101 | /* FIXME: cancel current round */ |
2055 | { | ||
2056 | if ((NULL != session->info) && (NULL != session->info[i].wh)) | ||
2057 | GNUNET_STREAM_write_cancel (session->info[i].wh); | ||
2058 | } | ||
2059 | 2102 | ||
2060 | switch (session->current_round) | 2103 | switch (session->current_round) |
2061 | { | 2104 | { |
2062 | case CONSENSUS_ROUND_BEGIN: | 2105 | case CONSENSUS_ROUND_BEGIN: |
2063 | { | 2106 | session->current_round = CONSENSUS_ROUND_EXCHANGE; |
2064 | session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE; | 2107 | session->exp_round = 0; |
2065 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4), | 2108 | subround_over (session, NULL); |
2066 | round_over, session); | 2109 | break; |
2067 | for (i = 0; i < session->num_peers; i++) | 2110 | case CONSENSUS_ROUND_EXCHANGE: |
2111 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "done for now\n"); | ||
2112 | |||
2113 | if (0) | ||
2068 | { | 2114 | { |
2069 | /* we can only talk to hello'ed peers */ | 2115 | struct GNUNET_MessageHeader *msg; |
2070 | if ( (GNUNET_YES == session->info[i].is_outgoing) && | 2116 | msg = GNUNET_malloc (sizeof *msg); |
2071 | (GNUNET_YES == session->info[i].hello) ) | 2117 | msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); |
2072 | { | 2118 | msg->size = htons (sizeof *msg); |
2073 | /* kick off transmitting strata by calling the write continuation */ | 2119 | queue_client_message (session, msg); |
2074 | write_strata (&session->info[i], GNUNET_STREAM_OK, 0); | 2120 | client_send_next (session); |
2075 | } | ||
2076 | } | 2121 | } |
2077 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude started, timeout=%llu\n", session->conclude_timeout.rel_value); | 2122 | |
2078 | break; | 2123 | if (0) |
2079 | } | ||
2080 | case CONSENSUS_ROUND_A2A_EXCHANGE: | ||
2081 | { | ||
2082 | session->current_round = CONSENSUS_ROUND_A2A_INVENTORY; | ||
2083 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "starting inventory round\n"); | ||
2084 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4), | ||
2085 | round_over, session); | ||
2086 | for (i = 0; i < session->num_peers; i++) | ||
2087 | { | 2124 | { |
2088 | session->info[i].ibf_state = IBF_STATE_NONE; | 2125 | session->current_round = CONSENSUS_ROUND_INVENTORY; |
2089 | if ( (GNUNET_YES == session->info[i].is_outgoing) && | 2126 | start_inventory (session); |
2090 | (GNUNET_YES == session->info[i].hello) ) | ||
2091 | { | ||
2092 | /* kick off transmitting strata by calling the write continuation */ | ||
2093 | write_strata (&session->info[i], GNUNET_STREAM_OK, 0); | ||
2094 | } | ||
2095 | } | 2127 | } |
2096 | break; | 2128 | break; |
2097 | } | 2129 | case CONSENSUS_ROUND_INVENTORY: |
2098 | case CONSENSUS_ROUND_A2A_INVENTORY: | 2130 | session->current_round = CONSENSUS_ROUND_STOCK; |
2099 | /* finally, we are done and select the most fitting group */ | 2131 | /* FIXME: exchange stock */ |
2100 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "protocol rounds done\n"); | 2132 | break; |
2133 | case CONSENSUS_ROUND_STOCK: | ||
2101 | session->current_round = CONSENSUS_ROUND_FINISH; | 2134 | session->current_round = CONSENSUS_ROUND_FINISH; |
2102 | select_fittest_group (session); | 2135 | /* FIXME: send elements to client */ |
2103 | break; | 2136 | break; |
2104 | default: | 2137 | default: |
2105 | GNUNET_assert (0); | 2138 | GNUNET_assert (0); |
@@ -2138,8 +2171,8 @@ client_conclude (void *cls, | |||
2138 | if (CONSENSUS_ROUND_BEGIN != session->current_round) | 2171 | if (CONSENSUS_ROUND_BEGIN != session->current_round) |
2139 | { | 2172 | { |
2140 | /* client requested conclude twice */ | 2173 | /* client requested conclude twice */ |
2141 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "unexpected round at conclude: %d\n", session->current_round); | ||
2142 | GNUNET_break (0); | 2174 | GNUNET_break (0); |
2175 | /* client may still own a session, destroy it */ | ||
2143 | disconnect_client (client); | 2176 | disconnect_client (client); |
2144 | return; | 2177 | return; |
2145 | } | 2178 | } |
@@ -2150,7 +2183,7 @@ client_conclude (void *cls, | |||
2150 | round_over (session, NULL); | 2183 | round_over (session, NULL); |
2151 | 2184 | ||
2152 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2185 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
2153 | send_next (session); | 2186 | client_send_next (session); |
2154 | } | 2187 | } |
2155 | 2188 | ||
2156 | 2189 | ||
@@ -2186,9 +2219,9 @@ client_ack (void *cls, | |||
2186 | return; | 2219 | return; |
2187 | } | 2220 | } |
2188 | 2221 | ||
2189 | pending = session->approval_pending_head; | 2222 | pending = session->client_approval_head; |
2190 | 2223 | ||
2191 | GNUNET_CONTAINER_DLL_remove (session->approval_pending_head, session->approval_pending_tail, pending); | 2224 | GNUNET_CONTAINER_DLL_remove (session->client_approval_head, session->client_approval_tail, pending); |
2192 | 2225 | ||
2193 | msg = (struct GNUNET_CONSENSUS_AckMessage *) message; | 2226 | msg = (struct GNUNET_CONSENSUS_AckMessage *) message; |
2194 | 2227 | ||
@@ -2196,11 +2229,12 @@ client_ack (void *cls, | |||
2196 | { | 2229 | { |
2197 | int i; | 2230 | int i; |
2198 | element = pending->element; | 2231 | element = pending->element; |
2199 | hash_for_ibf (element, element->size, &key); | 2232 | hash_for_ibf (element->data, element->size, &key); |
2200 | 2233 | ||
2201 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, | 2234 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, |
2202 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 2235 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
2203 | strata_insert (session->strata, &key); | 2236 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n"); |
2237 | strata_estimator_insert (session->se, &key); | ||
2204 | 2238 | ||
2205 | for (i = 0; i <= MAX_IBF_ORDER; i++) | 2239 | for (i = 0; i <= MAX_IBF_ORDER; i++) |
2206 | { | 2240 | { |
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c index d3218ff9b..5246b63a6 100644 --- a/src/consensus/ibf.c +++ b/src/consensus/ibf.c | |||
@@ -92,19 +92,20 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf, | |||
92 | struct IBF_Key key, int *dst) | 92 | struct IBF_Key key, int *dst) |
93 | { | 93 | { |
94 | struct GNUNET_HashCode bucket_indices; | 94 | struct GNUNET_HashCode bucket_indices; |
95 | unsigned int filled = 0; | 95 | unsigned int filled; |
96 | int i; | 96 | int i; |
97 | GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices); | 97 | GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices); |
98 | filled = 0; | ||
98 | for (i = 0; filled < ibf->hash_num; i++) | 99 | for (i = 0; filled < ibf->hash_num; i++) |
99 | { | 100 | { |
100 | unsigned int bucket; | 101 | unsigned int bucket; |
101 | unsigned int j; | 102 | unsigned int j; |
102 | if ( (0 != i) && (0 == (i % 16)) ) | 103 | if ( (0 != i) && (0 == (i % 16)) ) |
103 | GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), &bucket_indices); | 104 | GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), &bucket_indices); |
104 | bucket = bucket_indices.bits[i] % ibf->size; | 105 | bucket = bucket_indices.bits[i % 16] % ibf->size; |
105 | for (j = 0; j < filled; j++) | 106 | for (j = 0; j < filled; j++) |
106 | if (dst[j] == bucket) | 107 | if (dst[j] == bucket) |
107 | goto try_next;; | 108 | goto try_next; |
108 | dst[filled++] = bucket; | 109 | dst[filled++] = bucket; |
109 | try_next: ; | 110 | try_next: ; |
110 | } | 111 | } |
@@ -141,6 +142,7 @@ void | |||
141 | ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key) | 142 | ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key) |
142 | { | 143 | { |
143 | int buckets[ibf->hash_num]; | 144 | int buckets[ibf->hash_num]; |
145 | GNUNET_assert (ibf->hash_num <= ibf->size); | ||
144 | ibf_get_indices (ibf, key, buckets); | 146 | ibf_get_indices (ibf, key, buckets); |
145 | ibf_insert_into (ibf, key, buckets, 1); | 147 | ibf_insert_into (ibf, key, buckets, 1); |
146 | } | 148 | } |