aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-02-07 10:51:17 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-02-07 10:51:17 +0000
commit4dc3faf5e88b8ca602602aa28a6ff76c02d34848 (patch)
treed50989d9fe71816bba1fdcef690684daa4f7873b /src/consensus
parent5ceb669384bd36b3213f7f1e7dbaf31b913e06a0 (diff)
downloadgnunet-4dc3faf5e88b8ca602602aa28a6ff76c02d34848.tar.gz
gnunet-4dc3faf5e88b8ca602602aa28a6ff76c02d34848.zip
- improved ibf decoding algorithm
- strata estimator now fits in one message - work on consensus, not quite working yet
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/consensus_protocol.h7
-rw-r--r--src/consensus/gnunet-consensus-ibf.c65
-rw-r--r--src/consensus/gnunet-service-consensus.c546
-rw-r--r--src/consensus/ibf.c185
-rw-r--r--src/consensus/ibf.h39
5 files changed, 542 insertions, 300 deletions
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h
index c84aad263..0da959f8f 100644
--- a/src/consensus/consensus_protocol.h
+++ b/src/consensus/consensus_protocol.h
@@ -60,6 +60,13 @@ struct Element
60 struct GNUNET_HashCode hash; 60 struct GNUNET_HashCode hash;
61}; 61};
62 62
63
64struct ElementRequest
65{
66 struct GNUNET_MessageHeader header;
67 /* struct GNUNET_HashCode[] rest */
68};
69
63struct ConsensusHello 70struct ConsensusHello
64{ 71{
65 struct GNUNET_MessageHeader header; 72 struct GNUNET_MessageHeader header;
diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c
index c9bcc1ab0..a16ca3247 100644
--- a/src/consensus/gnunet-consensus-ibf.c
+++ b/src/consensus/gnunet-consensus-ibf.c
@@ -38,16 +38,39 @@ static unsigned int csize = 10;
38static unsigned int hash_num = 3; 38static unsigned int hash_num = 3;
39static unsigned int ibf_size = 80; 39static unsigned int ibf_size = 80;
40 40
41/* FIXME: add parameter for this */
42static enum GNUNET_CRYPTO_Quality random_quality = GNUNET_CRYPTO_QUALITY_WEAK;
41 43
42static struct GNUNET_CONTAINER_MultiHashMap *set_a; 44static struct GNUNET_CONTAINER_MultiHashMap *set_a;
43static struct GNUNET_CONTAINER_MultiHashMap *set_b; 45static struct GNUNET_CONTAINER_MultiHashMap *set_b;
44/* common elements in a and b */ 46/* common elements in a and b */
45static struct GNUNET_CONTAINER_MultiHashMap *set_c; 47static struct GNUNET_CONTAINER_MultiHashMap *set_c;
46 48
49static struct GNUNET_CONTAINER_MultiHashMap *key_to_hashcode;
50
47static struct InvertibleBloomFilter *ibf_a; 51static struct InvertibleBloomFilter *ibf_a;
48static struct InvertibleBloomFilter *ibf_b; 52static struct InvertibleBloomFilter *ibf_b;
49 53
50 54
55static void
56register_hashcode (struct GNUNET_HashCode *hash)
57{
58 struct GNUNET_HashCode replicated;
59 uint64_t key;
60 key = ibf_key_from_hashcode (hash);
61 ibf_hashcode_from_key (key, &replicated);
62 GNUNET_CONTAINER_multihashmap_put (key_to_hashcode, &replicated, GNUNET_memdup (hash, sizeof *hash),
63 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
64}
65
66static void
67iter_hashcodes (uint64_t key, GNUNET_CONTAINER_HashMapIterator iter, void *cls)
68{
69 struct GNUNET_HashCode replicated;
70 ibf_hashcode_from_key (key, &replicated);
71 GNUNET_CONTAINER_multihashmap_get_multiple (key_to_hashcode, &replicated, iter, cls);
72}
73
51 74
52static int 75static int
53insert_iterator (void *cls, 76insert_iterator (void *cls,
@@ -55,7 +78,19 @@ insert_iterator (void *cls,
55 void *value) 78 void *value)
56{ 79{
57 struct InvertibleBloomFilter *ibf = (struct InvertibleBloomFilter *) cls; 80 struct InvertibleBloomFilter *ibf = (struct InvertibleBloomFilter *) cls;
58 ibf_insert (ibf, key); 81 ibf_insert (ibf, ibf_key_from_hashcode (key));
82 return GNUNET_YES;
83}
84
85
86static int
87remove_iterator (void *cls,
88 const struct GNUNET_HashCode *key,
89 void *value)
90{
91 struct GNUNET_CONTAINER_MultiHashMap *hashmap = cls;
92 /* if remove fails, there just was a collision with another key */
93 (void) GNUNET_CONTAINER_multihashmap_remove (hashmap, value, NULL);
59 return GNUNET_YES; 94 return GNUNET_YES;
60} 95}
61 96
@@ -65,6 +100,7 @@ run (void *cls, char *const *args, const char *cfgfile,
65 const struct GNUNET_CONFIGURATION_Handle *cfg) 100 const struct GNUNET_CONFIGURATION_Handle *cfg)
66{ 101{
67 struct GNUNET_HashCode id; 102 struct GNUNET_HashCode id;
103 uint64_t ibf_key;
68 int i; 104 int i;
69 int side; 105 int side;
70 int res; 106 int res;
@@ -78,47 +114,57 @@ run (void *cls, char *const *args, const char *cfgfile,
78 set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize), 114 set_c = GNUNET_CONTAINER_multihashmap_create (((csize == 0) ? 1 : csize),
79 GNUNET_NO); 115 GNUNET_NO);
80 116
117 key_to_hashcode = GNUNET_CONTAINER_multihashmap_create (((asize+bsize+csize == 0) ? 1 : (asize+bsize+csize)),
118 GNUNET_NO);
119
81 printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n", 120 printf ("hash-num=%u, size=%u, #(A-B)=%u, #(B-A)=%u, #(A&B)=%u\n",
82 hash_num, ibf_size, asize, bsize, csize); 121 hash_num, ibf_size, asize, bsize, csize);
83 122
84 i = 0; 123 i = 0;
85 while (i < asize) 124 while (i < asize)
86 { 125 {
87 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &id); 126 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
88 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) 127 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
89 continue; 128 continue;
90 GNUNET_CONTAINER_multihashmap_put ( 129 GNUNET_CONTAINER_multihashmap_put (
91 set_a, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 130 set_a, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
131 register_hashcode (&id);
92 i++; 132 i++;
93 } 133 }
94 i = 0; 134 i = 0;
95 while (i < bsize) 135 while (i < bsize)
96 { 136 {
97 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &id); 137 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
98 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) 138 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
99 continue; 139 continue;
100 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id)) 140 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
101 continue; 141 continue;
102 GNUNET_CONTAINER_multihashmap_put ( 142 GNUNET_CONTAINER_multihashmap_put (
103 set_b, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 143 set_b, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
144 register_hashcode (&id);
104 i++; 145 i++;
105 } 146 }
106 i = 0; 147 i = 0;
107 while (i < csize) 148 while (i < csize)
108 { 149 {
109 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &id); 150 GNUNET_CRYPTO_hash_create_random (random_quality, &id);
110 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id)) 151 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_a, &id))
111 continue; 152 continue;
112 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id)) 153 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_b, &id))
113 continue; 154 continue;
155 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (set_c, &id))
156 continue;
114 GNUNET_CONTAINER_multihashmap_put ( 157 GNUNET_CONTAINER_multihashmap_put (
115 set_c, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 158 set_c, &id, NULL, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
159 register_hashcode (&id);
116 i++; 160 i++;
117 } 161 }
118 162
119 ibf_a = ibf_create (ibf_size, hash_num, 0); 163 ibf_a = ibf_create (ibf_size, hash_num, 0);
120 ibf_b = ibf_create (ibf_size, hash_num, 0); 164 ibf_b = ibf_create (ibf_size, hash_num, 0);
121 165
166 printf ("generated sets\n");
167
122 start_time = GNUNET_TIME_absolute_get (); 168 start_time = GNUNET_TIME_absolute_get ();
123 169
124 GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a); 170 GNUNET_CONTAINER_multihashmap_iterate (set_a, &insert_iterator, ibf_a);
@@ -137,7 +183,7 @@ run (void *cls, char *const *args, const char *cfgfile,
137 183
138 for (;;) 184 for (;;)
139 { 185 {
140 res = ibf_decode (ibf_a, &side, &id); 186 res = ibf_decode (ibf_a, &side, &ibf_key);
141 if (GNUNET_SYSERR == res) 187 if (GNUNET_SYSERR == res)
142 { 188 {
143 printf ("decode failed\n"); 189 printf ("decode failed\n");
@@ -157,14 +203,9 @@ run (void *cls, char *const *args, const char *cfgfile,
157 } 203 }
158 204
159 if (side == 1) 205 if (side == 1)
160 res = GNUNET_CONTAINER_multihashmap_remove (set_a, &id, NULL); 206 iter_hashcodes (ibf_key, remove_iterator, set_a);
161 if (side == -1) 207 if (side == -1)
162 res = GNUNET_CONTAINER_multihashmap_remove (set_b, &id, NULL); 208 iter_hashcodes (ibf_key, remove_iterator, set_b);
163 if (GNUNET_YES != res)
164 {
165 printf ("decoded wrong element\n");
166 return;
167 }
168 } 209 }
169} 210}
170 211
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 1cbb9d021..d223360dc 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -18,7 +18,6 @@
18 Boston, MA 02111-1307, USA. 18 Boston, MA 02111-1307, USA.
19*/ 19*/
20 20
21
22/** 21/**
23 * @file consensus/gnunet-service-consensus.c 22 * @file consensus/gnunet-service-consensus.c
24 * @brief 23 * @brief
@@ -47,17 +46,21 @@
47 */ 46 */
48#define STRATA_IBF_BUCKETS 80 47#define STRATA_IBF_BUCKETS 80
49/** 48/**
50 * hash num parameter of the IBF 49 * hash num parameter for the difference digests and strata estimators
51 */ 50 */
52#define STRATA_HASH_NUM 3 51#define STRATA_HASH_NUM 3
52
53/** 53/**
54 * Number of strata that can be transmitted in one message. 54 * Number of buckets that can be transmitted in one message.
55 */ 55 */
56#define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS))
57
58#define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) 56#define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
59 57
60#define MAX_IBF_ORDER (64) 58/**
59 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
60 * Choose this value so that computing the IBF is still cheaper
61 * than transmitting all values.
62 */
63#define MAX_IBF_ORDER (32)
61 64
62 65
63/* forward declarations */ 66/* forward declarations */
@@ -76,14 +79,14 @@ static void
76write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size); 79write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size);
77 80
78static void 81static void
79write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size); 82write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size);
80 83
81static int 84static int
82get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); 85get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
83 86
84 87
85/** 88/**
86 * An element that is waiting to be transmitted to a client. 89 * An element that is waiting to be transmitted.
87 */ 90 */
88struct PendingElement 91struct PendingElement
89{ 92{
@@ -106,6 +109,9 @@ struct PendingElement
106 struct ConsensusPeerInformation *cpi; 109 struct ConsensusPeerInformation *cpi;
107}; 110};
108 111
112/**
113 * Information about a peer that is in a consensus session.
114 */
109struct ConsensusPeerInformation 115struct ConsensusPeerInformation
110{ 116{
111 struct GNUNET_STREAM_Socket *socket; 117 struct GNUNET_STREAM_Socket *socket;
@@ -123,6 +129,12 @@ struct ConsensusPeerInformation
123 int is_outgoing; 129 int is_outgoing;
124 130
125 /** 131 /**
132 * if the peer did something wrong, and was disconnected,
133 * never interact with this peer again.
134 */
135 int is_bad;
136
137 /**
126 * Did we receive/send a consensus hello? 138 * Did we receive/send a consensus hello?
127 */ 139 */
128 int hello; 140 int hello;
@@ -137,27 +149,29 @@ struct ConsensusPeerInformation
137 */ 149 */
138 struct GNUNET_STREAM_WriteHandle *wh; 150 struct GNUNET_STREAM_WriteHandle *wh;
139 151
152 enum {
153 IBF_STATE_NONE,
154 IBF_STATE_RECEIVING,
155 IBF_STATE_TRANSMITTING,
156 IBF_STATE_DECODING
157 } ibf_state ;
158
140 /** 159 /**
141 * How many of the strate in the ibf were 160 * What is the order (=log2 size) of the ibf
142 * sent or received in this round? 161 * we're currently dealing with?
143 */ 162 */
144 int strata_counter;
145
146 int ibf_order; 163 int ibf_order;
147 164
148 struct InvertibleBloomFilter *outgoing_ibf; 165 /**
149 166 * The current IBF for this peer,
150 int outgoing_bucket_counter; 167 * purpose dependent on ibf_state
151 168 */
152 struct InvertibleBloomFilter *incoming_ibf; 169 struct InvertibleBloomFilter *ibf;
153
154 int incoming_bucket_counter;
155 170
156 /** 171 /**
157 * NULL or incoming_ibf - outgoing_ibf. 172 * How many buckets have we transmitted/received (depending on state)?
158 * Decoded values of side '1' are to be requested from the the peer.
159 */ 173 */
160 struct InvertibleBloomFilter *diff_ibf; 174 int ibf_bucket_counter;
161 175
162 /** 176 /**
163 * Strata estimator of the peer, NULL if our peer 177 * Strata estimator of the peer, NULL if our peer
@@ -165,11 +179,21 @@ struct ConsensusPeerInformation
165 */ 179 */
166 struct InvertibleBloomFilter **strata; 180 struct InvertibleBloomFilter **strata;
167 181
182 /**
183 * difference estimated with the current strata estimator
184 */
168 unsigned int diff; 185 unsigned int diff;
169 186
170 struct GNUNET_SERVER_MessageStreamTokenizer *mst; 187 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
171 188
189 /**
190 * Back-reference to the consensus session,
191 * to that ConsensusPeerInformation can be used as a closure
192 */
172 struct ConsensusSession *session; 193 struct ConsensusSession *session;
194
195 struct PendingElement *send_pending_head;
196 struct PendingElement *send_pending_tail;
173}; 197};
174 198
175struct QueuedMessage 199struct QueuedMessage
@@ -187,9 +211,31 @@ struct QueuedMessage
187 struct QueuedMessage *prev; 211 struct QueuedMessage *prev;
188}; 212};
189 213
214enum ConsensusRound
215{
216 /**
217 * distribution of information with the exponential scheme
218 */
219 CONSENSUS_ROUND_EXP_EXCHANGE,
220 /**
221 * All-to-all, exchange missing values
222 */
223 CONSENSUS_ROUND_A2A_EXCHANGE,
224 /**
225 * All-to-all, check what values are missing, don't exchange anything
226 */
227 CONSENSUS_ROUND_A2A_INVENTORY
228
229 /*
230 a round to exchange the information for fraud-detection
231 CONSENSUS_ROUNT_A2_INVENTORY_AGREEMENT
232 */
233};
234
190 235
191/** 236/**
192 * A consensus session consists of one local client and the remote authorities. 237 * A consensus session consists of one local client and the remote authorities.
238 *
193 */ 239 */
194struct ConsensusSession 240struct ConsensusSession
195{ 241{
@@ -225,6 +271,7 @@ struct ConsensusSession
225 /** 271 /**
226 * Values in the consensus set of this session, 272 * Values in the consensus set of this session,
227 * all of them either have been sent by or approved by the client. 273 * all of them either have been sent by or approved by the client.
274 * Contains GNUNET_CONSENSUS_Element.
228 */ 275 */
229 struct GNUNET_CONTAINER_MultiHashMap *values; 276 struct GNUNET_CONTAINER_MultiHashMap *values;
230 277
@@ -238,8 +285,14 @@ struct ConsensusSession
238 */ 285 */
239 struct PendingElement *approval_pending_tail; 286 struct PendingElement *approval_pending_tail;
240 287
288 /**
289 * Messages to be sent to the local client that owns this session
290 */
241 struct QueuedMessage *client_messages_head; 291 struct QueuedMessage *client_messages_head;
242 292
293 /**
294 * Messages to be sent to the local client that owns this session
295 */
243 struct QueuedMessage *client_messages_tail; 296 struct QueuedMessage *client_messages_tail;
244 297
245 /** 298 /**
@@ -259,22 +312,14 @@ struct ConsensusSession
259 int conclude_group_min; 312 int conclude_group_min;
260 313
261 /** 314 /**
262 * Current round of the conclusion
263 */
264 int current_round;
265
266 /**
267 * Soft deadline for conclude.
268 * Speed up the speed of the consensus at the cost of consensus quality, as
269 * the time approached or crosses the deadline.
270 */
271 struct GNUNET_TIME_Absolute conclude_deadline;
272
273 /**
274 * Number of other peers in the consensus 315 * Number of other peers in the consensus
275 */ 316 */
276 unsigned int num_peers; 317 unsigned int num_peers;
277 318
319 /**
320 * Information about the other peers,
321 * their state, etc.
322 */
278 struct ConsensusPeerInformation *info; 323 struct ConsensusPeerInformation *info;
279 324
280 /** 325 /**
@@ -289,13 +334,19 @@ struct ConsensusSession
289 int local_peer_idx; 334 int local_peer_idx;
290 335
291 /** 336 /**
292 * Task identifier for the round timeout task 337 * Strata estimator, computed online
293 */ 338 */
294 GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid;
295
296 struct InvertibleBloomFilter **strata; 339 struct InvertibleBloomFilter **strata;
297 340
341 /**
342 * Pre-computed IBFs
343 */
298 struct InvertibleBloomFilter **ibfs; 344 struct InvertibleBloomFilter **ibfs;
345
346 /**
347 * Current round
348 */
349 enum ConsensusRound current_round;
299}; 350};
300 351
301 352
@@ -339,6 +390,14 @@ struct IncomingSocket
339 * Peer-in-session this socket belongs to, once known, otherwise NULL. 390 * Peer-in-session this socket belongs to, once known, otherwise NULL.
340 */ 391 */
341 struct ConsensusPeerInformation *cpi; 392 struct ConsensusPeerInformation *cpi;
393
394 /**
395 * Set to the global session id, if the peer sent us a hello-message,
396 * but the session does not exist yet.
397 *
398 * FIXME: not implemented yet
399 */
400 struct GNUNET_HashCode *requested_gid;
342}; 401};
343 402
344static struct IncomingSocket *incoming_sockets_head; 403static struct IncomingSocket *incoming_sockets_head;
@@ -380,6 +439,12 @@ static struct GNUNET_CORE_Handle *core;
380static struct GNUNET_STREAM_ListenSocket *listener; 439static struct GNUNET_STREAM_ListenSocket *listener;
381 440
382 441
442/**
443 * Queue a message to be sent to the inhabiting client of a sessino
444 *
445 * @param session session
446 * @param msg message we want to queue
447 */
383static void 448static void
384queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) 449queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg)
385{ 450{
@@ -389,7 +454,32 @@ queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHea
389 GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); 454 GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm);
390} 455}
391 456
457/**
458 * Get peer index associated with the peer information,
459 * unique for every session among all peers.
460 */
461static int
462get_cpi_index (struct ConsensusPeerInformation *cpi)
463{
464 return cpi - cpi->session->info;
465}
466
467/**
468 * Mark the peer as bad, free as state we don't need anymore.
469 */
470static void
471mark_peer_bad (struct ConsensusPeerInformation *cpi)
472{
473 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer #%u marked as bad\n", get_cpi_index (cpi));
474 cpi->is_bad = GNUNET_YES;
475 /* FIXME: free ibfs etc. */
476}
477
392 478
479/**
480 * Estimate set difference with two strata estimators,
481 * i.e. arrays of IBFs.
482 */
393static int 483static int
394estimate_difference (struct InvertibleBloomFilter** strata1, 484estimate_difference (struct InvertibleBloomFilter** strata1,
395 struct InvertibleBloomFilter** strata2) 485 struct InvertibleBloomFilter** strata2)
@@ -415,6 +505,7 @@ estimate_difference (struct InvertibleBloomFilter** strata1,
415 } 505 }
416 if (GNUNET_SYSERR == more) 506 if (GNUNET_SYSERR == more)
417 { 507 {
508 ibf_destroy (diff);
418 return count * (1 << (i + 1)); 509 return count * (1 << (i + 1));
419 } 510 }
420 ibf_count++; 511 ibf_count++;
@@ -425,10 +516,9 @@ estimate_difference (struct InvertibleBloomFilter** strata1,
425} 516}
426 517
427 518
428
429/** 519/**
430 * Functions of this signature are called whenever data is available from the 520 * Called when receiving data from a peer that is member of
431 * stream. 521 * an inhabited consensus session.
432 * 522 *
433 * @param cls the closure from GNUNET_STREAM_read 523 * @param cls the closure from GNUNET_STREAM_read
434 * @param status the status of the stream at the time this function is called 524 * @param status the status of the stream at the time this function is called
@@ -468,8 +558,8 @@ session_stream_data_processor (void *cls,
468} 558}
469 559
470/** 560/**
471 * Functions of this signature are called whenever data is available from the 561 * Called when we receive data from a peer that is not member of
472 * stream. 562 * a session yet, or the session is not yet inhabited.
473 * 563 *
474 * @param cls the closure from GNUNET_STREAM_read 564 * @param cls the closure from GNUNET_STREAM_read
475 * @param status the status of the stream at the time this function is called 565 * @param status the status of the stream at the time this function is called
@@ -508,7 +598,7 @@ incoming_stream_data_processor (void *cls,
508 598
509 599
510/** 600/**
511 * Iterator over hash map entries. 601 * Iterator to insert values into an ibf.
512 * 602 *
513 * @param cls closure 603 * @param cls closure
514 * @param key current key code 604 * @param key current key code
@@ -524,28 +614,34 @@ ibf_values_iterator (void *cls,
524{ 614{
525 struct ConsensusPeerInformation *cpi; 615 struct ConsensusPeerInformation *cpi;
526 cpi = cls; 616 cpi = cls;
527 ibf_insert (cpi->session->ibfs[cpi->ibf_order], key); 617 ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key_from_hashcode (key));
528 return GNUNET_YES; 618 return GNUNET_YES;
529} 619}
530 620
531
532static void 621static void
533create_outgoing_ibf (struct ConsensusPeerInformation *cpi) 622prepare_ibf (struct ConsensusPeerInformation *cpi)
534{ 623{
535 if (NULL == cpi->session->ibfs[cpi->ibf_order]) 624 if (NULL == cpi->session->ibfs[cpi->ibf_order])
536 { 625 {
537 cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); 626 cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
538 GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); 627 GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi);
539 } 628 }
540 cpi->outgoing_ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
541} 629}
542 630
631
632/**
633 * Called when a peer sends us its strata estimator.
634 * In response, we sent out IBF of appropriate size back.
635 *
636 * @param cpi session
637 * @param strata_msg message
638 */
543static int 639static int
544handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) 640handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg)
545{ 641{
546 int i; 642 int i;
547 int num_strata; 643 uint64_t *key_src;
548 struct GNUNET_HashCode *hash_src; 644 uint32_t *hash_src;
549 uint8_t *count_src; 645 uint8_t *count_src;
550 646
551 GNUNET_assert (GNUNET_NO == cpi->is_outgoing); 647 GNUNET_assert (GNUNET_NO == cpi->is_outgoing);
@@ -557,49 +653,46 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
557 cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); 653 cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0);
558 } 654 }
559 655
560 num_strata = ntohs (strata_msg->num_strata);
561
562 /* for correct message alignment, copy bucket types seperately */ 656 /* for correct message alignment, copy bucket types seperately */
563 hash_src = (struct GNUNET_HashCode *) &strata_msg[1]; 657 key_src = (uint64_t *) &strata_msg[1];
564 658
565 for (i = 0; i < num_strata; i++) 659 for (i = 0; i < STRATA_COUNT; i++)
566 { 660 {
567 memcpy (cpi->strata[cpi->strata_counter+i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); 661 memcpy (cpi->strata[i]->id_sum, key_src, STRATA_IBF_BUCKETS * sizeof *key_src);
568 hash_src += STRATA_IBF_BUCKETS; 662 key_src += STRATA_IBF_BUCKETS;
569 } 663 }
570 664
571 for (i = 0; i < num_strata; i++) 665 hash_src = (uint32_t *) key_src;
666
667 for (i = 0; i < STRATA_COUNT; i++)
572 { 668 {
573 memcpy (cpi->strata[cpi->strata_counter+i]->id_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); 669 memcpy (cpi->strata[i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src);
574 hash_src += STRATA_IBF_BUCKETS; 670 hash_src += STRATA_IBF_BUCKETS;
575 } 671 }
576 672
577 count_src = (uint8_t *) hash_src; 673 count_src = (uint8_t *) hash_src;
578 674
579 for (i = 0; i < num_strata; i++) 675 for (i = 0; i < STRATA_COUNT; i++)
580 { 676 {
581 memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS); 677 memcpy (cpi->strata[i]->count, count_src, STRATA_IBF_BUCKETS);
582 count_src += STRATA_IBF_BUCKETS; 678 count_src += STRATA_IBF_BUCKETS;
583 } 679 }
584 680
585 GNUNET_assert (count_src == (((uint8_t *) &strata_msg[1]) + STRATA_IBF_BUCKETS * num_strata * IBF_BUCKET_SIZE)); 681 cpi->diff = estimate_difference (cpi->session->strata, cpi->strata);
586 682 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff);
587 cpi->strata_counter += num_strata; 683
588 684 /* send IBF of the right size */
589 if (STRATA_COUNT == cpi->strata_counter) 685 cpi->ibf_order = 0;
590 { 686 while ((1 << cpi->ibf_order) < cpi->diff)
591 687 cpi->ibf_order++;
592 cpi->diff = estimate_difference (cpi->session->strata, cpi->strata); 688 if (cpi->ibf_order > MAX_IBF_ORDER)
593 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff); 689 cpi->ibf_order = MAX_IBF_ORDER;
594 cpi->ibf_order = 0; 690 cpi->ibf_order += 2;
595 while ((1 << cpi->ibf_order) < cpi->diff) 691 /* create ibf if not already pre-computed */
596 cpi->ibf_order++; 692 prepare_ibf (cpi);
597 if (cpi->ibf_order > MAX_IBF_ORDER) 693 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
598 cpi->ibf_order = MAX_IBF_ORDER; 694 cpi->ibf_state = IBF_STATE_TRANSMITTING;
599 cpi->ibf_order += 2; 695 write_ibf (cpi, GNUNET_STREAM_OK, 0);
600 create_outgoing_ibf (cpi);
601 write_ibf (cpi, GNUNET_STREAM_OK, 0);
602 }
603 696
604 return GNUNET_YES; 697 return GNUNET_YES;
605} 698}
@@ -608,59 +701,57 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess
608static int 701static int
609handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) 702handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest)
610{ 703{
611 struct GNUNET_HashCode *hash_src;
612 int num_buckets; 704 int num_buckets;
705 uint64_t *key_src;
706 uint32_t *hash_src;
613 uint8_t *count_src; 707 uint8_t *count_src;
614 708
615 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; 709 num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE;
616 710
617 if (cpi->is_outgoing == GNUNET_YES) 711 if (IBF_STATE_NONE == cpi->ibf_state)
618 { 712 {
619 /* we receive the ibf as an initiator, thus we're interested in the order */ 713 cpi->ibf_state = IBF_STATE_RECEIVING;
620 cpi->ibf_order = digest->order; 714 cpi->ibf_order = digest->order;
621 if ((0 == cpi->outgoing_bucket_counter) && (NULL == cpi->wh)) 715 cpi->ibf_bucket_counter = 0;
622 {
623 create_outgoing_ibf (cpi);
624 write_ibf (cpi, GNUNET_STREAM_OK, 0);
625 }
626 /* FIXME: ensure that orders do not differ each time */
627 } 716 }
628 else 717
718 if ( (IBF_STATE_RECEIVING != cpi->ibf_state) ||
719 (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) )
629 { 720 {
630 /* FIXME: handle correctly */ 721 mark_peer_bad (cpi);
631 GNUNET_assert (cpi->ibf_order == digest->order); 722 return GNUNET_NO;
632 } 723 }
633 724
634 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->incoming_bucket_counter, (1 << cpi->ibf_order));
635 725
636 if (cpi->incoming_bucket_counter + num_buckets > (1 << cpi->ibf_order)) 726 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->ibf_bucket_counter, (1 << cpi->ibf_order));
637 {
638 /* TODO: handle this */
639 GNUNET_assert (0);
640 }
641 727
642 if (NULL == cpi->incoming_ibf) 728 if (NULL == cpi->ibf)
643 cpi->incoming_ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); 729 cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
644 730
645 hash_src = (struct GNUNET_HashCode *) &digest[1]; 731 key_src = (uint64_t *) &digest[1];
646 732
647 memcpy (cpi->incoming_ibf->hash_sum, hash_src, num_buckets * sizeof *hash_src); 733 memcpy (cpi->ibf->hash_sum, key_src, num_buckets * sizeof *key_src);
648 hash_src += num_buckets; 734 hash_src += num_buckets;
649 735
650 memcpy (cpi->incoming_ibf->id_sum, hash_src, num_buckets * sizeof *hash_src); 736 hash_src = (uint32_t *) key_src;
737
738 memcpy (cpi->ibf->id_sum, hash_src, num_buckets * sizeof *hash_src);
651 hash_src += num_buckets; 739 hash_src += num_buckets;
652 740
653 count_src = (uint8_t *) hash_src; 741 count_src = (uint8_t *) hash_src;
654 742
655 memcpy (cpi->incoming_ibf->count, count_src, num_buckets * sizeof *count_src); 743 memcpy (cpi->ibf->count, count_src, num_buckets * sizeof *count_src);
656 744
657 cpi->incoming_bucket_counter += num_buckets; 745 cpi->ibf_bucket_counter += num_buckets;
658 746
659 if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order)) 747 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
660 { 748 {
661 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); 749 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n");
662 if ((NULL == cpi->wh) && (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order))) 750 GNUNET_assert (NULL != cpi->wh);
663 write_values (cpi, GNUNET_STREAM_OK, 0); 751 cpi->ibf_state = IBF_STATE_DECODING;
752 prepare_ibf (cpi);
753 ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]);
754 write_requests_and_elements (cpi, GNUNET_STREAM_OK, 0);
664 } 755 }
665 return GNUNET_YES; 756 return GNUNET_YES;
666} 757}
@@ -702,10 +793,28 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me
702} 793}
703 794
704 795
796/**
797 * Handle a request for elements.
798 * Only allowed in exchange-rounds.
799 *
800 * FIXME: implement
801 */
802static int
803handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg)
804{
805 /* FIXME: implement */
806 return GNUNET_YES;
807}
808
809
810/**
811 * Handle a HELLO-message, send when another peer wants to join a session where
812 * our peer is a member. The session may or may not be inhabited yet.
813 */
705static int 814static int
706handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello) 815handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello)
707{ 816{
708 /* FIXME: session might not exist yet */ 817 /* FIXME: session might not exist yet. create an uninhabited session and wait for a client */
709 struct ConsensusSession *session; 818 struct ConsensusSession *session;
710 session = sessions_head; 819 session = sessions_head;
711 while (NULL != session) 820 while (NULL != session)
@@ -726,7 +835,8 @@ handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello
726 } 835 }
727 session = session->next; 836 session = session->next;
728 } 837 }
729 GNUNET_assert (0); 838 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer tried to HELLO uninhabited session\n");
839 GNUNET_break (0);
730 return GNUNET_NO; 840 return GNUNET_NO;
731} 841}
732 842
@@ -756,6 +866,8 @@ mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader
756 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); 866 return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message);
757 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: 867 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS:
758 return handle_p2p_element (cpi, message); 868 return handle_p2p_element (cpi, message);
869 case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST:
870 return handle_p2p_element_request (cpi, (struct ElementRequest *) message);
759 default: 871 default:
760 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type)); 872 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type));
761 /* FIXME: handle correctly */ 873 /* FIXME: handle correctly */
@@ -1007,6 +1119,9 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess
1007 1119
1008 1120
1009 1121
1122/**
1123 * Called when stream has finishes writing the hello message
1124 */
1010static void 1125static void
1011hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) 1126hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1012{ 1127{
@@ -1067,6 +1182,8 @@ initialize_session_info (struct ConsensusSession *session)
1067 session->info[i].session = session; 1182 session->info[i].session = session;
1068 } 1183 }
1069 1184
1185 session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE;
1186
1070 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; 1187 last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
1071 i = (session->local_peer_idx + 1) % session->num_peers; 1188 i = (session->local_peer_idx + 1) % session->num_peers;
1072 while (i != last) 1189 while (i != last)
@@ -1143,8 +1260,9 @@ strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *ke
1143 int i; 1260 int i;
1144 v = key->bits[0]; 1261 v = key->bits[0];
1145 /* count trailing '1'-bits of v */ 1262 /* count trailing '1'-bits of v */
1146 for (i = 0; v & 1; v>>=1, i++); 1263 for (i = 0; v & 1; v>>=1, i++)
1147 ibf_insert (strata[i], key); 1264 /* empty */;
1265 ibf_insert (strata[i], ibf_key_from_hashcode (key));
1148} 1266}
1149 1267
1150 1268
@@ -1309,6 +1427,29 @@ client_insert (void *cls,
1309 1427
1310 1428
1311 1429
1430
1431/**
1432 * Functions of this signature are called whenever writing operations
1433 * on a stream are executed
1434 *
1435 * @param cls the closure from GNUNET_STREAM_write
1436 * @param status the status of the stream at the time this function is called;
1437 * GNUNET_STREAM_OK if writing to stream was completed successfully;
1438 * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
1439 * (this doesn't mean that the data is never sent, the receiver may
1440 * have read the data but its ACKs may have been lost);
1441 * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
1442 * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
1443 * be processed.
1444 * @param size the number of bytes written
1445 */
1446static void
1447write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1448{
1449 GNUNET_assert (GNUNET_STREAM_OK == status);
1450 /* just wait for the ibf */
1451}
1452
1312/** 1453/**
1313 * Functions of this signature are called whenever writing operations 1454 * Functions of this signature are called whenever writing operations
1314 * on a stream are executed 1455 * on a stream are executed
@@ -1331,65 +1472,53 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1331 struct StrataMessage *strata_msg; 1472 struct StrataMessage *strata_msg;
1332 size_t msize; 1473 size_t msize;
1333 int i; 1474 int i;
1334 struct GNUNET_HashCode *hash_dst; 1475 uint64_t *key_dst;
1476 uint32_t *hash_dst;
1335 uint8_t *count_dst; 1477 uint8_t *count_dst;
1336 int num_strata;
1337 1478
1338 cpi = cls; 1479 cpi = cls;
1339 cpi->wh = NULL; 1480 cpi->wh = NULL;
1340 1481
1482 GNUNET_assert (GNUNET_STREAM_OK == status);
1483
1341 GNUNET_assert (GNUNET_YES == cpi->is_outgoing); 1484 GNUNET_assert (GNUNET_YES == cpi->is_outgoing);
1342 1485
1343 /* FIXME: handle this */ 1486 /* FIXME: handle this */
1344 GNUNET_assert (GNUNET_STREAM_OK == status); 1487 GNUNET_assert (GNUNET_STREAM_OK == status);
1345 1488
1346 if (STRATA_COUNT == cpi->strata_counter) 1489 msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1347 {
1348 /* strata have been written, wait for other side's IBF */
1349 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n");
1350 return;
1351 }
1352
1353 if ((STRATA_COUNT - cpi->strata_counter) < STRATA_PER_MESSAGE)
1354 num_strata = (STRATA_COUNT - cpi->strata_counter);
1355 else
1356 num_strata = STRATA_PER_MESSAGE;
1357
1358
1359 msize = (sizeof *strata_msg) + (num_strata * IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS);
1360 1490
1361 strata_msg = GNUNET_malloc (msize); 1491 strata_msg = GNUNET_malloc (msize);
1362 strata_msg->header.size = htons (msize); 1492 strata_msg->header.size = htons (msize);
1363 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); 1493 strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE);
1364 strata_msg->num_strata = htons (num_strata);
1365 1494
1366 /* for correct message alignment, copy bucket types seperately */ 1495 /* for correct message alignment, copy bucket types seperately */
1367 hash_dst = (struct GNUNET_HashCode *) &strata_msg[1]; 1496 key_dst = (uint64_t *) &strata_msg[1];
1368 1497
1369 for (i = 0; i < num_strata; i++) 1498 for (i = 0; i < STRATA_COUNT; i++)
1370 { 1499 {
1371 memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); 1500 memcpy (key_dst, cpi->session->strata[i]->id_sum, STRATA_IBF_BUCKETS * sizeof *key_dst);
1372 hash_dst += STRATA_IBF_BUCKETS; 1501 key_dst += STRATA_IBF_BUCKETS;
1373 } 1502 }
1374 1503
1375 for (i = 0; i < num_strata; i++) 1504 hash_dst = (uint32_t *) key_dst;
1505
1506 for (i = 0; i < STRATA_COUNT; i++)
1376 { 1507 {
1377 memcpy (hash_dst, cpi->session->strata[cpi->strata_counter+i]->id_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); 1508 memcpy (hash_dst, cpi->session->strata[i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst);
1378 hash_dst += STRATA_IBF_BUCKETS; 1509 hash_dst += STRATA_IBF_BUCKETS;
1379 } 1510 }
1380 1511
1381 count_dst = (uint8_t *) hash_dst; 1512 count_dst = (uint8_t *) hash_dst;
1382 1513
1383 for (i = 0; i < num_strata; i++) 1514 for (i = 0; i < STRATA_COUNT; i++)
1384 { 1515 {
1385 memcpy (count_dst, cpi->session->strata[cpi->strata_counter+i]->count, STRATA_IBF_BUCKETS); 1516 memcpy (count_dst, cpi->session->strata[i]->count, STRATA_IBF_BUCKETS);
1386 count_dst += STRATA_IBF_BUCKETS; 1517 count_dst += STRATA_IBF_BUCKETS;
1387 } 1518 }
1388 1519
1389 cpi->strata_counter += num_strata;
1390
1391 cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, 1520 cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1392 write_strata, cpi); 1521 write_strata_done, cpi);
1393 1522
1394 GNUNET_assert (NULL != cpi->wh); 1523 GNUNET_assert (NULL != cpi->wh);
1395} 1524}
@@ -1416,29 +1545,33 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1416 struct ConsensusPeerInformation *cpi; 1545 struct ConsensusPeerInformation *cpi;
1417 struct DifferenceDigest *digest; 1546 struct DifferenceDigest *digest;
1418 int msize; 1547 int msize;
1419 struct GNUNET_HashCode *hash_dst; 1548 uint64_t *key_dst;
1549 uint32_t *hash_dst;
1420 uint8_t *count_dst; 1550 uint8_t *count_dst;
1421 int num_buckets; 1551 int num_buckets;
1422 1552
1423 cpi = cls; 1553 cpi = cls;
1424 cpi->wh = NULL; 1554 cpi->wh = NULL;
1425 1555
1426 if (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order)) 1556 GNUNET_assert (GNUNET_STREAM_OK == status);
1557
1558 GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state);
1559
1560 if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order))
1427 { 1561 {
1428 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); 1562 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n");
1429 if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order)) 1563 /* we now wait for values / requests / another IBF because peer could not decode with our IBF */
1430 write_values (cpi, GNUNET_STREAM_OK, 0);
1431 return; 1564 return;
1432 } 1565 }
1433 1566
1434 /* remaining buckets */ 1567 /* remaining buckets */
1435 num_buckets = (1 << cpi->ibf_order) - cpi->outgoing_bucket_counter; 1568 num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter;
1436 1569
1437 /* limit to maximum */ 1570 /* limit to maximum */
1438 if (num_buckets > BUCKETS_PER_MESSAGE) 1571 if (num_buckets > BUCKETS_PER_MESSAGE)
1439 num_buckets = BUCKETS_PER_MESSAGE; 1572 num_buckets = BUCKETS_PER_MESSAGE;
1440 1573
1441 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->outgoing_bucket_counter, (1<<cpi->ibf_order)); 1574 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->ibf_bucket_counter, (1<<cpi->ibf_order));
1442 1575
1443 msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); 1576 msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE);
1444 1577
@@ -1447,19 +1580,21 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1447 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); 1580 digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST);
1448 digest->order = cpi->ibf_order; 1581 digest->order = cpi->ibf_order;
1449 1582
1450 hash_dst = (struct GNUNET_HashCode *) &digest[1]; 1583 key_dst = (uint64_t *) &digest[1];
1451 1584
1452 memcpy (hash_dst, cpi->outgoing_ibf->hash_sum, num_buckets * sizeof *hash_dst); 1585 memcpy (key_dst, cpi->ibf->id_sum, num_buckets * sizeof *key_dst);
1453 hash_dst += num_buckets; 1586 key_dst += num_buckets;
1587
1588 hash_dst = (uint32_t *) key_dst;
1454 1589
1455 memcpy (hash_dst, cpi->outgoing_ibf->id_sum, num_buckets * sizeof *hash_dst); 1590 memcpy (hash_dst, cpi->ibf->id_sum, num_buckets * sizeof *hash_dst);
1456 hash_dst += num_buckets; 1591 hash_dst += num_buckets;
1457 1592
1458 count_dst = (uint8_t *) hash_dst; 1593 count_dst = (uint8_t *) hash_dst;
1459 1594
1460 memcpy (count_dst, cpi->outgoing_ibf->count, num_buckets * sizeof *count_dst); 1595 memcpy (count_dst, cpi->ibf->count, num_buckets * sizeof *count_dst);
1461 1596
1462 cpi->outgoing_bucket_counter += num_buckets; 1597 cpi->ibf_bucket_counter += num_buckets;
1463 1598
1464 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, 1599 cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1465 write_ibf, cpi); 1600 write_ibf, cpi);
@@ -1484,37 +1619,34 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1484 * @param size the number of bytes written 1619 * @param size the number of bytes written
1485 */ 1620 */
1486static void 1621static void
1487write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) 1622write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1488{ 1623{
1489 struct ConsensusPeerInformation *cpi; 1624 struct ConsensusPeerInformation *cpi;
1490 struct GNUNET_HashCode key; 1625 uint64_t key;
1491 struct GNUNET_CONSENSUS_Element *element; 1626 struct GNUNET_HashCode hashcode;
1492 struct GNUNET_MessageHeader *element_msg;
1493 int side; 1627 int side;
1494 int msize; 1628 int msize;
1495 1629
1630 GNUNET_assert (GNUNET_STREAM_OK == status);
1631
1496 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n"); 1632 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n");
1497 1633
1498 cpi = cls; 1634 cpi = cls;
1499 cpi->wh = NULL; 1635 cpi->wh = NULL;
1500 1636
1501 if (NULL == cpi->diff_ibf) 1637 GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state);
1502 {
1503 GNUNET_assert (NULL != cpi->incoming_ibf);
1504 GNUNET_assert (NULL != cpi->outgoing_ibf);
1505 GNUNET_assert (cpi->outgoing_ibf->size == cpi->incoming_ibf->size);
1506 cpi->diff_ibf = ibf_dup (cpi->incoming_ibf);
1507 ibf_subtract (cpi->diff_ibf, cpi->outgoing_ibf);
1508 }
1509 1638
1510 for (;;) 1639 for (;;)
1511 { 1640 {
1512 int res; 1641 int res;
1513 res = ibf_decode (cpi->diff_ibf, &side, &key); 1642 res = ibf_decode (cpi->ibf, &side, &key);
1514 if (GNUNET_SYSERR == res) 1643 if (GNUNET_SYSERR == res)
1515 { 1644 {
1516 /* TODO: handle this correctly, request new ibf */ 1645 cpi->ibf_order++;
1517 GNUNET_break (0); 1646 prepare_ibf (cpi);
1647 cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
1648 cpi->ibf_state = IBF_STATE_TRANSMITTING;
1649 write_ibf (cls, status, size);
1518 return; 1650 return;
1519 } 1651 }
1520 if (GNUNET_NO == res) 1652 if (GNUNET_NO == res)
@@ -1523,40 +1655,59 @@ write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1523 return; 1655 return;
1524 } 1656 }
1525 if (-1 == side) 1657 if (-1 == side)
1526 break; 1658 {
1527 } 1659 struct GNUNET_CONSENSUS_Element *element;
1528 1660 struct GNUNET_MessageHeader *element_msg;
1529 element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &key); 1661 ibf_hashcode_from_key (key, &hashcode);
1530 1662 /* FIXME: this only transmits one element stored with the key */
1531 if (NULL == element) 1663 element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
1532 { 1664 if (NULL == element)
1533 /* FIXME: handle correctly */ 1665 continue;
1534 GNUNET_break (0); 1666 msize = sizeof (struct GNUNET_MessageHeader) + element->size;
1535 return; 1667 element_msg = GNUNET_malloc (msize);
1668 element_msg->size = htons (msize);
1669 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
1670 GNUNET_assert (NULL != element->data);
1671 memcpy (&element_msg[1], element->data, element->size);
1672 cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1673 write_requests_and_elements, cpi);
1674 GNUNET_free (element_msg);
1675 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n");
1676
1677 GNUNET_assert (NULL != cpi->wh);
1678 return;
1679 }
1680 else
1681 {
1682 struct ElementRequest *msg;
1683 size_t msize;
1684 uint64_t *p;
1685
1686 msize = (sizeof *msg) + sizeof (uint64_t);
1687 msg = GNUNET_malloc (msize);
1688 msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
1689 msg->header.size = htons (msize);
1690 p = (uint64_t *) &msg[1];
1691 *p = key;
1692
1693 cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1694 write_requests_and_elements, cpi);
1695 GNUNET_assert (NULL != cpi->wh);
1696 GNUNET_free (msg);
1697 return;
1698 }
1536 } 1699 }
1537 1700
1538 msize = sizeof (struct GNUNET_MessageHeader) + element->size; 1701}
1539
1540 element_msg = GNUNET_malloc (msize);
1541 element_msg->size = htons (msize);
1542 element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
1543
1544
1545 GNUNET_assert (NULL != element->data);
1546
1547
1548 memcpy (&element_msg[1], element->data, element->size);
1549
1550 cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL,
1551 write_values, cpi);
1552
1553 GNUNET_free (element_msg);
1554 1702
1555 1703
1556 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n");
1557 1704
1558 GNUNET_assert (NULL != cpi->wh); 1705/*
1706static void
1707select_best_group (struct ConsensusSession *session)
1708{
1559} 1709}
1710*/
1560 1711
1561 1712
1562/** 1713/**
@@ -1566,7 +1717,7 @@ write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size)
1566 * @param client client handle 1717 * @param client client handle
1567 * @param message message sent by the client 1718 * @param message message sent by the client
1568 */ 1719 */
1569void 1720static void
1570client_conclude (void *cls, 1721client_conclude (void *cls,
1571 struct GNUNET_SERVER_Client *client, 1722 struct GNUNET_SERVER_Client *client,
1572 const struct GNUNET_MessageHeader *message) 1723 const struct GNUNET_MessageHeader *message)
@@ -1597,8 +1748,6 @@ client_conclude (void *cls,
1597 1748
1598 session->conclude_requested = GNUNET_YES; 1749 session->conclude_requested = GNUNET_YES;
1599 1750
1600 /* FIXME: write to already connected sockets */
1601
1602 for (i = 0; i < session->num_peers; i++) 1751 for (i = 0; i < session->num_peers; i++)
1603 { 1752 {
1604 if ( (GNUNET_YES == session->info[i].is_outgoing) && 1753 if ( (GNUNET_YES == session->info[i].is_outgoing) &&
@@ -1654,19 +1803,14 @@ client_ack (void *cls,
1654 1803
1655 if (msg->keep) 1804 if (msg->keep)
1656 { 1805 {
1657
1658 element = pending->element; 1806 element = pending->element;
1659
1660 GNUNET_CRYPTO_hash (element, element->size, &key); 1807 GNUNET_CRYPTO_hash (element, element->size, &key);
1661 1808
1662 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, 1809 GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
1663 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 1810 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1664
1665 strata_insert (session->strata, &key); 1811 strata_insert (session->strata, &key);
1666 } 1812 }
1667 1813
1668 /* FIXME: also remove element from strata */
1669
1670 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1814 GNUNET_SERVER_receive_done (client, GNUNET_OK);
1671} 1815}
1672 1816
@@ -1783,6 +1927,7 @@ shutdown_task (void *cls,
1783static void 1927static void
1784run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) 1928run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
1785{ 1929{
1930 /* core is only used to retrieve the peer identity */
1786 static const struct GNUNET_CORE_MessageHandler core_handlers[] = { 1931 static const struct GNUNET_CORE_MessageHandler core_handlers[] = {
1787 {NULL, 0, 0} 1932 {NULL, 0, 0}
1788 }; 1933 };
@@ -1803,7 +1948,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
1803 1948
1804 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); 1949 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
1805 1950
1806
1807 listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS, 1951 listener = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_CONSENSUS,
1808 listen_cb, NULL, 1952 listen_cb, NULL,
1809 GNUNET_STREAM_OPTION_END); 1953 GNUNET_STREAM_OPTION_END);
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c
index 629fde3fc..fb3bbe7cd 100644
--- a/src/consensus/ibf.c
+++ b/src/consensus/ibf.c
@@ -30,6 +30,37 @@
30 30
31 31
32/** 32/**
33 * Create a key from a hashcode.
34 *
35 * @param hash the hashcode
36 * @return a key
37 */
38uint64_t
39ibf_key_from_hashcode (const struct GNUNET_HashCode *hash)
40{
41 return GNUNET_ntohll (*(uint64_t *) hash);
42}
43
44
45/**
46 * Create a hashcode from a key, by replicating the key
47 * until the hascode is filled
48 *
49 * @param key the key
50 * @param dst hashcode to store the result in
51 */
52void
53ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst)
54{
55 uint64_t *p;
56 int i;
57 p = (uint64_t *) dst;
58 for (i = 0; i < 8; i++)
59 *p++ = key;
60}
61
62
63/**
33 * Create an invertible bloom filter. 64 * Create an invertible bloom filter.
34 * 65 *
35 * @param size number of IBF buckets 66 * @param size number of IBF buckets
@@ -39,7 +70,7 @@
39 * @return the newly created invertible bloom filter 70 * @return the newly created invertible bloom filter
40 */ 71 */
41struct InvertibleBloomFilter * 72struct InvertibleBloomFilter *
42ibf_create (unsigned int size, unsigned int hash_num, uint32_t salt) 73ibf_create (uint32_t size, unsigned int hash_num, uint32_t salt)
43{ 74{
44 struct InvertibleBloomFilter *ibf; 75 struct InvertibleBloomFilter *ibf;
45 76
@@ -53,72 +84,53 @@ ibf_create (unsigned int size, unsigned int hash_num, uint32_t salt)
53 return ibf; 84 return ibf;
54} 85}
55 86
56
57
58/** 87/**
59 * Insert an element into an IBF, with either positive or negative sign. 88 * Store unique bucket indices for the specified key in dst.
60 *
61 * @param ibf the IBF
62 * @param id the element's hash code
63 * @param side the sign of side determines the sign of the
64 * inserted element.
65 */ 89 */
66void 90static inline void
67ibf_insert_on_side (struct InvertibleBloomFilter *ibf, 91ibf_get_indices (const struct InvertibleBloomFilter *ibf,
68 const struct GNUNET_HashCode *key, 92 uint64_t key, int *dst)
69 int side)
70{ 93{
71 struct GNUNET_HashCode bucket_indices; 94 struct GNUNET_HashCode bucket_indices;
72 struct GNUNET_HashCode key_copy; 95 unsigned int filled = 0;
73 struct GNUNET_HashCode key_hash; 96 int i;
74 unsigned int i; 97 GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices);
98 for (i = 0; filled < ibf->hash_num; i++)
99 {
100 unsigned int bucket;
101 unsigned int j;
102 if ( (0 != i) && (0 == (i % 16)) )
103 GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), &bucket_indices);
104 bucket = bucket_indices.bits[i] % ibf->size;
105 for (j = 0; j < filled; j++)
106 if (dst[j] == bucket)
107 goto try_next;;
108 dst[filled++] = bucket;
109 try_next: ;
110 }
111}
75 112
76 113
77 GNUNET_assert ((1 == side) || (-1 == side)); 114static void
78 GNUNET_assert (NULL != ibf); 115ibf_insert_into (struct InvertibleBloomFilter *ibf,
79 116 uint64_t key,
117 const int *buckets, int side)
118{
119 int i;
120 struct GNUNET_HashCode key_hash_sha;
121 uint32_t key_hash;
122 GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha);
123 key_hash = key_hash_sha.bits[0];
124 for (i = 0; i < ibf->hash_num; i++)
80 { 125 {
81 int used_buckets[ibf->hash_num]; 126 const int bucket = buckets[i];
82 127 ibf->count[bucket] += side;
83 /* copy the key, if key and an entry in the IBF alias */ 128 ibf->id_sum[bucket] ^= key;
84 key_copy = *key; 129 ibf->hash_sum[bucket] ^= key_hash;
85
86 bucket_indices = key_copy;
87 GNUNET_CRYPTO_hash (key, sizeof (struct GNUNET_HashCode), &key_hash);
88
89 for (i = 0; i < ibf->hash_num; i++)
90 {
91 unsigned int bucket;
92 unsigned int j;
93 int collided;
94
95 if ( (0 != i) &&
96 (0 == (i % 16)) )
97 GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode),
98 &bucket_indices);
99
100 bucket = bucket_indices.bits[i%16] % ibf->size;
101 collided = GNUNET_NO;
102 for (j = 0; j < i; j++)
103 if (used_buckets[j] == bucket)
104 collided = GNUNET_YES;
105 if (GNUNET_YES == collided)
106 {
107 used_buckets[i] = -1;
108 continue;
109 }
110 used_buckets[i] = bucket;
111
112 ibf->count[bucket] += side;
113
114 GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket],
115 &ibf->id_sum[bucket]);
116 GNUNET_CRYPTO_hash_xor (&key_hash, &ibf->hash_sum[bucket],
117 &ibf->hash_sum[bucket]);
118 }
119 } 130 }
120} 131}
121 132
133
122/** 134/**
123 * Insert an element into an IBF. 135 * Insert an element into an IBF.
124 * 136 *
@@ -126,27 +138,28 @@ ibf_insert_on_side (struct InvertibleBloomFilter *ibf,
126 * @param id the element's hash code 138 * @param id the element's hash code
127 */ 139 */
128void 140void
129ibf_insert (struct InvertibleBloomFilter *ibf, const struct GNUNET_HashCode *key) 141ibf_insert (struct InvertibleBloomFilter *ibf, uint64_t key)
130{ 142{
131 ibf_insert_on_side (ibf, key, 1); 143 int buckets[ibf->hash_num];
144 ibf_get_indices (ibf, key, buckets);
145 ibf_insert_into (ibf, key, buckets, 1);
132} 146}
133 147
148/**
149 * Test is the IBF is empty, i.e. all counts, keys and key hashes are zero.
150 */
134static int 151static int
135ibf_is_empty (struct InvertibleBloomFilter *ibf) 152ibf_is_empty (struct InvertibleBloomFilter *ibf)
136{ 153{
137 int i; 154 int i;
138 for (i = 0; i < ibf->size; i++) 155 for (i = 0; i < ibf->size; i++)
139 { 156 {
140 int j;
141 if (0 != ibf->count[i]) 157 if (0 != ibf->count[i])
142 return GNUNET_NO; 158 return GNUNET_NO;
143 for (j = 0; j < 16; ++j) 159 if (0 != ibf->hash_sum[i])
144 { 160 return GNUNET_NO;
145 if (0 != ibf->hash_sum[i].bits[j]) 161 if (0 != ibf->id_sum[i])
146 return GNUNET_NO; 162 return GNUNET_NO;
147 if (0 != ibf->id_sum[i].bits[j])
148 return GNUNET_NO;
149 }
150 } 163 }
151 return GNUNET_YES; 164 return GNUNET_YES;
152} 165}
@@ -166,21 +179,40 @@ ibf_is_empty (struct InvertibleBloomFilter *ibf)
166 */ 179 */
167int 180int
168ibf_decode (struct InvertibleBloomFilter *ibf, 181ibf_decode (struct InvertibleBloomFilter *ibf,
169 int *ret_side, struct GNUNET_HashCode *ret_id) 182 int *ret_side, uint64_t *ret_id)
170{ 183{
171 struct GNUNET_HashCode hash; 184 uint32_t hash;
172 int i; 185 int i;
186 struct GNUNET_HashCode key_hash_sha;
187 int buckets[ibf->hash_num];
173 188
174 GNUNET_assert (NULL != ibf); 189 GNUNET_assert (NULL != ibf);
175 190
176 for (i = 0; i < ibf->size; i++) 191 for (i = 0; i < ibf->size; i++)
177 { 192 {
193 int j;
194 int hit;
195
196 /* we can only decode from pure buckets */
178 if ((1 != ibf->count[i]) && (-1 != ibf->count[i])) 197 if ((1 != ibf->count[i]) && (-1 != ibf->count[i]))
179 continue; 198 continue;
180 199
181 GNUNET_CRYPTO_hash (&ibf->id_sum[i], sizeof (struct GNUNET_HashCode), &hash); 200 GNUNET_CRYPTO_hash (&ibf->id_sum[i], sizeof (uint64_t), &key_hash_sha);
201 hash = key_hash_sha.bits[0];
182 202
183 if (0 != memcmp (&hash, &ibf->hash_sum[i], sizeof (struct GNUNET_HashCode))) 203 /* test if the hash matches the key */
204 if (hash != ibf->hash_sum[i])
205 continue;
206
207 /* test if key in bucket hits its own location,
208 * if not, the key hash was subject to collision */
209 hit = GNUNET_NO;
210 ibf_get_indices (ibf, ibf->id_sum[i], buckets);
211 for (j = 0; j < ibf->hash_num; j++)
212 if (buckets[j] == i)
213 hit = GNUNET_YES;
214
215 if (GNUNET_NO == hit)
184 continue; 216 continue;
185 217
186 if (NULL != ret_side) 218 if (NULL != ret_side)
@@ -189,7 +221,7 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
189 *ret_id = ibf->id_sum[i]; 221 *ret_id = ibf->id_sum[i];
190 222
191 /* insert on the opposite side, effectively removing the element */ 223 /* insert on the opposite side, effectively removing the element */
192 ibf_insert_on_side (ibf, &ibf->id_sum[i], -ibf->count[i]); 224 ibf_insert_into (ibf, ibf->id_sum[i], buckets, -ibf->count[i]);
193 225
194 return GNUNET_YES; 226 return GNUNET_YES;
195 } 227 }
@@ -200,7 +232,6 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
200} 232}
201 233
202 234
203
204/** 235/**
205 * Subtract ibf2 from ibf1, storing the result in ibf1. 236 * Subtract ibf2 from ibf1, storing the result in ibf1.
206 * The two IBF's must have the same parameters size and hash_num. 237 * The two IBF's must have the same parameters size and hash_num.
@@ -209,7 +240,7 @@ ibf_decode (struct InvertibleBloomFilter *ibf,
209 * @param ibf2 IBF that will be subtracted from ibf1 240 * @param ibf2 IBF that will be subtracted from ibf1
210 */ 241 */
211void 242void
212ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter *ibf2) 243ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
213{ 244{
214 int i; 245 int i;
215 246
@@ -220,10 +251,8 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter *
220 for (i = 0; i < ibf1->size; i++) 251 for (i = 0; i < ibf1->size; i++)
221 { 252 {
222 ibf1->count[i] -= ibf2->count[i]; 253 ibf1->count[i] -= ibf2->count[i];
223 GNUNET_CRYPTO_hash_xor (&ibf1->id_sum[i], &ibf2->id_sum[i], 254 ibf1->hash_sum[i] ^= ibf2->hash_sum[i];
224 &ibf1->id_sum[i]); 255 ibf1->id_sum[i] ^= ibf2->id_sum[i];
225 GNUNET_CRYPTO_hash_xor (&ibf1->hash_sum[i], &ibf2->hash_sum[i],
226 &ibf1->hash_sum[i]);
227 } 256 }
228} 257}
229 258
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h
index 26f101905..72345d3e1 100644
--- a/src/consensus/ibf.h
+++ b/src/consensus/ibf.h
@@ -42,7 +42,7 @@ extern "C"
42/** 42/**
43 * Size of one ibf bucket in bytes 43 * Size of one ibf bucket in bytes
44 */ 44 */
45#define IBF_BUCKET_SIZE (64+64+1) 45#define IBF_BUCKET_SIZE (8+4+1)
46 46
47 47
48/** 48/**
@@ -56,7 +56,7 @@ struct InvertibleBloomFilter
56 /** 56 /**
57 * How many cells does this IBF have? 57 * How many cells does this IBF have?
58 */ 58 */
59 unsigned int size; 59 uint32_t size;
60 60
61 /** 61 /**
62 * In how many cells do we hash one element? 62 * In how many cells do we hash one element?
@@ -72,12 +72,12 @@ struct InvertibleBloomFilter
72 /** 72 /**
73 * xor sums of the elements' hash codes, used to identify the elements. 73 * xor sums of the elements' hash codes, used to identify the elements.
74 */ 74 */
75 struct GNUNET_HashCode *id_sum; 75 uint64_t *id_sum;
76 76
77 /** 77 /**
78 * xor sums of the "hash of the hash". 78 * xor sums of the "hash of the hash".
79 */ 79 */
80 struct GNUNET_HashCode *hash_sum; 80 uint32_t *hash_sum;
81 81
82 /** 82 /**
83 * How many times has a bucket been hit? 83 * How many times has a bucket been hit?
@@ -88,16 +88,37 @@ struct InvertibleBloomFilter
88 88
89 89
90/** 90/**
91 * Create a key from a hashcode.
92 *
93 * @param hash the hashcode
94 * @return a key
95 */
96uint64_t
97ibf_key_from_hashcode (const struct GNUNET_HashCode *hash);
98
99
100/**
101 * Create a hashcode from a key, by replicating the key
102 * until the hascode is filled
103 *
104 * @param key the key
105 * @param dst hashcode to store the result in
106 */
107void
108ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst);
109
110
111/**
91 * Create an invertible bloom filter. 112 * Create an invertible bloom filter.
92 * 113 *
93 * @param size number of IBF buckets 114 * @param size number of IBF buckets
94 * @param hash_num number of buckets one element is hashed in 115 * @param hash_num number of buckets one element is hashed in, usually 3 or 4
95 * @param salt salt for mingling hashes, different salt may 116 * @param salt salt for mingling hashes, different salt may
96 * result in less (or more) collisions 117 * result in less (or more) collisions
97 * @return the newly created invertible bloom filter 118 * @return the newly created invertible bloom filter
98 */ 119 */
99struct InvertibleBloomFilter * 120struct InvertibleBloomFilter *
100ibf_create(unsigned int size, unsigned int hash_num, uint32_t salt); 121ibf_create(uint32_t size, unsigned int hash_num, uint32_t salt);
101 122
102 123
103/** 124/**
@@ -107,7 +128,7 @@ ibf_create(unsigned int size, unsigned int hash_num, uint32_t salt);
107 * @param id the element's hash code 128 * @param id the element's hash code
108 */ 129 */
109void 130void
110ibf_insert (struct InvertibleBloomFilter *ibf, const struct GNUNET_HashCode *id); 131ibf_insert (struct InvertibleBloomFilter *ibf, uint64_t id);
111 132
112 133
113/** 134/**
@@ -118,7 +139,7 @@ ibf_insert (struct InvertibleBloomFilter *ibf, const struct GNUNET_HashCode *id)
118 * @param ibf2 IBF that will be subtracted from ibf1 139 * @param ibf2 IBF that will be subtracted from ibf1
119 */ 140 */
120void 141void
121ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter *ibf2); 142ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2);
122 143
123 144
124/** 145/**
@@ -133,7 +154,7 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, struct InvertibleBloomFilter *
133 * GNUNET_SYSERR if the decoding has faile 154 * GNUNET_SYSERR if the decoding has faile
134 */ 155 */
135int 156int
136ibf_decode (struct InvertibleBloomFilter *ibf, int *side, struct GNUNET_HashCode *ret_id); 157ibf_decode (struct InvertibleBloomFilter *ibf, int *side, uint64_t *ret_id);
137 158
138 159
139/** 160/**