aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-03-07 14:12:11 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-03-07 14:12:11 +0000
commit159e38f1ed94c6b44ca20bc2a78fd5cad7027fd0 (patch)
treee4e60aec526c9a0ffbe3dd69896d394587e8586a /src/consensus
parent1a17d075effa5fbc3b3521ab0d15b2d035599969 (diff)
downloadgnunet-159e38f1ed94c6b44ca20bc2a78fd5cad7027fd0.tar.gz
gnunet-159e38f1ed94c6b44ca20bc2a78fd5cad7027fd0.zip
consensus now implemented with primitive conclusion group selection
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/consensus_api.c13
-rw-r--r--src/consensus/consensus_protocol.h4
-rw-r--r--src/consensus/gnunet-consensus-ibf.c6
-rw-r--r--src/consensus/gnunet-consensus.c15
-rw-r--r--src/consensus/gnunet-service-consensus.c855
-rw-r--r--src/consensus/ibf.c213
-rw-r--r--src/consensus/ibf.h107
7 files changed, 919 insertions, 294 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 7ebb0a9d9..e970040e1 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -139,6 +139,12 @@ struct GNUNET_CONSENSUS_Handle
139 139
140 struct QueuedMessage *messages_head; 140 struct QueuedMessage *messages_head;
141 struct QueuedMessage *messages_tail; 141 struct QueuedMessage *messages_tail;
142
143 /**
144 * GNUNET_YES when currently in a section where destroy may not be
145 * called.
146 */
147 int may_not_destroy;
142}; 148};
143 149
144 150
@@ -279,7 +285,9 @@ handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus,
279 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) 285 struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg)
280{ 286{
281 GNUNET_assert (NULL != consensus->conclude_cb); 287 GNUNET_assert (NULL != consensus->conclude_cb);
288 consensus->may_not_destroy = GNUNET_YES;
282 consensus->conclude_cb (consensus->conclude_cls, NULL); 289 consensus->conclude_cb (consensus->conclude_cls, NULL);
290 consensus->may_not_destroy = GNUNET_NO;
283 consensus->conclude_cb = NULL; 291 consensus->conclude_cb = NULL;
284} 292}
285 293
@@ -523,6 +531,11 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus,
523void 531void
524GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus) 532GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus)
525{ 533{
534 if (GNUNET_YES == consensus->may_not_destroy)
535 {
536 LOG (GNUNET_ERROR_TYPE_ERROR, "destroy may not be called right now\n");
537 GNUNET_assert (0);
538 }
526 if (consensus->client != NULL) 539 if (consensus->client != NULL)
527 { 540 {
528 GNUNET_CLIENT_disconnect (consensus->client); 541 GNUNET_CLIENT_disconnect (consensus->client);
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h
index 0da959f8f..e8b2c8a34 100644
--- a/src/consensus/consensus_protocol.h
+++ b/src/consensus/consensus_protocol.h
@@ -39,6 +39,10 @@ struct StrataMessage
39{ 39{
40 struct GNUNET_MessageHeader header; 40 struct GNUNET_MessageHeader header;
41 /** 41 /**
42 * Number of elements the sender currently has.
43 */
44 uint16_t num_elements;
45 /**
42 * Number of strata in this estimator. 46 * Number of strata in this estimator.
43 */ 47 */
44 uint16_t num_strata; 48 uint16_t num_strata;
diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c
index a16ca3247..f4a233ece 100644
--- a/src/consensus/gnunet-consensus-ibf.c
+++ b/src/consensus/gnunet-consensus-ibf.c
@@ -56,7 +56,7 @@ static void
56register_hashcode (struct GNUNET_HashCode *hash) 56register_hashcode (struct GNUNET_HashCode *hash)
57{ 57{
58 struct GNUNET_HashCode replicated; 58 struct GNUNET_HashCode replicated;
59 uint64_t key; 59 struct IBF_Key key;
60 key = ibf_key_from_hashcode (hash); 60 key = ibf_key_from_hashcode (hash);
61 ibf_hashcode_from_key (key, &replicated); 61 ibf_hashcode_from_key (key, &replicated);
62 GNUNET_CONTAINER_multihashmap_put (key_to_hashcode, &replicated, GNUNET_memdup (hash, sizeof *hash), 62 GNUNET_CONTAINER_multihashmap_put (key_to_hashcode, &replicated, GNUNET_memdup (hash, sizeof *hash),
@@ -64,7 +64,7 @@ register_hashcode (struct GNUNET_HashCode *hash)
64} 64}
65 65
66static void 66static void
67iter_hashcodes (uint64_t key, GNUNET_CONTAINER_HashMapIterator iter, void *cls) 67iter_hashcodes (struct IBF_Key key, GNUNET_CONTAINER_HashMapIterator iter, void *cls)
68{ 68{
69 struct GNUNET_HashCode replicated; 69 struct GNUNET_HashCode replicated;
70 ibf_hashcode_from_key (key, &replicated); 70 ibf_hashcode_from_key (key, &replicated);
@@ -100,7 +100,7 @@ run (void *cls, char *const *args, const char *cfgfile,
100 const struct GNUNET_CONFIGURATION_Handle *cfg) 100 const struct GNUNET_CONFIGURATION_Handle *cfg)
101{ 101{
102 struct GNUNET_HashCode id; 102 struct GNUNET_HashCode id;
103 uint64_t ibf_key; 103 struct IBF_Key ibf_key;
104 int i; 104 int i;
105 int side; 105 int side;
106 int res; 106 int res;
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index a63575825..fd6019c20 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -65,6 +65,16 @@ controller_cb(void *cls,
65} 65}
66 66
67 67
68static void
69destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx)
70{
71 struct GNUNET_CONSENSUS_Handle *consensus;
72 consensus = cls;
73 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying consensus\n");
74 GNUNET_CONSENSUS_destroy (consensus);
75}
76
77
68/** 78/**
69 * Called when a conclusion was successful. 79 * Called when a conclusion was successful.
70 * 80 *
@@ -72,14 +82,13 @@ controller_cb(void *cls,
72 * @param group 82 * @param group
73 * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not 83 * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not
74 */ 84 */
75static int 85static void
76conclude_cb (void *cls, const struct GNUNET_CONSENSUS_Group *group) 86conclude_cb (void *cls, const struct GNUNET_CONSENSUS_Group *group)
77{ 87{
78 return GNUNET_NO; 88 GNUNET_SCHEDULER_add_now (destroy, cls);
79} 89}
80 90
81 91
82
83static void 92static void
84generate_indices (int *indices) 93generate_indices (int *indices)
85{ 94{
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index d223360dc..2f59b86bc 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -20,7 +20,7 @@
20 20
21/** 21/**
22 * @file consensus/gnunet-service-consensus.c 22 * @file consensus/gnunet-service-consensus.c
23 * @brief 23 * @brief multi-peer set reconciliation
24 * @author Florian Dold 24 * @author Florian Dold
25 */ 25 */
26 26
@@ -60,7 +60,7 @@
60 * Choose this value so that computing the IBF is still cheaper 60 * Choose this value so that computing the IBF is still cheaper
61 * than transmitting all values. 61 * than transmitting all values.
62 */ 62 */
63#define MAX_IBF_ORDER (32) 63#define MAX_IBF_ORDER (16)
64 64
65 65
66/* forward declarations */ 66/* forward declarations */
@@ -114,11 +114,20 @@ struct PendingElement
114 */ 114 */
115struct ConsensusPeerInformation 115struct ConsensusPeerInformation
116{ 116{
117 /**
118 * Socket for communicating with the peer, either created by the local peer,
119 * or the remote peer.
120 */
117 struct GNUNET_STREAM_Socket *socket; 121 struct GNUNET_STREAM_Socket *socket;
118 122
119 /** 123 /**
124 * Message tokenizer, for the data received from this peer via the stream socket.
125 */
126 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
127
128 /**
120 * Is socket's connection established, i.e. can we write to it? 129 * Is socket's connection established, i.e. can we write to it?
121 * Only relevent on outgoing cpi. 130 * Only relevent to outgoing cpi.
122 */ 131 */
123 int is_connected; 132 int is_connected;
124 133
@@ -150,15 +159,22 @@ struct ConsensusPeerInformation
150 struct GNUNET_STREAM_WriteHandle *wh; 159 struct GNUNET_STREAM_WriteHandle *wh;
151 160
152 enum { 161 enum {
153 IBF_STATE_NONE, 162 /* beginning of round */
163 IBF_STATE_NONE=0,
164 /* we currently receive an ibf */
154 IBF_STATE_RECEIVING, 165 IBF_STATE_RECEIVING,
166 /* we currently transmit an ibf */
155 IBF_STATE_TRANSMITTING, 167 IBF_STATE_TRANSMITTING,
156 IBF_STATE_DECODING 168 /* we decode a received ibf */
169 IBF_STATE_DECODING,
170 /* wait for elements and element requests */
171 IBF_STATE_ANTICIPATE_DIFF
157 } ibf_state ; 172 } ibf_state ;
158 173
159 /** 174 /**
160 * What is the order (=log2 size) of the ibf 175 * What is the order (=log2 size) of the ibf
161 * we're currently dealing with? 176 * we're currently dealing with?
177 * Interpretation depends on ibf_state.
162 */ 178 */
163 int ibf_order; 179 int ibf_order;
164 180
@@ -169,7 +185,8 @@ struct ConsensusPeerInformation
169 struct InvertibleBloomFilter *ibf; 185 struct InvertibleBloomFilter *ibf;
170 186
171 /** 187 /**
172 * How many buckets have we transmitted/received (depending on state)? 188 * How many buckets have we transmitted/received?
189 * Interpretatin depends on ibf_state
173 */ 190 */
174 int ibf_bucket_counter; 191 int ibf_bucket_counter;
175 192
@@ -180,11 +197,25 @@ struct ConsensusPeerInformation
180 struct InvertibleBloomFilter **strata; 197 struct InvertibleBloomFilter **strata;
181 198
182 /** 199 /**
183 * difference estimated with the current strata estimator 200 * Elements that the peer is missing from us.
184 */ 201 */
185 unsigned int diff; 202 uint64_t *missing_local;
186 203
187 struct GNUNET_SERVER_MessageStreamTokenizer *mst; 204 /**
205 * Number of elements in missing_local
206 */
207 unsigned int num_missing_local;
208
209 /**
210 * Elements that this peer told us *we* don't have,
211 * i.e. we are the remote peer that has some values missing.
212 */
213 uint64_t *missing_remote;
214
215 /**
216 * Number of elements in missing_local
217 */
218 unsigned int num_missing_remote;
188 219
189 /** 220 /**
190 * Back-reference to the consensus session, 221 * Back-reference to the consensus session,
@@ -192,10 +223,17 @@ struct ConsensusPeerInformation
192 */ 223 */
193 struct ConsensusSession *session; 224 struct ConsensusSession *session;
194 225
195 struct PendingElement *send_pending_head; 226 /**
196 struct PendingElement *send_pending_tail; 227 * When decoding the IBF, requests for elements and outgoing elements
228 * have to be queued, to ensure that messages actually fit in the stream buffer.
229 */
230 struct QueuedMessage *requests_and_elements_head;
231 struct QueuedMessage *requests_and_elements_tail;
197}; 232};
198 233
234/**
235 * A doubly linked list of messages.
236 */
199struct QueuedMessage 237struct QueuedMessage
200{ 238{
201 struct GNUNET_MessageHeader *msg; 239 struct GNUNET_MessageHeader *msg;
@@ -211,31 +249,40 @@ struct QueuedMessage
211 struct QueuedMessage *prev; 249 struct QueuedMessage *prev;
212}; 250};
213 251
252/**
253 * Describes the current round a consensus session is in.
254 */
214enum ConsensusRound 255enum ConsensusRound
215{ 256{
216 /** 257 /**
217 * distribution of information with the exponential scheme 258 * Not started the protocl yet
259 */
260 CONSENSUS_ROUND_BEGIN=0,
261 /**
262 * distribution of information with the exponential scheme.
218 */ 263 */
219 CONSENSUS_ROUND_EXP_EXCHANGE, 264 CONSENSUS_ROUND_EXP_EXCHANGE,
220 /** 265 /**
221 * All-to-all, exchange missing values 266 * All-to-all, exchange missing values.
222 */ 267 */
223 CONSENSUS_ROUND_A2A_EXCHANGE, 268 CONSENSUS_ROUND_A2A_EXCHANGE,
224 /** 269 /**
225 * All-to-all, check what values are missing, don't exchange anything 270 * All-to-all, check what values are missing, don't exchange anything.
226 */ 271 */
227 CONSENSUS_ROUND_A2A_INVENTORY 272 CONSENSUS_ROUND_A2A_INVENTORY,
228 273 /**
229 /* 274 * All-to-all round to exchange information for byzantine fault detection.
230 a round to exchange the information for fraud-detection 275 */
231 CONSENSUS_ROUNT_A2_INVENTORY_AGREEMENT 276 CONSENSUS_ROUND_A2A_INVENTORY_AGREEMENT,
232 */ 277 /**
278 * Rounds are over
279 */
280 CONSENSUS_ROUND_FINISH
233}; 281};
234 282
235 283
236/** 284/**
237 * A consensus session consists of one local client and the remote authorities. 285 * A consensus session consists of one local client and the remote authorities.
238 *
239 */ 286 */
240struct ConsensusSession 287struct ConsensusSession
241{ 288{
@@ -301,15 +348,15 @@ struct ConsensusSession
301 struct GNUNET_SERVER_TransmitHandle *th; 348 struct GNUNET_SERVER_TransmitHandle *th;
302 349
303 /** 350 /**
304 * Once conclude_requested is GNUNET_YES, the client may not 351 * Timeout for all rounds together, single rounds will schedule a timeout task
305 * insert any more values. 352 * with a fraction of the conclude timeout.
306 */ 353 */
307 int conclude_requested; 354 struct GNUNET_TIME_Relative conclude_timeout;
308 355
309 /** 356 /**
310 * Minimum number of peers to form a consensus group 357 * Timeout task identifier for the current round
311 */ 358 */
312 int conclude_group_min; 359 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
313 360
314 /** 361 /**
315 * Number of other peers in the consensus 362 * Number of other peers in the consensus
@@ -353,6 +400,7 @@ struct ConsensusSession
353/** 400/**
354 * Sockets from other peers who want to communicate with us. 401 * Sockets from other peers who want to communicate with us.
355 * It may not be known yet which consensus session they belong to. 402 * It may not be known yet which consensus session they belong to.
403 * Also, the session might not exist yet locally.
356 */ 404 */
357struct IncomingSocket 405struct IncomingSocket
358{ 406{
@@ -400,6 +448,7 @@ struct IncomingSocket
400 struct GNUNET_HashCode *requested_gid; 448 struct GNUNET_HashCode *requested_gid;
401}; 449};
402 450
451
403static struct IncomingSocket *incoming_sockets_head; 452static struct IncomingSocket *incoming_sockets_head;
404static struct IncomingSocket *incoming_sockets_tail; 453static struct IncomingSocket *incoming_sockets_tail;
405 454
@@ -440,7 +489,7 @@ static struct GNUNET_STREAM_ListenSocket *listener;
440 489
441 490
442/** 491/**
443 * Queue a message to be sent to the inhabiting client of a sessino 492 * Queue a message to be sent to the inhabiting client of a session.
444 * 493 *
445 * @param session session 494 * @param session session
446 * @param msg message we want to queue 495 * @param msg message we want to queue
@@ -465,7 +514,9 @@ get_cpi_index (struct ConsensusPeerInformation *cpi)
465} 514}
466 515
467/** 516/**
468 * Mark the peer as bad, free as state we don't need anymore. 517 * Mark the peer as bad, free state we don't need anymore.
518 *
519 * @param cpi consensus peer information of the bad peer
469 */ 520 */
470static void 521static void
471mark_peer_bad (struct ConsensusPeerInformation *cpi) 522mark_peer_bad (struct ConsensusPeerInformation *cpi)
@@ -479,6 +530,11 @@ mark_peer_bad (struct ConsensusPeerInformation *cpi)
479/** 530/**
480 * Estimate set difference with two strata estimators, 531 * Estimate set difference with two strata estimators,
481 * i.e. arrays of IBFs. 532 * i.e. arrays of IBFs.
533 * Does not not modify its arguments.
534 *
535 * @param strata1 first strata estimator
536 * @param strata2 second strata estimator
537 * @return the estimated difference
482 */ 538 */
483static int 539static int
484estimate_difference (struct InvertibleBloomFilter** strata1, 540estimate_difference (struct InvertibleBloomFilter** strata1,
@@ -490,9 +546,11 @@ estimate_difference (struct InvertibleBloomFilter** strata1,
490 for (i = STRATA_COUNT - 1; i >= 0; i--) 546 for (i = STRATA_COUNT - 1; i >= 0; i--)
491 { 547 {
492 struct InvertibleBloomFilter *diff; 548 struct InvertibleBloomFilter *diff;
549 /* number of keys decoded from the ibf */
493 int ibf_count; 550 int ibf_count;
494 int more; 551 int more;
495 ibf_count = 0; 552 ibf_count = 0;
553 /* FIXME: implement this without always allocating new IBFs */
496 diff = ibf_dup (strata1[i]); 554 diff = ibf_dup (strata1[i]);
497 ibf_subtract (diff, strata2[i]); 555 ibf_subtract (diff, strata2[i]);
498 for (;;) 556 for (;;)
@@ -537,22 +595,17 @@ session_stream_data_processor (void *cls,
537 int ret; 595 int ret;
538 596
539 GNUNET_assert (GNUNET_STREAM_OK == status); 597 GNUNET_assert (GNUNET_STREAM_OK == status);
540
541 cpi = cls; 598 cpi = cls;
542
543 GNUNET_assert (NULL != cpi->mst); 599 GNUNET_assert (NULL != cpi->mst);
544
545 ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES); 600 ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES);
546 if (GNUNET_SYSERR == ret) 601 if (GNUNET_SYSERR == ret)
547 { 602 {
548 /* FIXME: handle this correctly */ 603 /* FIXME: handle this correctly */
549 GNUNET_assert (0); 604 GNUNET_assert (0);
550 } 605 }
551
552 /* read again */ 606 /* read again */
553 cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL, 607 cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL,
554 &session_stream_data_processor, cpi); 608 &session_stream_data_processor, cpi);
555
556 /* we always read all data */ 609 /* we always read all data */
557 return size; 610 return size;
558} 611}
@@ -578,26 +631,62 @@ incoming_stream_data_processor (void *cls,
578 int ret; 631 int ret;
579 632
580 GNUNET_assert (GNUNET_STREAM_OK == status); 633 GNUNET_assert (GNUNET_STREAM_OK == status);
581
582 incoming = cls; 634 incoming = cls;
583
584 ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES); 635 ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES);
585 if (GNUNET_SYSERR == ret) 636 if (GNUNET_SYSERR == ret)
586 { 637 {
587 /* FIXME: handle this correctly */ 638 /* FIXME: handle this correctly */
588 GNUNET_assert (0); 639 GNUNET_assert (0);
589 } 640 }
590
591 /* read again */ 641 /* read again */
592 incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, 642 incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL,
593 &incoming_stream_data_processor, incoming); 643 &incoming_stream_data_processor, incoming);
594
595 /* we always read all data */ 644 /* we always read all data */
596 return size; 645 return size;
597} 646}
598 647
599 648
600/** 649/**
650 * Iterator over hash map entries.
651 * Queue elements to be sent to the peer in cls.
652 *
653 * @param cls closure
654 * @param key current key code
655 * @param value value in the hash map
656 * @return GNUNET_YES if we should continue to
657 * iterate,
658 * GNUNET_NO if not.
659 */
660static int
661send_element_iter (void *cls,
662 const struct GNUNET_HashCode *key,
663 void *value)
664{
665 struct ConsensusPeerInformation *cpi;
666 struct GNUNET_CONSENSUS_Element *element;
667 struct QueuedMessage *qm;
668 struct GNUNET_MessageHeader *element_msg;
669 size_t msize;
670 cpi = cls;
671 element = value;
672 msize = sizeof (struct GNUNET_MessageHeader) + element->size;
673 element_msg = GNUNET_malloc (msize);
674 element_msg->size = htons (msize);
675 if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round)
676 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
677 else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round)
678 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE);
679 else
680 GNUNET_assert (0);
681 GNUNET_assert (NULL != element->data);
682 memcpy (&element_msg[1], element->data, element->size);
683 qm = GNUNET_malloc (sizeof *qm);
684 qm->msg = element_msg;
685 GNUNET_CONTAINER_DLL_insert (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
686 return GNUNET_YES;
687}
688
689/**
601 * Iterator to insert values into an ibf. 690 * Iterator to insert values into an ibf.
602 * 691 *
603 * @param cls closure 692 * @param cls closure
@@ -618,6 +707,12 @@ ibf_values_iterator (void *cls,
618 return GNUNET_YES; 707 return GNUNET_YES;
619} 708}
620 709
710/**
711 * Create and populate an IBF for the specified peer,
712 * if it does not already exist.
713 *
714 * @param peer to create the ibf for
715 */
621static void 716static void
622prepare_ibf (struct ConsensusPeerInformation *cpi) 717prepare_ibf (struct ConsensusPeerInformation *cpi)
623{ 718{
@@ -630,6 +725,42 @@ prepare_ibf (struct ConsensusPeerInformation *cpi)
630 725
631 726
632/** 727/**
728 * Called when a remote peer wants to inform the local peer
729 * that the remote peer misses elements.
730 * Elements are not reconciled.
731 *
732 * @param cpi session
733 * @param msg message
734 */
735static int
736handle_p2p_missing_local (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
737{
738 uint64_t key;
739 key = *(uint64_t *) &msg[1];
740 GNUNET_array_append (cpi->missing_remote, cpi->num_missing_remote, key);
741 return GNUNET_OK;
742}
743
744
745/**
746 * Called when a remote peer wants to inform the local peer
747 * that the local peer misses elements.
748 * Elements are not reconciled.
749 *
750 * @param cpi session
751 * @param msg message
752 */
753static int
754handle_p2p_missing_remote (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg)
755{
756 uint64_t key;
757 key = *(uint64_t *) &msg[1];
758 GNUNET_array_append (cpi->missing_local, cpi->num_missing_local, key);
759 return GNUNET_OK;
760}
761
762
763/**
633 * Called when a peer sends us its strata estimator. 764 * Called when a peer sends us its strata estimator.
634 * In response, we sent out IBF of appropriate size back. 765 * In response, we sent out IBF of appropriate size back.
635 * 766 *
@@ -640,9 +771,9 @@ static int
640handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) 771handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
641{ 772{
642 int i; 773 int i;
643 uint64_t *key_src; 774 unsigned int diff;
644 uint32_t *hash_src; 775 void *buf;
645 uint8_t *count_src; 776 size_t size;
646 777
647 GNUNET_assert (GNUNET_NO == cpi->is_outgoing); 778 GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
648 779
@@ -653,47 +784,36 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
653 cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); 784 cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
654 } 785 }
655 786
656 /* for correct message alignment, copy bucket types seperately */ 787 size = ntohs (strata_msg->header.size);
657 key_src = (uint64_t *) &strata_msg[1]; 788 buf = (void *) &strata_msg[1];
658
659 for (i = 0; i < STRATA_COUNT; i++) 789 for (i = 0; i < STRATA_COUNT; i++)
660 { 790 {
661 memcpy (cpi->strata[i]->id_sum, key_src, STRATA_IBF_BUCKETS * sizeof *key_src); 791 int res;
662 key_src += STRATA_IBF_BUCKETS; 792 res = ibf_read (&buf, &size, cpi->strata[i]);
663 } 793 GNUNET_assert (GNUNET_OK == res);
664
665 hash_src = (uint32_t *) key_src;
666
667 for (i = 0; i < STRATA_COUNT; i++)
668 {
669 memcpy (cpi->strata[i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
670 hash_src += STRATA_IBF_BUCKETS;
671 } 794 }
672 795
673 count_src = (uint8_t *) hash_src; 796 diff = estimate_difference (cpi->session->strata, cpi->strata);
797 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", diff);
674 798
675 for (i = 0; i < STRATA_COUNT; i++) 799 if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) ||
800 (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round))
676 { 801 {
677 memcpy (cpi->strata[i]->count, count_src, STRATA_IBF_BUCKETS); 802 /* send IBF of the right size */
678 count_src += STRATA_IBF_BUCKETS; 803 cpi->ibf_order = 0;
804 while ((1 << cpi->ibf_order) < diff)
805 cpi->ibf_order++;
806 if (cpi->ibf_order > MAX_IBF_ORDER)
807 cpi->ibf_order = MAX_IBF_ORDER;
808 cpi->ibf_order += 1;
809 /* create ibf if not already pre-computed */
810 prepare_ibf (cpi);
811 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
812 cpi->ibf_state = IBF_STATE_TRANSMITTING;
813 cpi->ibf_bucket_counter = 0;
814 write_ibf (cpi, GNUNET_STREAM_OK, 0);
679 } 815 }
680 816
681 cpi->diff = estimate_difference (cpi->session->strata, cpi->strata);
682 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff);
683
684 /* send IBF of the right size */
685 cpi->ibf_order = 0;
686 while ((1 << cpi->ibf_order) < cpi->diff)
687 cpi->ibf_order++;
688 if (cpi->ibf_order > MAX_IBF_ORDER)
689 cpi->ibf_order = MAX_IBF_ORDER;
690 cpi->ibf_order += 2;
691 /* create ibf if not already pre-computed */
692 prepare_ibf (cpi);
693 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
694 cpi->ibf_state = IBF_STATE_TRANSMITTING;
695 write_ibf (cpi, GNUNET_STREAM_OK, 0);
696
697 return GNUNET_YES; 817 return GNUNET_YES;
698} 818}
699 819
@@ -702,52 +822,62 @@ static int
702handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) 822handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
703{ 823{
704 int num_buckets; 824 int num_buckets;
705 uint64_t *key_src; 825 void *buf;
706 uint32_t *hash_src;
707 uint8_t *count_src;
708 826
709 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; 827 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
710 828 switch (cpi->ibf_state)
711 if (IBF_STATE_NONE == cpi->ibf_state)
712 { 829 {
713 cpi->ibf_state = IBF_STATE_RECEIVING; 830 case IBF_STATE_NONE:
714 cpi->ibf_order = digest->order; 831 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving first ibf of order %d\n", digest->order);
715 cpi->ibf_bucket_counter = 0; 832 cpi->ibf_state = IBF_STATE_RECEIVING;
833 cpi->ibf_order = digest->order;
834 cpi->ibf_bucket_counter = 0;
835 if (NULL != cpi->ibf)
836 {
837 GNUNET_free (cpi->ibf);
838 cpi->ibf = NULL;
839 }
840 break;
841 case IBF_STATE_ANTICIPATE_DIFF:
842 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving decode fail ibf of order %d\n", digest->order);
843 cpi->ibf_state = IBF_STATE_RECEIVING;
844 cpi->ibf_order = digest->order;
845 cpi->ibf_bucket_counter = 0;
846 if (NULL != cpi->ibf)
847 {
848 ibf_destroy (cpi->ibf);
849 cpi->ibf = NULL;
850 }
851 break;
852 case IBF_STATE_RECEIVING:
853 break;
854 default:
855 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received ibf unexpectedly in state %d\n", cpi->ibf_state);
856 mark_peer_bad (cpi);
857 return GNUNET_NO;
716 } 858 }
717 859
718 if ( (IBF_STATE_RECEIVING != cpi->ibf_state) || 860 if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
719 (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) )
720 { 861 {
862 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n");
721 mark_peer_bad (cpi); 863 mark_peer_bad (cpi);
722 return GNUNET_NO; 864 return GNUNET_NO;
723 } 865 }
724 866
725 867 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets,
726 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->ibf_bucket_counter, (1 << cpi->ibf_order)); 868 cpi->ibf_bucket_counter, (1 << cpi->ibf_order));
727 869
728 if (NULL == cpi->ibf) 870 if (NULL == cpi->ibf)
729 cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); 871 cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
730 872
731 key_src = (uint64_t *) &digest[1]; 873 buf = (void *) &digest[1];
732 874 ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf);
733 memcpy (cpi->ibf->hash_sum, key_src, num_buckets * sizeof *key_src);
734 hash_src += num_buckets;
735
736 hash_src = (uint32_t *) key_src;
737
738 memcpy (cpi->ibf->id_sum, hash_src, num_buckets * sizeof *hash_src);
739 hash_src += num_buckets;
740
741 count_src = (uint8_t *) hash_src;
742
743 memcpy (cpi->ibf->count, count_src, num_buckets * sizeof *count_src);
744 875
745 cpi->ibf_bucket_counter += num_buckets; 876 cpi->ibf_bucket_counter += num_buckets;
746 877
747 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) 878 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
748 { 879 {
749 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); 880 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n");
750 GNUNET_assert (NULL != cpi->wh);
751 cpi->ibf_state = IBF_STATE_DECODING; 881 cpi->ibf_state = IBF_STATE_DECODING;
752 prepare_ibf (cpi); 882 prepare_ibf (cpi);
753 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); 883 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
@@ -794,15 +924,62 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me
794 924
795 925
796/** 926/**
927 * Functions of this signature are called whenever writing operations
928 * on a stream are executed
929 *
930 * @param cls the closure from GNUNET_STREAM_write
931 * @param status the status of the stream at the time this function is called;
932 * GNUNET_STREAM_OK if writing to stream was completed successfully;
933 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
934 * (this doesn't mean that the data is never sent, the receiver may
935 * have read the data but its ACKs may have been lost);
936 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
937 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
938 * be processed.
939 * @param size the number of bytes written
940 */
941static void
942write_requested_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
943{
944 struct ConsensusPeerInformation *cpi;
945 cpi = cls;
946 GNUNET_assert (NULL == cpi->wh);
947 cpi->wh = NULL;
948 if (NULL != cpi->requests_and_elements_head)
949 {
950 struct QueuedMessage *qm;
951 qm = cpi->requests_and_elements_head;
952 GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
953
954 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
955 GNUNET_TIME_UNIT_FOREVER_REL,
956 write_requested_elements, cpi);
957 GNUNET_assert (NULL != cpi->wh);
958 }
959}
960
961
962/**
797 * Handle a request for elements. 963 * Handle a request for elements.
798 * Only allowed in exchange-rounds. 964 * Only allowed in exchange-rounds.
799 *
800 * FIXME: implement
801 */ 965 */
802static int 966static int
803handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) 967handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
804{ 968{
805 /* FIXME: implement */ 969 struct GNUNET_HashCode *hashcode;
970 unsigned int num;
971
972 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request\n");
973 num = ntohs (msg->header.size) / sizeof (struct GNUNET_HashCode);
974 hashcode = (struct GNUNET_HashCode *) &msg[1];
975 while (num--)
976 {
977 GNUNET_assert (IBF_STATE_ANTICIPATE_DIFF == cpi->ibf_state);
978 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, hashcode, send_element_iter, cpi);
979 if (NULL == cpi->wh)
980 write_requested_elements (cpi, GNUNET_STREAM_OK, 0);
981 hashcode++;
982 }
806 return GNUNET_YES; 983 return GNUNET_YES;
807} 984}
808 985
@@ -831,6 +1008,12 @@ handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello
831 inc->cpi->mst = inc->mst; 1008 inc->cpi->mst = inc->mst;
832 inc->cpi->hello = GNUNET_YES; 1009 inc->cpi->hello = GNUNET_YES;
833 inc->cpi->socket = inc->socket; 1010 inc->cpi->socket = inc->socket;
1011
1012 if ( (CONSENSUS_ROUND_A2A_EXCHANGE == session->current_round) &&
1013 (GNUNET_YES == inc->cpi->is_outgoing))
1014 {
1015 write_strata (&session->info[idx], GNUNET_STREAM_OK, 0);
1016 }
834 return GNUNET_YES; 1017 return GNUNET_YES;
835 } 1018 }
836 session = session->next; 1019 session = session->next;
@@ -866,6 +1049,10 @@ mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader
866 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); 1049 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
867 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: 1050 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
868 return handle_p2p_element (cpi, message); 1051 return handle_p2p_element (cpi, message);
1052 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL:
1053 return handle_p2p_missing_local (cpi, message);
1054 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE:
1055 return handle_p2p_missing_remote (cpi, message);
869 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: 1056 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
870 return handle_p2p_element_request (cpi, (struct ElementRequest *) message); 1057 return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
871 default: 1058 default:
@@ -938,7 +1125,6 @@ listen_cb (void *cls,
938 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 1125 incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
939 &incoming_stream_data_processor, incoming); 1126 &incoming_stream_data_processor, incoming);
940 1127
941
942 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); 1128 incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming);
943 1129
944 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); 1130 GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming);
@@ -947,6 +1133,11 @@ listen_cb (void *cls,
947} 1133}
948 1134
949 1135
1136/**
1137 * Destroy a session, free all resources associated with it.
1138 *
1139 * @param session the session to destroy
1140 */
950static void 1141static void
951destroy_session (struct ConsensusSession *session) 1142destroy_session (struct ConsensusSession *session)
952{ 1143{
@@ -1013,10 +1204,7 @@ compute_global_id (const struct GNUNET_HashCode *local_id,
1013 1204
1014 1205
1015/** 1206/**
1016 * Function called to notify a client about the connection 1207 * Transmit a queued message to the session's client.
1017 * begin ready to queue more data. "buf" will be
1018 * NULL and "size" zero if the connection was closed for
1019 * writing in the meantime.
1020 * 1208 *
1021 * @param cls consensus session 1209 * @param cls consensus session
1022 * @param size number of bytes available in buf 1210 * @param size number of bytes available in buf
@@ -1034,7 +1222,6 @@ transmit_queued (void *cls, size_t size,
1034 session = cls; 1222 session = cls;
1035 session->th = NULL; 1223 session->th = NULL;
1036 1224
1037
1038 qmsg = session->client_messages_head; 1225 qmsg = session->client_messages_head;
1039 GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); 1226 GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg);
1040 GNUNET_assert (qmsg); 1227 GNUNET_assert (qmsg);
@@ -1060,7 +1247,7 @@ transmit_queued (void *cls, size_t size,
1060 1247
1061 1248
1062/** 1249/**
1063 * Schedule sending the next message (if there is any) to a client. 1250 * Schedule transmitting the next queued message (if any) to a client.
1064 * 1251 *
1065 * @param cli the client to send the next message to 1252 * @param cli the client to send the next message to
1066 */ 1253 */
@@ -1118,7 +1305,6 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess
1118} 1305}
1119 1306
1120 1307
1121
1122/** 1308/**
1123 * Called when stream has finishes writing the hello message 1309 * Called when stream has finishes writing the hello message
1124 */ 1310 */
@@ -1128,21 +1314,25 @@ hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1128 struct ConsensusPeerInformation *cpi; 1314 struct ConsensusPeerInformation *cpi;
1129 1315
1130 cpi = cls; 1316 cpi = cls;
1317 cpi->wh = NULL;
1131 cpi->hello = GNUNET_YES; 1318 cpi->hello = GNUNET_YES;
1132 1319
1133 GNUNET_assert (GNUNET_STREAM_OK == status); 1320 GNUNET_assert (GNUNET_STREAM_OK == status);
1134 1321
1135 if (cpi->session->conclude_requested) 1322 /* FIXME: other rounds */
1323
1324 if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) &&
1325 (GNUNET_YES == cpi->is_outgoing))
1136 { 1326 {
1137 write_strata (cpi, GNUNET_STREAM_OK, 0); 1327 write_strata (cpi, GNUNET_STREAM_OK, 0);
1138 } 1328 }
1139} 1329}
1140 1330
1141 1331
1142/** 1332/**
1143 * Functions of this type will be called when a stream is established 1333 * Called when we established a stream connection to another peer
1144 * 1334 *
1145 * @param cls the closure from GNUNET_STREAM_open 1335 * @param cls cpi of the peer we just connected to
1146 * @param socket socket to use to communicate with the other side (read/write) 1336 * @param socket socket to use to communicate with the other side (read/write)
1147 */ 1337 */
1148static void 1338static void
@@ -1151,9 +1341,9 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1151 struct ConsensusPeerInformation *cpi; 1341 struct ConsensusPeerInformation *cpi;
1152 struct ConsensusHello *hello; 1342 struct ConsensusHello *hello;
1153 1343
1154
1155 cpi = cls; 1344 cpi = cls;
1156 cpi->is_connected = GNUNET_YES; 1345 cpi->is_connected = GNUNET_YES;
1346 cpi->wh = NULL;
1157 1347
1158 hello = GNUNET_malloc (sizeof *hello); 1348 hello = GNUNET_malloc (sizeof *hello);
1159 hello->header.size = htons (sizeof *hello); 1349 hello->header.size = htons (sizeof *hello);
@@ -1165,7 +1355,6 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket)
1165 1355
1166 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, 1356 cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
1167 &session_stream_data_processor, cpi); 1357 &session_stream_data_processor, cpi);
1168
1169} 1358}
1170 1359
1171 1360
@@ -1182,7 +1371,7 @@ initialize_session_info (struct ConsensusSession *session)
1182 session->info[i].session = session; 1371 session->info[i].session = session;
1183 } 1372 }
1184 1373
1185 session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE; 1374 session->current_round = CONSENSUS_ROUND_BEGIN;
1186 1375
1187 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; 1376 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1188 i = (session->local_peer_idx + 1) % session->num_peers; 1377 i = (session->local_peer_idx + 1) % session->num_peers;
@@ -1267,6 +1456,47 @@ strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *ke
1267 1456
1268 1457
1269/** 1458/**
1459 * Add incoming peer connections to the session,
1460 * for peers who have connected to us before the local session has been established
1461 *
1462 * @param session ...
1463 */
1464static void
1465add_incoming_peers (struct ConsensusSession *session)
1466{
1467 struct IncomingSocket *inc;
1468 inc = incoming_sockets_head;
1469
1470 while (NULL != inc)
1471 {
1472 if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid))
1473 {
1474 int i;
1475 for (i = 0; i < session->num_peers; i++)
1476 {
1477 struct ConsensusPeerInformation *cpi;
1478 cpi = &session->info[i];
1479 if (0 == memcmp (inc->peer, &cpi->session->peers[i], sizeof (struct GNUNET_PeerIdentity)))
1480 {
1481 if (GNUNET_YES == cpi->is_outgoing)
1482 {
1483 /* FIXME: disconnect */
1484 continue;
1485 }
1486 cpi->socket = inc->socket;
1487 inc->cpi = cpi;
1488 inc->cpi->mst = inc->mst;
1489 inc->cpi->hello = GNUNET_YES;
1490 break;
1491 }
1492 }
1493 }
1494 inc = inc->next;
1495 }
1496}
1497
1498
1499/**
1270 * Initialize the session, continue receiving messages from the owning client 1500 * Initialize the session, continue receiving messages from the owning client
1271 * 1501 *
1272 * @param session the session to initialize 1502 * @param session the session to initialize
@@ -1311,7 +1541,7 @@ initialize_session (struct ConsensusSession *session)
1311 for (i = 0; i < STRATA_COUNT; i++) 1541 for (i = 0; i < STRATA_COUNT; i++)
1312 session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); 1542 session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
1313 1543
1314 session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *)); 1544 session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *));
1315 1545
1316 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); 1546 session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation));
1317 initialize_session_info (session); 1547 initialize_session_info (session);
@@ -1319,6 +1549,8 @@ initialize_session (struct ConsensusSession *session)
1319 GNUNET_free (session->join_msg); 1549 GNUNET_free (session->join_msg);
1320 session->join_msg = NULL; 1550 session->join_msg = NULL;
1321 1551
1552 add_incoming_peers (session);
1553
1322 GNUNET_SERVER_receive_done (session->client, GNUNET_OK); 1554 GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1323 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); 1555 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id));
1324} 1556}
@@ -1370,6 +1602,19 @@ client_join (void *cls,
1370 1602
1371 1603
1372/** 1604/**
1605 * Hash a block of data, producing a replicated ibf hash.
1606 */
1607static void
1608hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret)
1609{
1610 struct IBF_Key ibf_key;
1611 GNUNET_CRYPTO_hash (block, size, ret);
1612 ibf_key = ibf_key_from_hashcode (ret);
1613 ibf_hashcode_from_key (ibf_key, ret);
1614}
1615
1616
1617/**
1373 * Called when a client performs an insert operation. 1618 * Called when a client performs an insert operation.
1374 * 1619 *
1375 * @param cls (unused) 1620 * @param cls (unused)
@@ -1384,7 +1629,7 @@ client_insert (void *cls,
1384 struct ConsensusSession *session; 1629 struct ConsensusSession *session;
1385 struct GNUNET_CONSENSUS_ElementMessage *msg; 1630 struct GNUNET_CONSENSUS_ElementMessage *msg;
1386 struct GNUNET_CONSENSUS_Element *element; 1631 struct GNUNET_CONSENSUS_Element *element;
1387 struct GNUNET_HashCode key; 1632 struct GNUNET_HashCode hash;
1388 int element_size; 1633 int element_size;
1389 1634
1390 session = sessions_head; 1635 session = sessions_head;
@@ -1413,12 +1658,14 @@ client_insert (void *cls,
1413 1658
1414 GNUNET_assert (NULL != element->data); 1659 GNUNET_assert (NULL != element->data);
1415 1660
1416 GNUNET_CRYPTO_hash (element, element_size, &key); 1661 hash_for_ibf (element, element_size, &hash);
1417 1662
1418 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, 1663 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting with hash_for_ibf %s\n", GNUNET_h2s (&hash));
1664
1665 GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element,
1419 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 1666 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1420 1667
1421 strata_insert (session->strata, &key); 1668 strata_insert (session->strata, &hash);
1422 1669
1423 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1670 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1424 1671
@@ -1426,8 +1673,6 @@ client_insert (void *cls,
1426} 1673}
1427 1674
1428 1675
1429
1430
1431/** 1676/**
1432 * Functions of this signature are called whenever writing operations 1677 * Functions of this signature are called whenever writing operations
1433 * on a stream are executed 1678 * on a stream are executed
@@ -1446,10 +1691,14 @@ client_insert (void *cls,
1446static void 1691static void
1447write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) 1692write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1448{ 1693{
1694 struct ConsensusPeerInformation *cpi;
1695 cpi = cls;
1696 cpi->wh = NULL;
1449 GNUNET_assert (GNUNET_STREAM_OK == status); 1697 GNUNET_assert (GNUNET_STREAM_OK == status);
1450 /* just wait for the ibf */ 1698 /* just wait for the ibf */
1451} 1699}
1452 1700
1701
1453/** 1702/**
1454 * Functions of this signature are called whenever writing operations 1703 * Functions of this signature are called whenever writing operations
1455 * on a stream are executed 1704 * on a stream are executed
@@ -1470,11 +1719,9 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1470{ 1719{
1471 struct ConsensusPeerInformation *cpi; 1720 struct ConsensusPeerInformation *cpi;
1472 struct StrataMessage *strata_msg; 1721 struct StrataMessage *strata_msg;
1722 void *buf;
1473 size_t msize; 1723 size_t msize;
1474 int i; 1724 int i;
1475 uint64_t *key_dst;
1476 uint32_t *hash_dst;
1477 uint8_t *count_dst;
1478 1725
1479 cpi = cls; 1726 cpi = cls;
1480 cpi->wh = NULL; 1727 cpi->wh = NULL;
@@ -1491,36 +1738,30 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1491 strata_msg = GNUNET_malloc (msize); 1738 strata_msg = GNUNET_malloc (msize);
1492 strata_msg->header.size = htons (msize); 1739 strata_msg->header.size = htons (msize);
1493 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); 1740 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1494 1741
1495 /* for correct message alignment, copy bucket types seperately */ 1742 buf = &strata_msg[1];
1496 key_dst = (uint64_t *) &strata_msg[1];
1497
1498 for (i = 0; i < STRATA_COUNT; i++)
1499 {
1500 memcpy (key_dst, cpi->session->strata[i]->id_sum, STRATA_IBF_BUCKETS * sizeof *key_dst);
1501 key_dst += STRATA_IBF_BUCKETS;
1502 }
1503
1504 hash_dst = (uint32_t *) key_dst;
1505
1506 for (i = 0; i < STRATA_COUNT; i++)
1507 {
1508 memcpy (hash_dst, cpi->session->strata[i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1509 hash_dst += STRATA_IBF_BUCKETS;
1510 }
1511
1512 count_dst = (uint8_t *) hash_dst;
1513
1514 for (i = 0; i < STRATA_COUNT; i++) 1743 for (i = 0; i < STRATA_COUNT; i++)
1515 { 1744 {
1516 memcpy (count_dst, cpi->session->strata[i]->count, STRATA_IBF_BUCKETS); 1745 ibf_write (cpi->session->strata[i], &buf, NULL);
1517 count_dst += STRATA_IBF_BUCKETS;
1518 } 1746 }
1519 1747
1520 cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, 1748 cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1521 write_strata_done, cpi); 1749 write_strata_done, cpi);
1522 1750
1523 GNUNET_assert (NULL != cpi->wh); 1751 GNUNET_assert (NULL != cpi->wh);
1752
1753 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n");
1754}
1755
1756
1757static void
1758write_ibf_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1759{
1760 struct ConsensusPeerInformation *cpi;
1761 cpi = cls;
1762 cpi->wh = NULL;
1763 GNUNET_assert (GNUNET_STREAM_OK == status);
1764 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "write ibf done callback\n");
1524} 1765}
1525 1766
1526 1767
@@ -1545,24 +1786,17 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1545 struct ConsensusPeerInformation *cpi; 1786 struct ConsensusPeerInformation *cpi;
1546 struct DifferenceDigest *digest; 1787 struct DifferenceDigest *digest;
1547 int msize; 1788 int msize;
1548 uint64_t *key_dst;
1549 uint32_t *hash_dst;
1550 uint8_t *count_dst;
1551 int num_buckets; 1789 int num_buckets;
1790 void *buf;
1552 1791
1553 cpi = cls; 1792 cpi = cls;
1554 cpi->wh = NULL; 1793 cpi->wh = NULL;
1555 1794
1556 GNUNET_assert (GNUNET_STREAM_OK == status); 1795 GNUNET_assert (GNUNET_STREAM_OK == status);
1557
1558 GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state); 1796 GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state);
1559 1797
1560 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) 1798 /* we should not be done here! */
1561 { 1799 GNUNET_assert (cpi->ibf_bucket_counter != (1 << cpi->ibf_order));
1562 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n");
1563 /* we now wait for values / requests / another IBF because peer could not decode with our IBF */
1564 return;
1565 }
1566 1800
1567 /* remaining buckets */ 1801 /* remaining buckets */
1568 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; 1802 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
@@ -1580,24 +1814,23 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1580 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); 1814 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1581 digest->order = cpi->ibf_order; 1815 digest->order = cpi->ibf_order;
1582 1816
1583 key_dst = (uint64_t *) &digest[1]; 1817 buf = &digest[1];
1584 1818 ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL);
1585 memcpy (key_dst, cpi->ibf->id_sum, num_buckets * sizeof *key_dst);
1586 key_dst += num_buckets;
1587
1588 hash_dst = (uint32_t *) key_dst;
1589
1590 memcpy (hash_dst, cpi->ibf->id_sum, num_buckets * sizeof *hash_dst);
1591 hash_dst += num_buckets;
1592
1593 count_dst = (uint8_t *) hash_dst;
1594
1595 memcpy (count_dst, cpi->ibf->count, num_buckets * sizeof *count_dst);
1596 1819
1597 cpi->ibf_bucket_counter += num_buckets; 1820 cpi->ibf_bucket_counter += num_buckets;
1598 1821
1599 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, 1822 /* we have to set the new state here, because of non-deterministic schedulung */
1600 write_ibf, cpi); 1823 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
1824 {
1825 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n");
1826 /* we now wait for values / requests / another IBF because peer could not decode with our IBF */
1827 cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
1828 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf_done, cpi);
1829 }
1830 else
1831 {
1832 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf, cpi);
1833 }
1601 1834
1602 GNUNET_assert (NULL != cpi->wh); 1835 GNUNET_assert (NULL != cpi->wh);
1603} 1836}
@@ -1622,19 +1855,32 @@ static void
1622write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size) 1855write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1623{ 1856{
1624 struct ConsensusPeerInformation *cpi; 1857 struct ConsensusPeerInformation *cpi;
1625 uint64_t key; 1858 struct IBF_Key key;
1626 struct GNUNET_HashCode hashcode; 1859 struct GNUNET_HashCode hashcode;
1627 int side; 1860 int side;
1628 int msize;
1629 1861
1630 GNUNET_assert (GNUNET_STREAM_OK == status); 1862 GNUNET_assert (GNUNET_STREAM_OK == status);
1631 1863
1632 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n"); 1864 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n");
1633 1865
1634 cpi = cls; 1866 cpi = cls;
1867 GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state);
1635 cpi->wh = NULL; 1868 cpi->wh = NULL;
1636 1869
1637 GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state); 1870 if (NULL != cpi->requests_and_elements_head)
1871 {
1872 struct QueuedMessage *qm;
1873 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending queued element\n");
1874 qm = cpi->requests_and_elements_head;
1875 GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
1876
1877 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
1878 GNUNET_TIME_UNIT_FOREVER_REL,
1879 write_requests_and_elements, cpi);
1880 GNUNET_assert (NULL != cpi->wh);
1881 /* some elements / requests have queued up, we have to transmit them first */
1882 return;
1883 }
1638 1884
1639 for (;;) 1885 for (;;)
1640 { 1886 {
@@ -1642,10 +1888,13 @@ write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t
1642 res = ibf_decode (cpi->ibf, &side, &key); 1888 res = ibf_decode (cpi->ibf, &side, &key);
1643 if (GNUNET_SYSERR == res) 1889 if (GNUNET_SYSERR == res)
1644 { 1890 {
1891 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n");
1892 /* decoding failed, we tell the other peer by sending our ibf with a larger order */
1645 cpi->ibf_order++; 1893 cpi->ibf_order++;
1646 prepare_ibf (cpi); 1894 prepare_ibf (cpi);
1647 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); 1895 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1648 cpi->ibf_state = IBF_STATE_TRANSMITTING; 1896 cpi->ibf_state = IBF_STATE_TRANSMITTING;
1897 cpi->ibf_bucket_counter = 0;
1649 write_ibf (cls, status, size); 1898 write_ibf (cls, status, size);
1650 return; 1899 return;
1651 } 1900 }
@@ -1656,38 +1905,52 @@ write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t
1656 } 1905 }
1657 if (-1 == side) 1906 if (-1 == side)
1658 { 1907 {
1659 struct GNUNET_CONSENSUS_Element *element; 1908 /* we have the element, send it to the other peer */
1660 struct GNUNET_MessageHeader *element_msg; 1909 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element\n");
1661 ibf_hashcode_from_key (key, &hashcode); 1910 ibf_hashcode_from_key (key, &hashcode);
1662 /* FIXME: this only transmits one element stored with the key */ 1911 GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi);
1663 element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); 1912 /* send the first message, because we can! */
1664 if (NULL == element) 1913 if (NULL != cpi->requests_and_elements_head)
1665 continue; 1914 {
1666 msize = sizeof (struct GNUNET_MessageHeader) + element->size; 1915 struct QueuedMessage *qm;
1667 element_msg = GNUNET_malloc (msize); 1916 qm = cpi->requests_and_elements_head;
1668 element_msg->size = htons (msize); 1917 GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm);
1669 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); 1918 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing element\n");
1670 GNUNET_assert (NULL != element->data); 1919 cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size),
1671 memcpy (&element_msg[1], element->data, element->size); 1920 GNUNET_TIME_UNIT_FOREVER_REL,
1672 cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, 1921 write_requests_and_elements, cpi);
1673 write_requests_and_elements, cpi); 1922 GNUNET_assert (NULL != cpi->wh);
1674 GNUNET_free (element_msg); 1923 }
1675 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n"); 1924 else
1676 1925 {
1677 GNUNET_assert (NULL != cpi->wh); 1926 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "no element found for decoded hash %s\n", GNUNET_h2s (&hashcode));
1927 }
1678 return; 1928 return;
1679 } 1929 }
1680 else 1930 else
1681 { 1931 {
1682 struct ElementRequest *msg; 1932 struct ElementRequest *msg;
1683 size_t msize; 1933 size_t msize;
1684 uint64_t *p; 1934 struct IBF_Key *p;
1685 1935
1686 msize = (sizeof *msg) + sizeof (uint64_t); 1936 msize = (sizeof *msg) + sizeof (uint64_t);
1687 msg = GNUNET_malloc (msize); 1937 msg = GNUNET_malloc (msize);
1688 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); 1938 if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round)
1939 {
1940 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request for element\n");
1941 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1942 }
1943 else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round)
1944 {
1945 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending locally missing element\n");
1946 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL);
1947 }
1948 else
1949 {
1950 GNUNET_assert (0);
1951 }
1689 msg->header.size = htons (msize); 1952 msg->header.size = htons (msize);
1690 p = (uint64_t *) &msg[1]; 1953 p = (struct IBF_Key *) &msg[1];
1691 *p = key; 1954 *p = key;
1692 1955
1693 cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, 1956 cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1697,17 +1960,151 @@ write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t
1697 return; 1960 return;
1698 } 1961 }
1699 } 1962 }
1700
1701} 1963}
1702 1964
1703 1965
1966static double
1967compute_similarity (struct ConsensusSession *session, int p1, int p2)
1968{
1969 /* FIXME: simplistic dummy implementation, use real set union/intersecion */
1970 return (session->info[p1].num_missing_local + session->info[p2].num_missing_local) /
1971 ((double) (session->info[p1].num_missing_remote + session->info[p2].num_missing_remote + 1));
1972}
1973
1704 1974
1705/*
1706static void 1975static void
1707select_best_group (struct ConsensusSession *session) 1976select_fittest_group (struct ConsensusSession *session)
1708{ 1977{
1978 /* simplistic implementation: compute the similarity with the latest strata estimator,
1979 * rank the results once */
1980 struct GNUNET_PeerIdentity *group;
1981 double rating[session->num_peers];
1982 struct GNUNET_CONSENSUS_ConcludeDoneMessage *done_msg;
1983 size_t msize;
1984 int i;
1985 int j;
1986 /* number of peers in the consensus group */
1987 int k;
1988
1989 k = ceil(session->num_peers / 3.0) * 2;
1990 group = GNUNET_malloc (k * sizeof *group);
1991
1992 /* do strata subtraction */
1993 /* FIXME: we know the real sets, subtract them! */
1994 for (i = 0; i < session->num_peers; i++)
1995 {
1996 rating[i] = 0;
1997 for (j = 0; j < i; j++)
1998 {
1999 double sim;
2000 sim = compute_similarity (session, i, j);
2001 rating[i] += sim;
2002 rating[j] += sim;
2003 }
2004 }
2005 for (i = 0; i < k; i++)
2006 {
2007 int best_idx = 0;
2008 for (j = 1; j < session->num_peers; j++)
2009 if (rating[j] > rating[best_idx])
2010 best_idx = j;
2011 rating[best_idx] = -1;
2012 group[i] = session->peers[best_idx];
2013 }
2014
2015 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got group!\n");
2016
2017 msize = sizeof *done_msg + k * sizeof *group;
2018
2019 done_msg = GNUNET_malloc (msize);
2020 done_msg->header.size = htons (msize);
2021 done_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
2022 memcpy (&done_msg[1], group, k * sizeof *group);
2023
2024 queue_client_message (session, (struct GNUNET_MessageHeader *) done_msg);
2025 send_next (session);
2026}
2027
2028
2029/**
2030 * Select and kick off the next round, based on the current round.
2031 * @param cls the session
2032 * @param tc task context, for when this task is invoked by the scheduler,
2033 * NULL if invoked for another reason
2034 */
2035static void
2036round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2037{
2038 struct ConsensusSession *session;
2039 int i;
2040
2041 /* don't kick off next round if we're shutting down */
2042 if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
2043 return;
2044
2045 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n");
2046 session = cls;
2047
2048 if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
2049 {
2050 GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
2051 session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
2052 }
2053
2054 for (i = 0; i < session->num_peers; i++)
2055 {
2056 if ((NULL != session->info) && (NULL != session->info[i].wh))
2057 GNUNET_STREAM_write_cancel (session->info[i].wh);
2058 }
2059
2060 switch (session->current_round)
2061 {
2062 case CONSENSUS_ROUND_BEGIN:
2063 {
2064 session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE;
2065 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4),
2066 round_over, session);
2067 for (i = 0; i < session->num_peers; i++)
2068 {
2069 /* we can only talk to hello'ed peers */
2070 if ( (GNUNET_YES == session->info[i].is_outgoing) &&
2071 (GNUNET_YES == session->info[i].hello) )
2072 {
2073 /* kick off transmitting strata by calling the write continuation */
2074 write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
2075 }
2076 }
2077 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude started, timeout=%llu\n", session->conclude_timeout.rel_value);
2078 break;
2079 }
2080 case CONSENSUS_ROUND_A2A_EXCHANGE:
2081 {
2082 session->current_round = CONSENSUS_ROUND_A2A_INVENTORY;
2083 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "starting inventory round\n");
2084 session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4),
2085 round_over, session);
2086 for (i = 0; i < session->num_peers; i++)
2087 {
2088 session->info[i].ibf_state = IBF_STATE_NONE;
2089 if ( (GNUNET_YES == session->info[i].is_outgoing) &&
2090 (GNUNET_YES == session->info[i].hello) )
2091 {
2092 /* kick off transmitting strata by calling the write continuation */
2093 write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
2094 }
2095 }
2096 break;
2097 }
2098 case CONSENSUS_ROUND_A2A_INVENTORY:
2099 /* finally, we are done and select the most fitting group */
2100 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "protocol rounds done\n");
2101 session->current_round = CONSENSUS_ROUND_FINISH;
2102 select_fittest_group (session);
2103 break;
2104 default:
2105 GNUNET_assert (0);
2106 }
1709} 2107}
1710*/
1711 2108
1712 2109
1713/** 2110/**
@@ -1719,13 +2116,13 @@ select_best_group (struct ConsensusSession *session)
1719 */ 2116 */
1720static void 2117static void
1721client_conclude (void *cls, 2118client_conclude (void *cls,
1722 struct GNUNET_SERVER_Client *client, 2119 struct GNUNET_SERVER_Client *client,
1723 const struct GNUNET_MessageHeader *message) 2120 const struct GNUNET_MessageHeader *message)
1724{ 2121{
1725 struct ConsensusSession *session; 2122 struct ConsensusSession *session;
1726 int i; 2123 struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
1727 2124
1728 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); 2125 cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
1729 2126
1730 session = sessions_head; 2127 session = sessions_head;
1731 while ((session != NULL) && (session->client != client)) 2128 while ((session != NULL) && (session->client != client))
@@ -1738,25 +2135,19 @@ client_conclude (void *cls,
1738 return; 2135 return;
1739 } 2136 }
1740 2137
1741 if (GNUNET_YES == session->conclude_requested) 2138 if (CONSENSUS_ROUND_BEGIN != session->current_round)
1742 { 2139 {
1743 /* client requested conclude twice */ 2140 /* client requested conclude twice */
2141 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "unexpected round at conclude: %d\n", session->current_round);
1744 GNUNET_break (0); 2142 GNUNET_break (0);
1745 disconnect_client (client); 2143 disconnect_client (client);
1746 return; 2144 return;
1747 } 2145 }
1748 2146
1749 session->conclude_requested = GNUNET_YES; 2147 session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
1750 2148
1751 for (i = 0; i < session->num_peers; i++) 2149 /* the 'begin' round is over, start with the next, real round */
1752 { 2150 round_over (session, NULL);
1753 if ( (GNUNET_YES == session->info[i].is_outgoing) &&
1754 (GNUNET_YES == session->info[i].hello) )
1755 {
1756 /* kick off transmitting strata by calling the write continuation */
1757 write_strata (&session->info[i], GNUNET_STREAM_OK, 0);
1758 }
1759 }
1760 2151
1761 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2152 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1762 send_next (session); 2153 send_next (session);
@@ -1803,12 +2194,19 @@ client_ack (void *cls,
1803 2194
1804 if (msg->keep) 2195 if (msg->keep)
1805 { 2196 {
2197 int i;
1806 element = pending->element; 2198 element = pending->element;
1807 GNUNET_CRYPTO_hash (element, element->size, &key); 2199 hash_for_ibf (element, element->size, &key);
1808 2200
1809 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, 2201 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1810 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 2202 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1811 strata_insert (session->strata, &key); 2203 strata_insert (session->strata, &key);
2204
2205 for (i = 0; i <= MAX_IBF_ORDER; i++)
2206 {
2207 if (NULL != session->ibfs[i])
2208 ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&key));
2209 }
1812 } 2210 }
1813 2211
1814 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2212 GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1862,7 +2260,6 @@ static void
1862shutdown_task (void *cls, 2260shutdown_task (void *cls,
1863 const struct GNUNET_SCHEDULER_TaskContext *tc) 2261 const struct GNUNET_SCHEDULER_TaskContext *tc)
1864{ 2262{
1865
1866 /* FIXME: complete; write separate destructors for different data types */ 2263 /* FIXME: complete; write separate destructors for different data types */
1867 2264
1868 while (NULL != incoming_sockets_head) 2265 while (NULL != incoming_sockets_head)
@@ -1884,15 +2281,16 @@ shutdown_task (void *cls,
1884 2281
1885 session = sessions_head; 2282 session = sessions_head;
1886 2283
1887 for (i = 0; session->num_peers; i++) 2284 if (NULL != session->info)
1888 { 2285 for (i = 0; i < session->num_peers; i++)
1889 struct ConsensusPeerInformation *cpi;
1890 cpi = &session->info[i];
1891 if ((NULL != cpi) && (NULL != cpi->socket))
1892 { 2286 {
1893 GNUNET_STREAM_close (cpi->socket); 2287 struct ConsensusPeerInformation *cpi;
2288 cpi = &session->info[i];
2289 if ((NULL != cpi) && (NULL != cpi->socket))
2290 {
2291 GNUNET_STREAM_close (cpi->socket);
2292 }
1894 } 2293 }
1895 }
1896 2294
1897 if (NULL != session->client) 2295 if (NULL != session->client)
1898 GNUNET_SERVER_client_disconnect (session->client); 2296 GNUNET_SERVER_client_disconnect (session->client);
@@ -1952,7 +2350,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
1952 listen_cb, NULL, 2350 listen_cb, NULL,
1953 GNUNET_STREAM_OPTION_END); 2351 GNUNET_STREAM_OPTION_END);
1954 2352
1955
1956 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ 2353 /* we have to wait for the core_startup callback before proceeding with the consensus service startup */
1957 core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers); 2354 core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers);
1958 GNUNET_assert (NULL != core); 2355 GNUNET_assert (NULL != core);
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c
index fb3bbe7cd..d3218ff9b 100644
--- a/src/consensus/ibf.c
+++ b/src/consensus/ibf.c
@@ -18,30 +18,27 @@
18 Boston, MA 02111-1307, USA. 18 Boston, MA 02111-1307, USA.
19*/ 19*/
20 20
21
22/** 21/**
23 * @file consensus/ibf.c 22 * @file consensus/ibf.c
24 * @brief implementation of the invertible bloom filter 23 * @brief implementation of the invertible bloom filter
25 * @author Florian Dold 24 * @author Florian Dold
26 */ 25 */
27 26
28
29#include "ibf.h" 27#include "ibf.h"
30 28
31
32/** 29/**
33 * Create a key from a hashcode. 30 * Create a key from a hashcode.
34 * 31 *
35 * @param hash the hashcode 32 * @param hash the hashcode
36 * @return a key 33 * @return a key
37 */ 34 */
38uint64_t 35struct IBF_Key
39ibf_key_from_hashcode (const struct GNUNET_HashCode *hash) 36ibf_key_from_hashcode (const struct GNUNET_HashCode *hash)
40{ 37{
41 return GNUNET_ntohll (*(uint64_t *) hash); 38 /* FIXME: endianess */
39 return *(struct IBF_Key *) hash;
42} 40}
43 41
44
45/** 42/**
46 * Create a hashcode from a key, by replicating the key 43 * Create a hashcode from a key, by replicating the key
47 * until the hascode is filled 44 * until the hascode is filled
@@ -50,12 +47,13 @@ ibf_key_from_hashcode (const struct GNUNET_HashCode *hash)
50 * @param dst hashcode to store the result in 47 * @param dst hashcode to store the result in
51 */ 48 */
52void 49void
53ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst) 50ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst)
54{ 51{
55 uint64_t *p; 52 struct IBF_Key *p;
56 int i; 53 unsigned int i;
57 p = (uint64_t *) dst; 54 const unsigned int keys_per_hashcode = sizeof (struct GNUNET_HashCode) / sizeof (struct IBF_Key);
58 for (i = 0; i < 8; i++) 55 p = (struct IBF_Key *) dst;
56 for (i = 0; i < keys_per_hashcode; i++)
59 *p++ = key; 57 *p++ = key;
60} 58}
61 59
@@ -70,14 +68,16 @@ ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst)
70 * @return the newly created invertible bloom filter 68 * @return the newly created invertible bloom filter
71 */ 69 */
72struct InvertibleBloomFilter * 70struct InvertibleBloomFilter *
73ibf_create (uint32_t size, unsigned int hash_num, uint32_t salt) 71ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt)
74{ 72{
75 struct InvertibleBloomFilter *ibf; 73 struct InvertibleBloomFilter *ibf;
76 74
75 /* TODO: use malloc_large */
76
77 ibf = GNUNET_malloc (sizeof (struct InvertibleBloomFilter)); 77 ibf = GNUNET_malloc (sizeof (struct InvertibleBloomFilter));
78 ibf->count = GNUNET_malloc (size * sizeof (uint8_t)); 78 ibf->count = GNUNET_malloc (size * sizeof (uint8_t));
79 ibf->id_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode)); 79 ibf->key_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode));
80 ibf->hash_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode)); 80 ibf->key_hash_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode));
81 ibf->size = size; 81 ibf->size = size;
82 ibf->hash_num = hash_num; 82 ibf->hash_num = hash_num;
83 83
@@ -89,7 +89,7 @@ ibf_create (uint32_t size, unsigned int hash_num, uint32_t salt)
89 */ 89 */
90static inline void 90static inline void
91ibf_get_indices (const struct InvertibleBloomFilter *ibf, 91ibf_get_indices (const struct InvertibleBloomFilter *ibf,
92 uint64_t key, int *dst) 92 struct IBF_Key key, int *dst)
93{ 93{
94 struct GNUNET_HashCode bucket_indices; 94 struct GNUNET_HashCode bucket_indices;
95 unsigned int filled = 0; 95 unsigned int filled = 0;
@@ -113,20 +113,20 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf,
113 113
114static void 114static void
115ibf_insert_into (struct InvertibleBloomFilter *ibf, 115ibf_insert_into (struct InvertibleBloomFilter *ibf,
116 uint64_t key, 116 struct IBF_Key key,
117 const int *buckets, int side) 117 const int *buckets, int side)
118{ 118{
119 int i; 119 int i;
120 struct GNUNET_HashCode key_hash_sha; 120 struct GNUNET_HashCode key_hash_sha;
121 uint32_t key_hash; 121 struct IBF_KeyHash key_hash;
122 GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha); 122 GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha);
123 key_hash = key_hash_sha.bits[0]; 123 key_hash.key_hash_val = key_hash_sha.bits[0];
124 for (i = 0; i < ibf->hash_num; i++) 124 for (i = 0; i < ibf->hash_num; i++)
125 { 125 {
126 const int bucket = buckets[i]; 126 const int bucket = buckets[i];
127 ibf->count[bucket] += side; 127 ibf->count[bucket].count_val += side;
128 ibf->id_sum[bucket] ^= key; 128 ibf->key_sum[bucket].key_val ^= key.key_val;
129 ibf->hash_sum[bucket] ^= key_hash; 129 ibf->key_hash_sum[bucket].key_hash_val ^= key_hash.key_hash_val;
130 } 130 }
131} 131}
132 132
@@ -138,7 +138,7 @@ ibf_insert_into (struct InvertibleBloomFilter *ibf,
138 * @param id the element's hash code 138 * @param id the element's hash code
139 */ 139 */
140void 140void
141ibf_insert (struct InvertibleBloomFilter *ibf, uint64_t key) 141ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
142{ 142{
143 int buckets[ibf->hash_num]; 143 int buckets[ibf->hash_num];
144 ibf_get_indices (ibf, key, buckets); 144 ibf_get_indices (ibf, key, buckets);
@@ -154,11 +154,11 @@ ibf_is_empty (struct InvertibleBloomFilter *ibf)
154 int i; 154 int i;
155 for (i = 0; i < ibf->size; i++) 155 for (i = 0; i < ibf->size; i++)
156 { 156 {
157 if (0 != ibf->count[i]) 157 if (0 != ibf->count[i].count_val)
158 return GNUNET_NO; 158 return GNUNET_NO;
159 if (0 != ibf->hash_sum[i]) 159 if (0 != ibf->key_hash_sum[i].key_hash_val)
160 return GNUNET_NO; 160 return GNUNET_NO;
161 if (0 != ibf->id_sum[i]) 161 if (0 != ibf->key_sum[i].key_val)
162 return GNUNET_NO; 162 return GNUNET_NO;
163 } 163 }
164 return GNUNET_YES; 164 return GNUNET_YES;
@@ -179,9 +179,9 @@ ibf_is_empty (struct InvertibleBloomFilter *ibf)
179 */ 179 */
180int 180int
181ibf_decode (struct InvertibleBloomFilter *ibf, 181ibf_decode (struct InvertibleBloomFilter *ibf,
182 int *ret_side, uint64_t *ret_id) 182 int *ret_side, struct IBF_Key *ret_id)
183{ 183{
184 uint32_t hash; 184 struct IBF_KeyHash hash;
185 int i; 185 int i;
186 struct GNUNET_HashCode key_hash_sha; 186 struct GNUNET_HashCode key_hash_sha;
187 int buckets[ibf->hash_num]; 187 int buckets[ibf->hash_num];
@@ -194,20 +194,20 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
194 int hit; 194 int hit;
195 195
196 /* we can only decode from pure buckets */ 196 /* we can only decode from pure buckets */
197 if ((1 != ibf->count[i]) && (-1 != ibf->count[i])) 197 if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val))
198 continue; 198 continue;
199 199
200 GNUNET_CRYPTO_hash (&ibf->id_sum[i], sizeof (uint64_t), &key_hash_sha); 200 GNUNET_CRYPTO_hash (&ibf->key_sum[i], sizeof (struct IBF_Key), &key_hash_sha);
201 hash = key_hash_sha.bits[0]; 201 hash.key_hash_val = key_hash_sha.bits[0];
202 202
203 /* test if the hash matches the key */ 203 /* test if the hash matches the key */
204 if (hash != ibf->hash_sum[i]) 204 if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val)
205 continue; 205 continue;
206 206
207 /* test if key in bucket hits its own location, 207 /* test if key in bucket hits its own location,
208 * if not, the key hash was subject to collision */ 208 * if not, the key hash was subject to collision */
209 hit = GNUNET_NO; 209 hit = GNUNET_NO;
210 ibf_get_indices (ibf, ibf->id_sum[i], buckets); 210 ibf_get_indices (ibf, ibf->key_sum[i], buckets);
211 for (j = 0; j < ibf->hash_num; j++) 211 for (j = 0; j < ibf->hash_num; j++)
212 if (buckets[j] == i) 212 if (buckets[j] == i)
213 hit = GNUNET_YES; 213 hit = GNUNET_YES;
@@ -216,12 +216,12 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
216 continue; 216 continue;
217 217
218 if (NULL != ret_side) 218 if (NULL != ret_side)
219 *ret_side = ibf->count[i]; 219 *ret_side = ibf->count[i].count_val;
220 if (NULL != ret_id) 220 if (NULL != ret_id)
221 *ret_id = ibf->id_sum[i]; 221 *ret_id = ibf->key_sum[i];
222 222
223 /* insert on the opposite side, effectively removing the element */ 223 /* insert on the opposite side, effectively removing the element */
224 ibf_insert_into (ibf, ibf->id_sum[i], buckets, -ibf->count[i]); 224 ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val);
225 225
226 return GNUNET_YES; 226 return GNUNET_YES;
227 } 227 }
@@ -233,6 +233,128 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
233 233
234 234
235/** 235/**
236 * Write an ibf.
237 *
238 * @param ibf the ibf to write
239 * @param start with which bucket to start
240 * @param count how many buckets to write
241 * @param buf buffer to write the data to, will be updated to point to the
242 * first byte after the written data
243 * @param size pointer to the size of the buffer, will be updated, can be NULL
244 */
245void
246ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size)
247{
248 struct IBF_Key *key_dst;
249 struct IBF_KeyHash *key_hash_dst;
250 struct IBF_Count *count_dst;
251
252 /* update size and check for overflow */
253 if (NULL != size)
254 {
255 size_t old_size;
256 old_size = *size;
257 *size = *size - count * IBF_BUCKET_SIZE;
258 GNUNET_assert (*size < old_size);
259 }
260 /* copy keys */
261 key_dst = (struct IBF_Key *) *buf;
262 memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst);
263 key_dst += count;
264 /* copy key hashes */
265 key_hash_dst = (struct IBF_KeyHash *) key_dst;
266 memcpy (key_hash_dst, ibf->key_hash_sum + start, count * sizeof *key_hash_dst);
267 key_hash_dst += count;
268 /* copy counts */
269 count_dst = (struct IBF_Count *) key_hash_dst;
270 memcpy (count_dst, ibf->count + start, count * sizeof *count_dst);
271 count_dst += count;
272 /* returned buffer is at the end of written data*/
273 *buf = (void *) count_dst;
274}
275
276
277/**
278 * Read an ibf.
279 *
280 * @param buf pointer to the buffer to write to, will point to first
281 * byte after the written data
282 * @param size size of the buffer, will be updated
283 * @param start which bucket to start at
284 * @param count how many buckets to read
285 * @param dst ibf to write buckets to
286 * @return GNUNET_OK on success
287 */
288int
289ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
290{
291 struct IBF_Key *key_src;
292 struct IBF_KeyHash *key_hash_src;
293 struct IBF_Count *count_src;
294
295 /* update size and check for overflow */
296 if (NULL != size)
297 {
298 size_t old_size;
299 old_size = *size;
300 *size = *size - count * IBF_BUCKET_SIZE;
301 if (*size > old_size)
302 return GNUNET_SYSERR;
303 }
304 /* copy keys */
305 key_src = (struct IBF_Key *) *buf;
306 memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src);
307 key_src += count;
308 /* copy key hashes */
309 key_hash_src = (struct IBF_KeyHash *) key_src;
310 memcpy (ibf->key_hash_sum + start, key_hash_src, count * sizeof *key_hash_src);
311 key_hash_src += count;
312 /* copy counts */
313 count_src = (struct IBF_Count *) key_hash_src;
314 memcpy (ibf->count + start, count_src, count * sizeof *count_src);
315 count_src += count;
316 /* returned buffer is at the end of written data*/
317 *buf = (void *) count_src;
318 return GNUNET_OK;
319}
320
321
322/**
323 * Write an ibf.
324 *
325 * @param ibf the ibf to write
326 * @param start with which bucket to start
327 * @param count how many buckets to write
328 * @param buf buffer to write the data to, will be updated to point to the
329 * first byte after the written data
330 * @param size pointer to the size of the buffer, will be updated, can be NULL
331 */
332void
333ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size)
334{
335 ibf_write_slice (ibf, 0, ibf->size, buf, size);
336}
337
338
339/**
340 * Read an ibf.
341 *
342 * @param buf pointer to the buffer to write to, will point to first
343 * byte after the written data
344 * @param size size of the buffer, will be updated
345 * @param start which bucket to start at
346 * @param count how many buckets to read
347 * @param dst ibf to write buckets to
348 * @return GNUNET_OK on success
349 */
350int
351ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst)
352{
353 return ibf_read_slice (buf, size, 0, dst->size, dst);
354}
355
356
357/**
236 * Subtract ibf2 from ibf1, storing the result in ibf1. 358 * Subtract ibf2 from ibf1, storing the result in ibf1.
237 * The two IBF's must have the same parameters size and hash_num. 359 * The two IBF's must have the same parameters size and hash_num.
238 * 360 *
@@ -250,31 +372,33 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFi
250 372
251 for (i = 0; i < ibf1->size; i++) 373 for (i = 0; i < ibf1->size; i++)
252 { 374 {
253 ibf1->count[i] -= ibf2->count[i]; 375 ibf1->count[i].count_val -= ibf2->count[i].count_val;
254 ibf1->hash_sum[i] ^= ibf2->hash_sum[i]; 376 ibf1->key_hash_sum[i].key_hash_val ^= ibf2->key_hash_sum[i].key_hash_val;
255 ibf1->id_sum[i] ^= ibf2->id_sum[i]; 377 ibf1->key_sum[i].key_val ^= ibf2->key_sum[i].key_val;
256 } 378 }
257} 379}
258 380
381
259/** 382/**
260 * Create a copy of an IBF, the copy has to be destroyed properly. 383 * Create a copy of an IBF, the copy has to be destroyed properly.
261 * 384 *
262 * @param ibf the IBF to copy 385 * @param ibf the IBF to copy
263 */ 386 */
264struct InvertibleBloomFilter * 387struct InvertibleBloomFilter *
265ibf_dup (struct InvertibleBloomFilter *ibf) 388ibf_dup (const struct InvertibleBloomFilter *ibf)
266{ 389{
267 struct InvertibleBloomFilter *copy; 390 struct InvertibleBloomFilter *copy;
268 copy = GNUNET_malloc (sizeof *copy); 391 copy = GNUNET_malloc (sizeof *copy);
269 copy->hash_num = ibf->hash_num; 392 copy->hash_num = ibf->hash_num;
270 copy->salt = ibf->salt; 393 copy->salt = ibf->salt;
271 copy->size = ibf->size; 394 copy->size = ibf->size;
272 copy->hash_sum = GNUNET_memdup (ibf->hash_sum, ibf->size * sizeof (struct GNUNET_HashCode)); 395 copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size * sizeof (struct IBF_KeyHash));
273 copy->id_sum = GNUNET_memdup (ibf->id_sum, ibf->size * sizeof (struct GNUNET_HashCode)); 396 copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof (struct IBF_Key));
274 copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof (uint8_t)); 397 copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof (struct IBF_Count));
275 return copy; 398 return copy;
276} 399}
277 400
401
278/** 402/**
279 * Destroy all resources associated with the invertible bloom filter. 403 * Destroy all resources associated with the invertible bloom filter.
280 * No more ibf_*-functions may be called on ibf after calling destroy. 404 * No more ibf_*-functions may be called on ibf after calling destroy.
@@ -284,8 +408,9 @@ ibf_dup (struct InvertibleBloomFilter *ibf)
284void 408void
285ibf_destroy (struct InvertibleBloomFilter *ibf) 409ibf_destroy (struct InvertibleBloomFilter *ibf)
286{ 410{
287 GNUNET_free (ibf->hash_sum); 411 GNUNET_free (ibf->key_sum);
288 GNUNET_free (ibf->id_sum); 412 GNUNET_free (ibf->key_hash_sum);
289 GNUNET_free (ibf->count); 413 GNUNET_free (ibf->count);
290 GNUNET_free (ibf); 414 GNUNET_free (ibf);
291} 415}
416
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h
index 72345d3e1..cafe55c8d 100644
--- a/src/consensus/ibf.h
+++ b/src/consensus/ibf.h
@@ -39,11 +39,27 @@ extern "C"
39#endif 39#endif
40#endif 40#endif
41 41
42
43struct IBF_Key
44{
45 uint64_t key_val;
46};
47
48struct IBF_KeyHash
49{
50 uint32_t key_hash_val;
51};
52
53struct IBF_Count
54{
55 int8_t count_val;
56};
57
42/** 58/**
43 * Size of one ibf bucket in bytes 59 * Size of one ibf bucket in bytes
44 */ 60 */
45#define IBF_BUCKET_SIZE (8+4+1) 61#define IBF_BUCKET_SIZE (sizeof (struct IBF_Count) + sizeof (struct IBF_Key) + \
46 62 sizeof (struct IBF_KeyHash))
47 63
48/** 64/**
49 * Invertible bloom filter (IBF). 65 * Invertible bloom filter (IBF).
@@ -62,7 +78,7 @@ struct InvertibleBloomFilter
62 * In how many cells do we hash one element? 78 * In how many cells do we hash one element?
63 * Usually 4 or 3. 79 * Usually 4 or 3.
64 */ 80 */
65 unsigned int hash_num; 81 uint8_t hash_num;
66 82
67 /** 83 /**
68 * Salt for mingling hashes 84 * Salt for mingling hashes
@@ -70,30 +86,91 @@ struct InvertibleBloomFilter
70 uint32_t salt; 86 uint32_t salt;
71 87
72 /** 88 /**
73 * xor sums of the elements' hash codes, used to identify the elements. 89 * Xor sums of the elements' keys, used to identify the elements.
90 * Array of 'size' elements.
74 */ 91 */
75 uint64_t *id_sum; 92 struct IBF_Key *key_sum;
76 93
77 /** 94 /**
78 * xor sums of the "hash of the hash". 95 * Xor sums of the hashes of the keys of inserted elements.
96 * Array of 'size' elements.
79 */ 97 */
80 uint32_t *hash_sum; 98 struct IBF_KeyHash *key_hash_sum;
81 99
82 /** 100 /**
83 * How many times has a bucket been hit? 101 * How many times has a bucket been hit?
84 * Can be negative, as a result of IBF subtraction. 102 * Can be negative, as a result of IBF subtraction.
103 * Array of 'size' elements.
85 */ 104 */
86 int8_t *count; 105 struct IBF_Count *count;
87}; 106};
88 107
89 108
90/** 109/**
110 * Write an ibf.
111 *
112 * @param ibf the ibf to write
113 * @param start with which bucket to start
114 * @param count how many buckets to write
115 * @param buf buffer to write the data to, will be updated to point to the
116 * first byte after the written data
117 * @param size pointer to the size of the buffer, will be updated, can be NULL
118 */
119void
120ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size);
121
122
123/**
124 * Read an ibf.
125 *
126 * @param buf pointer to the buffer to write to, will point to first
127 * byte after the written data
128 * @param size size of the buffer, will be updated
129 * @param start which bucket to start at
130 * @param count how many buckets to read
131 * @param dst ibf to write buckets to
132 * @return GNUNET_OK on success
133 */
134int
135ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *dst);
136
137
138/**
139 * Write an ibf.
140 *
141 * @param ibf the ibf to write
142 * @param start with which bucket to start
143 * @param count how many buckets to write
144 * @param buf buffer to write the data to, will be updated to point to the
145 * first byte after the written data
146 * @param size pointer to the size of the buffer, will be updated, can be NULL
147 */
148void
149ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size);
150
151
152/**
153 * Read an ibf.
154 *
155 * @param buf pointer to the buffer to write to, will point to first
156 * byte after the written data
157 * @param size size of the buffer, will be updated
158 * @param start which bucket to start at
159 * @param count how many buckets to read
160 * @param dst ibf to write buckets to
161 * @return GNUNET_OK on success
162 */
163int
164ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst);
165
166
167/**
91 * Create a key from a hashcode. 168 * Create a key from a hashcode.
92 * 169 *
93 * @param hash the hashcode 170 * @param hash the hashcode
94 * @return a key 171 * @return a key
95 */ 172 */
96uint64_t 173struct IBF_Key
97ibf_key_from_hashcode (const struct GNUNET_HashCode *hash); 174ibf_key_from_hashcode (const struct GNUNET_HashCode *hash);
98 175
99 176
@@ -105,7 +182,7 @@ ibf_key_from_hashcode (const struct GNUNET_HashCode *hash);
105 * @param dst hashcode to store the result in 182 * @param dst hashcode to store the result in
106 */ 183 */
107void 184void
108ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst); 185ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst);
109 186
110 187
111/** 188/**
@@ -118,17 +195,17 @@ ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst);
118 * @return the newly created invertible bloom filter 195 * @return the newly created invertible bloom filter
119 */ 196 */
120struct InvertibleBloomFilter * 197struct InvertibleBloomFilter *
121ibf_create(uint32_t size, unsigned int hash_num, uint32_t salt); 198ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt);
122 199
123 200
124/** 201/**
125 * Insert an element into an IBF. 202 * Insert an element into an IBF.
126 * 203 *
127 * @param ibf the IBF 204 * @param ibf the IBF
128 * @param id the element's hash code 205 * @param key the element's hash code
129 */ 206 */
130void 207void
131ibf_insert (struct InvertibleBloomFilter *ibf, uint64_t id); 208ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key);
132 209
133 210
134/** 211/**
@@ -154,7 +231,7 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFi
154 * GNUNET_SYSERR if the decoding has faile 231 * GNUNET_SYSERR if the decoding has faile
155 */ 232 */
156int 233int
157ibf_decode (struct InvertibleBloomFilter *ibf, int *side, uint64_t *ret_id); 234ibf_decode (struct InvertibleBloomFilter *ibf, int *side, struct IBF_Key *ret_key);
158 235
159 236
160/** 237/**
@@ -163,7 +240,7 @@ ibf_decode (struct InvertibleBloomFilter *ibf, int *side, uint64_t *ret_id);
163 * @param ibf the IBF to copy 240 * @param ibf the IBF to copy
164 */ 241 */
165struct InvertibleBloomFilter * 242struct InvertibleBloomFilter *
166ibf_dup (struct InvertibleBloomFilter *ibf); 243ibf_dup (const struct InvertibleBloomFilter *ibf);
167 244
168/** 245/**
169 * Destroy all resources associated with the invertible bloom filter. 246 * Destroy all resources associated with the invertible bloom filter.