aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-01-24 02:55:31 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-01-24 02:55:31 +0000
commitf7eb3ed6bb391e9f87bcb3535bf04c4aeec2f7c1 (patch)
tree33a7d6f9fecf0c260a1b5b57403a9d13c6bbc859 /src/consensus
parentd6a27b576d197ac823e8494f351d43a78125a35f (diff)
downloadgnunet-f7eb3ed6bb391e9f87bcb3535bf04c4aeec2f7c1.tar.gz
gnunet-f7eb3ed6bb391e9f87bcb3535bf04c4aeec2f7c1.zip
implemented value exchange, various fixes
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/consensus_api.c32
-rw-r--r--src/consensus/consensus_protocol.h4
-rw-r--r--src/consensus/gnunet-consensus.c4
-rw-r--r--src/consensus/gnunet-service-consensus.c477
-rw-r--r--src/consensus/ibf.c2
-rw-r--r--src/consensus/test_consensus.conf1
6 files changed, 453 insertions, 67 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 5c0494254..ba0e69e48 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -176,7 +176,6 @@ transmit_queued (void *cls, size_t size,
176 176
177 qmsg = consensus->messages_head; 177 qmsg = consensus->messages_head;
178 GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); 178 GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg);
179 GNUNET_assert (qmsg);
180 179
181 if (NULL == buf) 180 if (NULL == buf)
182 { 181 {
@@ -196,8 +195,8 @@ transmit_queued (void *cls, size_t size,
196 { 195 {
197 qmsg->idc (qmsg->idc_cls, GNUNET_YES); 196 qmsg->idc (qmsg->idc_cls, GNUNET_YES);
198 } 197 }
199 GNUNET_free (qmsg->msg); 198
200 GNUNET_free (qmsg); 199 /* FIXME: free the messages */
201 200
202 send_next (consensus); 201 send_next (consensus);
203 202
@@ -218,7 +217,6 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus)
218 217
219 if (NULL != consensus->messages_head) 218 if (NULL != consensus->messages_head)
220 { 219 {
221 LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
222 consensus->th = 220 consensus->th =
223 GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size), 221 GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size),
224 GNUNET_TIME_UNIT_FOREVER_REL, 222 GNUNET_TIME_UNIT_FOREVER_REL,
@@ -226,6 +224,15 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus)
226 } 224 }
227} 225}
228 226
227static void
228queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg)
229{
230 struct QueuedMessage *qm;
231 qm = GNUNET_malloc (sizeof *qm);
232 qm->msg = msg;
233 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm);
234}
235
229 236
230/** 237/**
231 * Called when the server has sent is a new element 238 * Called when the server has sent is a new element
@@ -239,23 +246,24 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
239{ 246{
240 struct GNUNET_CONSENSUS_Element element; 247 struct GNUNET_CONSENSUS_Element element;
241 struct GNUNET_CONSENSUS_AckMessage *ack_msg; 248 struct GNUNET_CONSENSUS_AckMessage *ack_msg;
242 struct QueuedMessage *queued_msg;
243 int ret; 249 int ret;
244 250
251 LOG (GNUNET_ERROR_TYPE_INFO, "received new element\n");
252
245 element.type = msg->element_type; 253 element.type = msg->element_type;
246 element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); 254 element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
247 element.data = &msg[1]; 255 element.data = &msg[1];
248 256
249 ret = consensus->new_element_cb (consensus->new_element_cls, &element); 257 ret = consensus->new_element_cb (consensus->new_element_cls, &element);
250 258
251 queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct GNUNET_CONSENSUS_AckMessage)); 259 ack_msg = GNUNET_malloc (sizeof *ack_msg);
252 queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1]; 260 ack_msg->header.size = htons (sizeof *ack_msg);
253 261 ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
254 ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg;
255 ack_msg->keep = ret; 262 ack_msg->keep = ret;
256 263
257 GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, 264 queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg);
258 queued_msg); 265
266 send_next (consensus);
259} 267}
260 268
261 269
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h
index 105708ee9..c84aad263 100644
--- a/src/consensus/consensus_protocol.h
+++ b/src/consensus/consensus_protocol.h
@@ -49,13 +49,15 @@ struct StrataMessage
49 49
50struct DifferenceDigest 50struct DifferenceDigest
51{ 51{
52
53 struct GNUNET_MessageHeader header; 52 struct GNUNET_MessageHeader header;
53 uint8_t order;
54 uint8_t round;
54}; 55};
55 56
56struct Element 57struct Element
57{ 58{
58 struct GNUNET_MessageHeader header; 59 struct GNUNET_MessageHeader header;
60 struct GNUNET_HashCode hash;
59}; 61};
60 62
61struct ConsensusHello 63struct ConsensusHello
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index c8a5593f1..222ec3e9d 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -177,6 +177,7 @@ static int
177new_element_cb (void *cls, 177new_element_cb (void *cls,
178 struct GNUNET_CONSENSUS_Element *element) 178 struct GNUNET_CONSENSUS_Element *element)
179{ 179{
180 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n");
180 return GNUNET_YES; 181 return GNUNET_YES;
181} 182}
182 183
@@ -263,8 +264,11 @@ test_master (void *cls,
263 int i; 264 int i;
264 265
265 266
267 GNUNET_log_setup ("gnunet-consensus", "INFO", NULL);
268
266 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n"); 269 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n");
267 270
271
268 peers = started_peers; 272 peers = started_peers;
269 273
270 peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); 274 peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index ad0266954..e80bee331 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -55,12 +55,16 @@
55 */ 55 */
56#define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS)) 56#define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS))
57 57
58#define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
59
60#define MAX_IBF_ORDER (64)
58 61
59 62
60/* forward declarations */ 63/* forward declarations */
61 64
62struct ConsensusSession; 65struct ConsensusSession;
63struct IncomingSocket; 66struct IncomingSocket;
67struct ConsensusPeerInformation;
64 68
65static void 69static void
66send_next (struct ConsensusSession *session); 70send_next (struct ConsensusSession *session);
@@ -68,6 +72,12 @@ send_next (struct ConsensusSession *session);
68static void 72static void
69write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size); 73write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size);
70 74
75static void
76write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size);
77
78static void
79write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size);
80
71static int 81static int
72get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); 82get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
73 83
@@ -91,6 +101,9 @@ struct PendingElement
91 * The actual element 101 * The actual element
92 */ 102 */
93 struct GNUNET_CONSENSUS_Element *element; 103 struct GNUNET_CONSENSUS_Element *element;
104
105 /* peer this element is coming from */
106 struct ConsensusPeerInformation *cpi;
94}; 107};
95 108
96struct ConsensusPeerInformation 109struct ConsensusPeerInformation
@@ -130,13 +143,21 @@ struct ConsensusPeerInformation
130 */ 143 */
131 int strata_counter; 144 int strata_counter;
132 145
133 struct InvertibleBloomFilter *my_ibf; 146 int ibf_order;
147
148 struct InvertibleBloomFilter *outgoing_ibf;
134 149
135 int my_ibf_bucket_counter; 150 int outgoing_bucket_counter;
136 151
137 struct InvertibleBloomFilter *peer_ibf; 152 struct InvertibleBloomFilter *incoming_ibf;
138 153
139 int peer_ibf_bucket_counter; 154 int incoming_bucket_counter;
155
156 /**
157 * NULL or incoming_ibf - outgoing_ibf.
158 * Decoded values of side '1' are to be requested from the the peer.
159 */
160 struct InvertibleBloomFilter *diff_ibf;
140 161
141 /** 162 /**
142 * Strata estimator of the peer, NULL if our peer 163 * Strata estimator of the peer, NULL if our peer
@@ -144,6 +165,8 @@ struct ConsensusPeerInformation
144 */ 165 */
145 struct InvertibleBloomFilter **strata; 166 struct InvertibleBloomFilter **strata;
146 167
168 unsigned int diff;
169
147 struct GNUNET_SERVER_MessageStreamTokenizer *mst; 170 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
148 171
149 struct ConsensusSession *session; 172 struct ConsensusSession *session;
@@ -206,16 +229,6 @@ struct ConsensusSession
206 struct GNUNET_CONTAINER_MultiHashMap *values; 229 struct GNUNET_CONTAINER_MultiHashMap *values;
207 230
208 /** 231 /**
209 * Elements that have not been sent to the client yet.
210 */
211 struct PendingElement *transmit_pending_head;
212
213 /**
214 * Elements that have not been sent to the client yet.
215 */
216 struct PendingElement *transmit_pending_tail;
217
218 /**
219 * Elements that have not been approved (or rejected) by the client yet. 232 * Elements that have not been approved (or rejected) by the client yet.
220 */ 233 */
221 struct PendingElement *approval_pending_head; 234 struct PendingElement *approval_pending_head;
@@ -281,6 +294,8 @@ struct ConsensusSession
281 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; 294 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
282 295
283 struct InvertibleBloomFilter **strata; 296 struct InvertibleBloomFilter **strata;
297
298 struct InvertibleBloomFilter **ibfs;
284}; 299};
285 300
286 301
@@ -365,6 +380,16 @@ static struct GNUNET_CORE_Handle *core;
365static struct GNUNET_STREAM_ListenSocket *listener; 380static struct GNUNET_STREAM_ListenSocket *listener;
366 381
367 382
383static void
384queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg)
385{
386 struct QueuedMessage *qm;
387 qm = GNUNET_malloc (sizeof *qm);
388 qm->msg = msg;
389 GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm);
390}
391
392
368static int 393static int
369estimate_difference (struct InvertibleBloomFilter** strata1, 394estimate_difference (struct InvertibleBloomFilter** strata1,
370 struct InvertibleBloomFilter** strata2) 395 struct InvertibleBloomFilter** strata2)
@@ -400,6 +425,7 @@ estimate_difference (struct InvertibleBloomFilter** strata1,
400} 425}
401 426
402 427
428
403/** 429/**
404 * Functions of this signature are called whenever data is available from the 430 * Functions of this signature are called whenever data is available from the
405 * stream. 431 * stream.
@@ -412,7 +438,48 @@ estimate_difference (struct InvertibleBloomFilter** strata1,
412 * given to the next time the read processor is called). 438 * given to the next time the read processor is called).
413 */ 439 */
414static size_t 440static size_t
415stream_data_processor (void *cls, 441session_stream_data_processor (void *cls,
442 enum GNUNET_STREAM_Status status,
443 const void *data,
444 size_t size)
445{
446 struct ConsensusPeerInformation *cpi;
447 int ret;
448
449 GNUNET_assert (GNUNET_STREAM_OK == status);
450
451 cpi = cls;
452
453 GNUNET_assert (NULL != cpi->mst);
454
455 ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES);
456 if (GNUNET_SYSERR == ret)
457 {
458 /* FIXME: handle this correctly */
459 GNUNET_assert (0);
460 }
461
462 /* read again */
463 cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
464 &session_stream_data_processor, cpi);
465
466 /* we always read all data */
467 return size;
468}
469
470/**
471 * Functions of this signature are called whenever data is available from the
472 * stream.
473 *
474 * @param cls the closure from GNUNET_STREAM_read
475 * @param status the status of the stream at the time this function is called
476 * @param data traffic from the other side
477 * @param size the number of bytes available in data read; will be 0 on timeout
478 * @return number of bytes of processed from 'data' (any data remaining should be
479 * given to the next time the read processor is called).
480 */
481static size_t
482incoming_stream_data_processor (void *cls,
416 enum GNUNET_STREAM_Status status, 483 enum GNUNET_STREAM_Status status,
417 const void *data, 484 const void *data,
418 size_t size) 485 size_t size)
@@ -422,9 +489,9 @@ stream_data_processor (void *cls,
422 489
423 GNUNET_assert (GNUNET_STREAM_OK == status); 490 GNUNET_assert (GNUNET_STREAM_OK == status);
424 491
425 incoming = (struct IncomingSocket *) cls; 492 incoming = cls;
426 493
427 ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_NO); 494 ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES);
428 if (GNUNET_SYSERR == ret) 495 if (GNUNET_SYSERR == ret)
429 { 496 {
430 /* FIXME: handle this correctly */ 497 /* FIXME: handle this correctly */
@@ -433,12 +500,46 @@ stream_data_processor (void *cls,
433 500
434 /* read again */ 501 /* read again */
435 incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, 502 incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
436 &stream_data_processor, incoming); 503 &incoming_stream_data_processor, incoming);
437 504
438 /* we always read all data */ 505 /* we always read all data */
439 return size; 506 return size;
440} 507}
441 508
509
510/**
511 * Iterator over hash map entries.
512 *
513 * @param cls closure
514 * @param key current key code
515 * @param value value in the hash map
516 * @return GNUNET_YES if we should continue to
517 * iterate,
518 * GNUNET_NO if not.
519 */
520static int
521ibf_values_iterator (void *cls,
522 const struct GNUNET_HashCode *key,
523 void *value)
524{
525 struct ConsensusPeerInformation *cpi;
526 cpi = cls;
527 ibf_insert (cpi->session->ibfs[cpi->ibf_order], key);
528 return GNUNET_YES;
529}
530
531
532static void
533create_outgoing_ibf (struct ConsensusPeerInformation *cpi)
534{
535 if (NULL == cpi->session->ibfs[cpi->ibf_order])
536 {
537 cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
538 GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
539 }
540 cpi->outgoing_ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
541}
542
442static int 543static int
443handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) 544handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
444{ 545{
@@ -477,8 +578,6 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
477 578
478 for (i = 0; i < num_strata; i++) 579 for (i = 0; i < num_strata; i++)
479 { 580 {
480 uint8_t zero[STRATA_IBF_BUCKETS];
481 memset (zero, 0, STRATA_IBF_BUCKETS);
482 memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS); 581 memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS);
483 count_src += STRATA_IBF_BUCKETS; 582 count_src += STRATA_IBF_BUCKETS;
484 } 583 }
@@ -489,9 +588,17 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
489 588
490 if (STRATA_COUNT == cpi->strata_counter) 589 if (STRATA_COUNT == cpi->strata_counter)
491 { 590 {
492 int diff; 591
493 diff = estimate_difference (cpi->session->strata, cpi->strata); 592 cpi->diff = estimate_difference (cpi->session->strata, cpi->strata);
494 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "diff=%d\n", diff); 593 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff);
594 cpi->ibf_order = 0;
595 while ((1 << cpi->ibf_order) < cpi->diff)
596 cpi->ibf_order++;
597 if (cpi->ibf_order > MAX_IBF_ORDER)
598 cpi->ibf_order = MAX_IBF_ORDER;
599 cpi->ibf_order += 2;
600 create_outgoing_ibf (cpi);
601 write_ibf (cpi, GNUNET_STREAM_OK, 0);
495 } 602 }
496 603
497 return GNUNET_YES; 604 return GNUNET_YES;
@@ -499,15 +606,97 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
499 606
500 607
501static int 608static int
502handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *strata) 609handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
503{ 610{
611 struct GNUNET_HashCode *hash_src;
612 int num_buckets;
613 uint8_t *count_src;
614
615 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
616
617 if (cpi->is_outgoing == GNUNET_YES)
618 {
619 /* we receive the ibf as an initiator, thus we're interested in the order */
620 cpi->ibf_order = digest->order;
621 if ((0 == cpi->outgoing_bucket_counter) && (NULL == cpi->wh))
622 {
623 create_outgoing_ibf (cpi);
624 write_ibf (cpi, GNUNET_STREAM_OK, 0);
625 }
626 /* FIXME: ensure that orders do not differ each time */
627 }
628 else
629 {
630 /* FIXME: handle correctly */
631 GNUNET_assert (cpi->ibf_order == digest->order);
632 }
633
634 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->incoming_bucket_counter, (1 << cpi->ibf_order));
635
636 if (cpi->incoming_bucket_counter + num_buckets > (1 << cpi->ibf_order))
637 {
638 /* TODO: handle this */
639 GNUNET_assert (0);
640 }
641
642 if (NULL == cpi->incoming_ibf)
643 cpi->incoming_ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
644
645 hash_src = (struct GNUNET_HashCode *) &digest[1];
646
647 memcpy (cpi->incoming_ibf->hash_sum, hash_src, num_buckets * sizeof *hash_src);
648 hash_src += num_buckets;
649
650 memcpy (cpi->incoming_ibf->id_sum, hash_src, num_buckets * sizeof *hash_src);
651 hash_src += num_buckets;
652
653 count_src = (uint8_t *) hash_src;
654
655 memcpy (cpi->incoming_ibf->count, count_src, num_buckets * sizeof *count_src);
656
657 cpi->incoming_bucket_counter += num_buckets;
658
659 if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order))
660 {
661 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n");
662 if ((NULL == cpi->wh) && (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order)))
663 write_values (cpi, GNUNET_STREAM_OK, 0);
664 }
504 return GNUNET_YES; 665 return GNUNET_YES;
505} 666}
506 667
507 668
508static int 669static int
509handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct Element *strata) 670handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg)
510{ 671{
672 struct PendingElement *pending_element;
673 struct GNUNET_CONSENSUS_Element *element;
674 struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
675 size_t size;
676
677 size = ntohs (element_msg->size) - sizeof *element_msg;
678
679 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size);
680
681 element = GNUNET_malloc (size + sizeof *element);
682 element->size = size;
683 memcpy (&element[1], &element_msg[1], size);
684
685 pending_element = GNUNET_malloc (sizeof *pending_element);
686 pending_element->element = element;
687 GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, cpi->session->approval_pending_tail, pending_element);
688
689 client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg);
690 client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
691 client_element_msg->header.size = htons (size + sizeof *client_element_msg);
692 memcpy (&client_element_msg[1], &element[1], size);
693
694 queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg);
695
696 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n");
697
698 send_next (cpi->session);
699
511 return GNUNET_YES; 700 return GNUNET_YES;
512} 701}
513 702
@@ -556,16 +745,17 @@ static int
556mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) 745mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message)
557{ 746{
558 struct ConsensusPeerInformation *cpi; 747 struct ConsensusPeerInformation *cpi;
559 cpi = (struct ConsensusPeerInformation *) cls; 748 cpi = cls;
560 switch (ntohs( message->type)) 749 switch (ntohs (message->type))
561 { 750 {
562 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: 751 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE:
563 return handle_p2p_strata (cpi, (struct StrataMessage *) message); 752 return handle_p2p_strata (cpi, (struct StrataMessage *) message);
564 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST: 753 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST:
565 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); 754 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
566 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: 755 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
567 return handle_p2p_element (cpi, (struct Element *) message); 756 return handle_p2p_element (cpi, message);
568 default: 757 default:
758 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type));
569 /* FIXME: handle correctly */ 759 /* FIXME: handle correctly */
570 GNUNET_assert (0); 760 GNUNET_assert (0);
571 } 761 }
@@ -632,7 +822,7 @@ listen_cb (void *cls,
632 incoming->peer = GNUNET_memdup (initiator, sizeof *initiator); 822 incoming->peer = GNUNET_memdup (initiator, sizeof *initiator);
633 823
634 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 824 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
635 &stream_data_processor, incoming); 825 &incoming_stream_data_processor, incoming);
636 826
637 827
638 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); 828 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
@@ -727,7 +917,7 @@ transmit_queued (void *cls, size_t size,
727 struct QueuedMessage *qmsg; 917 struct QueuedMessage *qmsg;
728 size_t msg_size; 918 size_t msg_size;
729 919
730 session = (struct ConsensusSession *) cls; 920 session = cls;
731 session->th = NULL; 921 session->th = NULL;
732 922
733 923
@@ -773,7 +963,6 @@ send_next (struct ConsensusSession *session)
773 { 963 {
774 int msize; 964 int msize;
775 msize = ntohs (session->client_messages_head->msg->size); 965 msize = ntohs (session->client_messages_head->msg->size);
776 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n");
777 session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, 966 session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize,
778 GNUNET_TIME_UNIT_FOREVER_REL, 967 GNUNET_TIME_UNIT_FOREVER_REL,
779 &transmit_queued, session); 968 &transmit_queued, session);
@@ -821,13 +1010,11 @@ hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
821{ 1010{
822 struct ConsensusPeerInformation *cpi; 1011 struct ConsensusPeerInformation *cpi;
823 1012
824 cpi = (struct ConsensusPeerInformation *) cls; 1013 cpi = cls;
825 cpi->hello = GNUNET_YES; 1014 cpi->hello = GNUNET_YES;
826 1015
827 GNUNET_assert (GNUNET_STREAM_OK == status); 1016 GNUNET_assert (GNUNET_STREAM_OK == status);
828 1017
829 cpi = (struct ConsensusPeerInformation *) cls;
830
831 if (cpi->session->conclude_requested) 1018 if (cpi->session->conclude_requested)
832 { 1019 {
833 write_strata (cpi, GNUNET_STREAM_OK, 0); 1020 write_strata (cpi, GNUNET_STREAM_OK, 0);
@@ -848,7 +1035,7 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
848 struct ConsensusHello *hello; 1035 struct ConsensusHello *hello;
849 1036
850 1037
851 cpi = (struct ConsensusPeerInformation *) cls; 1038 cpi = cls;
852 cpi->is_connected = GNUNET_YES; 1039 cpi->is_connected = GNUNET_YES;
853 1040
854 hello = GNUNET_malloc (sizeof *hello); 1041 hello = GNUNET_malloc (sizeof *hello);
@@ -856,10 +1043,12 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
856 hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); 1043 hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO);
857 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); 1044 memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode));
858 1045
859
860 cpi->wh = 1046 cpi->wh =
861 GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); 1047 GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
862 1048
1049 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1050 &session_stream_data_processor, cpi);
1051
863} 1052}
864 1053
865 1054
@@ -874,18 +1063,19 @@ initialize_session_info (struct ConsensusSession *session)
874 /* initialize back-references, so consensus peer information can 1063 /* initialize back-references, so consensus peer information can
875 * be used as closure */ 1064 * be used as closure */
876 session->info[i].session = session; 1065 session->info[i].session = session;
877
878 } 1066 }
879 1067
880 last = (session->local_peer_idx + (session->num_peers / 2)) % session->num_peers; 1068 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
881 i = (session->local_peer_idx + 1) % session->num_peers; 1069 i = (session->local_peer_idx + 1) % session->num_peers;
882 while (i != last) 1070 while (i != last)
883 { 1071 {
884 session->info[i].is_outgoing = GNUNET_YES; 1072 session->info[i].is_outgoing = GNUNET_YES;
885 session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS, 1073 session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS,
886 open_cb, &session->info[i], GNUNET_STREAM_OPTION_END); 1074 open_cb, &session->info[i], GNUNET_STREAM_OPTION_END);
887 session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, session); 1075 session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[i]);
888 i = (i + 1) % session->num_peers; 1076 i = (i + 1) % session->num_peers;
1077
1078 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i);
889 } 1079 }
890 // tie-breaker for even number of peers 1080 // tie-breaker for even number of peers
891 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) 1081 if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
@@ -893,6 +1083,9 @@ initialize_session_info (struct ConsensusSession *session)
893 session->info[last].is_outgoing = GNUNET_YES; 1083 session->info[last].is_outgoing = GNUNET_YES;
894 session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS, 1084 session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS,
895 open_cb, &session->info[last], GNUNET_STREAM_OPTION_END); 1085 open_cb, &session->info[last], GNUNET_STREAM_OPTION_END);
1086 session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[last]);
1087
1088 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last);
896 } 1089 }
897} 1090}
898 1091
@@ -949,9 +1142,6 @@ strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *ke
949 v = key->bits[0]; 1142 v = key->bits[0];
950 /* count trailing '1'-bits of v */ 1143 /* count trailing '1'-bits of v */
951 for (i = 0; v & 1; v>>=1, i++); 1144 for (i = 0; v & 1; v>>=1, i++);
952
953 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata insert at %d\n", i);
954
955 ibf_insert (strata[i], key); 1145 ibf_insert (strata[i], key);
956} 1146}
957 1147
@@ -1001,8 +1191,9 @@ initialize_session (struct ConsensusSession *session)
1001 for (i = 0; i < STRATA_COUNT; i++) 1191 for (i = 0; i < STRATA_COUNT; i++)
1002 session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); 1192 session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
1003 1193
1004 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); 1194 session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *));
1005 1195
1196 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1006 initialize_session_info (session); 1197 initialize_session_info (session);
1007 1198
1008 GNUNET_free (session->join_msg); 1199 GNUNET_free (session->join_msg);
@@ -1053,11 +1244,9 @@ client_join (void *cls,
1053 if (NULL == my_peer) 1244 if (NULL == my_peer)
1054 { 1245 {
1055 GNUNET_SERVER_disable_receive_done_warning (client); 1246 GNUNET_SERVER_disable_receive_done_warning (client);
1056 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init delayed\n");
1057 return; 1247 return;
1058 } 1248 }
1059 1249
1060 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init now\n");
1061 initialize_session (session); 1250 initialize_session (session);
1062} 1251}
1063 1252
@@ -1097,7 +1286,7 @@ client_insert (void *cls,
1097 } 1286 }
1098 1287
1099 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; 1288 msg = (struct GNUNET_CONSENSUS_ElementMessage *) m;
1100 element_size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); 1289 element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage);
1101 1290
1102 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); 1291 element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size);
1103 1292
@@ -1146,7 +1335,8 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1146 uint8_t *count_dst; 1335 uint8_t *count_dst;
1147 int num_strata; 1336 int num_strata;
1148 1337
1149 cpi = (struct ConsensusPeerInformation *) cls; 1338 cpi = cls;
1339 cpi->wh = NULL;
1150 1340
1151 GNUNET_assert (GNUNET_YES == cpi->is_outgoing); 1341 GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
1152 1342
@@ -1156,6 +1346,7 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1156 if (STRATA_COUNT == cpi->strata_counter) 1346 if (STRATA_COUNT == cpi->strata_counter)
1157 { 1347 {
1158 /* strata have been written, wait for other side's IBF */ 1348 /* strata have been written, wait for other side's IBF */
1349 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n");
1159 return; 1350 return;
1160 } 1351 }
1161 1352
@@ -1223,8 +1414,57 @@ static void
1223write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) 1414write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1224{ 1415{
1225 struct ConsensusPeerInformation *cpi; 1416 struct ConsensusPeerInformation *cpi;
1417 struct DifferenceDigest *digest;
1418 int msize;
1419 struct GNUNET_HashCode *hash_dst;
1420 uint8_t *count_dst;
1421 int num_buckets;
1422
1423 cpi = cls;
1424 cpi->wh = NULL;
1425
1426 if (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order))
1427 {
1428 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n");
1429 if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order))
1430 write_values (cpi, GNUNET_STREAM_OK, 0);
1431 return;
1432 }
1433
1434 /* remaining buckets */
1435 num_buckets = (1 << cpi->ibf_order) - cpi->outgoing_bucket_counter;
1436
1437 /* limit to maximum */
1438 if (num_buckets > BUCKETS_PER_MESSAGE)
1439 num_buckets = BUCKETS_PER_MESSAGE;
1440
1441 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->outgoing_bucket_counter, (1<<cpi->ibf_order));
1442
1443 msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE);
1444
1445 digest = GNUNET_malloc (msize);
1446 digest->header.size = htons (msize);
1447 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1448 digest->order = cpi->ibf_order;
1449
1450 hash_dst = (struct GNUNET_HashCode *) &digest[1];
1451
1452 memcpy (hash_dst, cpi->outgoing_ibf->hash_sum, num_buckets * sizeof *hash_dst);
1453 hash_dst += num_buckets;
1226 1454
1227 cpi = (struct ConsensusPeerInformation *) cls; 1455 memcpy (hash_dst, cpi->outgoing_ibf->id_sum, num_buckets * sizeof *hash_dst);
1456 hash_dst += num_buckets;
1457
1458 count_dst = (uint8_t *) hash_dst;
1459
1460 memcpy (count_dst, cpi->outgoing_ibf->count, num_buckets * sizeof *count_dst);
1461
1462 cpi->outgoing_bucket_counter += num_buckets;
1463
1464 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1465 write_ibf, cpi);
1466
1467 GNUNET_assert (NULL != cpi->wh);
1228} 1468}
1229 1469
1230 1470
@@ -1247,8 +1487,71 @@ static void
1247write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) 1487write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1248{ 1488{
1249 struct ConsensusPeerInformation *cpi; 1489 struct ConsensusPeerInformation *cpi;
1490 struct GNUNET_HashCode key;
1491 struct GNUNET_CONSENSUS_Element *element;
1492 struct GNUNET_MessageHeader *element_msg;
1493 int side;
1494 int msize;
1495
1496 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n");
1497
1498 cpi = cls;
1499 cpi->wh = NULL;
1500
1501 if (NULL == cpi->diff_ibf)
1502 {
1503 GNUNET_assert (NULL != cpi->incoming_ibf);
1504 GNUNET_assert (NULL != cpi->outgoing_ibf);
1505 GNUNET_assert (cpi->outgoing_ibf->size == cpi->incoming_ibf->size);
1506 cpi->diff_ibf = ibf_dup (cpi->incoming_ibf);
1507 ibf_subtract (cpi->diff_ibf, cpi->outgoing_ibf);
1508 }
1509
1510 for (;;)
1511 {
1512 int res;
1513 res = ibf_decode (cpi->diff_ibf, &side, &key);
1514 if (GNUNET_SYSERR == res)
1515 {
1516 /* TODO: handle this correctly, request new ibf */
1517 GNUNET_break (0);
1518 return;
1519 }
1520 if (GNUNET_NO == res)
1521 {
1522 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n");
1523 return;
1524 }
1525 if (-1 == side)
1526 break;
1527 }
1528
1529 element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &key);
1530
1531 if (NULL == element)
1532 {
1533 /* FIXME: handle correctly */
1534 GNUNET_break (0);
1535 return;
1536 }
1537
1538 msize = sizeof (struct GNUNET_MessageHeader) + element->size;
1539
1540 element_msg = GNUNET_malloc (msize);
1541 element_msg->size = htons (msize);
1542 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
1543
1544 memcpy (&element_msg[1], element->data, element->size);
1545
1546 cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1547 write_values, cpi);
1250 1548
1251 cpi = (struct ConsensusPeerInformation *) cls; 1549 GNUNET_free (element_msg);
1550
1551
1552 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n");
1553
1554 GNUNET_assert (NULL != cpi->wh);
1252} 1555}
1253 1556
1254 1557
@@ -1301,7 +1604,6 @@ client_conclude (void *cls,
1301 write_strata (&session->info[i], GNUNET_STREAM_OK, 0); 1604 write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
1302 } 1605 }
1303 } 1606 }
1304
1305 1607
1306 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1608 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1307 send_next (session); 1609 send_next (session);
@@ -1320,7 +1622,48 @@ client_ack (void *cls,
1320 struct GNUNET_SERVER_Client *client, 1622 struct GNUNET_SERVER_Client *client,
1321 const struct GNUNET_MessageHeader *message) 1623 const struct GNUNET_MessageHeader *message)
1322{ 1624{
1323 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client ack received\n"); 1625 struct ConsensusSession *session;
1626 struct GNUNET_CONSENSUS_AckMessage *msg;
1627 struct PendingElement *pending;
1628 struct GNUNET_CONSENSUS_Element *element;
1629 struct GNUNET_HashCode key;
1630
1631 session = sessions_head;
1632 while (NULL != session)
1633 {
1634 if (session->client == client)
1635 break;
1636 }
1637
1638 if (NULL == session)
1639 {
1640 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n");
1641 GNUNET_SERVER_client_disconnect (client);
1642 return;
1643 }
1644
1645 pending = session->approval_pending_head;
1646
1647 GNUNET_CONTAINER_DLL_remove (session->approval_pending_head, session->approval_pending_tail, pending);
1648
1649 msg = (struct GNUNET_CONSENSUS_AckMessage *) message;
1650
1651 if (msg->keep)
1652 {
1653
1654 element = pending->element;
1655
1656 GNUNET_CRYPTO_hash (element, element->size, &key);
1657
1658 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1659 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1660
1661 strata_insert (session->strata, &key);
1662 }
1663
1664 /* FIXME: also remove element from strata */
1665
1666 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1324} 1667}
1325 1668
1326/** 1669/**
@@ -1371,10 +1714,41 @@ static void
1371shutdown_task (void *cls, 1714shutdown_task (void *cls,
1372 const struct GNUNET_SCHEDULER_TaskContext *tc) 1715 const struct GNUNET_SCHEDULER_TaskContext *tc)
1373{ 1716{
1717
1718 /* FIXME: complete; write separate destructors for different data types */
1719
1720 while (NULL != incoming_sockets_head)
1721 {
1722 struct IncomingSocket *socket;
1723 socket = incoming_sockets_head;
1724 if (NULL == socket->cpi)
1725 {
1726 GNUNET_STREAM_close (socket->socket);
1727 }
1728 incoming_sockets_head = incoming_sockets_head->next;
1729 GNUNET_free (socket);
1730 }
1731
1374 while (NULL != sessions_head) 1732 while (NULL != sessions_head)
1375 { 1733 {
1376 struct ConsensusSession *session; 1734 struct ConsensusSession *session;
1735 int i;
1736
1377 session = sessions_head; 1737 session = sessions_head;
1738
1739 for (i = 0; session->num_peers; i++)
1740 {
1741 struct ConsensusPeerInformation *cpi;
1742 cpi = &session->info[i];
1743 if ((NULL != cpi) && (NULL != cpi->socket))
1744 {
1745 GNUNET_STREAM_close (cpi->socket);
1746 }
1747 }
1748
1749 if (NULL != session->client)
1750 GNUNET_SERVER_client_disconnect (session->client);
1751
1378 sessions_head = sessions_head->next; 1752 sessions_head = sessions_head->next;
1379 GNUNET_free (session); 1753 GNUNET_free (session);
1380 } 1754 }
@@ -1436,7 +1810,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
1436 GNUNET_assert (NULL != core); 1810 GNUNET_assert (NULL != core);
1437 1811
1438 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n"); 1812 GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n");
1439 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata per msg: %d\n", STRATA_PER_MESSAGE);
1440} 1813}
1441 1814
1442 1815
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c
index 2d06fc29b..629fde3fc 100644
--- a/src/consensus/ibf.c
+++ b/src/consensus/ibf.c
@@ -111,8 +111,6 @@ ibf_insert_on_side (struct InvertibleBloomFilter *ibf,
111 111
112 ibf->count[bucket] += side; 112 ibf->count[bucket] += side;
113 113
114 GNUNET_log_from(GNUNET_ERROR_TYPE_INFO, "ibf", "inserting in bucket %d \n", bucket);
115
116 GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket], 114 GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket],
117 &ibf->id_sum[bucket]); 115 &ibf->id_sum[bucket]);
118 GNUNET_CRYPTO_hash_xor (&key_hash, &ibf->hash_sum[bucket], 116 GNUNET_CRYPTO_hash_xor (&key_hash, &ibf->hash_sum[bucket],
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf
index 01266c2a9..61c382f4c 100644
--- a/src/consensus/test_consensus.conf
+++ b/src/consensus/test_consensus.conf
@@ -5,6 +5,7 @@ HOSTNAME = localhost
5HOME = $SERVICEHOME 5HOME = $SERVICEHOME
6BINARY = gnunet-service-consensus 6BINARY = gnunet-service-consensus
7#PREFIX = gdbserver :12345 7#PREFIX = gdbserver :12345
8PREFIX = valgrind
8ACCEPT_FROM = 127.0.0.1; 9ACCEPT_FROM = 127.0.0.1;
9ACCEPT_FROM6 = ::1; 10ACCEPT_FROM6 = ::1;
10UNIXPATH = /tmp/gnunet-service-consensus.sock 11UNIXPATH = /tmp/gnunet-service-consensus.sock