diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-01-24 02:55:31 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-01-24 02:55:31 +0000 |
commit | f7eb3ed6bb391e9f87bcb3535bf04c4aeec2f7c1 (patch) | |
tree | 33a7d6f9fecf0c260a1b5b57403a9d13c6bbc859 /src | |
parent | d6a27b576d197ac823e8494f351d43a78125a35f (diff) | |
download | gnunet-f7eb3ed6bb391e9f87bcb3535bf04c4aeec2f7c1.tar.gz gnunet-f7eb3ed6bb391e9f87bcb3535bf04c4aeec2f7c1.zip |
implemented value exchange, various fixes
Diffstat (limited to 'src')
-rw-r--r-- | src/consensus/consensus_api.c | 32 | ||||
-rw-r--r-- | src/consensus/consensus_protocol.h | 4 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 4 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 477 | ||||
-rw-r--r-- | src/consensus/ibf.c | 2 | ||||
-rw-r--r-- | src/consensus/test_consensus.conf | 1 |
6 files changed, 453 insertions, 67 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 5c0494254..ba0e69e48 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -176,7 +176,6 @@ transmit_queued (void *cls, size_t size, | |||
176 | 176 | ||
177 | qmsg = consensus->messages_head; | 177 | qmsg = consensus->messages_head; |
178 | GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); | 178 | GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); |
179 | GNUNET_assert (qmsg); | ||
180 | 179 | ||
181 | if (NULL == buf) | 180 | if (NULL == buf) |
182 | { | 181 | { |
@@ -196,8 +195,8 @@ transmit_queued (void *cls, size_t size, | |||
196 | { | 195 | { |
197 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); | 196 | qmsg->idc (qmsg->idc_cls, GNUNET_YES); |
198 | } | 197 | } |
199 | GNUNET_free (qmsg->msg); | 198 | |
200 | GNUNET_free (qmsg); | 199 | /* FIXME: free the messages */ |
201 | 200 | ||
202 | send_next (consensus); | 201 | send_next (consensus); |
203 | 202 | ||
@@ -218,7 +217,6 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus) | |||
218 | 217 | ||
219 | if (NULL != consensus->messages_head) | 218 | if (NULL != consensus->messages_head) |
220 | { | 219 | { |
221 | LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n"); | ||
222 | consensus->th = | 220 | consensus->th = |
223 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size), | 221 | GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size), |
224 | GNUNET_TIME_UNIT_FOREVER_REL, | 222 | GNUNET_TIME_UNIT_FOREVER_REL, |
@@ -226,6 +224,15 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus) | |||
226 | } | 224 | } |
227 | } | 225 | } |
228 | 226 | ||
227 | static void | ||
228 | queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg) | ||
229 | { | ||
230 | struct QueuedMessage *qm; | ||
231 | qm = GNUNET_malloc (sizeof *qm); | ||
232 | qm->msg = msg; | ||
233 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm); | ||
234 | } | ||
235 | |||
229 | 236 | ||
230 | /** | 237 | /** |
231 | * Called when the server has sent is a new element | 238 | * Called when the server has sent is a new element |
@@ -239,23 +246,24 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, | |||
239 | { | 246 | { |
240 | struct GNUNET_CONSENSUS_Element element; | 247 | struct GNUNET_CONSENSUS_Element element; |
241 | struct GNUNET_CONSENSUS_AckMessage *ack_msg; | 248 | struct GNUNET_CONSENSUS_AckMessage *ack_msg; |
242 | struct QueuedMessage *queued_msg; | ||
243 | int ret; | 249 | int ret; |
244 | 250 | ||
251 | LOG (GNUNET_ERROR_TYPE_INFO, "received new element\n"); | ||
252 | |||
245 | element.type = msg->element_type; | 253 | element.type = msg->element_type; |
246 | element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); | 254 | element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); |
247 | element.data = &msg[1]; | 255 | element.data = &msg[1]; |
248 | 256 | ||
249 | ret = consensus->new_element_cb (consensus->new_element_cls, &element); | 257 | ret = consensus->new_element_cb (consensus->new_element_cls, &element); |
250 | 258 | ||
251 | queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct GNUNET_CONSENSUS_AckMessage)); | 259 | ack_msg = GNUNET_malloc (sizeof *ack_msg); |
252 | queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1]; | 260 | ack_msg->header.size = htons (sizeof *ack_msg); |
253 | 261 | ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); | |
254 | ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg; | ||
255 | ack_msg->keep = ret; | 262 | ack_msg->keep = ret; |
256 | 263 | ||
257 | GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, | 264 | queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg); |
258 | queued_msg); | 265 | |
266 | send_next (consensus); | ||
259 | } | 267 | } |
260 | 268 | ||
261 | 269 | ||
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h index 105708ee9..c84aad263 100644 --- a/src/consensus/consensus_protocol.h +++ b/src/consensus/consensus_protocol.h | |||
@@ -49,13 +49,15 @@ struct StrataMessage | |||
49 | 49 | ||
50 | struct DifferenceDigest | 50 | struct DifferenceDigest |
51 | { | 51 | { |
52 | |||
53 | struct GNUNET_MessageHeader header; | 52 | struct GNUNET_MessageHeader header; |
53 | uint8_t order; | ||
54 | uint8_t round; | ||
54 | }; | 55 | }; |
55 | 56 | ||
56 | struct Element | 57 | struct Element |
57 | { | 58 | { |
58 | struct GNUNET_MessageHeader header; | 59 | struct GNUNET_MessageHeader header; |
60 | struct GNUNET_HashCode hash; | ||
59 | }; | 61 | }; |
60 | 62 | ||
61 | struct ConsensusHello | 63 | struct ConsensusHello |
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index c8a5593f1..222ec3e9d 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c | |||
@@ -177,6 +177,7 @@ static int | |||
177 | new_element_cb (void *cls, | 177 | new_element_cb (void *cls, |
178 | struct GNUNET_CONSENSUS_Element *element) | 178 | struct GNUNET_CONSENSUS_Element *element) |
179 | { | 179 | { |
180 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); | ||
180 | return GNUNET_YES; | 181 | return GNUNET_YES; |
181 | } | 182 | } |
182 | 183 | ||
@@ -263,8 +264,11 @@ test_master (void *cls, | |||
263 | int i; | 264 | int i; |
264 | 265 | ||
265 | 266 | ||
267 | GNUNET_log_setup ("gnunet-consensus", "INFO", NULL); | ||
268 | |||
266 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n"); | 269 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n"); |
267 | 270 | ||
271 | |||
268 | peers = started_peers; | 272 | peers = started_peers; |
269 | 273 | ||
270 | peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); | 274 | peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); |
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index ad0266954..e80bee331 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -55,12 +55,16 @@ | |||
55 | */ | 55 | */ |
56 | #define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS)) | 56 | #define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS)) |
57 | 57 | ||
58 | #define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) | ||
59 | |||
60 | #define MAX_IBF_ORDER (64) | ||
58 | 61 | ||
59 | 62 | ||
60 | /* forward declarations */ | 63 | /* forward declarations */ |
61 | 64 | ||
62 | struct ConsensusSession; | 65 | struct ConsensusSession; |
63 | struct IncomingSocket; | 66 | struct IncomingSocket; |
67 | struct ConsensusPeerInformation; | ||
64 | 68 | ||
65 | static void | 69 | static void |
66 | send_next (struct ConsensusSession *session); | 70 | send_next (struct ConsensusSession *session); |
@@ -68,6 +72,12 @@ send_next (struct ConsensusSession *session); | |||
68 | static void | 72 | static void |
69 | write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size); | 73 | write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size); |
70 | 74 | ||
75 | static void | ||
76 | write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size); | ||
77 | |||
78 | static void | ||
79 | write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size); | ||
80 | |||
71 | static int | 81 | static int |
72 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); | 82 | get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); |
73 | 83 | ||
@@ -91,6 +101,9 @@ struct PendingElement | |||
91 | * The actual element | 101 | * The actual element |
92 | */ | 102 | */ |
93 | struct GNUNET_CONSENSUS_Element *element; | 103 | struct GNUNET_CONSENSUS_Element *element; |
104 | |||
105 | /* peer this element is coming from */ | ||
106 | struct ConsensusPeerInformation *cpi; | ||
94 | }; | 107 | }; |
95 | 108 | ||
96 | struct ConsensusPeerInformation | 109 | struct ConsensusPeerInformation |
@@ -130,13 +143,21 @@ struct ConsensusPeerInformation | |||
130 | */ | 143 | */ |
131 | int strata_counter; | 144 | int strata_counter; |
132 | 145 | ||
133 | struct InvertibleBloomFilter *my_ibf; | 146 | int ibf_order; |
147 | |||
148 | struct InvertibleBloomFilter *outgoing_ibf; | ||
134 | 149 | ||
135 | int my_ibf_bucket_counter; | 150 | int outgoing_bucket_counter; |
136 | 151 | ||
137 | struct InvertibleBloomFilter *peer_ibf; | 152 | struct InvertibleBloomFilter *incoming_ibf; |
138 | 153 | ||
139 | int peer_ibf_bucket_counter; | 154 | int incoming_bucket_counter; |
155 | |||
156 | /** | ||
157 | * NULL or incoming_ibf - outgoing_ibf. | ||
158 | * Decoded values of side '1' are to be requested from the the peer. | ||
159 | */ | ||
160 | struct InvertibleBloomFilter *diff_ibf; | ||
140 | 161 | ||
141 | /** | 162 | /** |
142 | * Strata estimator of the peer, NULL if our peer | 163 | * Strata estimator of the peer, NULL if our peer |
@@ -144,6 +165,8 @@ struct ConsensusPeerInformation | |||
144 | */ | 165 | */ |
145 | struct InvertibleBloomFilter **strata; | 166 | struct InvertibleBloomFilter **strata; |
146 | 167 | ||
168 | unsigned int diff; | ||
169 | |||
147 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | 170 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; |
148 | 171 | ||
149 | struct ConsensusSession *session; | 172 | struct ConsensusSession *session; |
@@ -206,16 +229,6 @@ struct ConsensusSession | |||
206 | struct GNUNET_CONTAINER_MultiHashMap *values; | 229 | struct GNUNET_CONTAINER_MultiHashMap *values; |
207 | 230 | ||
208 | /** | 231 | /** |
209 | * Elements that have not been sent to the client yet. | ||
210 | */ | ||
211 | struct PendingElement *transmit_pending_head; | ||
212 | |||
213 | /** | ||
214 | * Elements that have not been sent to the client yet. | ||
215 | */ | ||
216 | struct PendingElement *transmit_pending_tail; | ||
217 | |||
218 | /** | ||
219 | * Elements that have not been approved (or rejected) by the client yet. | 232 | * Elements that have not been approved (or rejected) by the client yet. |
220 | */ | 233 | */ |
221 | struct PendingElement *approval_pending_head; | 234 | struct PendingElement *approval_pending_head; |
@@ -281,6 +294,8 @@ struct ConsensusSession | |||
281 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; | 294 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; |
282 | 295 | ||
283 | struct InvertibleBloomFilter **strata; | 296 | struct InvertibleBloomFilter **strata; |
297 | |||
298 | struct InvertibleBloomFilter **ibfs; | ||
284 | }; | 299 | }; |
285 | 300 | ||
286 | 301 | ||
@@ -365,6 +380,16 @@ static struct GNUNET_CORE_Handle *core; | |||
365 | static struct GNUNET_STREAM_ListenSocket *listener; | 380 | static struct GNUNET_STREAM_ListenSocket *listener; |
366 | 381 | ||
367 | 382 | ||
383 | static void | ||
384 | queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) | ||
385 | { | ||
386 | struct QueuedMessage *qm; | ||
387 | qm = GNUNET_malloc (sizeof *qm); | ||
388 | qm->msg = msg; | ||
389 | GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); | ||
390 | } | ||
391 | |||
392 | |||
368 | static int | 393 | static int |
369 | estimate_difference (struct InvertibleBloomFilter** strata1, | 394 | estimate_difference (struct InvertibleBloomFilter** strata1, |
370 | struct InvertibleBloomFilter** strata2) | 395 | struct InvertibleBloomFilter** strata2) |
@@ -400,6 +425,7 @@ estimate_difference (struct InvertibleBloomFilter** strata1, | |||
400 | } | 425 | } |
401 | 426 | ||
402 | 427 | ||
428 | |||
403 | /** | 429 | /** |
404 | * Functions of this signature are called whenever data is available from the | 430 | * Functions of this signature are called whenever data is available from the |
405 | * stream. | 431 | * stream. |
@@ -412,7 +438,48 @@ estimate_difference (struct InvertibleBloomFilter** strata1, | |||
412 | * given to the next time the read processor is called). | 438 | * given to the next time the read processor is called). |
413 | */ | 439 | */ |
414 | static size_t | 440 | static size_t |
415 | stream_data_processor (void *cls, | 441 | session_stream_data_processor (void *cls, |
442 | enum GNUNET_STREAM_Status status, | ||
443 | const void *data, | ||
444 | size_t size) | ||
445 | { | ||
446 | struct ConsensusPeerInformation *cpi; | ||
447 | int ret; | ||
448 | |||
449 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
450 | |||
451 | cpi = cls; | ||
452 | |||
453 | GNUNET_assert (NULL != cpi->mst); | ||
454 | |||
455 | ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES); | ||
456 | if (GNUNET_SYSERR == ret) | ||
457 | { | ||
458 | /* FIXME: handle this correctly */ | ||
459 | GNUNET_assert (0); | ||
460 | } | ||
461 | |||
462 | /* read again */ | ||
463 | cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
464 | &session_stream_data_processor, cpi); | ||
465 | |||
466 | /* we always read all data */ | ||
467 | return size; | ||
468 | } | ||
469 | |||
470 | /** | ||
471 | * Functions of this signature are called whenever data is available from the | ||
472 | * stream. | ||
473 | * | ||
474 | * @param cls the closure from GNUNET_STREAM_read | ||
475 | * @param status the status of the stream at the time this function is called | ||
476 | * @param data traffic from the other side | ||
477 | * @param size the number of bytes available in data read; will be 0 on timeout | ||
478 | * @return number of bytes of processed from 'data' (any data remaining should be | ||
479 | * given to the next time the read processor is called). | ||
480 | */ | ||
481 | static size_t | ||
482 | incoming_stream_data_processor (void *cls, | ||
416 | enum GNUNET_STREAM_Status status, | 483 | enum GNUNET_STREAM_Status status, |
417 | const void *data, | 484 | const void *data, |
418 | size_t size) | 485 | size_t size) |
@@ -422,9 +489,9 @@ stream_data_processor (void *cls, | |||
422 | 489 | ||
423 | GNUNET_assert (GNUNET_STREAM_OK == status); | 490 | GNUNET_assert (GNUNET_STREAM_OK == status); |
424 | 491 | ||
425 | incoming = (struct IncomingSocket *) cls; | 492 | incoming = cls; |
426 | 493 | ||
427 | ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_NO); | 494 | ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES); |
428 | if (GNUNET_SYSERR == ret) | 495 | if (GNUNET_SYSERR == ret) |
429 | { | 496 | { |
430 | /* FIXME: handle this correctly */ | 497 | /* FIXME: handle this correctly */ |
@@ -433,12 +500,46 @@ stream_data_processor (void *cls, | |||
433 | 500 | ||
434 | /* read again */ | 501 | /* read again */ |
435 | incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, | 502 | incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, |
436 | &stream_data_processor, incoming); | 503 | &incoming_stream_data_processor, incoming); |
437 | 504 | ||
438 | /* we always read all data */ | 505 | /* we always read all data */ |
439 | return size; | 506 | return size; |
440 | } | 507 | } |
441 | 508 | ||
509 | |||
510 | /** | ||
511 | * Iterator over hash map entries. | ||
512 | * | ||
513 | * @param cls closure | ||
514 | * @param key current key code | ||
515 | * @param value value in the hash map | ||
516 | * @return GNUNET_YES if we should continue to | ||
517 | * iterate, | ||
518 | * GNUNET_NO if not. | ||
519 | */ | ||
520 | static int | ||
521 | ibf_values_iterator (void *cls, | ||
522 | const struct GNUNET_HashCode *key, | ||
523 | void *value) | ||
524 | { | ||
525 | struct ConsensusPeerInformation *cpi; | ||
526 | cpi = cls; | ||
527 | ibf_insert (cpi->session->ibfs[cpi->ibf_order], key); | ||
528 | return GNUNET_YES; | ||
529 | } | ||
530 | |||
531 | |||
532 | static void | ||
533 | create_outgoing_ibf (struct ConsensusPeerInformation *cpi) | ||
534 | { | ||
535 | if (NULL == cpi->session->ibfs[cpi->ibf_order]) | ||
536 | { | ||
537 | cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); | ||
538 | GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); | ||
539 | } | ||
540 | cpi->outgoing_ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
541 | } | ||
542 | |||
442 | static int | 543 | static int |
443 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) | 544 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) |
444 | { | 545 | { |
@@ -477,8 +578,6 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
477 | 578 | ||
478 | for (i = 0; i < num_strata; i++) | 579 | for (i = 0; i < num_strata; i++) |
479 | { | 580 | { |
480 | uint8_t zero[STRATA_IBF_BUCKETS]; | ||
481 | memset (zero, 0, STRATA_IBF_BUCKETS); | ||
482 | memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS); | 581 | memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS); |
483 | count_src += STRATA_IBF_BUCKETS; | 582 | count_src += STRATA_IBF_BUCKETS; |
484 | } | 583 | } |
@@ -489,9 +588,17 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
489 | 588 | ||
490 | if (STRATA_COUNT == cpi->strata_counter) | 589 | if (STRATA_COUNT == cpi->strata_counter) |
491 | { | 590 | { |
492 | int diff; | 591 | |
493 | diff = estimate_difference (cpi->session->strata, cpi->strata); | 592 | cpi->diff = estimate_difference (cpi->session->strata, cpi->strata); |
494 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "diff=%d\n", diff); | 593 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff); |
594 | cpi->ibf_order = 0; | ||
595 | while ((1 << cpi->ibf_order) < cpi->diff) | ||
596 | cpi->ibf_order++; | ||
597 | if (cpi->ibf_order > MAX_IBF_ORDER) | ||
598 | cpi->ibf_order = MAX_IBF_ORDER; | ||
599 | cpi->ibf_order += 2; | ||
600 | create_outgoing_ibf (cpi); | ||
601 | write_ibf (cpi, GNUNET_STREAM_OK, 0); | ||
495 | } | 602 | } |
496 | 603 | ||
497 | return GNUNET_YES; | 604 | return GNUNET_YES; |
@@ -499,15 +606,97 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
499 | 606 | ||
500 | 607 | ||
501 | static int | 608 | static int |
502 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *strata) | 609 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) |
503 | { | 610 | { |
611 | struct GNUNET_HashCode *hash_src; | ||
612 | int num_buckets; | ||
613 | uint8_t *count_src; | ||
614 | |||
615 | num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; | ||
616 | |||
617 | if (cpi->is_outgoing == GNUNET_YES) | ||
618 | { | ||
619 | /* we receive the ibf as an initiator, thus we're interested in the order */ | ||
620 | cpi->ibf_order = digest->order; | ||
621 | if ((0 == cpi->outgoing_bucket_counter) && (NULL == cpi->wh)) | ||
622 | { | ||
623 | create_outgoing_ibf (cpi); | ||
624 | write_ibf (cpi, GNUNET_STREAM_OK, 0); | ||
625 | } | ||
626 | /* FIXME: ensure that orders do not differ each time */ | ||
627 | } | ||
628 | else | ||
629 | { | ||
630 | /* FIXME: handle correctly */ | ||
631 | GNUNET_assert (cpi->ibf_order == digest->order); | ||
632 | } | ||
633 | |||
634 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->incoming_bucket_counter, (1 << cpi->ibf_order)); | ||
635 | |||
636 | if (cpi->incoming_bucket_counter + num_buckets > (1 << cpi->ibf_order)) | ||
637 | { | ||
638 | /* TODO: handle this */ | ||
639 | GNUNET_assert (0); | ||
640 | } | ||
641 | |||
642 | if (NULL == cpi->incoming_ibf) | ||
643 | cpi->incoming_ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); | ||
644 | |||
645 | hash_src = (struct GNUNET_HashCode *) &digest[1]; | ||
646 | |||
647 | memcpy (cpi->incoming_ibf->hash_sum, hash_src, num_buckets * sizeof *hash_src); | ||
648 | hash_src += num_buckets; | ||
649 | |||
650 | memcpy (cpi->incoming_ibf->id_sum, hash_src, num_buckets * sizeof *hash_src); | ||
651 | hash_src += num_buckets; | ||
652 | |||
653 | count_src = (uint8_t *) hash_src; | ||
654 | |||
655 | memcpy (cpi->incoming_ibf->count, count_src, num_buckets * sizeof *count_src); | ||
656 | |||
657 | cpi->incoming_bucket_counter += num_buckets; | ||
658 | |||
659 | if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order)) | ||
660 | { | ||
661 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); | ||
662 | if ((NULL == cpi->wh) && (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order))) | ||
663 | write_values (cpi, GNUNET_STREAM_OK, 0); | ||
664 | } | ||
504 | return GNUNET_YES; | 665 | return GNUNET_YES; |
505 | } | 666 | } |
506 | 667 | ||
507 | 668 | ||
508 | static int | 669 | static int |
509 | handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct Element *strata) | 670 | handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) |
510 | { | 671 | { |
672 | struct PendingElement *pending_element; | ||
673 | struct GNUNET_CONSENSUS_Element *element; | ||
674 | struct GNUNET_CONSENSUS_ElementMessage *client_element_msg; | ||
675 | size_t size; | ||
676 | |||
677 | size = ntohs (element_msg->size) - sizeof *element_msg; | ||
678 | |||
679 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size); | ||
680 | |||
681 | element = GNUNET_malloc (size + sizeof *element); | ||
682 | element->size = size; | ||
683 | memcpy (&element[1], &element_msg[1], size); | ||
684 | |||
685 | pending_element = GNUNET_malloc (sizeof *pending_element); | ||
686 | pending_element->element = element; | ||
687 | GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, cpi->session->approval_pending_tail, pending_element); | ||
688 | |||
689 | client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg); | ||
690 | client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); | ||
691 | client_element_msg->header.size = htons (size + sizeof *client_element_msg); | ||
692 | memcpy (&client_element_msg[1], &element[1], size); | ||
693 | |||
694 | queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg); | ||
695 | |||
696 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n"); | ||
697 | |||
698 | send_next (cpi->session); | ||
699 | |||
511 | return GNUNET_YES; | 700 | return GNUNET_YES; |
512 | } | 701 | } |
513 | 702 | ||
@@ -556,16 +745,17 @@ static int | |||
556 | mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) | 745 | mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) |
557 | { | 746 | { |
558 | struct ConsensusPeerInformation *cpi; | 747 | struct ConsensusPeerInformation *cpi; |
559 | cpi = (struct ConsensusPeerInformation *) cls; | 748 | cpi = cls; |
560 | switch (ntohs( message->type)) | 749 | switch (ntohs (message->type)) |
561 | { | 750 | { |
562 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: | 751 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: |
563 | return handle_p2p_strata (cpi, (struct StrataMessage *) message); | 752 | return handle_p2p_strata (cpi, (struct StrataMessage *) message); |
564 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST: | 753 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST: |
565 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); | 754 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); |
566 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: | 755 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: |
567 | return handle_p2p_element (cpi, (struct Element *) message); | 756 | return handle_p2p_element (cpi, message); |
568 | default: | 757 | default: |
758 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type)); | ||
569 | /* FIXME: handle correctly */ | 759 | /* FIXME: handle correctly */ |
570 | GNUNET_assert (0); | 760 | GNUNET_assert (0); |
571 | } | 761 | } |
@@ -632,7 +822,7 @@ listen_cb (void *cls, | |||
632 | incoming->peer = GNUNET_memdup (initiator, sizeof *initiator); | 822 | incoming->peer = GNUNET_memdup (initiator, sizeof *initiator); |
633 | 823 | ||
634 | incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | 824 | incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, |
635 | &stream_data_processor, incoming); | 825 | &incoming_stream_data_processor, incoming); |
636 | 826 | ||
637 | 827 | ||
638 | incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); | 828 | incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); |
@@ -727,7 +917,7 @@ transmit_queued (void *cls, size_t size, | |||
727 | struct QueuedMessage *qmsg; | 917 | struct QueuedMessage *qmsg; |
728 | size_t msg_size; | 918 | size_t msg_size; |
729 | 919 | ||
730 | session = (struct ConsensusSession *) cls; | 920 | session = cls; |
731 | session->th = NULL; | 921 | session->th = NULL; |
732 | 922 | ||
733 | 923 | ||
@@ -773,7 +963,6 @@ send_next (struct ConsensusSession *session) | |||
773 | { | 963 | { |
774 | int msize; | 964 | int msize; |
775 | msize = ntohs (session->client_messages_head->msg->size); | 965 | msize = ntohs (session->client_messages_head->msg->size); |
776 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n"); | ||
777 | session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, | 966 | session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, |
778 | GNUNET_TIME_UNIT_FOREVER_REL, | 967 | GNUNET_TIME_UNIT_FOREVER_REL, |
779 | &transmit_queued, session); | 968 | &transmit_queued, session); |
@@ -821,13 +1010,11 @@ hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
821 | { | 1010 | { |
822 | struct ConsensusPeerInformation *cpi; | 1011 | struct ConsensusPeerInformation *cpi; |
823 | 1012 | ||
824 | cpi = (struct ConsensusPeerInformation *) cls; | 1013 | cpi = cls; |
825 | cpi->hello = GNUNET_YES; | 1014 | cpi->hello = GNUNET_YES; |
826 | 1015 | ||
827 | GNUNET_assert (GNUNET_STREAM_OK == status); | 1016 | GNUNET_assert (GNUNET_STREAM_OK == status); |
828 | 1017 | ||
829 | cpi = (struct ConsensusPeerInformation *) cls; | ||
830 | |||
831 | if (cpi->session->conclude_requested) | 1018 | if (cpi->session->conclude_requested) |
832 | { | 1019 | { |
833 | write_strata (cpi, GNUNET_STREAM_OK, 0); | 1020 | write_strata (cpi, GNUNET_STREAM_OK, 0); |
@@ -848,7 +1035,7 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) | |||
848 | struct ConsensusHello *hello; | 1035 | struct ConsensusHello *hello; |
849 | 1036 | ||
850 | 1037 | ||
851 | cpi = (struct ConsensusPeerInformation *) cls; | 1038 | cpi = cls; |
852 | cpi->is_connected = GNUNET_YES; | 1039 | cpi->is_connected = GNUNET_YES; |
853 | 1040 | ||
854 | hello = GNUNET_malloc (sizeof *hello); | 1041 | hello = GNUNET_malloc (sizeof *hello); |
@@ -856,10 +1043,12 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) | |||
856 | hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); | 1043 | hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); |
857 | memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); | 1044 | memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); |
858 | 1045 | ||
859 | |||
860 | cpi->wh = | 1046 | cpi->wh = |
861 | GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); | 1047 | GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); |
862 | 1048 | ||
1049 | cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1050 | &session_stream_data_processor, cpi); | ||
1051 | |||
863 | } | 1052 | } |
864 | 1053 | ||
865 | 1054 | ||
@@ -874,18 +1063,19 @@ initialize_session_info (struct ConsensusSession *session) | |||
874 | /* initialize back-references, so consensus peer information can | 1063 | /* initialize back-references, so consensus peer information can |
875 | * be used as closure */ | 1064 | * be used as closure */ |
876 | session->info[i].session = session; | 1065 | session->info[i].session = session; |
877 | |||
878 | } | 1066 | } |
879 | 1067 | ||
880 | last = (session->local_peer_idx + (session->num_peers / 2)) % session->num_peers; | 1068 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; |
881 | i = (session->local_peer_idx + 1) % session->num_peers; | 1069 | i = (session->local_peer_idx + 1) % session->num_peers; |
882 | while (i != last) | 1070 | while (i != last) |
883 | { | 1071 | { |
884 | session->info[i].is_outgoing = GNUNET_YES; | 1072 | session->info[i].is_outgoing = GNUNET_YES; |
885 | session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS, | 1073 | session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS, |
886 | open_cb, &session->info[i], GNUNET_STREAM_OPTION_END); | 1074 | open_cb, &session->info[i], GNUNET_STREAM_OPTION_END); |
887 | session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, session); | 1075 | session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[i]); |
888 | i = (i + 1) % session->num_peers; | 1076 | i = (i + 1) % session->num_peers; |
1077 | |||
1078 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i); | ||
889 | } | 1079 | } |
890 | // tie-breaker for even number of peers | 1080 | // tie-breaker for even number of peers |
891 | if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) | 1081 | if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) |
@@ -893,6 +1083,9 @@ initialize_session_info (struct ConsensusSession *session) | |||
893 | session->info[last].is_outgoing = GNUNET_YES; | 1083 | session->info[last].is_outgoing = GNUNET_YES; |
894 | session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS, | 1084 | session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS, |
895 | open_cb, &session->info[last], GNUNET_STREAM_OPTION_END); | 1085 | open_cb, &session->info[last], GNUNET_STREAM_OPTION_END); |
1086 | session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[last]); | ||
1087 | |||
1088 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last); | ||
896 | } | 1089 | } |
897 | } | 1090 | } |
898 | 1091 | ||
@@ -949,9 +1142,6 @@ strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *ke | |||
949 | v = key->bits[0]; | 1142 | v = key->bits[0]; |
950 | /* count trailing '1'-bits of v */ | 1143 | /* count trailing '1'-bits of v */ |
951 | for (i = 0; v & 1; v>>=1, i++); | 1144 | for (i = 0; v & 1; v>>=1, i++); |
952 | |||
953 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata insert at %d\n", i); | ||
954 | |||
955 | ibf_insert (strata[i], key); | 1145 | ibf_insert (strata[i], key); |
956 | } | 1146 | } |
957 | 1147 | ||
@@ -1001,8 +1191,9 @@ initialize_session (struct ConsensusSession *session) | |||
1001 | for (i = 0; i < STRATA_COUNT; i++) | 1191 | for (i = 0; i < STRATA_COUNT; i++) |
1002 | session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); | 1192 | session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); |
1003 | 1193 | ||
1004 | session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); | 1194 | session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *)); |
1005 | 1195 | ||
1196 | session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); | ||
1006 | initialize_session_info (session); | 1197 | initialize_session_info (session); |
1007 | 1198 | ||
1008 | GNUNET_free (session->join_msg); | 1199 | GNUNET_free (session->join_msg); |
@@ -1053,11 +1244,9 @@ client_join (void *cls, | |||
1053 | if (NULL == my_peer) | 1244 | if (NULL == my_peer) |
1054 | { | 1245 | { |
1055 | GNUNET_SERVER_disable_receive_done_warning (client); | 1246 | GNUNET_SERVER_disable_receive_done_warning (client); |
1056 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init delayed\n"); | ||
1057 | return; | 1247 | return; |
1058 | } | 1248 | } |
1059 | 1249 | ||
1060 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init now\n"); | ||
1061 | initialize_session (session); | 1250 | initialize_session (session); |
1062 | } | 1251 | } |
1063 | 1252 | ||
@@ -1097,7 +1286,7 @@ client_insert (void *cls, | |||
1097 | } | 1286 | } |
1098 | 1287 | ||
1099 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; | 1288 | msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; |
1100 | element_size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); | 1289 | element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); |
1101 | 1290 | ||
1102 | element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); | 1291 | element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); |
1103 | 1292 | ||
@@ -1146,7 +1335,8 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1146 | uint8_t *count_dst; | 1335 | uint8_t *count_dst; |
1147 | int num_strata; | 1336 | int num_strata; |
1148 | 1337 | ||
1149 | cpi = (struct ConsensusPeerInformation *) cls; | 1338 | cpi = cls; |
1339 | cpi->wh = NULL; | ||
1150 | 1340 | ||
1151 | GNUNET_assert (GNUNET_YES == cpi->is_outgoing); | 1341 | GNUNET_assert (GNUNET_YES == cpi->is_outgoing); |
1152 | 1342 | ||
@@ -1156,6 +1346,7 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1156 | if (STRATA_COUNT == cpi->strata_counter) | 1346 | if (STRATA_COUNT == cpi->strata_counter) |
1157 | { | 1347 | { |
1158 | /* strata have been written, wait for other side's IBF */ | 1348 | /* strata have been written, wait for other side's IBF */ |
1349 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n"); | ||
1159 | return; | 1350 | return; |
1160 | } | 1351 | } |
1161 | 1352 | ||
@@ -1223,8 +1414,57 @@ static void | |||
1223 | write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 1414 | write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
1224 | { | 1415 | { |
1225 | struct ConsensusPeerInformation *cpi; | 1416 | struct ConsensusPeerInformation *cpi; |
1417 | struct DifferenceDigest *digest; | ||
1418 | int msize; | ||
1419 | struct GNUNET_HashCode *hash_dst; | ||
1420 | uint8_t *count_dst; | ||
1421 | int num_buckets; | ||
1422 | |||
1423 | cpi = cls; | ||
1424 | cpi->wh = NULL; | ||
1425 | |||
1426 | if (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order)) | ||
1427 | { | ||
1428 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); | ||
1429 | if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order)) | ||
1430 | write_values (cpi, GNUNET_STREAM_OK, 0); | ||
1431 | return; | ||
1432 | } | ||
1433 | |||
1434 | /* remaining buckets */ | ||
1435 | num_buckets = (1 << cpi->ibf_order) - cpi->outgoing_bucket_counter; | ||
1436 | |||
1437 | /* limit to maximum */ | ||
1438 | if (num_buckets > BUCKETS_PER_MESSAGE) | ||
1439 | num_buckets = BUCKETS_PER_MESSAGE; | ||
1440 | |||
1441 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->outgoing_bucket_counter, (1<<cpi->ibf_order)); | ||
1442 | |||
1443 | msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); | ||
1444 | |||
1445 | digest = GNUNET_malloc (msize); | ||
1446 | digest->header.size = htons (msize); | ||
1447 | digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); | ||
1448 | digest->order = cpi->ibf_order; | ||
1449 | |||
1450 | hash_dst = (struct GNUNET_HashCode *) &digest[1]; | ||
1451 | |||
1452 | memcpy (hash_dst, cpi->outgoing_ibf->hash_sum, num_buckets * sizeof *hash_dst); | ||
1453 | hash_dst += num_buckets; | ||
1226 | 1454 | ||
1227 | cpi = (struct ConsensusPeerInformation *) cls; | 1455 | memcpy (hash_dst, cpi->outgoing_ibf->id_sum, num_buckets * sizeof *hash_dst); |
1456 | hash_dst += num_buckets; | ||
1457 | |||
1458 | count_dst = (uint8_t *) hash_dst; | ||
1459 | |||
1460 | memcpy (count_dst, cpi->outgoing_ibf->count, num_buckets * sizeof *count_dst); | ||
1461 | |||
1462 | cpi->outgoing_bucket_counter += num_buckets; | ||
1463 | |||
1464 | cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1465 | write_ibf, cpi); | ||
1466 | |||
1467 | GNUNET_assert (NULL != cpi->wh); | ||
1228 | } | 1468 | } |
1229 | 1469 | ||
1230 | 1470 | ||
@@ -1247,8 +1487,71 @@ static void | |||
1247 | write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 1487 | write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
1248 | { | 1488 | { |
1249 | struct ConsensusPeerInformation *cpi; | 1489 | struct ConsensusPeerInformation *cpi; |
1490 | struct GNUNET_HashCode key; | ||
1491 | struct GNUNET_CONSENSUS_Element *element; | ||
1492 | struct GNUNET_MessageHeader *element_msg; | ||
1493 | int side; | ||
1494 | int msize; | ||
1495 | |||
1496 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n"); | ||
1497 | |||
1498 | cpi = cls; | ||
1499 | cpi->wh = NULL; | ||
1500 | |||
1501 | if (NULL == cpi->diff_ibf) | ||
1502 | { | ||
1503 | GNUNET_assert (NULL != cpi->incoming_ibf); | ||
1504 | GNUNET_assert (NULL != cpi->outgoing_ibf); | ||
1505 | GNUNET_assert (cpi->outgoing_ibf->size == cpi->incoming_ibf->size); | ||
1506 | cpi->diff_ibf = ibf_dup (cpi->incoming_ibf); | ||
1507 | ibf_subtract (cpi->diff_ibf, cpi->outgoing_ibf); | ||
1508 | } | ||
1509 | |||
1510 | for (;;) | ||
1511 | { | ||
1512 | int res; | ||
1513 | res = ibf_decode (cpi->diff_ibf, &side, &key); | ||
1514 | if (GNUNET_SYSERR == res) | ||
1515 | { | ||
1516 | /* TODO: handle this correctly, request new ibf */ | ||
1517 | GNUNET_break (0); | ||
1518 | return; | ||
1519 | } | ||
1520 | if (GNUNET_NO == res) | ||
1521 | { | ||
1522 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n"); | ||
1523 | return; | ||
1524 | } | ||
1525 | if (-1 == side) | ||
1526 | break; | ||
1527 | } | ||
1528 | |||
1529 | element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &key); | ||
1530 | |||
1531 | if (NULL == element) | ||
1532 | { | ||
1533 | /* FIXME: handle correctly */ | ||
1534 | GNUNET_break (0); | ||
1535 | return; | ||
1536 | } | ||
1537 | |||
1538 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; | ||
1539 | |||
1540 | element_msg = GNUNET_malloc (msize); | ||
1541 | element_msg->size = htons (msize); | ||
1542 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | ||
1543 | |||
1544 | memcpy (&element_msg[1], element->data, element->size); | ||
1545 | |||
1546 | cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | ||
1547 | write_values, cpi); | ||
1250 | 1548 | ||
1251 | cpi = (struct ConsensusPeerInformation *) cls; | 1549 | GNUNET_free (element_msg); |
1550 | |||
1551 | |||
1552 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n"); | ||
1553 | |||
1554 | GNUNET_assert (NULL != cpi->wh); | ||
1252 | } | 1555 | } |
1253 | 1556 | ||
1254 | 1557 | ||
@@ -1301,7 +1604,6 @@ client_conclude (void *cls, | |||
1301 | write_strata (&session->info[i], GNUNET_STREAM_OK, 0); | 1604 | write_strata (&session->info[i], GNUNET_STREAM_OK, 0); |
1302 | } | 1605 | } |
1303 | } | 1606 | } |
1304 | |||
1305 | 1607 | ||
1306 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1608 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1307 | send_next (session); | 1609 | send_next (session); |
@@ -1320,7 +1622,48 @@ client_ack (void *cls, | |||
1320 | struct GNUNET_SERVER_Client *client, | 1622 | struct GNUNET_SERVER_Client *client, |
1321 | const struct GNUNET_MessageHeader *message) | 1623 | const struct GNUNET_MessageHeader *message) |
1322 | { | 1624 | { |
1323 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client ack received\n"); | 1625 | struct ConsensusSession *session; |
1626 | struct GNUNET_CONSENSUS_AckMessage *msg; | ||
1627 | struct PendingElement *pending; | ||
1628 | struct GNUNET_CONSENSUS_Element *element; | ||
1629 | struct GNUNET_HashCode key; | ||
1630 | |||
1631 | session = sessions_head; | ||
1632 | while (NULL != session) | ||
1633 | { | ||
1634 | if (session->client == client) | ||
1635 | break; | ||
1636 | } | ||
1637 | |||
1638 | if (NULL == session) | ||
1639 | { | ||
1640 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n"); | ||
1641 | GNUNET_SERVER_client_disconnect (client); | ||
1642 | return; | ||
1643 | } | ||
1644 | |||
1645 | pending = session->approval_pending_head; | ||
1646 | |||
1647 | GNUNET_CONTAINER_DLL_remove (session->approval_pending_head, session->approval_pending_tail, pending); | ||
1648 | |||
1649 | msg = (struct GNUNET_CONSENSUS_AckMessage *) message; | ||
1650 | |||
1651 | if (msg->keep) | ||
1652 | { | ||
1653 | |||
1654 | element = pending->element; | ||
1655 | |||
1656 | GNUNET_CRYPTO_hash (element, element->size, &key); | ||
1657 | |||
1658 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, | ||
1659 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
1660 | |||
1661 | strata_insert (session->strata, &key); | ||
1662 | } | ||
1663 | |||
1664 | /* FIXME: also remove element from strata */ | ||
1665 | |||
1666 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||
1324 | } | 1667 | } |
1325 | 1668 | ||
1326 | /** | 1669 | /** |
@@ -1371,10 +1714,41 @@ static void | |||
1371 | shutdown_task (void *cls, | 1714 | shutdown_task (void *cls, |
1372 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 1715 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
1373 | { | 1716 | { |
1717 | |||
1718 | /* FIXME: complete; write separate destructors for different data types */ | ||
1719 | |||
1720 | while (NULL != incoming_sockets_head) | ||
1721 | { | ||
1722 | struct IncomingSocket *socket; | ||
1723 | socket = incoming_sockets_head; | ||
1724 | if (NULL == socket->cpi) | ||
1725 | { | ||
1726 | GNUNET_STREAM_close (socket->socket); | ||
1727 | } | ||
1728 | incoming_sockets_head = incoming_sockets_head->next; | ||
1729 | GNUNET_free (socket); | ||
1730 | } | ||
1731 | |||
1374 | while (NULL != sessions_head) | 1732 | while (NULL != sessions_head) |
1375 | { | 1733 | { |
1376 | struct ConsensusSession *session; | 1734 | struct ConsensusSession *session; |
1735 | int i; | ||
1736 | |||
1377 | session = sessions_head; | 1737 | session = sessions_head; |
1738 | |||
1739 | for (i = 0; session->num_peers; i++) | ||
1740 | { | ||
1741 | struct ConsensusPeerInformation *cpi; | ||
1742 | cpi = &session->info[i]; | ||
1743 | if ((NULL != cpi) && (NULL != cpi->socket)) | ||
1744 | { | ||
1745 | GNUNET_STREAM_close (cpi->socket); | ||
1746 | } | ||
1747 | } | ||
1748 | |||
1749 | if (NULL != session->client) | ||
1750 | GNUNET_SERVER_client_disconnect (session->client); | ||
1751 | |||
1378 | sessions_head = sessions_head->next; | 1752 | sessions_head = sessions_head->next; |
1379 | GNUNET_free (session); | 1753 | GNUNET_free (session); |
1380 | } | 1754 | } |
@@ -1436,7 +1810,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU | |||
1436 | GNUNET_assert (NULL != core); | 1810 | GNUNET_assert (NULL != core); |
1437 | 1811 | ||
1438 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n"); | 1812 | GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n"); |
1439 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata per msg: %d\n", STRATA_PER_MESSAGE); | ||
1440 | } | 1813 | } |
1441 | 1814 | ||
1442 | 1815 | ||
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c index 2d06fc29b..629fde3fc 100644 --- a/src/consensus/ibf.c +++ b/src/consensus/ibf.c | |||
@@ -111,8 +111,6 @@ ibf_insert_on_side (struct InvertibleBloomFilter *ibf, | |||
111 | 111 | ||
112 | ibf->count[bucket] += side; | 112 | ibf->count[bucket] += side; |
113 | 113 | ||
114 | GNUNET_log_from(GNUNET_ERROR_TYPE_INFO, "ibf", "inserting in bucket %d \n", bucket); | ||
115 | |||
116 | GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket], | 114 | GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket], |
117 | &ibf->id_sum[bucket]); | 115 | &ibf->id_sum[bucket]); |
118 | GNUNET_CRYPTO_hash_xor (&key_hash, &ibf->hash_sum[bucket], | 116 | GNUNET_CRYPTO_hash_xor (&key_hash, &ibf->hash_sum[bucket], |
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf index 01266c2a9..61c382f4c 100644 --- a/src/consensus/test_consensus.conf +++ b/src/consensus/test_consensus.conf | |||
@@ -5,6 +5,7 @@ HOSTNAME = localhost | |||
5 | HOME = $SERVICEHOME | 5 | HOME = $SERVICEHOME |
6 | BINARY = gnunet-service-consensus | 6 | BINARY = gnunet-service-consensus |
7 | #PREFIX = gdbserver :12345 | 7 | #PREFIX = gdbserver :12345 |
8 | PREFIX = valgrind | ||
8 | ACCEPT_FROM = 127.0.0.1; | 9 | ACCEPT_FROM = 127.0.0.1; |
9 | ACCEPT_FROM6 = ::1; | 10 | ACCEPT_FROM6 = ::1; |
10 | UNIXPATH = /tmp/gnunet-service-consensus.sock | 11 | UNIXPATH = /tmp/gnunet-service-consensus.sock |