aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-03-19 02:09:10 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-03-19 02:09:10 +0000
commitde88db7fecd4651732563450070095a2309079b7 (patch)
treee732c2a10aa5fe1bcf149597394975c1ea4a0857 /src/consensus
parentca39427d26e560d3ca74580825e1635b1dae4166 (diff)
downloadgnunet-de88db7fecd4651732563450070095a2309079b7.tar.gz
gnunet-de88db7fecd4651732563450070095a2309079b7.zip
fixed consensus for two peers, added log-rounds, started implementing freeze, multiple peers still buggy
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/consensus.h23
-rw-r--r--src/consensus/consensus_api.c8
-rw-r--r--src/consensus/consensus_protocol.h22
-rw-r--r--src/consensus/gnunet-consensus.c6
-rw-r--r--src/consensus/gnunet-service-consensus.c1306
-rw-r--r--src/consensus/ibf.c8
6 files changed, 695 insertions, 678 deletions
diff --git a/src/consensus/consensus.h b/src/consensus/consensus.h
index 2c68849b9..ebbadb926 100644
--- a/src/consensus/consensus.h
+++ b/src/consensus/consensus.h
@@ -52,12 +52,6 @@ struct GNUNET_CONSENSUS_ConcludeMessage
52 */ 52 */
53 struct GNUNET_MessageHeader header; 53 struct GNUNET_MessageHeader header;
54 54
55
56 /**
57 * Minimum group size required for a consensus group.
58 */
59 uint32_t min_group_size GNUNET_PACKED;
60
61 /** 55 /**
62 * Timeout for conclude 56 * Timeout for conclude
63 */ 57 */
@@ -65,23 +59,6 @@ struct GNUNET_CONSENSUS_ConcludeMessage
65}; 59};
66 60
67 61
68struct GNUNET_CONSENSUS_ConcludeDoneMessage
69{
70 /**
71 * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE
72 */
73 struct GNUNET_MessageHeader header;
74
75 uint32_t group_id GNUNET_PACKED;
76
77 uint32_t num_elements GNUNET_PACKED;
78
79 uint32_t num_peers GNUNET_PACKED;
80
81 /** PeerIdentity[num_peers] */
82};
83
84
85/** 62/**
86 * Message with an element 63 * Message with an element
87 */ 64 */
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 19bf81c86..be5117730 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -282,11 +282,11 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
282 */ 282 */
283static void 283static void
284handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, 284handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
285 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) 285 const struct GNUNET_MessageHeader *msg)
286{ 286{
287 GNUNET_assert (NULL != consensus->conclude_cb); 287 GNUNET_assert (NULL != consensus->conclude_cb);
288 consensus->may_not_destroy = GNUNET_YES; 288 consensus->may_not_destroy = GNUNET_YES;
289 consensus->conclude_cb (consensus->conclude_cls, NULL); 289 consensus->conclude_cb (consensus->conclude_cls);
290 consensus->may_not_destroy = GNUNET_NO; 290 consensus->may_not_destroy = GNUNET_NO;
291 consensus->conclude_cb = NULL; 291 consensus->conclude_cb = NULL;
292} 292}
@@ -323,7 +323,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
323 handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); 323 handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg);
324 break; 324 break;
325 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: 325 case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE:
326 handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); 326 handle_conclude_done (consensus, msg);
327 break; 327 break;
328 default: 328 default:
329 GNUNET_break (0); 329 GNUNET_break (0);
@@ -491,7 +491,6 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
491void 491void
492GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, 492GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
493 struct GNUNET_TIME_Relative timeout, 493 struct GNUNET_TIME_Relative timeout,
494 unsigned int min_group_size_in_consensus,
495 GNUNET_CONSENSUS_ConcludeCallback conclude, 494 GNUNET_CONSENSUS_ConcludeCallback conclude,
496 void *conclude_cls) 495 void *conclude_cls)
497{ 496{
@@ -508,7 +507,6 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
508 conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); 507 conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE);
509 conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); 508 conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage));
510 conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); 509 conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout);
511 conclude_msg->min_group_size = min_group_size_in_consensus;
512 510
513 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); 511 qmsg = GNUNET_malloc (sizeof (struct QueuedMessage));
514 qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; 512 qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg;
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h
index e8b2c8a34..c0420d55c 100644
--- a/src/consensus/consensus_protocol.h
+++ b/src/consensus/consensus_protocol.h
@@ -38,14 +38,9 @@ GNUNET_NETWORK_STRUCT_BEGIN
38struct StrataMessage 38struct StrataMessage
39{ 39{
40 struct GNUNET_MessageHeader header; 40 struct GNUNET_MessageHeader header;
41 /** 41 uint8_t round;
42 * Number of elements the sender currently has. 42 uint8_t exp_round;
43 */ 43 uint8_t exp_subround;
44 uint16_t num_elements;
45 /**
46 * Number of strata in this estimator.
47 */
48 uint16_t num_strata;
49 /* struct GNUNET_HashCode hash_buckets[ibf_size*num_strata] */ 44 /* struct GNUNET_HashCode hash_buckets[ibf_size*num_strata] */
50 /* struct GNUNET_HashCode id_buckets[ibf_size*num_strata] */ 45 /* struct GNUNET_HashCode id_buckets[ibf_size*num_strata] */
51 /* uint8_t count_buckets[ibf_size*num_strata] */ 46 /* uint8_t count_buckets[ibf_size*num_strata] */
@@ -56,8 +51,12 @@ struct DifferenceDigest
56 struct GNUNET_MessageHeader header; 51 struct GNUNET_MessageHeader header;
57 uint8_t order; 52 uint8_t order;
58 uint8_t round; 53 uint8_t round;
54 uint8_t exp_round;
55 uint8_t exp_subround;
56 /* rest: IBF */
59}; 57};
60 58
59
61struct Element 60struct Element
62{ 61{
63 struct GNUNET_MessageHeader header; 62 struct GNUNET_MessageHeader header;
@@ -75,7 +74,14 @@ struct ConsensusHello
75{ 74{
76 struct GNUNET_MessageHeader header; 75 struct GNUNET_MessageHeader header;
77 struct GNUNET_HashCode global_id; 76 struct GNUNET_HashCode global_id;
77};
78
79struct ConsensusRoundHeader
80{
81 struct GNUNET_MessageHeader header;
78 uint8_t round; 82 uint8_t round;
83 uint8_t exp_round;
84 uint8_t exp_subround;
79}; 85};
80 86
81 87
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index fd6019c20..7a951d35b 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -83,7 +83,7 @@ destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx)
83 * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not 83 * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not
84 */ 84 */
85static void 85static void
86conclude_cb (void *cls, const struct GNUNET_CONSENSUS_Group *group) 86conclude_cb (void *cls)
87{ 87{
88 GNUNET_SCHEDULER_add_now (destroy, cls); 88 GNUNET_SCHEDULER_add_now (destroy, cls);
89} 89}
@@ -142,7 +142,7 @@ do_consensus ()
142 } 142 }
143 143
144 for (i = 0; i < num_peers; i++) 144 for (i = 0; i < num_peers; i++)
145 GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, 0, conclude_cb, consensus_handles[i]); 145 GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]);
146} 146}
147 147
148 148
@@ -184,7 +184,7 @@ connect_complete (void *cls,
184 184
185static int 185static int
186new_element_cb (void *cls, 186new_element_cb (void *cls,
187 struct GNUNET_CONSENSUS_Element *element) 187 const struct GNUNET_CONSENSUS_Element *element)
188{ 188{
189 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); 189 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n");
190 return GNUNET_YES; 190 return GNUNET_YES;
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 494271e45..b1323f902 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -62,6 +62,11 @@
62 */ 62 */
63#define MAX_IBF_ORDER (16) 63#define MAX_IBF_ORDER (16)
64 64
65/**
66 * Number exp-rounds.
67 */
68#define NUM_EXP_ROUNDS (4)
69
65 70
66/* forward declarations */ 71/* forward declarations */
67 72
@@ -70,23 +75,32 @@ struct IncomingSocket;
70struct ConsensusPeerInformation; 75struct ConsensusPeerInformation;
71 76
72static void 77static void
73send_next (struct ConsensusSession *session); 78client_send_next (struct ConsensusSession *session);
74 79
75static void 80static int
76write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size); 81get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
77 82
78static void 83static void
79write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size); 84round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
85
86static void
87send_ibf (struct ConsensusPeerInformation *cpi);
88
89static void
90send_strata_estimator (struct ConsensusPeerInformation *cpi);
91
92static void
93decode (struct ConsensusPeerInformation *cpi);
80 94
81static void 95static void
82write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size); 96write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size);
83 97
84static int 98static void
85get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); 99subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
86 100
87 101
88/** 102/**
89 * An element that is waiting to be transmitted. 103 * An element that is waiting to be transmitted to the client.
90 */ 104 */
91struct PendingElement 105struct PendingElement
92{ 106{
@@ -109,11 +123,14 @@ struct PendingElement
109 struct ConsensusPeerInformation *cpi; 123 struct ConsensusPeerInformation *cpi;
110}; 124};
111 125
126
112/** 127/**
113 * Information about a peer that is in a consensus session. 128 * Information about a peer that is in a consensus session.
114 */ 129 */
115struct ConsensusPeerInformation 130struct ConsensusPeerInformation
116{ 131{
132 struct GNUNET_PeerIdentity peer_id;
133
117 /** 134 /**
118 * Socket for communicating with the peer, either created by the local peer, 135 * Socket for communicating with the peer, either created by the local peer,
119 * or the remote peer. 136 * or the remote peer.
@@ -126,22 +143,12 @@ struct ConsensusPeerInformation
126 struct GNUNET_SERVER_MessageStreamTokenizer *mst; 143 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
127 144
128 /** 145 /**
129 * Is socket's connection established, i.e. can we write to it? 146 * Do we connect to the peer, or does the peer connect to us?
130 * Only relevent to outgoing cpi. 147 * Only valid for all-to-all phases
131 */
132 int is_connected;
133
134 /**
135 * Type of the peer in the all-to-all rounds,
136 * GNUNET_YES if we initiate reconciliation.
137 */ 148 */
138 int is_outgoing; 149 int is_outgoing;
139 150
140 /** 151 int connected;
141 * if the peer did something wrong, and was disconnected,
142 * never interact with this peer again.
143 */
144 int is_bad;
145 152
146 /** 153 /**
147 * Did we receive/send a consensus hello? 154 * Did we receive/send a consensus hello?
@@ -194,43 +201,50 @@ struct ConsensusPeerInformation
194 * Strata estimator of the peer, NULL if our peer 201 * Strata estimator of the peer, NULL if our peer
195 * initiated the reconciliation. 202 * initiated the reconciliation.
196 */ 203 */
197 struct InvertibleBloomFilter **strata; 204 struct StrataEstimator *se;
198 205
199 /** 206 /**
200 * Elements that the peer is missing from us. 207 * Element keys that this peer misses, but we have them.
201 */ 208 */
202 uint64_t *missing_local; 209 struct GNUNET_CONTAINER_MultiHashMap *requested_keys;
203 210
204 /** 211 /**
205 * Number of elements in missing_local 212 * Element keys that this peer has, but we miss.
206 */ 213 */
207 unsigned int num_missing_local; 214 struct GNUNET_CONTAINER_MultiHashMap *reported_keys;
208 215
209 /** 216 /**
210 * Elements that this peer told us *we* don't have, 217 * Back-reference to the consensus session,
211 * i.e. we are the remote peer that has some values missing. 218 * to that ConsensusPeerInformation can be used as a closure
212 */ 219 */
213 uint64_t *missing_remote; 220 struct ConsensusSession *session;
214 221
215 /** 222 /**
216 * Number of elements in missing_local 223 * Messages queued for the current round.
217 */ 224 */
218 unsigned int num_missing_remote; 225 struct QueuedMessage *messages_head;
219 226
220 /** 227 /**
221 * Back-reference to the consensus session, 228 * Messages queued for the current round.
222 * to that ConsensusPeerInformation can be used as a closure
223 */ 229 */
224 struct ConsensusSession *session; 230 struct QueuedMessage *messages_tail;
225 231
226 /** 232 /**
227 * When decoding the IBF, requests for elements and outgoing elements 233 * True if we are actually replaying the strata message,
228 * have to be queued, to ensure that messages actually fit in the stream buffer. 234 * e.g. currently handling the premature_strata_message.
229 */ 235 */
230 struct QueuedMessage *requests_and_elements_head; 236 int replaying_strata_message;
231 struct QueuedMessage *requests_and_elements_tail; 237
238 /**
239 * A strata message that is not actually for the current round,
240 * used in the exp-scheme.
241 */
242 struct StrataMessage *premature_strata_message;
243
232}; 244};
233 245
246typedef void (*QueuedMessageCallback) (void *msg);
247
234/** 248/**
235 * A doubly linked list of messages. 249 * A doubly linked list of messages.
236 */ 250 */
@@ -247,6 +261,10 @@ struct QueuedMessage
247 * Queued messages are stored in a doubly linked list. 261 * Queued messages are stored in a doubly linked list.
248 */ 262 */
249 struct QueuedMessage *prev; 263 struct QueuedMessage *prev;
264
265 QueuedMessageCallback cb;
266
267 void *cls;
250}; 268};
251 269
252/** 270/**
@@ -255,31 +273,32 @@ struct QueuedMessage
255enum ConsensusRound 273enum ConsensusRound
256{ 274{
257 /** 275 /**
258 * Not started the protocl yet 276 * Not started the protocol yet.
259 */ 277 */
260 CONSENSUS_ROUND_BEGIN=0, 278 CONSENSUS_ROUND_BEGIN=0,
261 /** 279 /**
262 * distribution of information with the exponential scheme. 280 * Distribution of elements with the exponential scheme.
263 */
264 CONSENSUS_ROUND_EXP_EXCHANGE,
265 /**
266 * All-to-all, exchange missing values.
267 */ 281 */
268 CONSENSUS_ROUND_A2A_EXCHANGE, 282 CONSENSUS_ROUND_EXCHANGE,
269 /** 283 /**
270 * All-to-all, check what values are missing, don't exchange anything. 284 * Exchange which elements each peer has, but not the elements.
271 */ 285 */
272 CONSENSUS_ROUND_A2A_INVENTORY, 286 CONSENSUS_ROUND_INVENTORY,
273 /** 287 /**
274 * All-to-all round to exchange information for byzantine fault detection. 288 * Collect and distribute missing values.
275 */ 289 */
276 CONSENSUS_ROUND_A2A_INVENTORY_AGREEMENT, 290 CONSENSUS_ROUND_STOCK,
277 /** 291 /**
278 * Rounds are over 292 * Consensus concluded.
279 */ 293 */
280 CONSENSUS_ROUND_FINISH 294 CONSENSUS_ROUND_FINISH
281}; 295};
282 296
297struct StrataEstimator
298{
299 struct InvertibleBloomFilter **strata;
300};
301
283 302
284/** 303/**
285 * A consensus session consists of one local client and the remote authorities. 304 * A consensus session consists of one local client and the remote authorities.
@@ -305,7 +324,7 @@ struct ConsensusSession
305 324
306 /** 325 /**
307 * Global consensus identification, computed 326 * Global consensus identification, computed
308 * from the local id and participating authorities. 327 * from the session id and participating authorities.
309 */ 328 */
310 struct GNUNET_HashCode global_id; 329 struct GNUNET_HashCode global_id;
311 330
@@ -316,7 +335,7 @@ struct ConsensusSession
316 struct GNUNET_SERVER_Client *client; 335 struct GNUNET_SERVER_Client *client;
317 336
318 /** 337 /**
319 * Values in the consensus set of this session, 338 * Elements in the consensus set of this session,
320 * all of them either have been sent by or approved by the client. 339 * all of them either have been sent by or approved by the client.
321 * Contains GNUNET_CONSENSUS_Element. 340 * Contains GNUNET_CONSENSUS_Element.
322 */ 341 */
@@ -325,12 +344,12 @@ struct ConsensusSession
325 /** 344 /**
326 * Elements that have not been approved (or rejected) by the client yet. 345 * Elements that have not been approved (or rejected) by the client yet.
327 */ 346 */
328 struct PendingElement *approval_pending_head; 347 struct PendingElement *client_approval_head;
329 348
330 /** 349 /**
331 * Elements that have not been approved (or rejected) by the client yet. 350 * Elements that have not been approved (or rejected) by the client yet.
332 */ 351 */
333 struct PendingElement *approval_pending_tail; 352 struct PendingElement *client_approval_tail;
334 353
335 /** 354 /**
336 * Messages to be sent to the local client that owns this session 355 * Messages to be sent to the local client that owns this session
@@ -345,7 +364,7 @@ struct ConsensusSession
345 /** 364 /**
346 * Currently active transmit handle for sending to the client 365 * Currently active transmit handle for sending to the client
347 */ 366 */
348 struct GNUNET_SERVER_TransmitHandle *th; 367 struct GNUNET_SERVER_TransmitHandle *client_th;
349 368
350 /** 369 /**
351 * Timeout for all rounds together, single rounds will schedule a timeout task 370 * Timeout for all rounds together, single rounds will schedule a timeout task
@@ -370,12 +389,6 @@ struct ConsensusSession
370 struct ConsensusPeerInformation *info; 389 struct ConsensusPeerInformation *info;
371 390
372 /** 391 /**
373 * Sorted array of peer identities in this consensus session,
374 * includes the local peer.
375 */
376 struct GNUNET_PeerIdentity *peers;
377
378 /**
379 * Index of the local peer in the peers array 392 * Index of the local peer in the peers array
380 */ 393 */
381 int local_peer_idx; 394 int local_peer_idx;
@@ -383,7 +396,7 @@ struct ConsensusSession
383 /** 396 /**
384 * Strata estimator, computed online 397 * Strata estimator, computed online
385 */ 398 */
386 struct InvertibleBloomFilter **strata; 399 struct StrataEstimator *se;
387 400
388 /** 401 /**
389 * Pre-computed IBFs 402 * Pre-computed IBFs
@@ -394,6 +407,26 @@ struct ConsensusSession
394 * Current round 407 * Current round
395 */ 408 */
396 enum ConsensusRound current_round; 409 enum ConsensusRound current_round;
410
411 int exp_round;
412
413 int exp_subround;
414
415 /**
416 * Permutation of peers for the current round,
417 * maps logical index (for current round) to physical index (location in info array)
418 */
419 int *shuffle;
420
421 /**
422 * The partner for the current exp-round
423 */
424 struct ConsensusPeerInformation* partner_outgoing;
425
426 /**
427 * The partner for the current exp-round
428 */
429 struct ConsensusPeerInformation* partner_incoming;
397}; 430};
398 431
399 432
@@ -427,7 +460,7 @@ struct IncomingSocket
427 /** 460 /**
428 * Peer that connected to us with the socket. 461 * Peer that connected to us with the socket.
429 */ 462 */
430 struct GNUNET_PeerIdentity *peer; 463 struct GNUNET_PeerIdentity peer_id;
431 464
432 /** 465 /**
433 * Message stream tokenizer for this socket. 466 * Message stream tokenizer for this socket.
@@ -442,8 +475,6 @@ struct IncomingSocket
442 /** 475 /**
443 * Set to the global session id, if the peer sent us a hello-message, 476 * Set to the global session id, if the peer sent us a hello-message,
444 * but the session does not exist yet. 477 * but the session does not exist yet.
445 *
446 * FIXME: not implemented yet
447 */ 478 */
448 struct GNUNET_HashCode *requested_gid; 479 struct GNUNET_HashCode *requested_gid;
449}; 480};
@@ -504,26 +535,45 @@ queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHea
504} 535}
505 536
506/** 537/**
507 * Get peer index associated with the peer information, 538 * Queue a message to be sent to another peer
508 * unique for every session among all peers. 539 *
540 * @param cpi peer
541 * @param msg message we want to queue
509 */ 542 */
510static int 543static void
511get_cpi_index (struct ConsensusPeerInformation *cpi) 544queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls)
512{ 545{
513 return cpi - cpi->session->info; 546 struct QueuedMessage *qm;
547 qm = GNUNET_malloc (sizeof *qm);
548 qm->msg = msg;
549 qm->cls = cls;
550 qm->cb = cb;
551 GNUNET_CONTAINER_DLL_insert_tail (cpi->messages_head, cpi->messages_tail, qm);
552 if (cpi->wh == NULL)
553 write_queued (cpi, GNUNET_STREAM_OK, 0);
514} 554}
515 555
556
516/** 557/**
517 * Mark the peer as bad, free state we don't need anymore. 558 * Queue a message to be sent to another peer
518 * 559 *
519 * @param cpi consensus peer information of the bad peer 560 * @param cpi peer
561 * @param msg message we want to queue
520 */ 562 */
521static void 563static void
522mark_peer_bad (struct ConsensusPeerInformation *cpi) 564queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg)
523{ 565{
524 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer #%u marked as bad\n", get_cpi_index (cpi)); 566 queue_peer_message_with_cls (cpi, msg, NULL, NULL);
525 cpi->is_bad = GNUNET_YES; 567}
526 /* FIXME: free ibfs etc. */ 568
569
570
571static void
572clear_peer_messages (struct ConsensusPeerInformation *cpi)
573{
574 /* FIXME: deallocate */
575 cpi->messages_head = NULL;
576 cpi->messages_tail = NULL;
527} 577}
528 578
529 579
@@ -532,13 +582,13 @@ mark_peer_bad (struct ConsensusPeerInformation *cpi)
532 * i.e. arrays of IBFs. 582 * i.e. arrays of IBFs.
533 * Does not not modify its arguments. 583 * Does not not modify its arguments.
534 * 584 *
535 * @param strata1 first strata estimator 585 * @param se1 first strata estimator
536 * @param strata2 second strata estimator 586 * @param se2 second strata estimator
537 * @return the estimated difference 587 * @return the estimated difference
538 */ 588 */
539static int 589static int
540estimate_difference (struct InvertibleBloomFilter** strata1, 590estimate_difference (struct StrataEstimator *se1,
541 struct InvertibleBloomFilter** strata2) 591 struct StrataEstimator *se2)
542{ 592{
543 int i; 593 int i;
544 int count; 594 int count;
@@ -551,8 +601,8 @@ estimate_difference (struct InvertibleBloomFilter** strata1,
551 int more; 601 int more;
552 ibf_count = 0; 602 ibf_count = 0;
553 /* FIXME: implement this without always allocating new IBFs */ 603 /* FIXME: implement this without always allocating new IBFs */
554 diff = ibf_dup (strata1[i]); 604 diff = ibf_dup (se1->strata[i]);
555 ibf_subtract (diff, strata2[i]); 605 ibf_subtract (diff, se2->strata[i]);
556 for (;;) 606 for (;;)
557 { 607 {
558 more = ibf_decode (diff, NULL, NULL); 608 more = ibf_decode (diff, NULL, NULL);
@@ -664,25 +714,23 @@ send_element_iter (void *cls,
664{ 714{
665 struct ConsensusPeerInformation *cpi; 715 struct ConsensusPeerInformation *cpi;
666 struct GNUNET_CONSENSUS_Element *element; 716 struct GNUNET_CONSENSUS_Element *element;
667 struct QueuedMessage *qm;
668 struct GNUNET_MessageHeader *element_msg; 717 struct GNUNET_MessageHeader *element_msg;
669 size_t msize; 718 size_t msize;
719
670 cpi = cls; 720 cpi = cls;
671 element = value; 721 element = value;
672 msize = sizeof (struct GNUNET_MessageHeader) + element->size; 722 msize = sizeof (struct GNUNET_MessageHeader) + element->size;
673 element_msg = GNUNET_malloc (msize); 723 element_msg = GNUNET_malloc (msize);
674 element_msg->size = htons (msize); 724 element_msg->size = htons (msize);
675 if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) 725 if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round)
676 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); 726 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
677 else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round) 727 else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)
678 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE); 728 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
679 else 729 else
680 GNUNET_assert (0); 730 GNUNET_assert (0);
681 GNUNET_assert (NULL != element->data); 731 GNUNET_assert (NULL != element->data);
682 memcpy (&element_msg[1], element->data, element->size); 732 memcpy (&element_msg[1], element->data, element->size);
683 qm = GNUNET_malloc (sizeof *qm); 733 queue_peer_message (cpi, element_msg);
684 qm->msg = element_msg;
685 GNUNET_CONTAINER_DLL_insert (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
686 return GNUNET_YES; 734 return GNUNET_YES;
687} 735}
688 736
@@ -733,30 +781,79 @@ prepare_ibf (struct ConsensusPeerInformation *cpi)
733 * @param msg message 781 * @param msg message
734 */ 782 */
735static int 783static int
736handle_p2p_missing_local (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) 784handle_p2p_element_report (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
737{ 785{
738 uint64_t key; 786 GNUNET_assert (0);
739 key = *(uint64_t *) &msg[1]; 787}
740 GNUNET_array_append (cpi->missing_remote, cpi->num_missing_remote, key); 788
741 return GNUNET_OK; 789static void
790queue_cont_subround_over (void *cls)
791{
792 struct ConsensusSession *session;
793 session = cls;
794 subround_over (session, NULL);
742} 795}
743 796
744 797
745/** 798/**
746 * Called when a remote peer wants to inform the local peer 799 * Gets called when the other peer wants us to inform that
747 * that the local peer misses elements. 800 * it has decoded our ibf and sent us all elements / requests
748 * Elements are not reconciled.
749 *
750 * @param cpi session
751 * @param msg message
752 */ 801 */
753static int 802static int
754handle_p2p_missing_remote (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) 803handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
755{ 804{
756 uint64_t key; 805 struct GNUNET_MessageHeader *fin_msg;
757 key = *(uint64_t *) &msg[1]; 806 switch (cpi->session->current_round)
758 GNUNET_array_append (cpi->missing_local, cpi->num_missing_local, key); 807 {
759 return GNUNET_OK; 808 case CONSENSUS_ROUND_EXCHANGE:
809 fin_msg = GNUNET_malloc (sizeof *fin_msg);
810 fin_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
811 fin_msg->size = htons (sizeof *fin_msg);
812 /* the subround os over once we kicked off sending the fin msg */
813 /* FIXME: assert we are talking to the right peer! */
814 queue_peer_message_with_cls (cpi, fin_msg, queue_cont_subround_over, cpi->session);
815 break;
816 default:
817 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the current round\n");
818 break;
819 }
820 return GNUNET_YES;
821}
822
823/**
824 * The other peer wants us to inform that he sent us all the elements we requested.
825 */
826static int
827handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
828{
829 /* FIXME: only call subround_over if round is the current one! */
830 switch (cpi->session->current_round)
831 {
832 case CONSENSUS_ROUND_EXCHANGE:
833 subround_over (cpi->session, NULL);
834 break;
835 default:
836 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the current round\n");
837 break;
838 }
839 return GNUNET_YES;
840}
841
842
843static struct StrataEstimator *
844strata_estimator_create ()
845{
846 struct StrataEstimator *se;
847 int i;
848
849 /* fixme: allocate everything in one chunk */
850
851 se = GNUNET_malloc (sizeof (struct StrataEstimator));
852 se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter) * STRATA_COUNT);
853 for (i = 0; i < STRATA_COUNT; i++)
854 se->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
855
856 return se;
760} 857}
761 858
762 859
@@ -775,33 +872,47 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
775 void *buf; 872 void *buf;
776 size_t size; 873 size_t size;
777 874
778 GNUNET_assert (GNUNET_NO == cpi->is_outgoing); 875 switch (cpi->session->current_round)
779
780 if (NULL == cpi->strata)
781 { 876 {
782 cpi->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *)); 877 case CONSENSUS_ROUND_EXCHANGE:
783 for (i = 0; i < STRATA_COUNT; i++) 878 if ( (strata_msg->round != CONSENSUS_ROUND_EXCHANGE) ||
784 cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); 879 (strata_msg->exp_round != cpi->session->exp_round) ||
880 (strata_msg->exp_subround != cpi->session->exp_subround))
881 {
882 if (GNUNET_NO == cpi->replaying_strata_message)
883 {
884 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got probably premature message\n");
885 cpi->premature_strata_message = (struct StrataMessage *) GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg);
886 }
887 return GNUNET_YES;
888 }
889 break;
890 default:
891 GNUNET_assert (0);
892 break;
785 } 893 }
786 894
895 if (NULL == cpi->se)
896 cpi->se = strata_estimator_create ();
897
787 size = ntohs (strata_msg->header.size); 898 size = ntohs (strata_msg->header.size);
788 buf = (void *) &strata_msg[1]; 899 buf = (void *) &strata_msg[1];
789 for (i = 0; i < STRATA_COUNT; i++) 900 for (i = 0; i < STRATA_COUNT; i++)
790 { 901 {
791 int res; 902 int res;
792 res = ibf_read (&buf, &size, cpi->strata[i]); 903 res = ibf_read (&buf, &size, cpi->se->strata[i]);
793 GNUNET_assert (GNUNET_OK == res); 904 GNUNET_assert (GNUNET_OK == res);
794 } 905 }
795 906
796 diff = estimate_difference (cpi->session->strata, cpi->strata); 907 diff = estimate_difference (cpi->session->se, cpi->se);
797 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", diff); 908 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", diff);
798 909
799 if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) || 910 if ( (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) ||
800 (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round)) 911 (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round))
801 { 912 {
802 /* send IBF of the right size */ 913 /* send IBF of the right size */
803 cpi->ibf_order = 0; 914 cpi->ibf_order = 0;
804 while ((1 << cpi->ibf_order) < diff) 915 while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << cpi->ibf_order) )
805 cpi->ibf_order++; 916 cpi->ibf_order++;
806 if (cpi->ibf_order > MAX_IBF_ORDER) 917 if (cpi->ibf_order > MAX_IBF_ORDER)
807 cpi->ibf_order = MAX_IBF_ORDER; 918 cpi->ibf_order = MAX_IBF_ORDER;
@@ -811,9 +922,8 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
811 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); 922 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
812 cpi->ibf_state = IBF_STATE_TRANSMITTING; 923 cpi->ibf_state = IBF_STATE_TRANSMITTING;
813 cpi->ibf_bucket_counter = 0; 924 cpi->ibf_bucket_counter = 0;
814 write_ibf (cpi, GNUNET_STREAM_OK, 0); 925 send_ibf (cpi);
815 } 926 }
816
817 return GNUNET_YES; 927 return GNUNET_YES;
818} 928}
819 929
@@ -824,6 +934,8 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig
824 int num_buckets; 934 int num_buckets;
825 void *buf; 935 void *buf;
826 936
937 /* FIXME: find out if we're still expecting the same ibf! */
938
827 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; 939 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
828 switch (cpi->ibf_state) 940 switch (cpi->ibf_state)
829 { 941 {
@@ -852,16 +964,14 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig
852 case IBF_STATE_RECEIVING: 964 case IBF_STATE_RECEIVING:
853 break; 965 break;
854 default: 966 default:
855 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received ibf unexpectedly in state %d\n", cpi->ibf_state); 967 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "received ibf unexpectedly in state %d\n", cpi->ibf_state);
856 mark_peer_bad (cpi); 968 return GNUNET_YES;
857 return GNUNET_NO;
858 } 969 }
859 970
860 if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) 971 if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
861 { 972 {
862 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n"); 973 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n");
863 mark_peer_bad (cpi); 974 return GNUNET_YES;
864 return GNUNET_NO;
865 } 975 }
866 976
867 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, 977 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets,
@@ -877,16 +987,19 @@ handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDig
877 987
878 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) 988 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
879 { 989 {
880 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n");
881 cpi->ibf_state = IBF_STATE_DECODING; 990 cpi->ibf_state = IBF_STATE_DECODING;
882 prepare_ibf (cpi); 991 prepare_ibf (cpi);
883 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); 992 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
884 write_requests_and_elements (cpi, GNUNET_STREAM_OK, 0); 993 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "about to decode\n");
994 decode (cpi);
885 } 995 }
886 return GNUNET_YES; 996 return GNUNET_YES;
887} 997}
888 998
889 999
1000/**
1001 * Handle an element that another peer sent us
1002 */
890static int 1003static int
891handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) 1004handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
892{ 1005{
@@ -897,8 +1010,6 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me
897 1010
898 size = ntohs (element_msg->size) - sizeof *element_msg; 1011 size = ntohs (element_msg->size) - sizeof *element_msg;
899 1012
900 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size);
901
902 element = GNUNET_malloc (size + sizeof *element); 1013 element = GNUNET_malloc (size + sizeof *element);
903 element->size = size; 1014 element->size = size;
904 memcpy (&element[1], &element_msg[1], size); 1015 memcpy (&element[1], &element_msg[1], size);
@@ -906,7 +1017,7 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me
906 1017
907 pending_element = GNUNET_malloc (sizeof *pending_element); 1018 pending_element = GNUNET_malloc (sizeof *pending_element);
908 pending_element->element = element; 1019 pending_element->element = element;
909 GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, cpi->session->approval_pending_tail, pending_element); 1020 GNUNET_CONTAINER_DLL_insert_tail (cpi->session->client_approval_head, cpi->session->client_approval_tail, pending_element);
910 1021
911 client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg); 1022 client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
912 client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); 1023 client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
@@ -915,74 +1026,61 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me
915 1026
916 queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg); 1027 queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg);
917 1028
918 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n"); 1029 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element, size=%d\n", size);
919 1030
920 send_next (cpi->session); 1031 client_send_next (cpi->session);
921 1032
922 return GNUNET_YES; 1033 return GNUNET_YES;
923} 1034}
924 1035
925 1036
926/** 1037/**
927 * Functions of this signature are called whenever writing operations
928 * on a stream are executed
929 *
930 * @param cls the closure from GNUNET_STREAM_write
931 * @param status the status of the stream at the time this function is called;
932 * GNUNET_STREAM_OK if writing to stream was completed successfully;
933 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
934 * (this doesn't mean that the data is never sent, the receiver may
935 * have read the data but its ACKs may have been lost);
936 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
937 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
938 * be processed.
939 * @param size the number of bytes written
940 */
941static void
942write_requested_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
943{
944 struct ConsensusPeerInformation *cpi;
945 cpi = cls;
946 GNUNET_assert (NULL == cpi->wh);
947 cpi->wh = NULL;
948 if (NULL != cpi->requests_and_elements_head)
949 {
950 struct QueuedMessage *qm;
951 qm = cpi->requests_and_elements_head;
952 GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
953
954 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
955 GNUNET_TIME_UNIT_FOREVER_REL,
956 write_requested_elements, cpi);
957 GNUNET_assert (NULL != cpi->wh);
958 }
959}
960
961
962/**
963 * Handle a request for elements. 1038 * Handle a request for elements.
964 * Only allowed in exchange-rounds. 1039 * Only allowed in exchange-rounds.
965 */ 1040 */
966static int 1041static int
967handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) 1042handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
968{ 1043{
969 struct GNUNET_HashCode *hashcode; 1044 struct GNUNET_HashCode hashcode;
1045 struct IBF_Key *ibf_key;
970 unsigned int num; 1046 unsigned int num;
971 1047
972 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request\n"); 1048 num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
973 num = ntohs (msg->header.size) / sizeof (struct GNUNET_HashCode); 1049 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u elements\n", num);
974 hashcode = (struct GNUNET_HashCode *) &msg[1]; 1050
1051 ibf_key = (struct IBF_Key *) &msg[1];
975 while (num--) 1052 while (num--)
976 { 1053 {
977 GNUNET_assert (IBF_STATE_ANTICIPATE_DIFF == cpi->ibf_state); 1054 ibf_hashcode_from_key (*ibf_key, &hashcode);
978 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, hashcode, send_element_iter, cpi); 1055 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi);
979 if (NULL == cpi->wh) 1056 ibf_key++;
980 write_requested_elements (cpi, GNUNET_STREAM_OK, 0);
981 hashcode++;
982 } 1057 }
983 return GNUNET_YES; 1058 return GNUNET_YES;
984} 1059}
985 1060
1061/**
1062 * If necessary, send a message to the peer, depending on the current
1063 * round.
1064 */
1065static void
1066embrace_peer (struct ConsensusPeerInformation *cpi)
1067{
1068 GNUNET_assert (GNUNET_YES == cpi->hello);
1069 switch (cpi->session->current_round)
1070 {
1071 case CONSENSUS_ROUND_EXCHANGE:
1072 if (cpi->session->partner_outgoing != cpi)
1073 break;
1074 /* fallthrough */
1075 case CONSENSUS_ROUND_INVENTORY:
1076 /* fallthrough */
1077 case CONSENSUS_ROUND_STOCK:
1078 send_strata_estimator (cpi);
1079 default:
1080 break;
1081 }
1082}
1083
986 1084
987/** 1085/**
988 * Handle a HELLO-message, send when another peer wants to join a session where 1086 * Handle a HELLO-message, send when another peer wants to join a session where
@@ -993,38 +1091,172 @@ handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello
993{ 1091{
994 /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */ 1092 /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */
995 struct ConsensusSession *session; 1093 struct ConsensusSession *session;
1094
996 session = sessions_head; 1095 session = sessions_head;
997 while (NULL != session) 1096 while (NULL != session)
998 { 1097 {
999 if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id)) 1098 if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &hello->global_id))
1000 { 1099 {
1001 int idx; 1100 int idx;
1002 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer helloed session\n"); 1101 idx = get_peer_idx (&inc->peer_id, session);
1003 idx = get_peer_idx (inc->peer, session);
1004 GNUNET_assert (-1 != idx); 1102 GNUNET_assert (-1 != idx);
1005 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "idx is %d\n", idx); 1103 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d hello'ed session %d\n", idx);
1006 inc->cpi = &session->info[idx]; 1104 inc->cpi = &session->info[idx];
1007 GNUNET_assert (GNUNET_NO == inc->cpi->is_outgoing);
1008 inc->cpi->mst = inc->mst; 1105 inc->cpi->mst = inc->mst;
1009 inc->cpi->hello = GNUNET_YES; 1106 inc->cpi->hello = GNUNET_YES;
1010 inc->cpi->socket = inc->socket; 1107 inc->cpi->socket = inc->socket;
1011 1108 embrace_peer (inc->cpi);
1012 if ( (CONSENSUS_ROUND_A2A_EXCHANGE == session->current_round) &&
1013 (GNUNET_YES == inc->cpi->is_outgoing))
1014 {
1015 write_strata (&session->info[idx], GNUNET_STREAM_OK, 0);
1016 }
1017 return GNUNET_YES; 1109 return GNUNET_YES;
1018 } 1110 }
1019 session = session->next; 1111 session = session->next;
1020 } 1112 }
1021 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer tried to HELLO uninhabited session\n"); 1113 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "peer tried to HELLO uninhabited session\n");
1022 GNUNET_break (0);
1023 return GNUNET_NO; 1114 return GNUNET_NO;
1024} 1115}
1025 1116
1026 1117
1027/** 1118/**
1119 * Send a strata estimator.
1120 *
1121 * @param cpi the peer
1122 */
1123static void
1124send_strata_estimator (struct ConsensusPeerInformation *cpi)
1125{
1126 struct StrataMessage *strata_msg;
1127 void *buf;
1128 size_t msize;
1129 int i;
1130
1131 msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1132
1133 strata_msg = GNUNET_malloc (msize);
1134 strata_msg->header.size = htons (msize);
1135 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1136 strata_msg->round = cpi->session->current_round;
1137 strata_msg->exp_round = cpi->session->exp_round;
1138 strata_msg->exp_subround = cpi->session->exp_subround;
1139
1140 buf = &strata_msg[1];
1141 for (i = 0; i < STRATA_COUNT; i++)
1142 {
1143 ibf_write (cpi->session->se->strata[i], &buf, NULL);
1144 }
1145
1146 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg);
1147}
1148
1149/**
1150 * Send an IBF of the order specified in cpi
1151 *
1152 * @param cpi the peer
1153 */
1154static void
1155send_ibf (struct ConsensusPeerInformation *cpi)
1156{
1157 int sent_buckets;
1158 sent_buckets = 0;
1159 while (sent_buckets < (1 << cpi->ibf_order))
1160 {
1161 int num_buckets;
1162 void *buf;
1163 struct DifferenceDigest *digest;
1164 int msize;
1165
1166 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
1167 /* limit to maximum */
1168 if (num_buckets > BUCKETS_PER_MESSAGE)
1169 num_buckets = BUCKETS_PER_MESSAGE;
1170
1171 msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE);
1172
1173 digest = GNUNET_malloc (msize);
1174 digest->header.size = htons (msize);
1175 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1176 digest->order = cpi->ibf_order;
1177
1178 buf = &digest[1];
1179 ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL);
1180
1181 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest);
1182
1183 sent_buckets += num_buckets;
1184 }
1185 cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
1186}
1187
1188
1189static void
1190decode (struct ConsensusPeerInformation *cpi)
1191{
1192 struct IBF_Key key;
1193 struct GNUNET_HashCode hashcode;
1194 int side;
1195
1196 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n");
1197
1198 for (;;)
1199 {
1200 int res;
1201 res = ibf_decode (cpi->ibf, &side, &key);
1202 if (GNUNET_SYSERR == res)
1203 {
1204 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
1205 /* decoding failed, we tell the other peer by sending our ibf with a larger order */
1206 cpi->ibf_order++;
1207 prepare_ibf (cpi);
1208 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1209 cpi->ibf_state = IBF_STATE_TRANSMITTING;
1210 cpi->ibf_bucket_counter = 0;
1211 send_ibf (cpi);
1212 return;
1213 }
1214 if (GNUNET_NO == res)
1215 {
1216 struct GNUNET_MessageHeader *msg;
1217 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n");
1218 msg = GNUNET_malloc (sizeof *msg);
1219 msg->size = htons (sizeof *msg);
1220 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
1221 queue_peer_message (cpi, msg);
1222 return;
1223 }
1224 if (-1 == side)
1225 {
1226 /* we have the element(s), send it to the other peer */
1227 ibf_hashcode_from_key (key, &hashcode);
1228 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi);
1229 }
1230 else
1231 {
1232 struct ElementRequest *msg;
1233 size_t msize;
1234 struct IBF_Key *p;
1235
1236 msize = (sizeof *msg) + sizeof (struct IBF_Key);
1237 msg = GNUNET_malloc (msize);
1238 if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round)
1239 {
1240 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1241 }
1242 else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)
1243 {
1244 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1245 }
1246 else
1247 {
1248 GNUNET_assert (0);
1249 }
1250 msg->header.size = htons (msize);
1251 p = (struct IBF_Key *) &msg[1];
1252 *p = key;
1253 queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg);
1254 }
1255 }
1256}
1257
1258
1259/**
1028 * Functions with this signature are called whenever a 1260 * Functions with this signature are called whenever a
1029 * complete message is received by the tokenizer. 1261 * complete message is received by the tokenizer.
1030 * 1262 *
@@ -1049,16 +1281,17 @@ mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader
1049 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); 1281 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
1050 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: 1282 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
1051 return handle_p2p_element (cpi, message); 1283 return handle_p2p_element (cpi, message);
1052 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL: 1284 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT:
1053 return handle_p2p_missing_local (cpi, message); 1285 return handle_p2p_element_report (cpi, message);
1054 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE:
1055 return handle_p2p_missing_remote (cpi, message);
1056 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: 1286 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
1057 return handle_p2p_element_request (cpi, (struct ElementRequest *) message); 1287 return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
1288 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED:
1289 return handle_p2p_synced (cpi, message);
1290 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN:
1291 return handle_p2p_fin (cpi, message);
1058 default: 1292 default:
1059 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type)); 1293 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s\n",
1060 /* FIXME: handle correctly */ 1294 ntohs (message->type), GNUNET_h2s (&cpi->peer_id.hashPubKey));
1061 GNUNET_assert (0);
1062 } 1295 }
1063 return GNUNET_OK; 1296 return GNUNET_OK;
1064} 1297}
@@ -1089,8 +1322,8 @@ mst_incoming_callback (void *cls, void *client, const struct GNUNET_MessageHeade
1089 default: 1322 default:
1090 if (NULL != inc->cpi) 1323 if (NULL != inc->cpi)
1091 return mst_session_callback (inc->cpi, client, message); 1324 return mst_session_callback (inc->cpi, client, message);
1092 /* FIXME: disconnect peer properly */ 1325 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ignoring unexpected message type (%u) from peer: %s (not in session)\n",
1093 GNUNET_assert (0); 1326 ntohs (message->type), GNUNET_h2s (&inc->peer_id.hashPubKey));
1094 } 1327 }
1095 return GNUNET_OK; 1328 return GNUNET_OK;
1096} 1329}
@@ -1114,21 +1347,14 @@ listen_cb (void *cls,
1114 const struct GNUNET_PeerIdentity *initiator) 1347 const struct GNUNET_PeerIdentity *initiator)
1115{ 1348{
1116 struct IncomingSocket *incoming; 1349 struct IncomingSocket *incoming;
1117
1118 GNUNET_assert (NULL != socket); 1350 GNUNET_assert (NULL != socket);
1119
1120 incoming = GNUNET_malloc (sizeof *incoming); 1351 incoming = GNUNET_malloc (sizeof *incoming);
1121
1122 incoming->socket = socket; 1352 incoming->socket = socket;
1123 incoming->peer = GNUNET_memdup (initiator, sizeof *initiator); 1353 incoming->peer_id = *initiator;
1124
1125 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 1354 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1126 &incoming_stream_data_processor, incoming); 1355 &incoming_stream_data_processor, incoming);
1127
1128 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); 1356 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
1129
1130 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); 1357 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
1131
1132 return GNUNET_OK; 1358 return GNUNET_OK;
1133} 1359}
1134 1360
@@ -1179,26 +1405,21 @@ disconnect_client (struct GNUNET_SERVER_Client *client)
1179 * Thus, if the local id of two consensus sessions coincide, but are not comprised of 1405 * Thus, if the local id of two consensus sessions coincide, but are not comprised of
1180 * exactly the same peers, the global id will be different. 1406 * exactly the same peers, the global id will be different.
1181 * 1407 *
1182 * @param local_id local id of the consensus session 1408 * @param session_id local id of the consensus session
1183 * @param peers array of all peers participating in the consensus session
1184 * @param num_peers number of elements in the peers array
1185 * @param dst where the result is stored, may not be NULL
1186 */ 1409 */
1187static void 1410static void
1188compute_global_id (const struct GNUNET_HashCode *local_id, 1411compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCode *session_id)
1189 const struct GNUNET_PeerIdentity *peers, int num_peers,
1190 struct GNUNET_HashCode *dst)
1191{ 1412{
1192 int i; 1413 int i;
1193 struct GNUNET_HashCode tmp; 1414 struct GNUNET_HashCode tmp;
1194 1415
1195 *dst = *local_id; 1416 session->global_id = *session_id;
1196 for (i = 0; i < num_peers; ++i) 1417 for (i = 0; i < session->num_peers; ++i)
1197 { 1418 {
1198 GNUNET_CRYPTO_hash_xor (dst, &peers[0].hashPubKey, &tmp); 1419 GNUNET_CRYPTO_hash_xor (&session->global_id, &session->info[i].peer_id.hashPubKey, &tmp);
1199 *dst = tmp; 1420 session->global_id = tmp;
1200 GNUNET_CRYPTO_hash (dst, sizeof (struct GNUNET_PeerIdentity), &tmp); 1421 GNUNET_CRYPTO_hash (&session->global_id, sizeof (struct GNUNET_PeerIdentity), &tmp);
1201 *dst = tmp; 1422 session->global_id = tmp;
1202 } 1423 }
1203} 1424}
1204 1425
@@ -1220,7 +1441,7 @@ transmit_queued (void *cls, size_t size,
1220 size_t msg_size; 1441 size_t msg_size;
1221 1442
1222 session = cls; 1443 session = cls;
1223 session->th = NULL; 1444 session->client_th = NULL;
1224 1445
1225 qmsg = session->client_messages_head; 1446 qmsg = session->client_messages_head;
1226 GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); 1447 GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
@@ -1240,7 +1461,7 @@ transmit_queued (void *cls, size_t size,
1240 GNUNET_free (qmsg->msg); 1461 GNUNET_free (qmsg->msg);
1241 GNUNET_free (qmsg); 1462 GNUNET_free (qmsg);
1242 1463
1243 send_next (session); 1464 client_send_next (session);
1244 1465
1245 return msg_size; 1466 return msg_size;
1246} 1467}
@@ -1252,21 +1473,21 @@ transmit_queued (void *cls, size_t size,
1252 * @param cli the client to send the next message to 1473 * @param cli the client to send the next message to
1253 */ 1474 */
1254static void 1475static void
1255send_next (struct ConsensusSession *session) 1476client_send_next (struct ConsensusSession *session)
1256{ 1477{
1257 1478
1258 GNUNET_assert (NULL != session); 1479 GNUNET_assert (NULL != session);
1259 1480
1260 if (NULL != session->th) 1481 if (NULL != session->client_th)
1261 return; 1482 return;
1262 1483
1263 if (NULL != session->client_messages_head) 1484 if (NULL != session->client_messages_head)
1264 { 1485 {
1265 int msize; 1486 int msize;
1266 msize = ntohs (session->client_messages_head->msg->size); 1487 msize = ntohs (session->client_messages_head->msg->size);
1267 session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, 1488 session->client_th = GNUNET_SERVER_notify_transmit_ready (session->client, msize,
1268 GNUNET_TIME_UNIT_FOREVER_REL, 1489 GNUNET_TIME_UNIT_FOREVER_REL,
1269 &transmit_queued, session); 1490 &transmit_queued, session);
1270 } 1491 }
1271} 1492}
1272 1493
@@ -1297,11 +1518,11 @@ hash_cmp (const void *a, const void *b)
1297static int 1518static int
1298get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session) 1519get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
1299{ 1520{
1300 const struct GNUNET_PeerIdentity *needle; 1521 int i;
1301 needle = bsearch (peer, session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); 1522 for (i = 0; i < session->num_peers; i++)
1302 if (NULL == needle) 1523 if (0 == memcmp (peer, &session->info[i].peer_id, sizeof *peer))
1303 return -1; 1524 return i;
1304 return needle - session->peers; 1525 return -1;
1305} 1526}
1306 1527
1307 1528
@@ -1316,16 +1537,8 @@ hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1316 cpi = cls; 1537 cpi = cls;
1317 cpi->wh = NULL; 1538 cpi->wh = NULL;
1318 cpi->hello = GNUNET_YES; 1539 cpi->hello = GNUNET_YES;
1319
1320 GNUNET_assert (GNUNET_STREAM_OK == status); 1540 GNUNET_assert (GNUNET_STREAM_OK == status);
1321 1541 embrace_peer (cpi);
1322 /* FIXME: other rounds */
1323
1324 if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) &&
1325 (GNUNET_YES == cpi->is_outgoing))
1326 {
1327 write_strata (cpi, GNUNET_STREAM_OK, 0);
1328 }
1329} 1542}
1330 1543
1331 1544
@@ -1342,62 +1555,20 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1342 struct ConsensusHello *hello; 1555 struct ConsensusHello *hello;
1343 1556
1344 cpi = cls; 1557 cpi = cls;
1345 cpi->is_connected = GNUNET_YES;
1346 cpi->wh = NULL; 1558 cpi->wh = NULL;
1347
1348 hello = GNUNET_malloc (sizeof *hello); 1559 hello = GNUNET_malloc (sizeof *hello);
1349 hello->header.size = htons (sizeof *hello); 1560 hello->header.size = htons (sizeof *hello);
1350 hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); 1561 hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
1351 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); 1562 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
1352 1563 GNUNET_assert (NULL == cpi->mst);
1564 cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
1353 cpi->wh = 1565 cpi->wh =
1354 GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); 1566 GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
1355
1356 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 1567 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1357 &session_stream_data_processor, cpi); 1568 &session_stream_data_processor, cpi);
1358} 1569}
1359 1570
1360 1571
1361static void
1362initialize_session_info (struct ConsensusSession *session)
1363{
1364 int i;
1365 int last;
1366
1367 for (i = 0; i < session->num_peers; ++i)
1368 {
1369 /* initialize back-references, so consensus peer information can
1370 * be used as closure */
1371 session->info[i].session = session;
1372 }
1373
1374 session->current_round = CONSENSUS_ROUND_BEGIN;
1375
1376 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1377 i = (session->local_peer_idx + 1) % session->num_peers;
1378 while (i != last)
1379 {
1380 session->info[i].is_outgoing = GNUNET_YES;
1381 session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS,
1382 open_cb, &session->info[i], GNUNET_STREAM_OPTION_END);
1383 session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[i]);
1384 i = (i + 1) % session->num_peers;
1385
1386 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i);
1387 }
1388 // tie-breaker for even number of peers
1389 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
1390 {
1391 session->info[last].is_outgoing = GNUNET_YES;
1392 session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
1393 open_cb, &session->info[last], GNUNET_STREAM_OPTION_END);
1394 session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[last]);
1395
1396 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last);
1397 }
1398}
1399
1400
1401/** 1572/**
1402 * Create the sorted list of peers for the session, 1573 * Create the sorted list of peers for the session,
1403 * add the local peer if not in the join message. 1574 * add the local peer if not in the join message.
@@ -1408,6 +1579,7 @@ initialize_session_peer_list (struct ConsensusSession *session)
1408 unsigned int local_peer_in_list; 1579 unsigned int local_peer_in_list;
1409 uint32_t listed_peers; 1580 uint32_t listed_peers;
1410 const struct GNUNET_PeerIdentity *msg_peers; 1581 const struct GNUNET_PeerIdentity *msg_peers;
1582 struct GNUNET_PeerIdentity *peers;
1411 unsigned int i; 1583 unsigned int i;
1412 1584
1413 GNUNET_assert (NULL != session->join_msg); 1585 GNUNET_assert (NULL != session->join_msg);
@@ -1432,18 +1604,30 @@ initialize_session_peer_list (struct ConsensusSession *session)
1432 if (GNUNET_NO == local_peer_in_list) 1604 if (GNUNET_NO == local_peer_in_list)
1433 session->num_peers++; 1605 session->num_peers++;
1434 1606
1435 session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); 1607 peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity));
1436 1608
1437 if (GNUNET_NO == local_peer_in_list) 1609 if (GNUNET_NO == local_peer_in_list)
1438 session->peers[session->num_peers - 1] = *my_peer; 1610 peers[session->num_peers - 1] = *my_peer;
1611
1612 memcpy (peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity));
1613 qsort (peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp);
1614
1615 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1439 1616
1440 memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); 1617 for (i = 0; i < session->num_peers; ++i)
1441 qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &hash_cmp); 1618 {
1619 /* initialize back-references, so consensus peer information can
1620 * be used as closure */
1621 session->info[i].session = session;
1622 session->info[i].peer_id = peers[i];
1623 }
1624
1625 free (peers);
1442} 1626}
1443 1627
1444 1628
1445static void 1629static void
1446strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *key) 1630strata_estimator_insert (struct StrataEstimator *se, struct GNUNET_HashCode *key)
1447{ 1631{
1448 uint32_t v; 1632 uint32_t v;
1449 int i; 1633 int i;
@@ -1451,7 +1635,7 @@ strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *ke
1451 /* count trailing '1'-bits of v */ 1635 /* count trailing '1'-bits of v */
1452 for (i = 0; v & 1; v>>=1, i++) 1636 for (i = 0; v & 1; v>>=1, i++)
1453 /* empty */; 1637 /* empty */;
1454 ibf_insert (strata[i], ibf_key_from_hashcode (key)); 1638 ibf_insert (se->strata[i], ibf_key_from_hashcode (key));
1455} 1639}
1456 1640
1457 1641
@@ -1476,13 +1660,8 @@ add_incoming_peers (struct ConsensusSession *session)
1476 { 1660 {
1477 struct ConsensusPeerInformation *cpi; 1661 struct ConsensusPeerInformation *cpi;
1478 cpi = &session->info[i]; 1662 cpi = &session->info[i];
1479 if (0 == memcmp (inc->peer, &cpi->session->peers[i], sizeof (struct GNUNET_PeerIdentity))) 1663 if (0 == memcmp (&inc->peer_id, &cpi->peer_id, sizeof (struct GNUNET_PeerIdentity)))
1480 { 1664 {
1481 if (GNUNET_YES == cpi->is_outgoing)
1482 {
1483 /* FIXME: disconnect */
1484 continue;
1485 }
1486 cpi->socket = inc->socket; 1665 cpi->socket = inc->socket;
1487 inc->cpi = cpi; 1666 inc->cpi = cpi;
1488 inc->cpi->mst = inc->mst; 1667 inc->cpi->mst = inc->mst;
@@ -1505,15 +1684,12 @@ static void
1505initialize_session (struct ConsensusSession *session) 1684initialize_session (struct ConsensusSession *session)
1506{ 1685{
1507 const struct ConsensusSession *other_session; 1686 const struct ConsensusSession *other_session;
1508 int i;
1509 1687
1510 GNUNET_assert (NULL != session->join_msg); 1688 GNUNET_assert (NULL != session->join_msg);
1511
1512 initialize_session_peer_list (session); 1689 initialize_session_peer_list (session);
1513 1690 session->current_round = CONSENSUS_ROUND_BEGIN;
1514 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); 1691 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
1515 1692 compute_global_id (session, &session->join_msg->session_id);
1516 compute_global_id (&session->join_msg->session_id, session->peers, session->num_peers, &session->global_id);
1517 1693
1518 /* Check if some local client already owns the session. */ 1694 /* Check if some local client already owns the session. */
1519 other_session = sessions_head; 1695 other_session = sessions_head;
@@ -1531,26 +1707,14 @@ initialize_session (struct ConsensusSession *session)
1531 } 1707 }
1532 1708
1533 session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO); 1709 session->values = GNUNET_CONTAINER_multihashmap_create (256, GNUNET_NO);
1534
1535 session->local_peer_idx = get_peer_idx (my_peer, session); 1710 session->local_peer_idx = get_peer_idx (my_peer, session);
1536 GNUNET_assert (-1 != session->local_peer_idx); 1711 GNUNET_assert (-1 != session->local_peer_idx);
1537
1538 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx); 1712 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d is the local peer\n", session->local_peer_idx);
1539 1713 session->se = strata_estimator_create ();
1540 session->strata = GNUNET_malloc (STRATA_COUNT * sizeof (struct InvertibleBloomFilter *));
1541 for (i = 0; i < STRATA_COUNT; i++)
1542 session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
1543
1544 session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); 1714 session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
1545
1546 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1547 initialize_session_info (session);
1548
1549 GNUNET_free (session->join_msg); 1715 GNUNET_free (session->join_msg);
1550 session->join_msg = NULL; 1716 session->join_msg = NULL;
1551
1552 add_incoming_peers (session); 1717 add_incoming_peers (session);
1553
1554 GNUNET_SERVER_receive_done (session->client, GNUNET_OK); 1718 GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1555 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); 1719 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1556} 1720}
@@ -1658,46 +1822,19 @@ client_insert (void *cls,
1658 1822
1659 GNUNET_assert (NULL != element->data); 1823 GNUNET_assert (NULL != element->data);
1660 1824
1661 hash_for_ibf (element, element_size, &hash); 1825 hash_for_ibf (element->data, element_size, &hash);
1662
1663 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting with hash_for_ibf %s\n", GNUNET_h2s (&hash));
1664 1826
1665 GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element, 1827 GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element,
1666 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 1828 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1667 1829
1668 strata_insert (session->strata, &hash); 1830 strata_estimator_insert (session->se, &hash);
1669 1831
1670 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1832 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1671 1833
1672 send_next (session); 1834 client_send_next (session);
1673} 1835}
1674 1836
1675 1837
1676/**
1677 * Functions of this signature are called whenever writing operations
1678 * on a stream are executed
1679 *
1680 * @param cls the closure from GNUNET_STREAM_write
1681 * @param status the status of the stream at the time this function is called;
1682 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1683 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1684 * (this doesn't mean that the data is never sent, the receiver may
1685 * have read the data but its ACKs may have been lost);
1686 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1687 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1688 * be processed.
1689 * @param size the number of bytes written
1690 */
1691static void
1692write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1693{
1694 struct ConsensusPeerInformation *cpi;
1695 cpi = cls;
1696 cpi->wh = NULL;
1697 GNUNET_assert (GNUNET_STREAM_OK == status);
1698 /* just wait for the ibf */
1699}
1700
1701 1838
1702/** 1839/**
1703 * Functions of this signature are called whenever writing operations 1840 * Functions of this signature are called whenever writing operations
@@ -1715,319 +1852,226 @@ write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1715 * @param size the number of bytes written 1852 * @param size the number of bytes written
1716 */ 1853 */
1717static void 1854static void
1718write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) 1855write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1719{ 1856{
1720 struct ConsensusPeerInformation *cpi; 1857 struct ConsensusPeerInformation *cpi;
1721 struct StrataMessage *strata_msg;
1722 void *buf;
1723 size_t msize;
1724 int i;
1725 1858
1859 GNUNET_assert (GNUNET_STREAM_OK == status);
1726 cpi = cls; 1860 cpi = cls;
1727 cpi->wh = NULL; 1861 cpi->wh = NULL;
1728 1862 if (NULL != cpi->messages_head)
1729 GNUNET_assert (GNUNET_STREAM_OK == status);
1730
1731 GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
1732
1733 /* FIXME: handle this */
1734 GNUNET_assert (GNUNET_STREAM_OK == status);
1735
1736 msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1737
1738 strata_msg = GNUNET_malloc (msize);
1739 strata_msg->header.size = htons (msize);
1740 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1741
1742 buf = &strata_msg[1];
1743 for (i = 0; i < STRATA_COUNT; i++)
1744 { 1863 {
1745 ibf_write (cpi->session->strata[i], &buf, NULL); 1864 struct QueuedMessage *qm;
1865 qm = cpi->messages_head;
1866 GNUNET_CONTAINER_DLL_remove (cpi->messages_head, cpi->messages_tail, qm);
1867 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
1868 GNUNET_TIME_UNIT_FOREVER_REL,
1869 write_queued, cpi);
1870 if (NULL != qm->cb)
1871 qm->cb (qm->cls);
1872 GNUNET_free (qm->msg);
1873 GNUNET_free (qm);
1874 GNUNET_assert (NULL != cpi->wh);
1746 } 1875 }
1747
1748 cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1749 write_strata_done, cpi);
1750
1751 GNUNET_assert (NULL != cpi->wh);
1752
1753 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n");
1754} 1876}
1755 1877
1756 1878
1757static void 1879static void
1758write_ibf_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) 1880shuffle (struct ConsensusSession *session)
1759{ 1881{
1760 struct ConsensusPeerInformation *cpi; 1882 /* FIXME: implement */
1761 cpi = cls;
1762 cpi->wh = NULL;
1763 GNUNET_assert (GNUNET_STREAM_OK == status);
1764 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "write ibf done callback\n");
1765} 1883}
1766 1884
1767 1885
1768/** 1886/**
1769 * Functions of this signature are called whenever writing operations 1887 * Find and set the partner_incoming and partner_outgoing of our peer,
1770 * on a stream are executed 1888 * one of them may not exist in most cases.
1771 * 1889 *
1772 * @param cls the closure from GNUNET_STREAM_write 1890 * @param session the consensus session
1773 * @param status the status of the stream at the time this function is called;
1774 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1775 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1776 * (this doesn't mean that the data is never sent, the receiver may
1777 * have read the data but its ACKs may have been lost);
1778 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1779 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1780 * be processed.
1781 * @param size the number of bytes written
1782 */ 1891 */
1783static void 1892static void
1784write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) 1893find_partners (struct ConsensusSession *session)
1785{ 1894{
1786 struct ConsensusPeerInformation *cpi; 1895 int mark[session->num_peers];
1787 struct DifferenceDigest *digest; 1896 int i;
1788 int msize; 1897 memset (mark, 0, session->num_peers * sizeof (int));
1789 int num_buckets; 1898 session->partner_incoming = session->partner_outgoing = NULL;
1790 void *buf; 1899 for (i = 0; i < session->num_peers; i++)
1791 1900 {
1792 cpi = cls; 1901 int arc;
1793 cpi->wh = NULL; 1902 if (0 != mark[i])
1903 continue;
1904 arc = (i + (1 << session->exp_subround)) % session->num_peers;
1905 mark[i] = mark[arc] = 1;
1906 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d talks to %d\n", i, arc);
1907 GNUNET_assert (i != arc);
1908 if (i == session->local_peer_idx)
1909 {
1910 GNUNET_assert (NULL == session->partner_outgoing);
1911 session->partner_outgoing = &session->info[session->shuffle[arc]];
1912 }
1913 if (arc == session->local_peer_idx)
1914 {
1915 GNUNET_assert (NULL == session->partner_incoming);
1916 session->partner_incoming = &session->info[session->shuffle[i]];
1917 }
1918 if (0 != mark[session->local_peer_idx])
1919 {
1920 return;
1921 }
1922 }
1923}
1794 1924
1795 GNUNET_assert (GNUNET_STREAM_OK == status);
1796 GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state);
1797 1925
1798 /* we should not be done here! */ 1926/**
1799 GNUNET_assert (cpi->ibf_bucket_counter != (1 << cpi->ibf_order)); 1927 * Do the next subround in the exp-scheme.
1928 *
1929 * @param cls the session
1930 * @param tc task context, for when this task is invoked by the scheduler,
1931 * NULL if invoked for another reason
1932 */
1933static void
1934subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1935{
1936 struct ConsensusSession *session;
1937 int i;
1800 1938
1801 /* remaining buckets */ 1939 /* don't kick off next subround if we're shutting down */
1802 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; 1940 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1941 return;
1803 1942
1804 /* limit to maximum */ 1943 session = cls;
1805 if (num_buckets > BUCKETS_PER_MESSAGE)
1806 num_buckets = BUCKETS_PER_MESSAGE;
1807 1944
1808 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->ibf_bucket_counter, (1<<cpi->ibf_order)); 1945 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "subround over, exp_round=%d, exp_subround=%d\n", session->exp_round, session->exp_subround);
1809 1946
1810 msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); 1947 for (i = 0; i < session->num_peers; i++)
1948 clear_peer_messages (&session->info[i]);
1811 1949
1812 digest = GNUNET_malloc (msize); 1950 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
1813 digest->header.size = htons (msize); 1951 {
1814 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); 1952 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
1815 digest->order = cpi->ibf_order; 1953 }
1954 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
1816 1955
1817 buf = &digest[1]; 1956 if ((session->num_peers == 2) && (session->exp_round == 1))
1818 ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL); 1957 {
1958 round_over (session, NULL);
1959 return;
1960 }
1819 1961
1820 cpi->ibf_bucket_counter += num_buckets; 1962 if (session->exp_round == NUM_EXP_ROUNDS)
1963 {
1964 round_over (session, NULL);
1965 return;
1966 }
1821 1967
1822 /* we have to set the new state here, because of non-deterministic schedulung */ 1968 if (session->exp_round == 0)
1823 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
1824 { 1969 {
1825 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); 1970 session->exp_round = 1;
1826 /* we now wait for values / requests / another IBF because peer could not decode with our IBF */ 1971 session->exp_subround = 0;
1827 cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; 1972 if (NULL == session->shuffle)
1828 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf_done, cpi); 1973 session->shuffle = GNUNET_malloc ((sizeof (int)) * session->num_peers);
1974 for (i = 0; i < session->num_peers; i++)
1975 session->shuffle[i] = i;
1829 } 1976 }
1830 else 1977 else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
1831 { 1978 {
1832 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf, cpi); 1979 session->exp_round++;
1980 session->exp_subround = 0;
1981 shuffle (session);
1982 }
1983 else
1984 {
1985 session->exp_subround++;
1833 } 1986 }
1834 1987
1835 GNUNET_assert (NULL != cpi->wh); 1988 find_partners (session);
1836}
1837
1838
1839/**
1840 * Functions of this signature are called whenever writing operations
1841 * on a stream are executed
1842 *
1843 * @param cls the closure from GNUNET_STREAM_write
1844 * @param status the status of the stream at the time this function is called;
1845 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1846 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1847 * (this doesn't mean that the data is never sent, the receiver may
1848 * have read the data but its ACKs may have been lost);
1849 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1850 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1851 * be processed.
1852 * @param size the number of bytes written
1853 */
1854static void
1855write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1856{
1857 struct ConsensusPeerInformation *cpi;
1858 struct IBF_Key key;
1859 struct GNUNET_HashCode hashcode;
1860 int side;
1861
1862 GNUNET_assert (GNUNET_STREAM_OK == status);
1863
1864 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n");
1865 1989
1866 cpi = cls; 1990 if (NULL != session->partner_outgoing)
1867 GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state); 1991 {
1868 cpi->wh = NULL; 1992 session->partner_outgoing->ibf_state = IBF_STATE_NONE;
1993 session->partner_outgoing->ibf_bucket_counter = 0;
1994 }
1869 1995
1870 if (NULL != cpi->requests_and_elements_head) 1996 if (NULL != session->partner_incoming)
1871 { 1997 {
1872 struct QueuedMessage *qm; 1998 session->partner_incoming->ibf_state = IBF_STATE_NONE;
1873 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending queued element\n"); 1999 session->partner_incoming->ibf_bucket_counter = 0;
1874 qm = cpi->requests_and_elements_head;
1875 GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
1876 2000
1877 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size), 2001 /* maybe there's an early strata estimator? */
1878 GNUNET_TIME_UNIT_FOREVER_REL, 2002 if (NULL != session->partner_incoming->premature_strata_message)
1879 write_requests_and_elements, cpi); 2003 {
1880 GNUNET_assert (NULL != cpi->wh); 2004 session->partner_incoming->replaying_strata_message = GNUNET_YES;
1881 /* some elements / requests have queued up, we have to transmit them first */ 2005 handle_p2p_strata (session->partner_incoming, session->partner_incoming->premature_strata_message);
1882 return; 2006 GNUNET_free (session->partner_incoming->premature_strata_message);
2007 session->partner_incoming->replaying_strata_message = GNUNET_NO;
2008 }
1883 } 2009 }
1884 2010
1885 for (;;) 2011 if (NULL != session->partner_outgoing)
1886 { 2012 {
1887 int res; 2013 if (NULL == session->partner_outgoing->socket)
1888 res = ibf_decode (cpi->ibf, &side, &key);
1889 if (GNUNET_SYSERR == res)
1890 { 2014 {
1891 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); 2015 session->partner_outgoing->socket =
1892 /* decoding failed, we tell the other peer by sending our ibf with a larger order */ 2016 GNUNET_STREAM_open (cfg, &session->partner_outgoing->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
1893 cpi->ibf_order++; 2017 open_cb, session->partner_outgoing,
1894 prepare_ibf (cpi); 2018 GNUNET_STREAM_OPTION_END);
1895 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1896 cpi->ibf_state = IBF_STATE_TRANSMITTING;
1897 cpi->ibf_bucket_counter = 0;
1898 write_ibf (cls, status, size);
1899 return;
1900 } 2019 }
1901 if (GNUNET_NO == res) 2020 else if (GNUNET_YES == session->partner_outgoing->hello)
1902 { 2021 {
1903 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n"); 2022 send_strata_estimator (session->partner_outgoing);
1904 return;
1905 } 2023 }
1906 if (-1 == side) 2024 /* else: do nothing, the send hello cb will handle this */
1907 { 2025 }
1908 /* we have the element, send it to the other peer */
1909 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element\n");
1910 ibf_hashcode_from_key (key, &hashcode);
1911 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi);
1912 /* send the first message, because we can! */
1913 if (NULL != cpi->requests_and_elements_head)
1914 {
1915 struct QueuedMessage *qm;
1916 qm = cpi->requests_and_elements_head;
1917 GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
1918 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing element\n");
1919 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
1920 GNUNET_TIME_UNIT_FOREVER_REL,
1921 write_requests_and_elements, cpi);
1922 GNUNET_assert (NULL != cpi->wh);
1923 }
1924 else
1925 {
1926 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "no element found for decoded hash %s\n", GNUNET_h2s (&hashcode));
1927 }
1928 return;
1929 }
1930 else
1931 {
1932 struct ElementRequest *msg;
1933 size_t msize;
1934 struct IBF_Key *p;
1935 2026
1936 msize = (sizeof *msg) + sizeof (uint64_t); 2027 /*
1937 msg = GNUNET_malloc (msize); 2028 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
1938 if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) 2029 subround_over, session);
1939 { 2030 */
1940 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request for element\n");
1941 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1942 }
1943 else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round)
1944 {
1945 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending locally missing element\n");
1946 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL);
1947 }
1948 else
1949 {
1950 GNUNET_assert (0);
1951 }
1952 msg->header.size = htons (msize);
1953 p = (struct IBF_Key *) &msg[1];
1954 *p = key;
1955 2031
1956 cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1957 write_requests_and_elements, cpi);
1958 GNUNET_assert (NULL != cpi->wh);
1959 GNUNET_free (msg);
1960 return;
1961 }
1962 }
1963} 2032}
1964 2033
1965 2034static void
1966static double 2035contact_peer_a2a (struct ConsensusPeerInformation *cpi)
1967compute_similarity (struct ConsensusSession *session, int p1, int p2)
1968{ 2036{
1969 /* FIXME: simplistic dummy implementation, use real set union/intersecion */ 2037 cpi->is_outgoing = GNUNET_YES;
1970 return (session->info[p1].num_missing_local + session->info[p2].num_missing_local) / 2038 if (NULL == cpi->socket)
1971 ((double) (session->info[p1].num_missing_remote + session->info[p2].num_missing_remote + 1)); 2039 {
2040 cpi->socket = GNUNET_STREAM_open (cfg, &cpi->peer_id, GNUNET_APPLICATION_TYPE_CONSENSUS,
2041 open_cb, cpi, GNUNET_STREAM_OPTION_END);
2042 }
2043 else if (GNUNET_YES == cpi->hello)
2044 {
2045 send_strata_estimator (cpi);
2046 }
1972} 2047}
1973 2048
1974
1975static void 2049static void
1976select_fittest_group (struct ConsensusSession *session) 2050start_inventory (struct ConsensusSession *session)
1977{ 2051{
1978 /* simplistic implementation: compute the similarity with the latest strata estimator,
1979 * rank the results once */
1980 struct GNUNET_PeerIdentity *group;
1981 double rating[session->num_peers];
1982 struct GNUNET_CONSENSUS_ConcludeDoneMessage *done_msg;
1983 size_t msize;
1984 int i; 2052 int i;
1985 int j; 2053 int last;
1986 /* number of peers in the consensus group */
1987 int k;
1988
1989 k = ceil(session->num_peers / 3.0) * 2;
1990 group = GNUNET_malloc (k * sizeof *group);
1991 2054
1992 /* do strata subtraction */ 2055 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1993 /* FIXME: we know the real sets, subtract them! */ 2056 i = (session->local_peer_idx + 1) % session->num_peers;
1994 for (i = 0; i < session->num_peers; i++) 2057 while (i != last)
1995 { 2058 {
1996 rating[i] = 0; 2059 contact_peer_a2a (&session->info[i]);
1997 for (j = 0; j < i; j++) 2060 i = (i + 1) % session->num_peers;
1998 { 2061 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i);
1999 double sim;
2000 sim = compute_similarity (session, i, j);
2001 rating[i] += sim;
2002 rating[j] += sim;
2003 }
2004 } 2062 }
2005 for (i = 0; i < k; i++) 2063 // tie-breaker for even number of peers
2064 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
2006 { 2065 {
2007 int best_idx = 0; 2066 contact_peer_a2a (&session->info[last]);
2008 for (j = 1; j < session->num_peers; j++) 2067 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last);
2009 if (rating[j] > rating[best_idx])
2010 best_idx = j;
2011 rating[best_idx] = -1;
2012 group[i] = session->peers[best_idx];
2013 } 2068 }
2014
2015 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got group!\n");
2016
2017 msize = sizeof *done_msg + k * sizeof *group;
2018
2019 done_msg = GNUNET_malloc (msize);
2020 done_msg->header.size = htons (msize);
2021 done_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
2022 memcpy (&done_msg[1], group, k * sizeof *group);
2023
2024 queue_client_message (session, (struct GNUNET_MessageHeader *) done_msg);
2025 send_next (session);
2026} 2069}
2027 2070
2028 2071
2029/** 2072/**
2030 * Select and kick off the next round, based on the current round. 2073 * Select and kick off the next round, based on the current round.
2074 *
2031 * @param cls the session 2075 * @param cls the session
2032 * @param tc task context, for when this task is invoked by the scheduler, 2076 * @param tc task context, for when this task is invoked by the scheduler,
2033 * NULL if invoked for another reason 2077 * NULL if invoked for another reason
@@ -2045,61 +2089,50 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2045 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n"); 2089 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n");
2046 session = cls; 2090 session = cls;
2047 2091
2092 for (i = 0; i < session->num_peers; i++)
2093 clear_peer_messages (&session->info[i]);
2094
2048 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) 2095 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
2049 { 2096 {
2050 GNUNET_SCHEDULER_cancel (session->round_timeout_tid); 2097 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
2051 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; 2098 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
2052 } 2099 }
2053 2100
2054 for (i = 0; i < session->num_peers; i++) 2101 /* FIXME: cancel current round */
2055 {
2056 if ((NULL != session->info) && (NULL != session->info[i].wh))
2057 GNUNET_STREAM_write_cancel (session->info[i].wh);
2058 }
2059 2102
2060 switch (session->current_round) 2103 switch (session->current_round)
2061 { 2104 {
2062 case CONSENSUS_ROUND_BEGIN: 2105 case CONSENSUS_ROUND_BEGIN:
2063 { 2106 session->current_round = CONSENSUS_ROUND_EXCHANGE;
2064 session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE; 2107 session->exp_round = 0;
2065 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4), 2108 subround_over (session, NULL);
2066 round_over, session); 2109 break;
2067 for (i = 0; i < session->num_peers; i++) 2110 case CONSENSUS_ROUND_EXCHANGE:
2111 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "done for now\n");
2112
2113 if (0)
2068 { 2114 {
2069 /* we can only talk to hello'ed peers */ 2115 struct GNUNET_MessageHeader *msg;
2070 if ( (GNUNET_YES == session->info[i].is_outgoing) && 2116 msg = GNUNET_malloc (sizeof *msg);
2071 (GNUNET_YES == session->info[i].hello) ) 2117 msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
2072 { 2118 msg->size = htons (sizeof *msg);
2073 /* kick off transmitting strata by calling the write continuation */ 2119 queue_client_message (session, msg);
2074 write_strata (&session->info[i], GNUNET_STREAM_OK, 0); 2120 client_send_next (session);
2075 }
2076 } 2121 }
2077 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude started, timeout=%llu\n", session->conclude_timeout.rel_value); 2122
2078 break; 2123 if (0)
2079 }
2080 case CONSENSUS_ROUND_A2A_EXCHANGE:
2081 {
2082 session->current_round = CONSENSUS_ROUND_A2A_INVENTORY;
2083 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "starting inventory round\n");
2084 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4),
2085 round_over, session);
2086 for (i = 0; i < session->num_peers; i++)
2087 { 2124 {
2088 session->info[i].ibf_state = IBF_STATE_NONE; 2125 session->current_round = CONSENSUS_ROUND_INVENTORY;
2089 if ( (GNUNET_YES == session->info[i].is_outgoing) && 2126 start_inventory (session);
2090 (GNUNET_YES == session->info[i].hello) )
2091 {
2092 /* kick off transmitting strata by calling the write continuation */
2093 write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
2094 }
2095 } 2127 }
2096 break; 2128 break;
2097 } 2129 case CONSENSUS_ROUND_INVENTORY:
2098 case CONSENSUS_ROUND_A2A_INVENTORY: 2130 session->current_round = CONSENSUS_ROUND_STOCK;
2099 /* finally, we are done and select the most fitting group */ 2131 /* FIXME: exchange stock */
2100 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "protocol rounds done\n"); 2132 break;
2133 case CONSENSUS_ROUND_STOCK:
2101 session->current_round = CONSENSUS_ROUND_FINISH; 2134 session->current_round = CONSENSUS_ROUND_FINISH;
2102 select_fittest_group (session); 2135 /* FIXME: send elements to client */
2103 break; 2136 break;
2104 default: 2137 default:
2105 GNUNET_assert (0); 2138 GNUNET_assert (0);
@@ -2138,8 +2171,8 @@ client_conclude (void *cls,
2138 if (CONSENSUS_ROUND_BEGIN != session->current_round) 2171 if (CONSENSUS_ROUND_BEGIN != session->current_round)
2139 { 2172 {
2140 /* client requested conclude twice */ 2173 /* client requested conclude twice */
2141 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "unexpected round at conclude: %d\n", session->current_round);
2142 GNUNET_break (0); 2174 GNUNET_break (0);
2175 /* client may still own a session, destroy it */
2143 disconnect_client (client); 2176 disconnect_client (client);
2144 return; 2177 return;
2145 } 2178 }
@@ -2150,7 +2183,7 @@ client_conclude (void *cls,
2150 round_over (session, NULL); 2183 round_over (session, NULL);
2151 2184
2152 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2185 GNUNET_SERVER_receive_done (client, GNUNET_OK);
2153 send_next (session); 2186 client_send_next (session);
2154} 2187}
2155 2188
2156 2189
@@ -2186,9 +2219,9 @@ client_ack (void *cls,
2186 return; 2219 return;
2187 } 2220 }
2188 2221
2189 pending = session->approval_pending_head; 2222 pending = session->client_approval_head;
2190 2223
2191 GNUNET_CONTAINER_DLL_remove (session->approval_pending_head, session->approval_pending_tail, pending); 2224 GNUNET_CONTAINER_DLL_remove (session->client_approval_head, session->client_approval_tail, pending);
2192 2225
2193 msg = (struct GNUNET_CONSENSUS_AckMessage *) message; 2226 msg = (struct GNUNET_CONSENSUS_AckMessage *) message;
2194 2227
@@ -2196,11 +2229,12 @@ client_ack (void *cls,
2196 { 2229 {
2197 int i; 2230 int i;
2198 element = pending->element; 2231 element = pending->element;
2199 hash_for_ibf (element, element->size, &key); 2232 hash_for_ibf (element->data, element->size, &key);
2200 2233
2201 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, 2234 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
2202 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 2235 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2203 strata_insert (session->strata, &key); 2236 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n");
2237 strata_estimator_insert (session->se, &key);
2204 2238
2205 for (i = 0; i <= MAX_IBF_ORDER; i++) 2239 for (i = 0; i <= MAX_IBF_ORDER; i++)
2206 { 2240 {
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c
index d3218ff9b..5246b63a6 100644
--- a/src/consensus/ibf.c
+++ b/src/consensus/ibf.c
@@ -92,19 +92,20 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf,
92 struct IBF_Key key, int *dst) 92 struct IBF_Key key, int *dst)
93{ 93{
94 struct GNUNET_HashCode bucket_indices; 94 struct GNUNET_HashCode bucket_indices;
95 unsigned int filled = 0; 95 unsigned int filled;
96 int i; 96 int i;
97 GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices); 97 GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices);
98 filled = 0;
98 for (i = 0; filled < ibf->hash_num; i++) 99 for (i = 0; filled < ibf->hash_num; i++)
99 { 100 {
100 unsigned int bucket; 101 unsigned int bucket;
101 unsigned int j; 102 unsigned int j;
102 if ( (0 != i) && (0 == (i % 16)) ) 103 if ( (0 != i) && (0 == (i % 16)) )
103 GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), &bucket_indices); 104 GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), &bucket_indices);
104 bucket = bucket_indices.bits[i] % ibf->size; 105 bucket = bucket_indices.bits[i % 16] % ibf->size;
105 for (j = 0; j < filled; j++) 106 for (j = 0; j < filled; j++)
106 if (dst[j] == bucket) 107 if (dst[j] == bucket)
107 goto try_next;; 108 goto try_next;
108 dst[filled++] = bucket; 109 dst[filled++] = bucket;
109 try_next: ; 110 try_next: ;
110 } 111 }
@@ -141,6 +142,7 @@ void
141ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key) 142ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
142{ 143{
143 int buckets[ibf->hash_num]; 144 int buckets[ibf->hash_num];
145 GNUNET_assert (ibf->hash_num <= ibf->size);
144 ibf_get_indices (ibf, key, buckets); 146 ibf_get_indices (ibf, key, buckets);
145 ibf_insert_into (ibf, key, buckets, 1); 147 ibf_insert_into (ibf, key, buckets, 1);
146} 148}