diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-02-07 10:51:17 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-02-07 10:51:17 +0000 |
commit | 4dc3faf5e88b8ca602602aa28a6ff76c02d34848 (patch) | |
tree | d50989d9fe71816bba1fdcef690684daa4f7873b /src/consensus/gnunet-service-consensus.c | |
parent | 5ceb669384bd36b3213f7f1e7dbaf31b913e06a0 (diff) | |
download | gnunet-4dc3faf5e88b8ca602602aa28a6ff76c02d34848.tar.gz gnunet-4dc3faf5e88b8ca602602aa28a6ff76c02d34848.zip |
- improved ibf decoding algorithm
- strata estimator now fits in one message
- work on consensus, not quite working yet
Diffstat (limited to 'src/consensus/gnunet-service-consensus.c')
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 546 |
1 files changed, 345 insertions, 201 deletions
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 1cbb9d021..d223360dc 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -18,7 +18,6 @@ | |||
18 | Boston, MA 02111-1307, USA. | 18 | Boston, MA 02111-1307, USA. |
19 | */ | 19 | */ |
20 | 20 | ||
21 | |||
22 | /** | 21 | /** |
23 | * @file consensus/gnunet-service-consensus.c | 22 | * @file consensus/gnunet-service-consensus.c |
24 | * @brief | 23 | * @brief |
@@ -47,17 +46,21 @@ | |||
47 | */ | 46 | */ |
48 | #define STRATA_IBF_BUCKETS 80 | 47 | #define STRATA_IBF_BUCKETS 80 |
49 | /** | 48 | /** |
50 | * hash num parameter of the IBF | 49 | * hash num parameter for the difference digests and strata estimators |
51 | */ | 50 | */ |
52 | #define STRATA_HASH_NUM 3 | 51 | #define STRATA_HASH_NUM 3 |
52 | |||
53 | /** | 53 | /** |
54 | * Number of strata that can be transmitted in one message. | 54 | * Number of buckets that can be transmitted in one message. |
55 | */ | 55 | */ |
56 | #define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS)) | ||
57 | |||
58 | #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) | 56 | #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) |
59 | 57 | ||
60 | #define MAX_IBF_ORDER (64) | 58 | /** |
59 | * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). | ||
60 | * Choose this value so that computing the IBF is still cheaper | ||
61 | * than transmitting all values. | ||
62 | */ | ||
63 | #define MAX_IBF_ORDER (32) | ||
61 | 64 | ||
62 | 65 | ||
63 | /* forward declarations */ | 66 | /* forward declarations */ |
@@ -76,14 +79,14 @@ static void | |||
76 | write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size); | 79 | write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size); |
77 | 80 | ||
78 | static void | 81 | static void |
79 | write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size); | 82 | write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size); |
80 | 83 | ||
81 | static int | 84 | static int |
82 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); | 85 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); |
83 | 86 | ||
84 | 87 | ||
85 | /** | 88 | /** |
86 | * An element that is waiting to be transmitted to a client. | 89 | * An element that is waiting to be transmitted. |
87 | */ | 90 | */ |
88 | struct PendingElement | 91 | struct PendingElement |
89 | { | 92 | { |
@@ -106,6 +109,9 @@ struct PendingElement | |||
106 | struct ConsensusPeerInformation *cpi; | 109 | struct ConsensusPeerInformation *cpi; |
107 | }; | 110 | }; |
108 | 111 | ||
112 | /** | ||
113 | * Information about a peer that is in a consensus session. | ||
114 | */ | ||
109 | struct ConsensusPeerInformation | 115 | struct ConsensusPeerInformation |
110 | { | 116 | { |
111 | struct GNUNET_STREAM_Socket *socket; | 117 | struct GNUNET_STREAM_Socket *socket; |
@@ -123,6 +129,12 @@ struct ConsensusPeerInformation | |||
123 | int is_outgoing; | 129 | int is_outgoing; |
124 | 130 | ||
125 | /** | 131 | /** |
132 | * if the peer did something wrong, and was disconnected, | ||
133 | * never interact with this peer again. | ||
134 | */ | ||
135 | int is_bad; | ||
136 | |||
137 | /** | ||
126 | * Did we receive/send a consensus hello? | 138 | * Did we receive/send a consensus hello? |
127 | */ | 139 | */ |
128 | int hello; | 140 | int hello; |
@@ -137,27 +149,29 @@ struct ConsensusPeerInformation | |||
137 | */ | 149 | */ |
138 | struct GNUNET_STREAM_WriteHandle *wh; | 150 | struct GNUNET_STREAM_WriteHandle *wh; |
139 | 151 | ||
152 | enum { | ||
153 | IBF_STATE_NONE, | ||
154 | IBF_STATE_RECEIVING, | ||
155 | IBF_STATE_TRANSMITTING, | ||
156 | IBF_STATE_DECODING | ||
157 | } ibf_state ; | ||
158 | |||
140 | /** | 159 | /** |
141 | * How many of the strate in the ibf were | 160 | * What is the order (=log2 size) of the ibf |
142 | * sent or received in this round? | 161 | * we're currently dealing with? |
143 | */ | 162 | */ |
144 | int strata_counter; | ||
145 | |||
146 | int ibf_order; | 163 | int ibf_order; |
147 | 164 | ||
148 | struct InvertibleBloomFilter *outgoing_ibf; | 165 | /** |
149 | 166 | * The current IBF for this peer, | |
150 | int outgoing_bucket_counter; | 167 | * purpose dependent on ibf_state |
151 | 168 | */ | |
152 | struct InvertibleBloomFilter *incoming_ibf; | 169 | struct InvertibleBloomFilter *ibf; |
153 | |||
154 | int incoming_bucket_counter; | ||
155 | 170 | ||
156 | /** | 171 | /** |
157 | * NULL or incoming_ibf - outgoing_ibf. | 172 | * How many buckets have we transmitted/received (depending on state)? |
158 | * Decoded values of side '1' are to be requested from the the peer. | ||
159 | */ | 173 | */ |
160 | struct InvertibleBloomFilter *diff_ibf; | 174 | int ibf_bucket_counter; |
161 | 175 | ||
162 | /** | 176 | /** |
163 | * Strata estimator of the peer, NULL if our peer | 177 | * Strata estimator of the peer, NULL if our peer |
@@ -165,11 +179,21 @@ struct ConsensusPeerInformation | |||
165 | */ | 179 | */ |
166 | struct InvertibleBloomFilter **strata; | 180 | struct InvertibleBloomFilter **strata; |
167 | 181 | ||
182 | /** | ||
183 | * difference estimated with the current strata estimator | ||
184 | */ | ||
168 | unsigned int diff; | 185 | unsigned int diff; |
169 | 186 | ||
170 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | 187 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; |
171 | 188 | ||
189 | /** | ||
190 | * Back-reference to the consensus session, | ||
191 | * to that ConsensusPeerInformation can be used as a closure | ||
192 | */ | ||
172 | struct ConsensusSession *session; | 193 | struct ConsensusSession *session; |
194 | |||
195 | struct PendingElement *send_pending_head; | ||
196 | struct PendingElement *send_pending_tail; | ||
173 | }; | 197 | }; |
174 | 198 | ||
175 | struct QueuedMessage | 199 | struct QueuedMessage |
@@ -187,9 +211,31 @@ struct QueuedMessage | |||
187 | struct QueuedMessage *prev; | 211 | struct QueuedMessage *prev; |
188 | }; | 212 | }; |
189 | 213 | ||
214 | enum ConsensusRound | ||
215 | { | ||
216 | /** | ||
217 | * distribution of information with the exponential scheme | ||
218 | */ | ||
219 | CONSENSUS_ROUND_EXP_EXCHANGE, | ||
220 | /** | ||
221 | * All-to-all, exchange missing values | ||
222 | */ | ||
223 | CONSENSUS_ROUND_A2A_EXCHANGE, | ||
224 | /** | ||
225 | * All-to-all, check what values are missing, don't exchange anything | ||
226 | */ | ||
227 | CONSENSUS_ROUND_A2A_INVENTORY | ||
228 | |||
229 | /* | ||
230 | a round to exchange the information for fraud-detection | ||
231 | CONSENSUS_ROUNT_A2_INVENTORY_AGREEMENT | ||
232 | */ | ||
233 | }; | ||
234 | |||
190 | 235 | ||
191 | /** | 236 | /** |
192 | * A consensus session consists of one local client and the remote authorities. | 237 | * A consensus session consists of one local client and the remote authorities. |
238 | * | ||
193 | */ | 239 | */ |
194 | struct ConsensusSession | 240 | struct ConsensusSession |
195 | { | 241 | { |
@@ -225,6 +271,7 @@ struct ConsensusSession | |||
225 | /** | 271 | /** |
226 | * Values in the consensus set of this session, | 272 | * Values in the consensus set of this session, |
227 | * all of them either have been sent by or approved by the client. | 273 | * all of them either have been sent by or approved by the client. |
274 | * Contains GNUNET_CONSENSUS_Element. | ||
228 | */ | 275 | */ |
229 | struct GNUNET_CONTAINER_MultiHashMap *values; | 276 | struct GNUNET_CONTAINER_MultiHashMap *values; |
230 | 277 | ||
@@ -238,8 +285,14 @@ struct ConsensusSession | |||
238 | */ | 285 | */ |
239 | struct PendingElement *approval_pending_tail; | 286 | struct PendingElement *approval_pending_tail; |
240 | 287 | ||
288 | /** | ||
289 | * Messages to be sent to the local client that owns this session | ||
290 | */ | ||
241 | struct QueuedMessage *client_messages_head; | 291 | struct QueuedMessage *client_messages_head; |
242 | 292 | ||
293 | /** | ||
294 | * Messages to be sent to the local client that owns this session | ||
295 | */ | ||
243 | struct QueuedMessage *client_messages_tail; | 296 | struct QueuedMessage *client_messages_tail; |
244 | 297 | ||
245 | /** | 298 | /** |
@@ -259,22 +312,14 @@ struct ConsensusSession | |||
259 | int conclude_group_min; | 312 | int conclude_group_min; |
260 | 313 | ||
261 | /** | 314 | /** |
262 | * Current round of the conclusion | ||
263 | */ | ||
264 | int current_round; | ||
265 | |||
266 | /** | ||
267 | * Soft deadline for conclude. | ||
268 | * Speed up the speed of the consensus at the cost of consensus quality, as | ||
269 | * the time approached or crosses the deadline. | ||
270 | */ | ||
271 | struct GNUNET_TIME_Absolute conclude_deadline; | ||
272 | |||
273 | /** | ||
274 | * Number of other peers in the consensus | 315 | * Number of other peers in the consensus |
275 | */ | 316 | */ |
276 | unsigned int num_peers; | 317 | unsigned int num_peers; |
277 | 318 | ||
319 | /** | ||
320 | * Information about the other peers, | ||
321 | * their state, etc. | ||
322 | */ | ||
278 | struct ConsensusPeerInformation *info; | 323 | struct ConsensusPeerInformation *info; |
279 | 324 | ||
280 | /** | 325 | /** |
@@ -289,13 +334,19 @@ struct ConsensusSession | |||
289 | int local_peer_idx; | 334 | int local_peer_idx; |
290 | 335 | ||
291 | /** | 336 | /** |
292 | * Task identifier for the round timeout task | 337 | * Strata estimator, computed online |
293 | */ | 338 | */ |
294 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; | ||
295 | |||
296 | struct InvertibleBloomFilter **strata; | 339 | struct InvertibleBloomFilter **strata; |
297 | 340 | ||
341 | /** | ||
342 | * Pre-computed IBFs | ||
343 | */ | ||
298 | struct InvertibleBloomFilter **ibfs; | 344 | struct InvertibleBloomFilter **ibfs; |
345 | |||
346 | /** | ||
347 | * Current round | ||
348 | */ | ||
349 | enum ConsensusRound current_round; | ||
299 | }; | 350 | }; |
300 | 351 | ||
301 | 352 | ||
@@ -339,6 +390,14 @@ struct IncomingSocket | |||
339 | * Peer-in-session this socket belongs to, once known, otherwise NULL. | 390 | * Peer-in-session this socket belongs to, once known, otherwise NULL. |
340 | */ | 391 | */ |
341 | struct ConsensusPeerInformation *cpi; | 392 | struct ConsensusPeerInformation *cpi; |
393 | |||
394 | /** | ||
395 | * Set to the global session id, if the peer sent us a hello-message, | ||
396 | * but the session does not exist yet. | ||
397 | * | ||
398 | * FIXME: not implemented yet | ||
399 | */ | ||
400 | struct GNUNET_HashCode *requested_gid; | ||
342 | }; | 401 | }; |
343 | 402 | ||
344 | static struct IncomingSocket *incoming_sockets_head; | 403 | static struct IncomingSocket *incoming_sockets_head; |
@@ -380,6 +439,12 @@ static struct GNUNET_CORE_Handle *core; | |||
380 | static struct GNUNET_STREAM_ListenSocket *listener; | 439 | static struct GNUNET_STREAM_ListenSocket *listener; |
381 | 440 | ||
382 | 441 | ||
442 | /** | ||
443 | * Queue a message to be sent to the inhabiting client of a sessino | ||
444 | * | ||
445 | * @param session session | ||
446 | * @param msg message we want to queue | ||
447 | */ | ||
383 | static void | 448 | static void |
384 | queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) | 449 | queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) |
385 | { | 450 | { |
@@ -389,7 +454,32 @@ queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHea | |||
389 | GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); | 454 | GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); |
390 | } | 455 | } |
391 | 456 | ||
457 | /** | ||
458 | * Get peer index associated with the peer information, | ||
459 | * unique for every session among all peers. | ||
460 | */ | ||
461 | static int | ||
462 | get_cpi_index (struct ConsensusPeerInformation *cpi) | ||
463 | { | ||
464 | return cpi - cpi->session->info; | ||
465 | } | ||
466 | |||
467 | /** | ||
468 | * Mark the peer as bad, free as state we don't need anymore. | ||
469 | */ | ||
470 | static void | ||
471 | mark_peer_bad (struct ConsensusPeerInformation *cpi) | ||
472 | { | ||
473 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer #%u marked as bad\n", get_cpi_index (cpi)); | ||
474 | cpi->is_bad = GNUNET_YES; | ||
475 | /* FIXME: free ibfs etc. */ | ||
476 | } | ||
477 | |||
392 | 478 | ||
479 | /** | ||
480 | * Estimate set difference with two strata estimators, | ||
481 | * i.e. arrays of IBFs. | ||
482 | */ | ||
393 | static int | 483 | static int |
394 | estimate_difference (struct InvertibleBloomFilter** strata1, | 484 | estimate_difference (struct InvertibleBloomFilter** strata1, |
395 | struct InvertibleBloomFilter** strata2) | 485 | struct InvertibleBloomFilter** strata2) |
@@ -415,6 +505,7 @@ estimate_difference (struct InvertibleBloomFilter** strata1, | |||
415 | } | 505 | } |
416 | if (GNUNET_SYSERR == more) | 506 | if (GNUNET_SYSERR == more) |
417 | { | 507 | { |
508 | ibf_destroy (diff); | ||
418 | return count * (1 << (i + 1)); | 509 | return count * (1 << (i + 1)); |
419 | } | 510 | } |
420 | ibf_count++; | 511 | ibf_count++; |
@@ -425,10 +516,9 @@ estimate_difference (struct InvertibleBloomFilter** strata1, | |||
425 | } | 516 | } |
426 | 517 | ||
427 | 518 | ||
428 | |||
429 | /** | 519 | /** |
430 | * Functions of this signature are called whenever data is available from the | 520 | * Called when receiving data from a peer that is member of |
431 | * stream. | 521 | * an inhabited consensus session. |
432 | * | 522 | * |
433 | * @param cls the closure from GNUNET_STREAM_read | 523 | * @param cls the closure from GNUNET_STREAM_read |
434 | * @param status the status of the stream at the time this function is called | 524 | * @param status the status of the stream at the time this function is called |
@@ -468,8 +558,8 @@ session_stream_data_processor (void *cls, | |||
468 | } | 558 | } |
469 | 559 | ||
470 | /** | 560 | /** |
471 | * Functions of this signature are called whenever data is available from the | 561 | * Called when we receive data from a peer that is not member of |
472 | * stream. | 562 | * a session yet, or the session is not yet inhabited. |
473 | * | 563 | * |
474 | * @param cls the closure from GNUNET_STREAM_read | 564 | * @param cls the closure from GNUNET_STREAM_read |
475 | * @param status the status of the stream at the time this function is called | 565 | * @param status the status of the stream at the time this function is called |
@@ -508,7 +598,7 @@ incoming_stream_data_processor (void *cls, | |||
508 | 598 | ||
509 | 599 | ||
510 | /** | 600 | /** |
511 | * Iterator over hash map entries. | 601 | * Iterator to insert values into an ibf. |
512 | * | 602 | * |
513 | * @param cls closure | 603 | * @param cls closure |
514 | * @param key current key code | 604 | * @param key current key code |
@@ -524,28 +614,34 @@ ibf_values_iterator (void *cls, | |||
524 | { | 614 | { |
525 | struct ConsensusPeerInformation *cpi; | 615 | struct ConsensusPeerInformation *cpi; |
526 | cpi = cls; | 616 | cpi = cls; |
527 | ibf_insert (cpi->session->ibfs[cpi->ibf_order], key); | 617 | ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key_from_hashcode (key)); |
528 | return GNUNET_YES; | 618 | return GNUNET_YES; |
529 | } | 619 | } |
530 | 620 | ||
531 | |||
532 | static void | 621 | static void |
533 | create_outgoing_ibf (struct ConsensusPeerInformation *cpi) | 622 | prepare_ibf (struct ConsensusPeerInformation *cpi) |
534 | { | 623 | { |
535 | if (NULL == cpi->session->ibfs[cpi->ibf_order]) | 624 | if (NULL == cpi->session->ibfs[cpi->ibf_order]) |
536 | { | 625 | { |
537 | cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); | 626 | cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); |
538 | GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); | 627 | GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); |
539 | } | 628 | } |
540 | cpi->outgoing_ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
541 | } | 629 | } |
542 | 630 | ||
631 | |||
632 | /** | ||
633 | * Called when a peer sends us its strata estimator. | ||
634 | * In response, we sent out IBF of appropriate size back. | ||
635 | * | ||
636 | * @param cpi session | ||
637 | * @param strata_msg message | ||
638 | */ | ||
543 | static int | 639 | static int |
544 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) | 640 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) |
545 | { | 641 | { |
546 | int i; | 642 | int i; |
547 | int num_strata; | 643 | uint64_t *key_src; |
548 | struct GNUNET_HashCode *hash_src; | 644 | uint32_t *hash_src; |
549 | uint8_t *count_src; | 645 | uint8_t *count_src; |
550 | 646 | ||
551 | GNUNET_assert (GNUNET_NO == cpi->is_outgoing); | 647 | GNUNET_assert (GNUNET_NO == cpi->is_outgoing); |
@@ -557,49 +653,46 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
557 | cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); | 653 | cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); |
558 | } | 654 | } |
559 | 655 | ||
560 | num_strata = ntohs (strata_msg->num_strata); | ||
561 | |||
562 | /* for correct message alignment, copy bucket types seperately */ | 656 | /* for correct message alignment, copy bucket types seperately */ |
563 | hash_src = (struct GNUNET_HashCode *) &strata_msg[1]; | 657 | key_src = (uint64_t *) &strata_msg[1]; |
564 | 658 | ||
565 | for (i = 0; i < num_strata; i++) | 659 | for (i = 0; i < STRATA_COUNT; i++) |
566 | { | 660 | { |
567 | memcpy (cpi->strata[cpi->strata_counter+i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); | 661 | memcpy (cpi->strata[i]->id_sum, key_src, STRATA_IBF_BUCKETS * sizeof *key_src); |
568 | hash_src += STRATA_IBF_BUCKETS; | 662 | key_src += STRATA_IBF_BUCKETS; |
569 | } | 663 | } |
570 | 664 | ||
571 | for (i = 0; i < num_strata; i++) | 665 | hash_src = (uint32_t *) key_src; |
666 | |||
667 | for (i = 0; i < STRATA_COUNT; i++) | ||
572 | { | 668 | { |
573 | memcpy (cpi->strata[cpi->strata_counter+i]->id_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); | 669 | memcpy (cpi->strata[i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); |
574 | hash_src += STRATA_IBF_BUCKETS; | 670 | hash_src += STRATA_IBF_BUCKETS; |
575 | } | 671 | } |
576 | 672 | ||
577 | count_src = (uint8_t *) hash_src; | 673 | count_src = (uint8_t *) hash_src; |
578 | 674 | ||
579 | for (i = 0; i < num_strata; i++) | 675 | for (i = 0; i < STRATA_COUNT; i++) |
580 | { | 676 | { |
581 | memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS); | 677 | memcpy (cpi->strata[i]->count, count_src, STRATA_IBF_BUCKETS); |
582 | count_src += STRATA_IBF_BUCKETS; | 678 | count_src += STRATA_IBF_BUCKETS; |
583 | } | 679 | } |
584 | 680 | ||
585 | GNUNET_assert (count_src == (((uint8_t *) &strata_msg[1]) + STRATA_IBF_BUCKETS * num_strata * IBF_BUCKET_SIZE)); | 681 | cpi->diff = estimate_difference (cpi->session->strata, cpi->strata); |
586 | 682 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff); | |
587 | cpi->strata_counter += num_strata; | 683 | |
588 | 684 | /* send IBF of the right size */ | |
589 | if (STRATA_COUNT == cpi->strata_counter) | 685 | cpi->ibf_order = 0; |
590 | { | 686 | while ((1 << cpi->ibf_order) < cpi->diff) |
591 | 687 | cpi->ibf_order++; | |
592 | cpi->diff = estimate_difference (cpi->session->strata, cpi->strata); | 688 | if (cpi->ibf_order > MAX_IBF_ORDER) |
593 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff); | 689 | cpi->ibf_order = MAX_IBF_ORDER; |
594 | cpi->ibf_order = 0; | 690 | cpi->ibf_order += 2; |
595 | while ((1 << cpi->ibf_order) < cpi->diff) | 691 | /* create ibf if not already pre-computed */ |
596 | cpi->ibf_order++; | 692 | prepare_ibf (cpi); |
597 | if (cpi->ibf_order > MAX_IBF_ORDER) | 693 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); |
598 | cpi->ibf_order = MAX_IBF_ORDER; | 694 | cpi->ibf_state = IBF_STATE_TRANSMITTING; |
599 | cpi->ibf_order += 2; | 695 | write_ibf (cpi, GNUNET_STREAM_OK, 0); |
600 | create_outgoing_ibf (cpi); | ||
601 | write_ibf (cpi, GNUNET_STREAM_OK, 0); | ||
602 | } | ||
603 | 696 | ||
604 | return GNUNET_YES; | 697 | return GNUNET_YES; |
605 | } | 698 | } |
@@ -608,59 +701,57 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
608 | static int | 701 | static int |
609 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) | 702 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) |
610 | { | 703 | { |
611 | struct GNUNET_HashCode *hash_src; | ||
612 | int num_buckets; | 704 | int num_buckets; |
705 | uint64_t *key_src; | ||
706 | uint32_t *hash_src; | ||
613 | uint8_t *count_src; | 707 | uint8_t *count_src; |
614 | 708 | ||
615 | num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; | 709 | num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; |
616 | 710 | ||
617 | if (cpi->is_outgoing == GNUNET_YES) | 711 | if (IBF_STATE_NONE == cpi->ibf_state) |
618 | { | 712 | { |
619 | /* we receive the ibf as an initiator, thus we're interested in the order */ | 713 | cpi->ibf_state = IBF_STATE_RECEIVING; |
620 | cpi->ibf_order = digest->order; | 714 | cpi->ibf_order = digest->order; |
621 | if ((0 == cpi->outgoing_bucket_counter) && (NULL == cpi->wh)) | 715 | cpi->ibf_bucket_counter = 0; |
622 | { | ||
623 | create_outgoing_ibf (cpi); | ||
624 | write_ibf (cpi, GNUNET_STREAM_OK, 0); | ||
625 | } | ||
626 | /* FIXME: ensure that orders do not differ each time */ | ||
627 | } | 716 | } |
628 | else | 717 | |
718 | if ( (IBF_STATE_RECEIVING != cpi->ibf_state) || | ||
719 | (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) ) | ||
629 | { | 720 | { |
630 | /* FIXME: handle correctly */ | 721 | mark_peer_bad (cpi); |
631 | GNUNET_assert (cpi->ibf_order == digest->order); | 722 | return GNUNET_NO; |
632 | } | 723 | } |
633 | 724 | ||
634 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->incoming_bucket_counter, (1 << cpi->ibf_order)); | ||
635 | 725 | ||
636 | if (cpi->incoming_bucket_counter + num_buckets > (1 << cpi->ibf_order)) | 726 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->ibf_bucket_counter, (1 << cpi->ibf_order)); |
637 | { | ||
638 | /* TODO: handle this */ | ||
639 | GNUNET_assert (0); | ||
640 | } | ||
641 | 727 | ||
642 | if (NULL == cpi->incoming_ibf) | 728 | if (NULL == cpi->ibf) |
643 | cpi->incoming_ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); | 729 | cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); |
644 | 730 | ||
645 | hash_src = (struct GNUNET_HashCode *) &digest[1]; | 731 | key_src = (uint64_t *) &digest[1]; |
646 | 732 | ||
647 | memcpy (cpi->incoming_ibf->hash_sum, hash_src, num_buckets * sizeof *hash_src); | 733 | memcpy (cpi->ibf->hash_sum, key_src, num_buckets * sizeof *key_src); |
648 | hash_src += num_buckets; | 734 | hash_src += num_buckets; |
649 | 735 | ||
650 | memcpy (cpi->incoming_ibf->id_sum, hash_src, num_buckets * sizeof *hash_src); | 736 | hash_src = (uint32_t *) key_src; |
737 | |||
738 | memcpy (cpi->ibf->id_sum, hash_src, num_buckets * sizeof *hash_src); | ||
651 | hash_src += num_buckets; | 739 | hash_src += num_buckets; |
652 | 740 | ||
653 | count_src = (uint8_t *) hash_src; | 741 | count_src = (uint8_t *) hash_src; |
654 | 742 | ||
655 | memcpy (cpi->incoming_ibf->count, count_src, num_buckets * sizeof *count_src); | 743 | memcpy (cpi->ibf->count, count_src, num_buckets * sizeof *count_src); |
656 | 744 | ||
657 | cpi->incoming_bucket_counter += num_buckets; | 745 | cpi->ibf_bucket_counter += num_buckets; |
658 | 746 | ||
659 | if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order)) | 747 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) |
660 | { | 748 | { |
661 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); | 749 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); |
662 | if ((NULL == cpi->wh) && (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order))) | 750 | GNUNET_assert (NULL != cpi->wh); |
663 | write_values (cpi, GNUNET_STREAM_OK, 0); | 751 | cpi->ibf_state = IBF_STATE_DECODING; |
752 | prepare_ibf (cpi); | ||
753 | ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); | ||
754 | write_requests_and_elements (cpi, GNUNET_STREAM_OK, 0); | ||
664 | } | 755 | } |
665 | return GNUNET_YES; | 756 | return GNUNET_YES; |
666 | } | 757 | } |
@@ -702,10 +793,28 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me | |||
702 | } | 793 | } |
703 | 794 | ||
704 | 795 | ||
796 | /** | ||
797 | * Handle a request for elements. | ||
798 | * Only allowed in exchange-rounds. | ||
799 | * | ||
800 | * FIXME: implement | ||
801 | */ | ||
802 | static int | ||
803 | handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) | ||
804 | { | ||
805 | /* FIXME: implement */ | ||
806 | return GNUNET_YES; | ||
807 | } | ||
808 | |||
809 | |||
810 | /** | ||
811 | * Handle a HELLO-message, send when another peer wants to join a session where | ||
812 | * our peer is a member. The session may or may not be inhabited yet. | ||
813 | */ | ||
705 | static int | 814 | static int |
706 | handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) | 815 | handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) |
707 | { | 816 | { |
708 | /* FIXME: session might not exist yet */ | 817 | /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */ |
709 | struct ConsensusSession *session; | 818 | struct ConsensusSession *session; |
710 | session = sessions_head; | 819 | session = sessions_head; |
711 | while (NULL != session) | 820 | while (NULL != session) |
@@ -726,7 +835,8 @@ handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello | |||
726 | } | 835 | } |
727 | session = session->next; | 836 | session = session->next; |
728 | } | 837 | } |
729 | GNUNET_assert (0); | 838 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer tried to HELLO uninhabited session\n"); |
839 | GNUNET_break (0); | ||
730 | return GNUNET_NO; | 840 | return GNUNET_NO; |
731 | } | 841 | } |
732 | 842 | ||
@@ -756,6 +866,8 @@ mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader | |||
756 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); | 866 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); |
757 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: | 867 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: |
758 | return handle_p2p_element (cpi, message); | 868 | return handle_p2p_element (cpi, message); |
869 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: | ||
870 | return handle_p2p_element_request (cpi, (struct ElementRequest *) message); | ||
759 | default: | 871 | default: |
760 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type)); | 872 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type)); |
761 | /* FIXME: handle correctly */ | 873 | /* FIXME: handle correctly */ |
@@ -1007,6 +1119,9 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess | |||
1007 | 1119 | ||
1008 | 1120 | ||
1009 | 1121 | ||
1122 | /** | ||
1123 | * Called when stream has finishes writing the hello message | ||
1124 | */ | ||
1010 | static void | 1125 | static void |
1011 | hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 1126 | hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
1012 | { | 1127 | { |
@@ -1067,6 +1182,8 @@ initialize_session_info (struct ConsensusSession *session) | |||
1067 | session->info[i].session = session; | 1182 | session->info[i].session = session; |
1068 | } | 1183 | } |
1069 | 1184 | ||
1185 | session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE; | ||
1186 | |||
1070 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; | 1187 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; |
1071 | i = (session->local_peer_idx + 1) % session->num_peers; | 1188 | i = (session->local_peer_idx + 1) % session->num_peers; |
1072 | while (i != last) | 1189 | while (i != last) |
@@ -1143,8 +1260,9 @@ strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *ke | |||
1143 | int i; | 1260 | int i; |
1144 | v = key->bits[0]; | 1261 | v = key->bits[0]; |
1145 | /* count trailing '1'-bits of v */ | 1262 | /* count trailing '1'-bits of v */ |
1146 | for (i = 0; v & 1; v>>=1, i++); | 1263 | for (i = 0; v & 1; v>>=1, i++) |
1147 | ibf_insert (strata[i], key); | 1264 | /* empty */; |
1265 | ibf_insert (strata[i], ibf_key_from_hashcode (key)); | ||
1148 | } | 1266 | } |
1149 | 1267 | ||
1150 | 1268 | ||
@@ -1309,6 +1427,29 @@ client_insert (void *cls, | |||
1309 | 1427 | ||
1310 | 1428 | ||
1311 | 1429 | ||
1430 | |||
1431 | /** | ||
1432 | * Functions of this signature are called whenever writing operations | ||
1433 | * on a stream are executed | ||
1434 | * | ||
1435 | * @param cls the closure from GNUNET_STREAM_write | ||
1436 | * @param status the status of the stream at the time this function is called; | ||
1437 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
1438 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
1439 | * (this doesn't mean that the data is never sent, the receiver may | ||
1440 | * have read the data but its ACKs may have been lost); | ||
1441 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
1442 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
1443 | * be processed. | ||
1444 | * @param size the number of bytes written | ||
1445 | */ | ||
1446 | static void | ||
1447 | write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
1448 | { | ||
1449 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1450 | /* just wait for the ibf */ | ||
1451 | } | ||
1452 | |||
1312 | /** | 1453 | /** |
1313 | * Functions of this signature are called whenever writing operations | 1454 | * Functions of this signature are called whenever writing operations |
1314 | * on a stream are executed | 1455 | * on a stream are executed |
@@ -1331,65 +1472,53 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1331 | struct StrataMessage *strata_msg; | 1472 | struct StrataMessage *strata_msg; |
1332 | size_t msize; | 1473 | size_t msize; |
1333 | int i; | 1474 | int i; |
1334 | struct GNUNET_HashCode *hash_dst; | 1475 | uint64_t *key_dst; |
1476 | uint32_t *hash_dst; | ||
1335 | uint8_t *count_dst; | 1477 | uint8_t *count_dst; |
1336 | int num_strata; | ||
1337 | 1478 | ||
1338 | cpi = cls; | 1479 | cpi = cls; |
1339 | cpi->wh = NULL; | 1480 | cpi->wh = NULL; |
1340 | 1481 | ||
1482 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1483 | |||
1341 | GNUNET_assert (GNUNET_YES == cpi->is_outgoing); | 1484 | GNUNET_assert (GNUNET_YES == cpi->is_outgoing); |
1342 | 1485 | ||
1343 | /* FIXME: handle this */ | 1486 | /* FIXME: handle this */ |
1344 | GNUNET_assert (GNUNET_STREAM_OK == status); | 1487 | GNUNET_assert (GNUNET_STREAM_OK == status); |
1345 | 1488 | ||
1346 | if (STRATA_COUNT == cpi->strata_counter) | 1489 | msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); |
1347 | { | ||
1348 | /* strata have been written, wait for other side's IBF */ | ||
1349 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n"); | ||
1350 | return; | ||
1351 | } | ||
1352 | |||
1353 | if ((STRATA_COUNT - cpi->strata_counter) < STRATA_PER_MESSAGE) | ||
1354 | num_strata = (STRATA_COUNT - cpi->strata_counter); | ||
1355 | else | ||
1356 | num_strata = STRATA_PER_MESSAGE; | ||
1357 | |||
1358 | |||
1359 | msize = (sizeof *strata_msg) + (num_strata * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS); | ||
1360 | 1490 | ||
1361 | strata_msg = GNUNET_malloc (msize); | 1491 | strata_msg = GNUNET_malloc (msize); |
1362 | strata_msg->header.size = htons (msize); | 1492 | strata_msg->header.size = htons (msize); |
1363 | strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); | 1493 | strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); |
1364 | strata_msg->num_strata = htons (num_strata); | ||
1365 | 1494 | ||
1366 | /* for correct message alignment, copy bucket types seperately */ | 1495 | /* for correct message alignment, copy bucket types seperately */ |
1367 | hash_dst = (struct GNUNET_HashCode *) &strata_msg[1]; | 1496 | key_dst = (uint64_t *) &strata_msg[1]; |
1368 | 1497 | ||
1369 | for (i = 0; i < num_strata; i++) | 1498 | for (i = 0; i < STRATA_COUNT; i++) |
1370 | { | 1499 | { |
1371 | memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); | 1500 | memcpy (key_dst, cpi->session->strata[i]->id_sum, STRATA_IBF_BUCKETS * sizeof *key_dst); |
1372 | hash_dst += STRATA_IBF_BUCKETS; | 1501 | key_dst += STRATA_IBF_BUCKETS; |
1373 | } | 1502 | } |
1374 | 1503 | ||
1375 | for (i = 0; i < num_strata; i++) | 1504 | hash_dst = (uint32_t *) key_dst; |
1505 | |||
1506 | for (i = 0; i < STRATA_COUNT; i++) | ||
1376 | { | 1507 | { |
1377 | memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->id_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); | 1508 | memcpy (hash_dst, cpi->session->strata[i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); |
1378 | hash_dst += STRATA_IBF_BUCKETS; | 1509 | hash_dst += STRATA_IBF_BUCKETS; |
1379 | } | 1510 | } |
1380 | 1511 | ||
1381 | count_dst = (uint8_t *) hash_dst; | 1512 | count_dst = (uint8_t *) hash_dst; |
1382 | 1513 | ||
1383 | for (i = 0; i < num_strata; i++) | 1514 | for (i = 0; i < STRATA_COUNT; i++) |
1384 | { | 1515 | { |
1385 | memcpy (count_dst, cpi->session->strata[cpi->strata_counter+i]->count, STRATA_IBF_BUCKETS); | 1516 | memcpy (count_dst, cpi->session->strata[i]->count, STRATA_IBF_BUCKETS); |
1386 | count_dst += STRATA_IBF_BUCKETS; | 1517 | count_dst += STRATA_IBF_BUCKETS; |
1387 | } | 1518 | } |
1388 | 1519 | ||
1389 | cpi->strata_counter += num_strata; | ||
1390 | |||
1391 | cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | 1520 | cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, |
1392 | write_strata, cpi); | 1521 | write_strata_done, cpi); |
1393 | 1522 | ||
1394 | GNUNET_assert (NULL != cpi->wh); | 1523 | GNUNET_assert (NULL != cpi->wh); |
1395 | } | 1524 | } |
@@ -1416,29 +1545,33 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1416 | struct ConsensusPeerInformation *cpi; | 1545 | struct ConsensusPeerInformation *cpi; |
1417 | struct DifferenceDigest *digest; | 1546 | struct DifferenceDigest *digest; |
1418 | int msize; | 1547 | int msize; |
1419 | struct GNUNET_HashCode *hash_dst; | 1548 | uint64_t *key_dst; |
1549 | uint32_t *hash_dst; | ||
1420 | uint8_t *count_dst; | 1550 | uint8_t *count_dst; |
1421 | int num_buckets; | 1551 | int num_buckets; |
1422 | 1552 | ||
1423 | cpi = cls; | 1553 | cpi = cls; |
1424 | cpi->wh = NULL; | 1554 | cpi->wh = NULL; |
1425 | 1555 | ||
1426 | if (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order)) | 1556 | GNUNET_assert (GNUNET_STREAM_OK == status); |
1557 | |||
1558 | GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state); | ||
1559 | |||
1560 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) | ||
1427 | { | 1561 | { |
1428 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); | 1562 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); |
1429 | if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order)) | 1563 | /* we now wait for values / requests / another IBF because peer could not decode with our IBF */ |
1430 | write_values (cpi, GNUNET_STREAM_OK, 0); | ||
1431 | return; | 1564 | return; |
1432 | } | 1565 | } |
1433 | 1566 | ||
1434 | /* remaining buckets */ | 1567 | /* remaining buckets */ |
1435 | num_buckets = (1 << cpi->ibf_order) - cpi->outgoing_bucket_counter; | 1568 | num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; |
1436 | 1569 | ||
1437 | /* limit to maximum */ | 1570 | /* limit to maximum */ |
1438 | if (num_buckets > BUCKETS_PER_MESSAGE) | 1571 | if (num_buckets > BUCKETS_PER_MESSAGE) |
1439 | num_buckets = BUCKETS_PER_MESSAGE; | 1572 | num_buckets = BUCKETS_PER_MESSAGE; |
1440 | 1573 | ||
1441 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->outgoing_bucket_counter, (1<<cpi->ibf_order)); | 1574 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->ibf_bucket_counter, (1<<cpi->ibf_order)); |
1442 | 1575 | ||
1443 | msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); | 1576 | msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); |
1444 | 1577 | ||
@@ -1447,19 +1580,21 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1447 | digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); | 1580 | digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); |
1448 | digest->order = cpi->ibf_order; | 1581 | digest->order = cpi->ibf_order; |
1449 | 1582 | ||
1450 | hash_dst = (struct GNUNET_HashCode *) &digest[1]; | 1583 | key_dst = (uint64_t *) &digest[1]; |
1451 | 1584 | ||
1452 | memcpy (hash_dst, cpi->outgoing_ibf->hash_sum, num_buckets * sizeof *hash_dst); | 1585 | memcpy (key_dst, cpi->ibf->id_sum, num_buckets * sizeof *key_dst); |
1453 | hash_dst += num_buckets; | 1586 | key_dst += num_buckets; |
1587 | |||
1588 | hash_dst = (uint32_t *) key_dst; | ||
1454 | 1589 | ||
1455 | memcpy (hash_dst, cpi->outgoing_ibf->id_sum, num_buckets * sizeof *hash_dst); | 1590 | memcpy (hash_dst, cpi->ibf->id_sum, num_buckets * sizeof *hash_dst); |
1456 | hash_dst += num_buckets; | 1591 | hash_dst += num_buckets; |
1457 | 1592 | ||
1458 | count_dst = (uint8_t *) hash_dst; | 1593 | count_dst = (uint8_t *) hash_dst; |
1459 | 1594 | ||
1460 | memcpy (count_dst, cpi->outgoing_ibf->count, num_buckets * sizeof *count_dst); | 1595 | memcpy (count_dst, cpi->ibf->count, num_buckets * sizeof *count_dst); |
1461 | 1596 | ||
1462 | cpi->outgoing_bucket_counter += num_buckets; | 1597 | cpi->ibf_bucket_counter += num_buckets; |
1463 | 1598 | ||
1464 | cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, | 1599 | cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, |
1465 | write_ibf, cpi); | 1600 | write_ibf, cpi); |
@@ -1484,37 +1619,34 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1484 | * @param size the number of bytes written | 1619 | * @param size the number of bytes written |
1485 | */ | 1620 | */ |
1486 | static void | 1621 | static void |
1487 | write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 1622 | write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
1488 | { | 1623 | { |
1489 | struct ConsensusPeerInformation *cpi; | 1624 | struct ConsensusPeerInformation *cpi; |
1490 | struct GNUNET_HashCode key; | 1625 | uint64_t key; |
1491 | struct GNUNET_CONSENSUS_Element *element; | 1626 | struct GNUNET_HashCode hashcode; |
1492 | struct GNUNET_MessageHeader *element_msg; | ||
1493 | int side; | 1627 | int side; |
1494 | int msize; | 1628 | int msize; |
1495 | 1629 | ||
1630 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1631 | |||
1496 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n"); | 1632 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n"); |
1497 | 1633 | ||
1498 | cpi = cls; | 1634 | cpi = cls; |
1499 | cpi->wh = NULL; | 1635 | cpi->wh = NULL; |
1500 | 1636 | ||
1501 | if (NULL == cpi->diff_ibf) | 1637 | GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state); |
1502 | { | ||
1503 | GNUNET_assert (NULL != cpi->incoming_ibf); | ||
1504 | GNUNET_assert (NULL != cpi->outgoing_ibf); | ||
1505 | GNUNET_assert (cpi->outgoing_ibf->size == cpi->incoming_ibf->size); | ||
1506 | cpi->diff_ibf = ibf_dup (cpi->incoming_ibf); | ||
1507 | ibf_subtract (cpi->diff_ibf, cpi->outgoing_ibf); | ||
1508 | } | ||
1509 | 1638 | ||
1510 | for (;;) | 1639 | for (;;) |
1511 | { | 1640 | { |
1512 | int res; | 1641 | int res; |
1513 | res = ibf_decode (cpi->diff_ibf, &side, &key); | 1642 | res = ibf_decode (cpi->ibf, &side, &key); |
1514 | if (GNUNET_SYSERR == res) | 1643 | if (GNUNET_SYSERR == res) |
1515 | { | 1644 | { |
1516 | /* TODO: handle this correctly, request new ibf */ | 1645 | cpi->ibf_order++; |
1517 | GNUNET_break (0); | 1646 | prepare_ibf (cpi); |
1647 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
1648 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | ||
1649 | write_ibf (cls, status, size); | ||
1518 | return; | 1650 | return; |
1519 | } | 1651 | } |
1520 | if (GNUNET_NO == res) | 1652 | if (GNUNET_NO == res) |
@@ -1523,40 +1655,59 @@ write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1523 | return; | 1655 | return; |
1524 | } | 1656 | } |
1525 | if (-1 == side) | 1657 | if (-1 == side) |
1526 | break; | 1658 | { |
1527 | } | 1659 | struct GNUNET_CONSENSUS_Element *element; |
1528 | 1660 | struct GNUNET_MessageHeader *element_msg; | |
1529 | element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &key); | 1661 | ibf_hashcode_from_key (key, &hashcode); |
1530 | 1662 | /* FIXME: this only transmits one element stored with the key */ | |
1531 | if (NULL == element) | 1663 | element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); |
1532 | { | 1664 | if (NULL == element) |
1533 | /* FIXME: handle correctly */ | 1665 | continue; |
1534 | GNUNET_break (0); | 1666 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; |
1535 | return; | 1667 | element_msg = GNUNET_malloc (msize); |
1668 | element_msg->size = htons (msize); | ||
1669 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | ||
1670 | GNUNET_assert (NULL != element->data); | ||
1671 | memcpy (&element_msg[1], element->data, element->size); | ||
1672 | cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1673 | write_requests_and_elements, cpi); | ||
1674 | GNUNET_free (element_msg); | ||
1675 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n"); | ||
1676 | |||
1677 | GNUNET_assert (NULL != cpi->wh); | ||
1678 | return; | ||
1679 | } | ||
1680 | else | ||
1681 | { | ||
1682 | struct ElementRequest *msg; | ||
1683 | size_t msize; | ||
1684 | uint64_t *p; | ||
1685 | |||
1686 | msize = (sizeof *msg) + sizeof (uint64_t); | ||
1687 | msg = GNUNET_malloc (msize); | ||
1688 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); | ||
1689 | msg->header.size = htons (msize); | ||
1690 | p = (uint64_t *) &msg[1]; | ||
1691 | *p = key; | ||
1692 | |||
1693 | cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1694 | write_requests_and_elements, cpi); | ||
1695 | GNUNET_assert (NULL != cpi->wh); | ||
1696 | GNUNET_free (msg); | ||
1697 | return; | ||
1698 | } | ||
1536 | } | 1699 | } |
1537 | 1700 | ||
1538 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; | 1701 | } |
1539 | |||
1540 | element_msg = GNUNET_malloc (msize); | ||
1541 | element_msg->size = htons (msize); | ||
1542 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | ||
1543 | |||
1544 | |||
1545 | GNUNET_assert (NULL != element->data); | ||
1546 | |||
1547 | |||
1548 | memcpy (&element_msg[1], element->data, element->size); | ||
1549 | |||
1550 | cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1551 | write_values, cpi); | ||
1552 | |||
1553 | GNUNET_free (element_msg); | ||
1554 | 1702 | ||
1555 | 1703 | ||
1556 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n"); | ||
1557 | 1704 | ||
1558 | GNUNET_assert (NULL != cpi->wh); | 1705 | /* |
1706 | static void | ||
1707 | select_best_group (struct ConsensusSession *session) | ||
1708 | { | ||
1559 | } | 1709 | } |
1710 | */ | ||
1560 | 1711 | ||
1561 | 1712 | ||
1562 | /** | 1713 | /** |
@@ -1566,7 +1717,7 @@ write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1566 | * @param client client handle | 1717 | * @param client client handle |
1567 | * @param message message sent by the client | 1718 | * @param message message sent by the client |
1568 | */ | 1719 | */ |
1569 | void | 1720 | static void |
1570 | client_conclude (void *cls, | 1721 | client_conclude (void *cls, |
1571 | struct GNUNET_SERVER_Client *client, | 1722 | struct GNUNET_SERVER_Client *client, |
1572 | const struct GNUNET_MessageHeader *message) | 1723 | const struct GNUNET_MessageHeader *message) |
@@ -1597,8 +1748,6 @@ client_conclude (void *cls, | |||
1597 | 1748 | ||
1598 | session->conclude_requested = GNUNET_YES; | 1749 | session->conclude_requested = GNUNET_YES; |
1599 | 1750 | ||
1600 | /* FIXME: write to already connected sockets */ | ||
1601 | |||
1602 | for (i = 0; i < session->num_peers; i++) | 1751 | for (i = 0; i < session->num_peers; i++) |
1603 | { | 1752 | { |
1604 | if ( (GNUNET_YES == session->info[i].is_outgoing) && | 1753 | if ( (GNUNET_YES == session->info[i].is_outgoing) && |
@@ -1654,19 +1803,14 @@ client_ack (void *cls, | |||
1654 | 1803 | ||
1655 | if (msg->keep) | 1804 | if (msg->keep) |
1656 | { | 1805 | { |
1657 | |||
1658 | element = pending->element; | 1806 | element = pending->element; |
1659 | |||
1660 | GNUNET_CRYPTO_hash (element, element->size, &key); | 1807 | GNUNET_CRYPTO_hash (element, element->size, &key); |
1661 | 1808 | ||
1662 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, | 1809 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, |
1663 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 1810 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
1664 | |||
1665 | strata_insert (session->strata, &key); | 1811 | strata_insert (session->strata, &key); |
1666 | } | 1812 | } |
1667 | 1813 | ||
1668 | /* FIXME: also remove element from strata */ | ||
1669 | |||
1670 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1814 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1671 | } | 1815 | } |
1672 | 1816 | ||
@@ -1783,6 +1927,7 @@ shutdown_task (void *cls, | |||
1783 | static void | 1927 | static void |
1784 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) | 1928 | run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) |
1785 | { | 1929 | { |
1930 | /* core is only used to retrieve the peer identity */ | ||
1786 | static const struct GNUNET_CORE_MessageHandler core_handlers[] = { | 1931 | static const struct GNUNET_CORE_MessageHandler core_handlers[] = { |
1787 | {NULL, 0, 0} | 1932 | {NULL, 0, 0} |
1788 | }; | 1933 | }; |
@@ -1803,7 +1948,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU | |||
1803 | 1948 | ||
1804 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); | 1949 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); |
1805 | 1950 | ||
1806 | |||
1807 | listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS, | 1951 | listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS, |
1808 | listen_cb, NULL, | 1952 | listen_cb, NULL, |
1809 | GNUNET_STREAM_OPTION_END); | 1953 | GNUNET_STREAM_OPTION_END); |