diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-03-07 14:12:11 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-03-07 14:12:11 +0000 |
commit | 159e38f1ed94c6b44ca20bc2a78fd5cad7027fd0 (patch) | |
tree | e4e60aec526c9a0ffbe3dd69896d394587e8586a /src/consensus | |
parent | 1a17d075effa5fbc3b3521ab0d15b2d035599969 (diff) | |
download | gnunet-159e38f1ed94c6b44ca20bc2a78fd5cad7027fd0.tar.gz gnunet-159e38f1ed94c6b44ca20bc2a78fd5cad7027fd0.zip |
consensus now implemented with primitive conclusion group selection
Diffstat (limited to 'src/consensus')
-rw-r--r-- | src/consensus/consensus_api.c | 13 | ||||
-rw-r--r-- | src/consensus/consensus_protocol.h | 4 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus-ibf.c | 6 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 15 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 855 | ||||
-rw-r--r-- | src/consensus/ibf.c | 213 | ||||
-rw-r--r-- | src/consensus/ibf.h | 107 |
7 files changed, 919 insertions, 294 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 7ebb0a9d9..e970040e1 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c | |||
@@ -139,6 +139,12 @@ struct GNUNET_CONSENSUS_Handle | |||
139 | 139 | ||
140 | struct QueuedMessage *messages_head; | 140 | struct QueuedMessage *messages_head; |
141 | struct QueuedMessage *messages_tail; | 141 | struct QueuedMessage *messages_tail; |
142 | |||
143 | /** | ||
144 | * GNUNET_YES when currently in a section where destroy may not be | ||
145 | * called. | ||
146 | */ | ||
147 | int may_not_destroy; | ||
142 | }; | 148 | }; |
143 | 149 | ||
144 | 150 | ||
@@ -279,7 +285,9 @@ handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, | |||
279 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) | 285 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) |
280 | { | 286 | { |
281 | GNUNET_assert (NULL != consensus->conclude_cb); | 287 | GNUNET_assert (NULL != consensus->conclude_cb); |
288 | consensus->may_not_destroy = GNUNET_YES; | ||
282 | consensus->conclude_cb (consensus->conclude_cls, NULL); | 289 | consensus->conclude_cb (consensus->conclude_cls, NULL); |
290 | consensus->may_not_destroy = GNUNET_NO; | ||
283 | consensus->conclude_cb = NULL; | 291 | consensus->conclude_cb = NULL; |
284 | } | 292 | } |
285 | 293 | ||
@@ -523,6 +531,11 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, | |||
523 | void | 531 | void |
524 | GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus) | 532 | GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus) |
525 | { | 533 | { |
534 | if (GNUNET_YES == consensus->may_not_destroy) | ||
535 | { | ||
536 | LOG (GNUNET_ERROR_TYPE_ERROR, "destroy may not be called right now\n"); | ||
537 | GNUNET_assert (0); | ||
538 | } | ||
526 | if (consensus->client != NULL) | 539 | if (consensus->client != NULL) |
527 | { | 540 | { |
528 | GNUNET_CLIENT_disconnect (consensus->client); | 541 | GNUNET_CLIENT_disconnect (consensus->client); |
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h index 0da959f8f..e8b2c8a34 100644 --- a/src/consensus/consensus_protocol.h +++ b/src/consensus/consensus_protocol.h | |||
@@ -39,6 +39,10 @@ struct StrataMessage | |||
39 | { | 39 | { |
40 | struct GNUNET_MessageHeader header; | 40 | struct GNUNET_MessageHeader header; |
41 | /** | 41 | /** |
42 | * Number of elements the sender currently has. | ||
43 | */ | ||
44 | uint16_t num_elements; | ||
45 | /** | ||
42 | * Number of strata in this estimator. | 46 | * Number of strata in this estimator. |
43 | */ | 47 | */ |
44 | uint16_t num_strata; | 48 | uint16_t num_strata; |
diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c index a16ca3247..f4a233ece 100644 --- a/src/consensus/gnunet-consensus-ibf.c +++ b/src/consensus/gnunet-consensus-ibf.c | |||
@@ -56,7 +56,7 @@ static void | |||
56 | register_hashcode (struct GNUNET_HashCode *hash) | 56 | register_hashcode (struct GNUNET_HashCode *hash) |
57 | { | 57 | { |
58 | struct GNUNET_HashCode replicated; | 58 | struct GNUNET_HashCode replicated; |
59 | uint64_t key; | 59 | struct IBF_Key key; |
60 | key = ibf_key_from_hashcode (hash); | 60 | key = ibf_key_from_hashcode (hash); |
61 | ibf_hashcode_from_key (key, &replicated); | 61 | ibf_hashcode_from_key (key, &replicated); |
62 | GNUNET_CONTAINER_multihashmap_put (key_to_hashcode, &replicated, GNUNET_memdup (hash, sizeof *hash), | 62 | GNUNET_CONTAINER_multihashmap_put (key_to_hashcode, &replicated, GNUNET_memdup (hash, sizeof *hash), |
@@ -64,7 +64,7 @@ register_hashcode (struct GNUNET_HashCode *hash) | |||
64 | } | 64 | } |
65 | 65 | ||
66 | static void | 66 | static void |
67 | iter_hashcodes (uint64_t key, GNUNET_CONTAINER_HashMapIterator iter, void *cls) | 67 | iter_hashcodes (struct IBF_Key key, GNUNET_CONTAINER_HashMapIterator iter, void *cls) |
68 | { | 68 | { |
69 | struct GNUNET_HashCode replicated; | 69 | struct GNUNET_HashCode replicated; |
70 | ibf_hashcode_from_key (key, &replicated); | 70 | ibf_hashcode_from_key (key, &replicated); |
@@ -100,7 +100,7 @@ run (void *cls, char *const *args, const char *cfgfile, | |||
100 | const struct GNUNET_CONFIGURATION_Handle *cfg) | 100 | const struct GNUNET_CONFIGURATION_Handle *cfg) |
101 | { | 101 | { |
102 | struct GNUNET_HashCode id; | 102 | struct GNUNET_HashCode id; |
103 | uint64_t ibf_key; | 103 | struct IBF_Key ibf_key; |
104 | int i; | 104 | int i; |
105 | int side; | 105 | int side; |
106 | int res; | 106 | int res; |
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index a63575825..fd6019c20 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c | |||
@@ -65,6 +65,16 @@ controller_cb(void *cls, | |||
65 | } | 65 | } |
66 | 66 | ||
67 | 67 | ||
68 | static void | ||
69 | destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx) | ||
70 | { | ||
71 | struct GNUNET_CONSENSUS_Handle *consensus; | ||
72 | consensus = cls; | ||
73 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying consensus\n"); | ||
74 | GNUNET_CONSENSUS_destroy (consensus); | ||
75 | } | ||
76 | |||
77 | |||
68 | /** | 78 | /** |
69 | * Called when a conclusion was successful. | 79 | * Called when a conclusion was successful. |
70 | * | 80 | * |
@@ -72,14 +82,13 @@ controller_cb(void *cls, | |||
72 | * @param group | 82 | * @param group |
73 | * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not | 83 | * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if not |
74 | */ | 84 | */ |
75 | static int | 85 | static void |
76 | conclude_cb (void *cls, const struct GNUNET_CONSENSUS_Group *group) | 86 | conclude_cb (void *cls, const struct GNUNET_CONSENSUS_Group *group) |
77 | { | 87 | { |
78 | return GNUNET_NO; | 88 | GNUNET_SCHEDULER_add_now (destroy, cls); |
79 | } | 89 | } |
80 | 90 | ||
81 | 91 | ||
82 | |||
83 | static void | 92 | static void |
84 | generate_indices (int *indices) | 93 | generate_indices (int *indices) |
85 | { | 94 | { |
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index d223360dc..2f59b86bc 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c | |||
@@ -20,7 +20,7 @@ | |||
20 | 20 | ||
21 | /** | 21 | /** |
22 | * @file consensus/gnunet-service-consensus.c | 22 | * @file consensus/gnunet-service-consensus.c |
23 | * @brief | 23 | * @brief multi-peer set reconciliation |
24 | * @author Florian Dold | 24 | * @author Florian Dold |
25 | */ | 25 | */ |
26 | 26 | ||
@@ -60,7 +60,7 @@ | |||
60 | * Choose this value so that computing the IBF is still cheaper | 60 | * Choose this value so that computing the IBF is still cheaper |
61 | * than transmitting all values. | 61 | * than transmitting all values. |
62 | */ | 62 | */ |
63 | #define MAX_IBF_ORDER (32) | 63 | #define MAX_IBF_ORDER (16) |
64 | 64 | ||
65 | 65 | ||
66 | /* forward declarations */ | 66 | /* forward declarations */ |
@@ -114,11 +114,20 @@ struct PendingElement | |||
114 | */ | 114 | */ |
115 | struct ConsensusPeerInformation | 115 | struct ConsensusPeerInformation |
116 | { | 116 | { |
117 | /** | ||
118 | * Socket for communicating with the peer, either created by the local peer, | ||
119 | * or the remote peer. | ||
120 | */ | ||
117 | struct GNUNET_STREAM_Socket *socket; | 121 | struct GNUNET_STREAM_Socket *socket; |
118 | 122 | ||
119 | /** | 123 | /** |
124 | * Message tokenizer, for the data received from this peer via the stream socket. | ||
125 | */ | ||
126 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | ||
127 | |||
128 | /** | ||
120 | * Is socket's connection established, i.e. can we write to it? | 129 | * Is socket's connection established, i.e. can we write to it? |
121 | * Only relevent on outgoing cpi. | 130 | * Only relevent to outgoing cpi. |
122 | */ | 131 | */ |
123 | int is_connected; | 132 | int is_connected; |
124 | 133 | ||
@@ -150,15 +159,22 @@ struct ConsensusPeerInformation | |||
150 | struct GNUNET_STREAM_WriteHandle *wh; | 159 | struct GNUNET_STREAM_WriteHandle *wh; |
151 | 160 | ||
152 | enum { | 161 | enum { |
153 | IBF_STATE_NONE, | 162 | /* beginning of round */ |
163 | IBF_STATE_NONE=0, | ||
164 | /* we currently receive an ibf */ | ||
154 | IBF_STATE_RECEIVING, | 165 | IBF_STATE_RECEIVING, |
166 | /* we currently transmit an ibf */ | ||
155 | IBF_STATE_TRANSMITTING, | 167 | IBF_STATE_TRANSMITTING, |
156 | IBF_STATE_DECODING | 168 | /* we decode a received ibf */ |
169 | IBF_STATE_DECODING, | ||
170 | /* wait for elements and element requests */ | ||
171 | IBF_STATE_ANTICIPATE_DIFF | ||
157 | } ibf_state ; | 172 | } ibf_state ; |
158 | 173 | ||
159 | /** | 174 | /** |
160 | * What is the order (=log2 size) of the ibf | 175 | * What is the order (=log2 size) of the ibf |
161 | * we're currently dealing with? | 176 | * we're currently dealing with? |
177 | * Interpretation depends on ibf_state. | ||
162 | */ | 178 | */ |
163 | int ibf_order; | 179 | int ibf_order; |
164 | 180 | ||
@@ -169,7 +185,8 @@ struct ConsensusPeerInformation | |||
169 | struct InvertibleBloomFilter *ibf; | 185 | struct InvertibleBloomFilter *ibf; |
170 | 186 | ||
171 | /** | 187 | /** |
172 | * How many buckets have we transmitted/received (depending on state)? | 188 | * How many buckets have we transmitted/received? |
189 | * Interpretatin depends on ibf_state | ||
173 | */ | 190 | */ |
174 | int ibf_bucket_counter; | 191 | int ibf_bucket_counter; |
175 | 192 | ||
@@ -180,11 +197,25 @@ struct ConsensusPeerInformation | |||
180 | struct InvertibleBloomFilter **strata; | 197 | struct InvertibleBloomFilter **strata; |
181 | 198 | ||
182 | /** | 199 | /** |
183 | * difference estimated with the current strata estimator | 200 | * Elements that the peer is missing from us. |
184 | */ | 201 | */ |
185 | unsigned int diff; | 202 | uint64_t *missing_local; |
186 | 203 | ||
187 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | 204 | /** |
205 | * Number of elements in missing_local | ||
206 | */ | ||
207 | unsigned int num_missing_local; | ||
208 | |||
209 | /** | ||
210 | * Elements that this peer told us *we* don't have, | ||
211 | * i.e. we are the remote peer that has some values missing. | ||
212 | */ | ||
213 | uint64_t *missing_remote; | ||
214 | |||
215 | /** | ||
216 | * Number of elements in missing_local | ||
217 | */ | ||
218 | unsigned int num_missing_remote; | ||
188 | 219 | ||
189 | /** | 220 | /** |
190 | * Back-reference to the consensus session, | 221 | * Back-reference to the consensus session, |
@@ -192,10 +223,17 @@ struct ConsensusPeerInformation | |||
192 | */ | 223 | */ |
193 | struct ConsensusSession *session; | 224 | struct ConsensusSession *session; |
194 | 225 | ||
195 | struct PendingElement *send_pending_head; | 226 | /** |
196 | struct PendingElement *send_pending_tail; | 227 | * When decoding the IBF, requests for elements and outgoing elements |
228 | * have to be queued, to ensure that messages actually fit in the stream buffer. | ||
229 | */ | ||
230 | struct QueuedMessage *requests_and_elements_head; | ||
231 | struct QueuedMessage *requests_and_elements_tail; | ||
197 | }; | 232 | }; |
198 | 233 | ||
234 | /** | ||
235 | * A doubly linked list of messages. | ||
236 | */ | ||
199 | struct QueuedMessage | 237 | struct QueuedMessage |
200 | { | 238 | { |
201 | struct GNUNET_MessageHeader *msg; | 239 | struct GNUNET_MessageHeader *msg; |
@@ -211,31 +249,40 @@ struct QueuedMessage | |||
211 | struct QueuedMessage *prev; | 249 | struct QueuedMessage *prev; |
212 | }; | 250 | }; |
213 | 251 | ||
252 | /** | ||
253 | * Describes the current round a consensus session is in. | ||
254 | */ | ||
214 | enum ConsensusRound | 255 | enum ConsensusRound |
215 | { | 256 | { |
216 | /** | 257 | /** |
217 | * distribution of information with the exponential scheme | 258 | * Not started the protocl yet |
259 | */ | ||
260 | CONSENSUS_ROUND_BEGIN=0, | ||
261 | /** | ||
262 | * distribution of information with the exponential scheme. | ||
218 | */ | 263 | */ |
219 | CONSENSUS_ROUND_EXP_EXCHANGE, | 264 | CONSENSUS_ROUND_EXP_EXCHANGE, |
220 | /** | 265 | /** |
221 | * All-to-all, exchange missing values | 266 | * All-to-all, exchange missing values. |
222 | */ | 267 | */ |
223 | CONSENSUS_ROUND_A2A_EXCHANGE, | 268 | CONSENSUS_ROUND_A2A_EXCHANGE, |
224 | /** | 269 | /** |
225 | * All-to-all, check what values are missing, don't exchange anything | 270 | * All-to-all, check what values are missing, don't exchange anything. |
226 | */ | 271 | */ |
227 | CONSENSUS_ROUND_A2A_INVENTORY | 272 | CONSENSUS_ROUND_A2A_INVENTORY, |
228 | 273 | /** | |
229 | /* | 274 | * All-to-all round to exchange information for byzantine fault detection. |
230 | a round to exchange the information for fraud-detection | 275 | */ |
231 | CONSENSUS_ROUNT_A2_INVENTORY_AGREEMENT | 276 | CONSENSUS_ROUND_A2A_INVENTORY_AGREEMENT, |
232 | */ | 277 | /** |
278 | * Rounds are over | ||
279 | */ | ||
280 | CONSENSUS_ROUND_FINISH | ||
233 | }; | 281 | }; |
234 | 282 | ||
235 | 283 | ||
236 | /** | 284 | /** |
237 | * A consensus session consists of one local client and the remote authorities. | 285 | * A consensus session consists of one local client and the remote authorities. |
238 | * | ||
239 | */ | 286 | */ |
240 | struct ConsensusSession | 287 | struct ConsensusSession |
241 | { | 288 | { |
@@ -301,15 +348,15 @@ struct ConsensusSession | |||
301 | struct GNUNET_SERVER_TransmitHandle *th; | 348 | struct GNUNET_SERVER_TransmitHandle *th; |
302 | 349 | ||
303 | /** | 350 | /** |
304 | * Once conclude_requested is GNUNET_YES, the client may not | 351 | * Timeout for all rounds together, single rounds will schedule a timeout task |
305 | * insert any more values. | 352 | * with a fraction of the conclude timeout. |
306 | */ | 353 | */ |
307 | int conclude_requested; | 354 | struct GNUNET_TIME_Relative conclude_timeout; |
308 | 355 | ||
309 | /** | 356 | /** |
310 | * Minimum number of peers to form a consensus group | 357 | * Timeout task identifier for the current round |
311 | */ | 358 | */ |
312 | int conclude_group_min; | 359 | GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; |
313 | 360 | ||
314 | /** | 361 | /** |
315 | * Number of other peers in the consensus | 362 | * Number of other peers in the consensus |
@@ -353,6 +400,7 @@ struct ConsensusSession | |||
353 | /** | 400 | /** |
354 | * Sockets from other peers who want to communicate with us. | 401 | * Sockets from other peers who want to communicate with us. |
355 | * It may not be known yet which consensus session they belong to. | 402 | * It may not be known yet which consensus session they belong to. |
403 | * Also, the session might not exist yet locally. | ||
356 | */ | 404 | */ |
357 | struct IncomingSocket | 405 | struct IncomingSocket |
358 | { | 406 | { |
@@ -400,6 +448,7 @@ struct IncomingSocket | |||
400 | struct GNUNET_HashCode *requested_gid; | 448 | struct GNUNET_HashCode *requested_gid; |
401 | }; | 449 | }; |
402 | 450 | ||
451 | |||
403 | static struct IncomingSocket *incoming_sockets_head; | 452 | static struct IncomingSocket *incoming_sockets_head; |
404 | static struct IncomingSocket *incoming_sockets_tail; | 453 | static struct IncomingSocket *incoming_sockets_tail; |
405 | 454 | ||
@@ -440,7 +489,7 @@ static struct GNUNET_STREAM_ListenSocket *listener; | |||
440 | 489 | ||
441 | 490 | ||
442 | /** | 491 | /** |
443 | * Queue a message to be sent to the inhabiting client of a sessino | 492 | * Queue a message to be sent to the inhabiting client of a session. |
444 | * | 493 | * |
445 | * @param session session | 494 | * @param session session |
446 | * @param msg message we want to queue | 495 | * @param msg message we want to queue |
@@ -465,7 +514,9 @@ get_cpi_index (struct ConsensusPeerInformation *cpi) | |||
465 | } | 514 | } |
466 | 515 | ||
467 | /** | 516 | /** |
468 | * Mark the peer as bad, free as state we don't need anymore. | 517 | * Mark the peer as bad, free state we don't need anymore. |
518 | * | ||
519 | * @param cpi consensus peer information of the bad peer | ||
469 | */ | 520 | */ |
470 | static void | 521 | static void |
471 | mark_peer_bad (struct ConsensusPeerInformation *cpi) | 522 | mark_peer_bad (struct ConsensusPeerInformation *cpi) |
@@ -479,6 +530,11 @@ mark_peer_bad (struct ConsensusPeerInformation *cpi) | |||
479 | /** | 530 | /** |
480 | * Estimate set difference with two strata estimators, | 531 | * Estimate set difference with two strata estimators, |
481 | * i.e. arrays of IBFs. | 532 | * i.e. arrays of IBFs. |
533 | * Does not not modify its arguments. | ||
534 | * | ||
535 | * @param strata1 first strata estimator | ||
536 | * @param strata2 second strata estimator | ||
537 | * @return the estimated difference | ||
482 | */ | 538 | */ |
483 | static int | 539 | static int |
484 | estimate_difference (struct InvertibleBloomFilter** strata1, | 540 | estimate_difference (struct InvertibleBloomFilter** strata1, |
@@ -490,9 +546,11 @@ estimate_difference (struct InvertibleBloomFilter** strata1, | |||
490 | for (i = STRATA_COUNT - 1; i >= 0; i--) | 546 | for (i = STRATA_COUNT - 1; i >= 0; i--) |
491 | { | 547 | { |
492 | struct InvertibleBloomFilter *diff; | 548 | struct InvertibleBloomFilter *diff; |
549 | /* number of keys decoded from the ibf */ | ||
493 | int ibf_count; | 550 | int ibf_count; |
494 | int more; | 551 | int more; |
495 | ibf_count = 0; | 552 | ibf_count = 0; |
553 | /* FIXME: implement this without always allocating new IBFs */ | ||
496 | diff = ibf_dup (strata1[i]); | 554 | diff = ibf_dup (strata1[i]); |
497 | ibf_subtract (diff, strata2[i]); | 555 | ibf_subtract (diff, strata2[i]); |
498 | for (;;) | 556 | for (;;) |
@@ -537,22 +595,17 @@ session_stream_data_processor (void *cls, | |||
537 | int ret; | 595 | int ret; |
538 | 596 | ||
539 | GNUNET_assert (GNUNET_STREAM_OK == status); | 597 | GNUNET_assert (GNUNET_STREAM_OK == status); |
540 | |||
541 | cpi = cls; | 598 | cpi = cls; |
542 | |||
543 | GNUNET_assert (NULL != cpi->mst); | 599 | GNUNET_assert (NULL != cpi->mst); |
544 | |||
545 | ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES); | 600 | ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES); |
546 | if (GNUNET_SYSERR == ret) | 601 | if (GNUNET_SYSERR == ret) |
547 | { | 602 | { |
548 | /* FIXME: handle this correctly */ | 603 | /* FIXME: handle this correctly */ |
549 | GNUNET_assert (0); | 604 | GNUNET_assert (0); |
550 | } | 605 | } |
551 | |||
552 | /* read again */ | 606 | /* read again */ |
553 | cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL, | 607 | cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL, |
554 | &session_stream_data_processor, cpi); | 608 | &session_stream_data_processor, cpi); |
555 | |||
556 | /* we always read all data */ | 609 | /* we always read all data */ |
557 | return size; | 610 | return size; |
558 | } | 611 | } |
@@ -578,26 +631,62 @@ incoming_stream_data_processor (void *cls, | |||
578 | int ret; | 631 | int ret; |
579 | 632 | ||
580 | GNUNET_assert (GNUNET_STREAM_OK == status); | 633 | GNUNET_assert (GNUNET_STREAM_OK == status); |
581 | |||
582 | incoming = cls; | 634 | incoming = cls; |
583 | |||
584 | ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES); | 635 | ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES); |
585 | if (GNUNET_SYSERR == ret) | 636 | if (GNUNET_SYSERR == ret) |
586 | { | 637 | { |
587 | /* FIXME: handle this correctly */ | 638 | /* FIXME: handle this correctly */ |
588 | GNUNET_assert (0); | 639 | GNUNET_assert (0); |
589 | } | 640 | } |
590 | |||
591 | /* read again */ | 641 | /* read again */ |
592 | incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, | 642 | incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, |
593 | &incoming_stream_data_processor, incoming); | 643 | &incoming_stream_data_processor, incoming); |
594 | |||
595 | /* we always read all data */ | 644 | /* we always read all data */ |
596 | return size; | 645 | return size; |
597 | } | 646 | } |
598 | 647 | ||
599 | 648 | ||
600 | /** | 649 | /** |
650 | * Iterator over hash map entries. | ||
651 | * Queue elements to be sent to the peer in cls. | ||
652 | * | ||
653 | * @param cls closure | ||
654 | * @param key current key code | ||
655 | * @param value value in the hash map | ||
656 | * @return GNUNET_YES if we should continue to | ||
657 | * iterate, | ||
658 | * GNUNET_NO if not. | ||
659 | */ | ||
660 | static int | ||
661 | send_element_iter (void *cls, | ||
662 | const struct GNUNET_HashCode *key, | ||
663 | void *value) | ||
664 | { | ||
665 | struct ConsensusPeerInformation *cpi; | ||
666 | struct GNUNET_CONSENSUS_Element *element; | ||
667 | struct QueuedMessage *qm; | ||
668 | struct GNUNET_MessageHeader *element_msg; | ||
669 | size_t msize; | ||
670 | cpi = cls; | ||
671 | element = value; | ||
672 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; | ||
673 | element_msg = GNUNET_malloc (msize); | ||
674 | element_msg->size = htons (msize); | ||
675 | if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) | ||
676 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | ||
677 | else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round) | ||
678 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE); | ||
679 | else | ||
680 | GNUNET_assert (0); | ||
681 | GNUNET_assert (NULL != element->data); | ||
682 | memcpy (&element_msg[1], element->data, element->size); | ||
683 | qm = GNUNET_malloc (sizeof *qm); | ||
684 | qm->msg = element_msg; | ||
685 | GNUNET_CONTAINER_DLL_insert (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm); | ||
686 | return GNUNET_YES; | ||
687 | } | ||
688 | |||
689 | /** | ||
601 | * Iterator to insert values into an ibf. | 690 | * Iterator to insert values into an ibf. |
602 | * | 691 | * |
603 | * @param cls closure | 692 | * @param cls closure |
@@ -618,6 +707,12 @@ ibf_values_iterator (void *cls, | |||
618 | return GNUNET_YES; | 707 | return GNUNET_YES; |
619 | } | 708 | } |
620 | 709 | ||
710 | /** | ||
711 | * Create and populate an IBF for the specified peer, | ||
712 | * if it does not already exist. | ||
713 | * | ||
714 | * @param peer to create the ibf for | ||
715 | */ | ||
621 | static void | 716 | static void |
622 | prepare_ibf (struct ConsensusPeerInformation *cpi) | 717 | prepare_ibf (struct ConsensusPeerInformation *cpi) |
623 | { | 718 | { |
@@ -630,6 +725,42 @@ prepare_ibf (struct ConsensusPeerInformation *cpi) | |||
630 | 725 | ||
631 | 726 | ||
632 | /** | 727 | /** |
728 | * Called when a remote peer wants to inform the local peer | ||
729 | * that the remote peer misses elements. | ||
730 | * Elements are not reconciled. | ||
731 | * | ||
732 | * @param cpi session | ||
733 | * @param msg message | ||
734 | */ | ||
735 | static int | ||
736 | handle_p2p_missing_local (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | ||
737 | { | ||
738 | uint64_t key; | ||
739 | key = *(uint64_t *) &msg[1]; | ||
740 | GNUNET_array_append (cpi->missing_remote, cpi->num_missing_remote, key); | ||
741 | return GNUNET_OK; | ||
742 | } | ||
743 | |||
744 | |||
745 | /** | ||
746 | * Called when a remote peer wants to inform the local peer | ||
747 | * that the local peer misses elements. | ||
748 | * Elements are not reconciled. | ||
749 | * | ||
750 | * @param cpi session | ||
751 | * @param msg message | ||
752 | */ | ||
753 | static int | ||
754 | handle_p2p_missing_remote (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *msg) | ||
755 | { | ||
756 | uint64_t key; | ||
757 | key = *(uint64_t *) &msg[1]; | ||
758 | GNUNET_array_append (cpi->missing_local, cpi->num_missing_local, key); | ||
759 | return GNUNET_OK; | ||
760 | } | ||
761 | |||
762 | |||
763 | /** | ||
633 | * Called when a peer sends us its strata estimator. | 764 | * Called when a peer sends us its strata estimator. |
634 | * In response, we sent out IBF of appropriate size back. | 765 | * In response, we sent out IBF of appropriate size back. |
635 | * | 766 | * |
@@ -640,9 +771,9 @@ static int | |||
640 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) | 771 | handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) |
641 | { | 772 | { |
642 | int i; | 773 | int i; |
643 | uint64_t *key_src; | 774 | unsigned int diff; |
644 | uint32_t *hash_src; | 775 | void *buf; |
645 | uint8_t *count_src; | 776 | size_t size; |
646 | 777 | ||
647 | GNUNET_assert (GNUNET_NO == cpi->is_outgoing); | 778 | GNUNET_assert (GNUNET_NO == cpi->is_outgoing); |
648 | 779 | ||
@@ -653,47 +784,36 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess | |||
653 | cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); | 784 | cpi->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); |
654 | } | 785 | } |
655 | 786 | ||
656 | /* for correct message alignment, copy bucket types seperately */ | 787 | size = ntohs (strata_msg->header.size); |
657 | key_src = (uint64_t *) &strata_msg[1]; | 788 | buf = (void *) &strata_msg[1]; |
658 | |||
659 | for (i = 0; i < STRATA_COUNT; i++) | 789 | for (i = 0; i < STRATA_COUNT; i++) |
660 | { | 790 | { |
661 | memcpy (cpi->strata[i]->id_sum, key_src, STRATA_IBF_BUCKETS * sizeof *key_src); | 791 | int res; |
662 | key_src += STRATA_IBF_BUCKETS; | 792 | res = ibf_read (&buf, &size, cpi->strata[i]); |
663 | } | 793 | GNUNET_assert (GNUNET_OK == res); |
664 | |||
665 | hash_src = (uint32_t *) key_src; | ||
666 | |||
667 | for (i = 0; i < STRATA_COUNT; i++) | ||
668 | { | ||
669 | memcpy (cpi->strata[i]->hash_sum, hash_src, STRATA_IBF_BUCKETS * sizeof *hash_src); | ||
670 | hash_src += STRATA_IBF_BUCKETS; | ||
671 | } | 794 | } |
672 | 795 | ||
673 | count_src = (uint8_t *) hash_src; | 796 | diff = estimate_difference (cpi->session->strata, cpi->strata); |
797 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", diff); | ||
674 | 798 | ||
675 | for (i = 0; i < STRATA_COUNT; i++) | 799 | if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) || |
800 | (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round)) | ||
676 | { | 801 | { |
677 | memcpy (cpi->strata[i]->count, count_src, STRATA_IBF_BUCKETS); | 802 | /* send IBF of the right size */ |
678 | count_src += STRATA_IBF_BUCKETS; | 803 | cpi->ibf_order = 0; |
804 | while ((1 << cpi->ibf_order) < diff) | ||
805 | cpi->ibf_order++; | ||
806 | if (cpi->ibf_order > MAX_IBF_ORDER) | ||
807 | cpi->ibf_order = MAX_IBF_ORDER; | ||
808 | cpi->ibf_order += 1; | ||
809 | /* create ibf if not already pre-computed */ | ||
810 | prepare_ibf (cpi); | ||
811 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
812 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | ||
813 | cpi->ibf_bucket_counter = 0; | ||
814 | write_ibf (cpi, GNUNET_STREAM_OK, 0); | ||
679 | } | 815 | } |
680 | 816 | ||
681 | cpi->diff = estimate_difference (cpi->session->strata, cpi->strata); | ||
682 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff); | ||
683 | |||
684 | /* send IBF of the right size */ | ||
685 | cpi->ibf_order = 0; | ||
686 | while ((1 << cpi->ibf_order) < cpi->diff) | ||
687 | cpi->ibf_order++; | ||
688 | if (cpi->ibf_order > MAX_IBF_ORDER) | ||
689 | cpi->ibf_order = MAX_IBF_ORDER; | ||
690 | cpi->ibf_order += 2; | ||
691 | /* create ibf if not already pre-computed */ | ||
692 | prepare_ibf (cpi); | ||
693 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | ||
694 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | ||
695 | write_ibf (cpi, GNUNET_STREAM_OK, 0); | ||
696 | |||
697 | return GNUNET_YES; | 817 | return GNUNET_YES; |
698 | } | 818 | } |
699 | 819 | ||
@@ -702,52 +822,62 @@ static int | |||
702 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) | 822 | handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) |
703 | { | 823 | { |
704 | int num_buckets; | 824 | int num_buckets; |
705 | uint64_t *key_src; | 825 | void *buf; |
706 | uint32_t *hash_src; | ||
707 | uint8_t *count_src; | ||
708 | 826 | ||
709 | num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; | 827 | num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; |
710 | 828 | switch (cpi->ibf_state) | |
711 | if (IBF_STATE_NONE == cpi->ibf_state) | ||
712 | { | 829 | { |
713 | cpi->ibf_state = IBF_STATE_RECEIVING; | 830 | case IBF_STATE_NONE: |
714 | cpi->ibf_order = digest->order; | 831 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving first ibf of order %d\n", digest->order); |
715 | cpi->ibf_bucket_counter = 0; | 832 | cpi->ibf_state = IBF_STATE_RECEIVING; |
833 | cpi->ibf_order = digest->order; | ||
834 | cpi->ibf_bucket_counter = 0; | ||
835 | if (NULL != cpi->ibf) | ||
836 | { | ||
837 | GNUNET_free (cpi->ibf); | ||
838 | cpi->ibf = NULL; | ||
839 | } | ||
840 | break; | ||
841 | case IBF_STATE_ANTICIPATE_DIFF: | ||
842 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving decode fail ibf of order %d\n", digest->order); | ||
843 | cpi->ibf_state = IBF_STATE_RECEIVING; | ||
844 | cpi->ibf_order = digest->order; | ||
845 | cpi->ibf_bucket_counter = 0; | ||
846 | if (NULL != cpi->ibf) | ||
847 | { | ||
848 | ibf_destroy (cpi->ibf); | ||
849 | cpi->ibf = NULL; | ||
850 | } | ||
851 | break; | ||
852 | case IBF_STATE_RECEIVING: | ||
853 | break; | ||
854 | default: | ||
855 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received ibf unexpectedly in state %d\n", cpi->ibf_state); | ||
856 | mark_peer_bad (cpi); | ||
857 | return GNUNET_NO; | ||
716 | } | 858 | } |
717 | 859 | ||
718 | if ( (IBF_STATE_RECEIVING != cpi->ibf_state) || | 860 | if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) |
719 | (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order)) ) | ||
720 | { | 861 | { |
862 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n"); | ||
721 | mark_peer_bad (cpi); | 863 | mark_peer_bad (cpi); |
722 | return GNUNET_NO; | 864 | return GNUNET_NO; |
723 | } | 865 | } |
724 | 866 | ||
725 | 867 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, | |
726 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->ibf_bucket_counter, (1 << cpi->ibf_order)); | 868 | cpi->ibf_bucket_counter, (1 << cpi->ibf_order)); |
727 | 869 | ||
728 | if (NULL == cpi->ibf) | 870 | if (NULL == cpi->ibf) |
729 | cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); | 871 | cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); |
730 | 872 | ||
731 | key_src = (uint64_t *) &digest[1]; | 873 | buf = (void *) &digest[1]; |
732 | 874 | ibf_read_slice (&buf, NULL, cpi->ibf_bucket_counter, num_buckets, cpi->ibf); | |
733 | memcpy (cpi->ibf->hash_sum, key_src, num_buckets * sizeof *key_src); | ||
734 | hash_src += num_buckets; | ||
735 | |||
736 | hash_src = (uint32_t *) key_src; | ||
737 | |||
738 | memcpy (cpi->ibf->id_sum, hash_src, num_buckets * sizeof *hash_src); | ||
739 | hash_src += num_buckets; | ||
740 | |||
741 | count_src = (uint8_t *) hash_src; | ||
742 | |||
743 | memcpy (cpi->ibf->count, count_src, num_buckets * sizeof *count_src); | ||
744 | 875 | ||
745 | cpi->ibf_bucket_counter += num_buckets; | 876 | cpi->ibf_bucket_counter += num_buckets; |
746 | 877 | ||
747 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) | 878 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) |
748 | { | 879 | { |
749 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); | 880 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); |
750 | GNUNET_assert (NULL != cpi->wh); | ||
751 | cpi->ibf_state = IBF_STATE_DECODING; | 881 | cpi->ibf_state = IBF_STATE_DECODING; |
752 | prepare_ibf (cpi); | 882 | prepare_ibf (cpi); |
753 | ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); | 883 | ibf_subtract (cpi->ibf, cpi->session->ibfs[cpi->ibf_order]); |
@@ -794,15 +924,62 @@ handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_Me | |||
794 | 924 | ||
795 | 925 | ||
796 | /** | 926 | /** |
927 | * Functions of this signature are called whenever writing operations | ||
928 | * on a stream are executed | ||
929 | * | ||
930 | * @param cls the closure from GNUNET_STREAM_write | ||
931 | * @param status the status of the stream at the time this function is called; | ||
932 | * GNUNET_STREAM_OK if writing to stream was completed successfully; | ||
933 | * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully | ||
934 | * (this doesn't mean that the data is never sent, the receiver may | ||
935 | * have read the data but its ACKs may have been lost); | ||
936 | * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the | ||
937 | * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot | ||
938 | * be processed. | ||
939 | * @param size the number of bytes written | ||
940 | */ | ||
941 | static void | ||
942 | write_requested_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
943 | { | ||
944 | struct ConsensusPeerInformation *cpi; | ||
945 | cpi = cls; | ||
946 | GNUNET_assert (NULL == cpi->wh); | ||
947 | cpi->wh = NULL; | ||
948 | if (NULL != cpi->requests_and_elements_head) | ||
949 | { | ||
950 | struct QueuedMessage *qm; | ||
951 | qm = cpi->requests_and_elements_head; | ||
952 | GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm); | ||
953 | |||
954 | cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size), | ||
955 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
956 | write_requested_elements, cpi); | ||
957 | GNUNET_assert (NULL != cpi->wh); | ||
958 | } | ||
959 | } | ||
960 | |||
961 | |||
962 | /** | ||
797 | * Handle a request for elements. | 963 | * Handle a request for elements. |
798 | * Only allowed in exchange-rounds. | 964 | * Only allowed in exchange-rounds. |
799 | * | ||
800 | * FIXME: implement | ||
801 | */ | 965 | */ |
802 | static int | 966 | static int |
803 | handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) | 967 | handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct ElementRequest *msg) |
804 | { | 968 | { |
805 | /* FIXME: implement */ | 969 | struct GNUNET_HashCode *hashcode; |
970 | unsigned int num; | ||
971 | |||
972 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request\n"); | ||
973 | num = ntohs (msg->header.size) / sizeof (struct GNUNET_HashCode); | ||
974 | hashcode = (struct GNUNET_HashCode *) &msg[1]; | ||
975 | while (num--) | ||
976 | { | ||
977 | GNUNET_assert (IBF_STATE_ANTICIPATE_DIFF == cpi->ibf_state); | ||
978 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, hashcode, send_element_iter, cpi); | ||
979 | if (NULL == cpi->wh) | ||
980 | write_requested_elements (cpi, GNUNET_STREAM_OK, 0); | ||
981 | hashcode++; | ||
982 | } | ||
806 | return GNUNET_YES; | 983 | return GNUNET_YES; |
807 | } | 984 | } |
808 | 985 | ||
@@ -831,6 +1008,12 @@ handle_p2p_hello (struct IncomingSocket *inc, const struct ConsensusHello *hello | |||
831 | inc->cpi->mst = inc->mst; | 1008 | inc->cpi->mst = inc->mst; |
832 | inc->cpi->hello = GNUNET_YES; | 1009 | inc->cpi->hello = GNUNET_YES; |
833 | inc->cpi->socket = inc->socket; | 1010 | inc->cpi->socket = inc->socket; |
1011 | |||
1012 | if ( (CONSENSUS_ROUND_A2A_EXCHANGE == session->current_round) && | ||
1013 | (GNUNET_YES == inc->cpi->is_outgoing)) | ||
1014 | { | ||
1015 | write_strata (&session->info[idx], GNUNET_STREAM_OK, 0); | ||
1016 | } | ||
834 | return GNUNET_YES; | 1017 | return GNUNET_YES; |
835 | } | 1018 | } |
836 | session = session->next; | 1019 | session = session->next; |
@@ -866,6 +1049,10 @@ mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader | |||
866 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); | 1049 | return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); |
867 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: | 1050 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: |
868 | return handle_p2p_element (cpi, message); | 1051 | return handle_p2p_element (cpi, message); |
1052 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL: | ||
1053 | return handle_p2p_missing_local (cpi, message); | ||
1054 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_REMOTE: | ||
1055 | return handle_p2p_missing_remote (cpi, message); | ||
869 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: | 1056 | case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST: |
870 | return handle_p2p_element_request (cpi, (struct ElementRequest *) message); | 1057 | return handle_p2p_element_request (cpi, (struct ElementRequest *) message); |
871 | default: | 1058 | default: |
@@ -938,7 +1125,6 @@ listen_cb (void *cls, | |||
938 | incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | 1125 | incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, |
939 | &incoming_stream_data_processor, incoming); | 1126 | &incoming_stream_data_processor, incoming); |
940 | 1127 | ||
941 | |||
942 | incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); | 1128 | incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); |
943 | 1129 | ||
944 | GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); | 1130 | GNUNET_CONTAINER_DLL_insert_tail (incoming_sockets_head, incoming_sockets_tail, incoming); |
@@ -947,6 +1133,11 @@ listen_cb (void *cls, | |||
947 | } | 1133 | } |
948 | 1134 | ||
949 | 1135 | ||
1136 | /** | ||
1137 | * Destroy a session, free all resources associated with it. | ||
1138 | * | ||
1139 | * @param session the session to destroy | ||
1140 | */ | ||
950 | static void | 1141 | static void |
951 | destroy_session (struct ConsensusSession *session) | 1142 | destroy_session (struct ConsensusSession *session) |
952 | { | 1143 | { |
@@ -1013,10 +1204,7 @@ compute_global_id (const struct GNUNET_HashCode *local_id, | |||
1013 | 1204 | ||
1014 | 1205 | ||
1015 | /** | 1206 | /** |
1016 | * Function called to notify a client about the connection | 1207 | * Transmit a queued message to the session's client. |
1017 | * begin ready to queue more data. "buf" will be | ||
1018 | * NULL and "size" zero if the connection was closed for | ||
1019 | * writing in the meantime. | ||
1020 | * | 1208 | * |
1021 | * @param cls consensus session | 1209 | * @param cls consensus session |
1022 | * @param size number of bytes available in buf | 1210 | * @param size number of bytes available in buf |
@@ -1034,7 +1222,6 @@ transmit_queued (void *cls, size_t size, | |||
1034 | session = cls; | 1222 | session = cls; |
1035 | session->th = NULL; | 1223 | session->th = NULL; |
1036 | 1224 | ||
1037 | |||
1038 | qmsg = session->client_messages_head; | 1225 | qmsg = session->client_messages_head; |
1039 | GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); | 1226 | GNUNET_CONTAINER_DLL_remove (session->client_messages_head, session->client_messages_tail, qmsg); |
1040 | GNUNET_assert (qmsg); | 1227 | GNUNET_assert (qmsg); |
@@ -1060,7 +1247,7 @@ transmit_queued (void *cls, size_t size, | |||
1060 | 1247 | ||
1061 | 1248 | ||
1062 | /** | 1249 | /** |
1063 | * Schedule sending the next message (if there is any) to a client. | 1250 | * Schedule transmitting the next queued message (if any) to a client. |
1064 | * | 1251 | * |
1065 | * @param cli the client to send the next message to | 1252 | * @param cli the client to send the next message to |
1066 | */ | 1253 | */ |
@@ -1118,7 +1305,6 @@ get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSess | |||
1118 | } | 1305 | } |
1119 | 1306 | ||
1120 | 1307 | ||
1121 | |||
1122 | /** | 1308 | /** |
1123 | * Called when stream has finishes writing the hello message | 1309 | * Called when stream has finishes writing the hello message |
1124 | */ | 1310 | */ |
@@ -1128,21 +1314,25 @@ hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1128 | struct ConsensusPeerInformation *cpi; | 1314 | struct ConsensusPeerInformation *cpi; |
1129 | 1315 | ||
1130 | cpi = cls; | 1316 | cpi = cls; |
1317 | cpi->wh = NULL; | ||
1131 | cpi->hello = GNUNET_YES; | 1318 | cpi->hello = GNUNET_YES; |
1132 | 1319 | ||
1133 | GNUNET_assert (GNUNET_STREAM_OK == status); | 1320 | GNUNET_assert (GNUNET_STREAM_OK == status); |
1134 | 1321 | ||
1135 | if (cpi->session->conclude_requested) | 1322 | /* FIXME: other rounds */ |
1323 | |||
1324 | if ( (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) && | ||
1325 | (GNUNET_YES == cpi->is_outgoing)) | ||
1136 | { | 1326 | { |
1137 | write_strata (cpi, GNUNET_STREAM_OK, 0); | 1327 | write_strata (cpi, GNUNET_STREAM_OK, 0); |
1138 | } | 1328 | } |
1139 | } | 1329 | } |
1140 | 1330 | ||
1141 | 1331 | ||
1142 | /** | 1332 | /** |
1143 | * Functions of this type will be called when a stream is established | 1333 | * Called when we established a stream connection to another peer |
1144 | * | 1334 | * |
1145 | * @param cls the closure from GNUNET_STREAM_open | 1335 | * @param cls cpi of the peer we just connected to |
1146 | * @param socket socket to use to communicate with the other side (read/write) | 1336 | * @param socket socket to use to communicate with the other side (read/write) |
1147 | */ | 1337 | */ |
1148 | static void | 1338 | static void |
@@ -1151,9 +1341,9 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) | |||
1151 | struct ConsensusPeerInformation *cpi; | 1341 | struct ConsensusPeerInformation *cpi; |
1152 | struct ConsensusHello *hello; | 1342 | struct ConsensusHello *hello; |
1153 | 1343 | ||
1154 | |||
1155 | cpi = cls; | 1344 | cpi = cls; |
1156 | cpi->is_connected = GNUNET_YES; | 1345 | cpi->is_connected = GNUNET_YES; |
1346 | cpi->wh = NULL; | ||
1157 | 1347 | ||
1158 | hello = GNUNET_malloc (sizeof *hello); | 1348 | hello = GNUNET_malloc (sizeof *hello); |
1159 | hello->header.size = htons (sizeof *hello); | 1349 | hello->header.size = htons (sizeof *hello); |
@@ -1165,7 +1355,6 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) | |||
1165 | 1355 | ||
1166 | cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, | 1356 | cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, |
1167 | &session_stream_data_processor, cpi); | 1357 | &session_stream_data_processor, cpi); |
1168 | |||
1169 | } | 1358 | } |
1170 | 1359 | ||
1171 | 1360 | ||
@@ -1182,7 +1371,7 @@ initialize_session_info (struct ConsensusSession *session) | |||
1182 | session->info[i].session = session; | 1371 | session->info[i].session = session; |
1183 | } | 1372 | } |
1184 | 1373 | ||
1185 | session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE; | 1374 | session->current_round = CONSENSUS_ROUND_BEGIN; |
1186 | 1375 | ||
1187 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; | 1376 | last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; |
1188 | i = (session->local_peer_idx + 1) % session->num_peers; | 1377 | i = (session->local_peer_idx + 1) % session->num_peers; |
@@ -1267,6 +1456,47 @@ strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *ke | |||
1267 | 1456 | ||
1268 | 1457 | ||
1269 | /** | 1458 | /** |
1459 | * Add incoming peer connections to the session, | ||
1460 | * for peers who have connected to us before the local session has been established | ||
1461 | * | ||
1462 | * @param session ... | ||
1463 | */ | ||
1464 | static void | ||
1465 | add_incoming_peers (struct ConsensusSession *session) | ||
1466 | { | ||
1467 | struct IncomingSocket *inc; | ||
1468 | inc = incoming_sockets_head; | ||
1469 | |||
1470 | while (NULL != inc) | ||
1471 | { | ||
1472 | if (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, inc->requested_gid)) | ||
1473 | { | ||
1474 | int i; | ||
1475 | for (i = 0; i < session->num_peers; i++) | ||
1476 | { | ||
1477 | struct ConsensusPeerInformation *cpi; | ||
1478 | cpi = &session->info[i]; | ||
1479 | if (0 == memcmp (inc->peer, &cpi->session->peers[i], sizeof (struct GNUNET_PeerIdentity))) | ||
1480 | { | ||
1481 | if (GNUNET_YES == cpi->is_outgoing) | ||
1482 | { | ||
1483 | /* FIXME: disconnect */ | ||
1484 | continue; | ||
1485 | } | ||
1486 | cpi->socket = inc->socket; | ||
1487 | inc->cpi = cpi; | ||
1488 | inc->cpi->mst = inc->mst; | ||
1489 | inc->cpi->hello = GNUNET_YES; | ||
1490 | break; | ||
1491 | } | ||
1492 | } | ||
1493 | } | ||
1494 | inc = inc->next; | ||
1495 | } | ||
1496 | } | ||
1497 | |||
1498 | |||
1499 | /** | ||
1270 | * Initialize the session, continue receiving messages from the owning client | 1500 | * Initialize the session, continue receiving messages from the owning client |
1271 | * | 1501 | * |
1272 | * @param session the session to initialize | 1502 | * @param session the session to initialize |
@@ -1311,7 +1541,7 @@ initialize_session (struct ConsensusSession *session) | |||
1311 | for (i = 0; i < STRATA_COUNT; i++) | 1541 | for (i = 0; i < STRATA_COUNT; i++) |
1312 | session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); | 1542 | session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); |
1313 | 1543 | ||
1314 | session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *)); | 1544 | session->ibfs = GNUNET_malloc ((MAX_IBF_ORDER+1) * sizeof (struct InvertibleBloomFilter *)); |
1315 | 1545 | ||
1316 | session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); | 1546 | session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); |
1317 | initialize_session_info (session); | 1547 | initialize_session_info (session); |
@@ -1319,6 +1549,8 @@ initialize_session (struct ConsensusSession *session) | |||
1319 | GNUNET_free (session->join_msg); | 1549 | GNUNET_free (session->join_msg); |
1320 | session->join_msg = NULL; | 1550 | session->join_msg = NULL; |
1321 | 1551 | ||
1552 | add_incoming_peers (session); | ||
1553 | |||
1322 | GNUNET_SERVER_receive_done (session->client, GNUNET_OK); | 1554 | GNUNET_SERVER_receive_done (session->client, GNUNET_OK); |
1323 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); | 1555 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session %s initialized\n", GNUNET_h2s (&session->global_id)); |
1324 | } | 1556 | } |
@@ -1370,6 +1602,19 @@ client_join (void *cls, | |||
1370 | 1602 | ||
1371 | 1603 | ||
1372 | /** | 1604 | /** |
1605 | * Hash a block of data, producing a replicated ibf hash. | ||
1606 | */ | ||
1607 | static void | ||
1608 | hash_for_ibf (const void *block, size_t size, struct GNUNET_HashCode *ret) | ||
1609 | { | ||
1610 | struct IBF_Key ibf_key; | ||
1611 | GNUNET_CRYPTO_hash (block, size, ret); | ||
1612 | ibf_key = ibf_key_from_hashcode (ret); | ||
1613 | ibf_hashcode_from_key (ibf_key, ret); | ||
1614 | } | ||
1615 | |||
1616 | |||
1617 | /** | ||
1373 | * Called when a client performs an insert operation. | 1618 | * Called when a client performs an insert operation. |
1374 | * | 1619 | * |
1375 | * @param cls (unused) | 1620 | * @param cls (unused) |
@@ -1384,7 +1629,7 @@ client_insert (void *cls, | |||
1384 | struct ConsensusSession *session; | 1629 | struct ConsensusSession *session; |
1385 | struct GNUNET_CONSENSUS_ElementMessage *msg; | 1630 | struct GNUNET_CONSENSUS_ElementMessage *msg; |
1386 | struct GNUNET_CONSENSUS_Element *element; | 1631 | struct GNUNET_CONSENSUS_Element *element; |
1387 | struct GNUNET_HashCode key; | 1632 | struct GNUNET_HashCode hash; |
1388 | int element_size; | 1633 | int element_size; |
1389 | 1634 | ||
1390 | session = sessions_head; | 1635 | session = sessions_head; |
@@ -1413,12 +1658,14 @@ client_insert (void *cls, | |||
1413 | 1658 | ||
1414 | GNUNET_assert (NULL != element->data); | 1659 | GNUNET_assert (NULL != element->data); |
1415 | 1660 | ||
1416 | GNUNET_CRYPTO_hash (element, element_size, &key); | 1661 | hash_for_ibf (element, element_size, &hash); |
1417 | 1662 | ||
1418 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, | 1663 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "inserting with hash_for_ibf %s\n", GNUNET_h2s (&hash)); |
1664 | |||
1665 | GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element, | ||
1419 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 1666 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
1420 | 1667 | ||
1421 | strata_insert (session->strata, &key); | 1668 | strata_insert (session->strata, &hash); |
1422 | 1669 | ||
1423 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 1670 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1424 | 1671 | ||
@@ -1426,8 +1673,6 @@ client_insert (void *cls, | |||
1426 | } | 1673 | } |
1427 | 1674 | ||
1428 | 1675 | ||
1429 | |||
1430 | |||
1431 | /** | 1676 | /** |
1432 | * Functions of this signature are called whenever writing operations | 1677 | * Functions of this signature are called whenever writing operations |
1433 | * on a stream are executed | 1678 | * on a stream are executed |
@@ -1446,10 +1691,14 @@ client_insert (void *cls, | |||
1446 | static void | 1691 | static void |
1447 | write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 1692 | write_strata_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
1448 | { | 1693 | { |
1694 | struct ConsensusPeerInformation *cpi; | ||
1695 | cpi = cls; | ||
1696 | cpi->wh = NULL; | ||
1449 | GNUNET_assert (GNUNET_STREAM_OK == status); | 1697 | GNUNET_assert (GNUNET_STREAM_OK == status); |
1450 | /* just wait for the ibf */ | 1698 | /* just wait for the ibf */ |
1451 | } | 1699 | } |
1452 | 1700 | ||
1701 | |||
1453 | /** | 1702 | /** |
1454 | * Functions of this signature are called whenever writing operations | 1703 | * Functions of this signature are called whenever writing operations |
1455 | * on a stream are executed | 1704 | * on a stream are executed |
@@ -1470,11 +1719,9 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1470 | { | 1719 | { |
1471 | struct ConsensusPeerInformation *cpi; | 1720 | struct ConsensusPeerInformation *cpi; |
1472 | struct StrataMessage *strata_msg; | 1721 | struct StrataMessage *strata_msg; |
1722 | void *buf; | ||
1473 | size_t msize; | 1723 | size_t msize; |
1474 | int i; | 1724 | int i; |
1475 | uint64_t *key_dst; | ||
1476 | uint32_t *hash_dst; | ||
1477 | uint8_t *count_dst; | ||
1478 | 1725 | ||
1479 | cpi = cls; | 1726 | cpi = cls; |
1480 | cpi->wh = NULL; | 1727 | cpi->wh = NULL; |
@@ -1491,36 +1738,30 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1491 | strata_msg = GNUNET_malloc (msize); | 1738 | strata_msg = GNUNET_malloc (msize); |
1492 | strata_msg->header.size = htons (msize); | 1739 | strata_msg->header.size = htons (msize); |
1493 | strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); | 1740 | strata_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE); |
1494 | 1741 | ||
1495 | /* for correct message alignment, copy bucket types seperately */ | 1742 | buf = &strata_msg[1]; |
1496 | key_dst = (uint64_t *) &strata_msg[1]; | ||
1497 | |||
1498 | for (i = 0; i < STRATA_COUNT; i++) | ||
1499 | { | ||
1500 | memcpy (key_dst, cpi->session->strata[i]->id_sum, STRATA_IBF_BUCKETS * sizeof *key_dst); | ||
1501 | key_dst += STRATA_IBF_BUCKETS; | ||
1502 | } | ||
1503 | |||
1504 | hash_dst = (uint32_t *) key_dst; | ||
1505 | |||
1506 | for (i = 0; i < STRATA_COUNT; i++) | ||
1507 | { | ||
1508 | memcpy (hash_dst, cpi->session->strata[i]->hash_sum, STRATA_IBF_BUCKETS * sizeof *hash_dst); | ||
1509 | hash_dst += STRATA_IBF_BUCKETS; | ||
1510 | } | ||
1511 | |||
1512 | count_dst = (uint8_t *) hash_dst; | ||
1513 | |||
1514 | for (i = 0; i < STRATA_COUNT; i++) | 1743 | for (i = 0; i < STRATA_COUNT; i++) |
1515 | { | 1744 | { |
1516 | memcpy (count_dst, cpi->session->strata[i]->count, STRATA_IBF_BUCKETS); | 1745 | ibf_write (cpi->session->strata[i], &buf, NULL); |
1517 | count_dst += STRATA_IBF_BUCKETS; | ||
1518 | } | 1746 | } |
1519 | 1747 | ||
1520 | cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | 1748 | cpi->wh = GNUNET_STREAM_write (cpi->socket, strata_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, |
1521 | write_strata_done, cpi); | 1749 | write_strata_done, cpi); |
1522 | 1750 | ||
1523 | GNUNET_assert (NULL != cpi->wh); | 1751 | GNUNET_assert (NULL != cpi->wh); |
1752 | |||
1753 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n"); | ||
1754 | } | ||
1755 | |||
1756 | |||
1757 | static void | ||
1758 | write_ibf_done (void *cls, enum GNUNET_STREAM_Status status, size_t size) | ||
1759 | { | ||
1760 | struct ConsensusPeerInformation *cpi; | ||
1761 | cpi = cls; | ||
1762 | cpi->wh = NULL; | ||
1763 | GNUNET_assert (GNUNET_STREAM_OK == status); | ||
1764 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "write ibf done callback\n"); | ||
1524 | } | 1765 | } |
1525 | 1766 | ||
1526 | 1767 | ||
@@ -1545,24 +1786,17 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1545 | struct ConsensusPeerInformation *cpi; | 1786 | struct ConsensusPeerInformation *cpi; |
1546 | struct DifferenceDigest *digest; | 1787 | struct DifferenceDigest *digest; |
1547 | int msize; | 1788 | int msize; |
1548 | uint64_t *key_dst; | ||
1549 | uint32_t *hash_dst; | ||
1550 | uint8_t *count_dst; | ||
1551 | int num_buckets; | 1789 | int num_buckets; |
1790 | void *buf; | ||
1552 | 1791 | ||
1553 | cpi = cls; | 1792 | cpi = cls; |
1554 | cpi->wh = NULL; | 1793 | cpi->wh = NULL; |
1555 | 1794 | ||
1556 | GNUNET_assert (GNUNET_STREAM_OK == status); | 1795 | GNUNET_assert (GNUNET_STREAM_OK == status); |
1557 | |||
1558 | GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state); | 1796 | GNUNET_assert (IBF_STATE_TRANSMITTING == cpi->ibf_state); |
1559 | 1797 | ||
1560 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) | 1798 | /* we should not be done here! */ |
1561 | { | 1799 | GNUNET_assert (cpi->ibf_bucket_counter != (1 << cpi->ibf_order)); |
1562 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); | ||
1563 | /* we now wait for values / requests / another IBF because peer could not decode with our IBF */ | ||
1564 | return; | ||
1565 | } | ||
1566 | 1800 | ||
1567 | /* remaining buckets */ | 1801 | /* remaining buckets */ |
1568 | num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; | 1802 | num_buckets = (1 << cpi->ibf_order) - cpi->ibf_bucket_counter; |
@@ -1580,24 +1814,23 @@ write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) | |||
1580 | digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); | 1814 | digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); |
1581 | digest->order = cpi->ibf_order; | 1815 | digest->order = cpi->ibf_order; |
1582 | 1816 | ||
1583 | key_dst = (uint64_t *) &digest[1]; | 1817 | buf = &digest[1]; |
1584 | 1818 | ibf_write_slice (cpi->ibf, cpi->ibf_bucket_counter, num_buckets, &buf, NULL); | |
1585 | memcpy (key_dst, cpi->ibf->id_sum, num_buckets * sizeof *key_dst); | ||
1586 | key_dst += num_buckets; | ||
1587 | |||
1588 | hash_dst = (uint32_t *) key_dst; | ||
1589 | |||
1590 | memcpy (hash_dst, cpi->ibf->id_sum, num_buckets * sizeof *hash_dst); | ||
1591 | hash_dst += num_buckets; | ||
1592 | |||
1593 | count_dst = (uint8_t *) hash_dst; | ||
1594 | |||
1595 | memcpy (count_dst, cpi->ibf->count, num_buckets * sizeof *count_dst); | ||
1596 | 1819 | ||
1597 | cpi->ibf_bucket_counter += num_buckets; | 1820 | cpi->ibf_bucket_counter += num_buckets; |
1598 | 1821 | ||
1599 | cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, | 1822 | /* we have to set the new state here, because of non-deterministic schedulung */ |
1600 | write_ibf, cpi); | 1823 | if (cpi->ibf_bucket_counter == (1 << cpi->ibf_order)) |
1824 | { | ||
1825 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); | ||
1826 | /* we now wait for values / requests / another IBF because peer could not decode with our IBF */ | ||
1827 | cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF; | ||
1828 | cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf_done, cpi); | ||
1829 | } | ||
1830 | else | ||
1831 | { | ||
1832 | cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, write_ibf, cpi); | ||
1833 | } | ||
1601 | 1834 | ||
1602 | GNUNET_assert (NULL != cpi->wh); | 1835 | GNUNET_assert (NULL != cpi->wh); |
1603 | } | 1836 | } |
@@ -1622,19 +1855,32 @@ static void | |||
1622 | write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size) | 1855 | write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t size) |
1623 | { | 1856 | { |
1624 | struct ConsensusPeerInformation *cpi; | 1857 | struct ConsensusPeerInformation *cpi; |
1625 | uint64_t key; | 1858 | struct IBF_Key key; |
1626 | struct GNUNET_HashCode hashcode; | 1859 | struct GNUNET_HashCode hashcode; |
1627 | int side; | 1860 | int side; |
1628 | int msize; | ||
1629 | 1861 | ||
1630 | GNUNET_assert (GNUNET_STREAM_OK == status); | 1862 | GNUNET_assert (GNUNET_STREAM_OK == status); |
1631 | 1863 | ||
1632 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n"); | 1864 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n"); |
1633 | 1865 | ||
1634 | cpi = cls; | 1866 | cpi = cls; |
1867 | GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state); | ||
1635 | cpi->wh = NULL; | 1868 | cpi->wh = NULL; |
1636 | 1869 | ||
1637 | GNUNET_assert (IBF_STATE_DECODING == cpi->ibf_state); | 1870 | if (NULL != cpi->requests_and_elements_head) |
1871 | { | ||
1872 | struct QueuedMessage *qm; | ||
1873 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending queued element\n"); | ||
1874 | qm = cpi->requests_and_elements_head; | ||
1875 | GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm); | ||
1876 | |||
1877 | cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size), | ||
1878 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1879 | write_requests_and_elements, cpi); | ||
1880 | GNUNET_assert (NULL != cpi->wh); | ||
1881 | /* some elements / requests have queued up, we have to transmit them first */ | ||
1882 | return; | ||
1883 | } | ||
1638 | 1884 | ||
1639 | for (;;) | 1885 | for (;;) |
1640 | { | 1886 | { |
@@ -1642,10 +1888,13 @@ write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t | |||
1642 | res = ibf_decode (cpi->ibf, &side, &key); | 1888 | res = ibf_decode (cpi->ibf, &side, &key); |
1643 | if (GNUNET_SYSERR == res) | 1889 | if (GNUNET_SYSERR == res) |
1644 | { | 1890 | { |
1891 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding failed, transmitting larger IBF\n"); | ||
1892 | /* decoding failed, we tell the other peer by sending our ibf with a larger order */ | ||
1645 | cpi->ibf_order++; | 1893 | cpi->ibf_order++; |
1646 | prepare_ibf (cpi); | 1894 | prepare_ibf (cpi); |
1647 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); | 1895 | cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); |
1648 | cpi->ibf_state = IBF_STATE_TRANSMITTING; | 1896 | cpi->ibf_state = IBF_STATE_TRANSMITTING; |
1897 | cpi->ibf_bucket_counter = 0; | ||
1649 | write_ibf (cls, status, size); | 1898 | write_ibf (cls, status, size); |
1650 | return; | 1899 | return; |
1651 | } | 1900 | } |
@@ -1656,38 +1905,52 @@ write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t | |||
1656 | } | 1905 | } |
1657 | if (-1 == side) | 1906 | if (-1 == side) |
1658 | { | 1907 | { |
1659 | struct GNUNET_CONSENSUS_Element *element; | 1908 | /* we have the element, send it to the other peer */ |
1660 | struct GNUNET_MessageHeader *element_msg; | 1909 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element\n"); |
1661 | ibf_hashcode_from_key (key, &hashcode); | 1910 | ibf_hashcode_from_key (key, &hashcode); |
1662 | /* FIXME: this only transmits one element stored with the key */ | 1911 | GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, &hashcode, send_element_iter, cpi); |
1663 | element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode); | 1912 | /* send the first message, because we can! */ |
1664 | if (NULL == element) | 1913 | if (NULL != cpi->requests_and_elements_head) |
1665 | continue; | 1914 | { |
1666 | msize = sizeof (struct GNUNET_MessageHeader) + element->size; | 1915 | struct QueuedMessage *qm; |
1667 | element_msg = GNUNET_malloc (msize); | 1916 | qm = cpi->requests_and_elements_head; |
1668 | element_msg->size = htons (msize); | 1917 | GNUNET_CONTAINER_DLL_remove (cpi->requests_and_elements_head, cpi->requests_and_elements_tail, qm); |
1669 | element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); | 1918 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing element\n"); |
1670 | GNUNET_assert (NULL != element->data); | 1919 | cpi->wh = GNUNET_STREAM_write (cpi->socket, qm->msg, ntohs (qm->msg->size), |
1671 | memcpy (&element_msg[1], element->data, element->size); | 1920 | GNUNET_TIME_UNIT_FOREVER_REL, |
1672 | cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | 1921 | write_requests_and_elements, cpi); |
1673 | write_requests_and_elements, cpi); | 1922 | GNUNET_assert (NULL != cpi->wh); |
1674 | GNUNET_free (element_msg); | 1923 | } |
1675 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n"); | 1924 | else |
1676 | 1925 | { | |
1677 | GNUNET_assert (NULL != cpi->wh); | 1926 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "no element found for decoded hash %s\n", GNUNET_h2s (&hashcode)); |
1927 | } | ||
1678 | return; | 1928 | return; |
1679 | } | 1929 | } |
1680 | else | 1930 | else |
1681 | { | 1931 | { |
1682 | struct ElementRequest *msg; | 1932 | struct ElementRequest *msg; |
1683 | size_t msize; | 1933 | size_t msize; |
1684 | uint64_t *p; | 1934 | struct IBF_Key *p; |
1685 | 1935 | ||
1686 | msize = (sizeof *msg) + sizeof (uint64_t); | 1936 | msize = (sizeof *msg) + sizeof (uint64_t); |
1687 | msg = GNUNET_malloc (msize); | 1937 | msg = GNUNET_malloc (msize); |
1688 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); | 1938 | if (CONSENSUS_ROUND_A2A_EXCHANGE == cpi->session->current_round) |
1939 | { | ||
1940 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request for element\n"); | ||
1941 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST); | ||
1942 | } | ||
1943 | else if (CONSENSUS_ROUND_A2A_INVENTORY == cpi->session->current_round) | ||
1944 | { | ||
1945 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending locally missing element\n"); | ||
1946 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_MISSING_LOCAL); | ||
1947 | } | ||
1948 | else | ||
1949 | { | ||
1950 | GNUNET_assert (0); | ||
1951 | } | ||
1689 | msg->header.size = htons (msize); | 1952 | msg->header.size = htons (msize); |
1690 | p = (uint64_t *) &msg[1]; | 1953 | p = (struct IBF_Key *) &msg[1]; |
1691 | *p = key; | 1954 | *p = key; |
1692 | 1955 | ||
1693 | cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, | 1956 | cpi->wh = GNUNET_STREAM_write (cpi->socket, msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, |
@@ -1697,17 +1960,151 @@ write_requests_and_elements (void *cls, enum GNUNET_STREAM_Status status, size_t | |||
1697 | return; | 1960 | return; |
1698 | } | 1961 | } |
1699 | } | 1962 | } |
1700 | |||
1701 | } | 1963 | } |
1702 | 1964 | ||
1703 | 1965 | ||
1966 | static double | ||
1967 | compute_similarity (struct ConsensusSession *session, int p1, int p2) | ||
1968 | { | ||
1969 | /* FIXME: simplistic dummy implementation, use real set union/intersecion */ | ||
1970 | return (session->info[p1].num_missing_local + session->info[p2].num_missing_local) / | ||
1971 | ((double) (session->info[p1].num_missing_remote + session->info[p2].num_missing_remote + 1)); | ||
1972 | } | ||
1973 | |||
1704 | 1974 | ||
1705 | /* | ||
1706 | static void | 1975 | static void |
1707 | select_best_group (struct ConsensusSession *session) | 1976 | select_fittest_group (struct ConsensusSession *session) |
1708 | { | 1977 | { |
1978 | /* simplistic implementation: compute the similarity with the latest strata estimator, | ||
1979 | * rank the results once */ | ||
1980 | struct GNUNET_PeerIdentity *group; | ||
1981 | double rating[session->num_peers]; | ||
1982 | struct GNUNET_CONSENSUS_ConcludeDoneMessage *done_msg; | ||
1983 | size_t msize; | ||
1984 | int i; | ||
1985 | int j; | ||
1986 | /* number of peers in the consensus group */ | ||
1987 | int k; | ||
1988 | |||
1989 | k = ceil(session->num_peers / 3.0) * 2; | ||
1990 | group = GNUNET_malloc (k * sizeof *group); | ||
1991 | |||
1992 | /* do strata subtraction */ | ||
1993 | /* FIXME: we know the real sets, subtract them! */ | ||
1994 | for (i = 0; i < session->num_peers; i++) | ||
1995 | { | ||
1996 | rating[i] = 0; | ||
1997 | for (j = 0; j < i; j++) | ||
1998 | { | ||
1999 | double sim; | ||
2000 | sim = compute_similarity (session, i, j); | ||
2001 | rating[i] += sim; | ||
2002 | rating[j] += sim; | ||
2003 | } | ||
2004 | } | ||
2005 | for (i = 0; i < k; i++) | ||
2006 | { | ||
2007 | int best_idx = 0; | ||
2008 | for (j = 1; j < session->num_peers; j++) | ||
2009 | if (rating[j] > rating[best_idx]) | ||
2010 | best_idx = j; | ||
2011 | rating[best_idx] = -1; | ||
2012 | group[i] = session->peers[best_idx]; | ||
2013 | } | ||
2014 | |||
2015 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got group!\n"); | ||
2016 | |||
2017 | msize = sizeof *done_msg + k * sizeof *group; | ||
2018 | |||
2019 | done_msg = GNUNET_malloc (msize); | ||
2020 | done_msg->header.size = htons (msize); | ||
2021 | done_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE); | ||
2022 | memcpy (&done_msg[1], group, k * sizeof *group); | ||
2023 | |||
2024 | queue_client_message (session, (struct GNUNET_MessageHeader *) done_msg); | ||
2025 | send_next (session); | ||
2026 | } | ||
2027 | |||
2028 | |||
2029 | /** | ||
2030 | * Select and kick off the next round, based on the current round. | ||
2031 | * @param cls the session | ||
2032 | * @param tc task context, for when this task is invoked by the scheduler, | ||
2033 | * NULL if invoked for another reason | ||
2034 | */ | ||
2035 | static void | ||
2036 | round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
2037 | { | ||
2038 | struct ConsensusSession *session; | ||
2039 | int i; | ||
2040 | |||
2041 | /* don't kick off next round if we're shutting down */ | ||
2042 | if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) | ||
2043 | return; | ||
2044 | |||
2045 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n"); | ||
2046 | session = cls; | ||
2047 | |||
2048 | if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK)) | ||
2049 | { | ||
2050 | GNUNET_SCHEDULER_cancel (session->round_timeout_tid); | ||
2051 | session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; | ||
2052 | } | ||
2053 | |||
2054 | for (i = 0; i < session->num_peers; i++) | ||
2055 | { | ||
2056 | if ((NULL != session->info) && (NULL != session->info[i].wh)) | ||
2057 | GNUNET_STREAM_write_cancel (session->info[i].wh); | ||
2058 | } | ||
2059 | |||
2060 | switch (session->current_round) | ||
2061 | { | ||
2062 | case CONSENSUS_ROUND_BEGIN: | ||
2063 | { | ||
2064 | session->current_round = CONSENSUS_ROUND_A2A_EXCHANGE; | ||
2065 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4), | ||
2066 | round_over, session); | ||
2067 | for (i = 0; i < session->num_peers; i++) | ||
2068 | { | ||
2069 | /* we can only talk to hello'ed peers */ | ||
2070 | if ( (GNUNET_YES == session->info[i].is_outgoing) && | ||
2071 | (GNUNET_YES == session->info[i].hello) ) | ||
2072 | { | ||
2073 | /* kick off transmitting strata by calling the write continuation */ | ||
2074 | write_strata (&session->info[i], GNUNET_STREAM_OK, 0); | ||
2075 | } | ||
2076 | } | ||
2077 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude started, timeout=%llu\n", session->conclude_timeout.rel_value); | ||
2078 | break; | ||
2079 | } | ||
2080 | case CONSENSUS_ROUND_A2A_EXCHANGE: | ||
2081 | { | ||
2082 | session->current_round = CONSENSUS_ROUND_A2A_INVENTORY; | ||
2083 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "starting inventory round\n"); | ||
2084 | session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_divide (session->conclude_timeout, 4), | ||
2085 | round_over, session); | ||
2086 | for (i = 0; i < session->num_peers; i++) | ||
2087 | { | ||
2088 | session->info[i].ibf_state = IBF_STATE_NONE; | ||
2089 | if ( (GNUNET_YES == session->info[i].is_outgoing) && | ||
2090 | (GNUNET_YES == session->info[i].hello) ) | ||
2091 | { | ||
2092 | /* kick off transmitting strata by calling the write continuation */ | ||
2093 | write_strata (&session->info[i], GNUNET_STREAM_OK, 0); | ||
2094 | } | ||
2095 | } | ||
2096 | break; | ||
2097 | } | ||
2098 | case CONSENSUS_ROUND_A2A_INVENTORY: | ||
2099 | /* finally, we are done and select the most fitting group */ | ||
2100 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "protocol rounds done\n"); | ||
2101 | session->current_round = CONSENSUS_ROUND_FINISH; | ||
2102 | select_fittest_group (session); | ||
2103 | break; | ||
2104 | default: | ||
2105 | GNUNET_assert (0); | ||
2106 | } | ||
1709 | } | 2107 | } |
1710 | */ | ||
1711 | 2108 | ||
1712 | 2109 | ||
1713 | /** | 2110 | /** |
@@ -1719,13 +2116,13 @@ select_best_group (struct ConsensusSession *session) | |||
1719 | */ | 2116 | */ |
1720 | static void | 2117 | static void |
1721 | client_conclude (void *cls, | 2118 | client_conclude (void *cls, |
1722 | struct GNUNET_SERVER_Client *client, | 2119 | struct GNUNET_SERVER_Client *client, |
1723 | const struct GNUNET_MessageHeader *message) | 2120 | const struct GNUNET_MessageHeader *message) |
1724 | { | 2121 | { |
1725 | struct ConsensusSession *session; | 2122 | struct ConsensusSession *session; |
1726 | int i; | 2123 | struct GNUNET_CONSENSUS_ConcludeMessage *cmsg; |
1727 | 2124 | ||
1728 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); | 2125 | cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; |
1729 | 2126 | ||
1730 | session = sessions_head; | 2127 | session = sessions_head; |
1731 | while ((session != NULL) && (session->client != client)) | 2128 | while ((session != NULL) && (session->client != client)) |
@@ -1738,25 +2135,19 @@ client_conclude (void *cls, | |||
1738 | return; | 2135 | return; |
1739 | } | 2136 | } |
1740 | 2137 | ||
1741 | if (GNUNET_YES == session->conclude_requested) | 2138 | if (CONSENSUS_ROUND_BEGIN != session->current_round) |
1742 | { | 2139 | { |
1743 | /* client requested conclude twice */ | 2140 | /* client requested conclude twice */ |
2141 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "unexpected round at conclude: %d\n", session->current_round); | ||
1744 | GNUNET_break (0); | 2142 | GNUNET_break (0); |
1745 | disconnect_client (client); | 2143 | disconnect_client (client); |
1746 | return; | 2144 | return; |
1747 | } | 2145 | } |
1748 | 2146 | ||
1749 | session->conclude_requested = GNUNET_YES; | 2147 | session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); |
1750 | 2148 | ||
1751 | for (i = 0; i < session->num_peers; i++) | 2149 | /* the 'begin' round is over, start with the next, real round */ |
1752 | { | 2150 | round_over (session, NULL); |
1753 | if ( (GNUNET_YES == session->info[i].is_outgoing) && | ||
1754 | (GNUNET_YES == session->info[i].hello) ) | ||
1755 | { | ||
1756 | /* kick off transmitting strata by calling the write continuation */ | ||
1757 | write_strata (&session->info[i], GNUNET_STREAM_OK, 0); | ||
1758 | } | ||
1759 | } | ||
1760 | 2151 | ||
1761 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2152 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
1762 | send_next (session); | 2153 | send_next (session); |
@@ -1803,12 +2194,19 @@ client_ack (void *cls, | |||
1803 | 2194 | ||
1804 | if (msg->keep) | 2195 | if (msg->keep) |
1805 | { | 2196 | { |
2197 | int i; | ||
1806 | element = pending->element; | 2198 | element = pending->element; |
1807 | GNUNET_CRYPTO_hash (element, element->size, &key); | 2199 | hash_for_ibf (element, element->size, &key); |
1808 | 2200 | ||
1809 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, | 2201 | GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, |
1810 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 2202 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
1811 | strata_insert (session->strata, &key); | 2203 | strata_insert (session->strata, &key); |
2204 | |||
2205 | for (i = 0; i <= MAX_IBF_ORDER; i++) | ||
2206 | { | ||
2207 | if (NULL != session->ibfs[i]) | ||
2208 | ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&key)); | ||
2209 | } | ||
1812 | } | 2210 | } |
1813 | 2211 | ||
1814 | GNUNET_SERVER_receive_done (client, GNUNET_OK); | 2212 | GNUNET_SERVER_receive_done (client, GNUNET_OK); |
@@ -1862,7 +2260,6 @@ static void | |||
1862 | shutdown_task (void *cls, | 2260 | shutdown_task (void *cls, |
1863 | const struct GNUNET_SCHEDULER_TaskContext *tc) | 2261 | const struct GNUNET_SCHEDULER_TaskContext *tc) |
1864 | { | 2262 | { |
1865 | |||
1866 | /* FIXME: complete; write separate destructors for different data types */ | 2263 | /* FIXME: complete; write separate destructors for different data types */ |
1867 | 2264 | ||
1868 | while (NULL != incoming_sockets_head) | 2265 | while (NULL != incoming_sockets_head) |
@@ -1884,15 +2281,16 @@ shutdown_task (void *cls, | |||
1884 | 2281 | ||
1885 | session = sessions_head; | 2282 | session = sessions_head; |
1886 | 2283 | ||
1887 | for (i = 0; session->num_peers; i++) | 2284 | if (NULL != session->info) |
1888 | { | 2285 | for (i = 0; i < session->num_peers; i++) |
1889 | struct ConsensusPeerInformation *cpi; | ||
1890 | cpi = &session->info[i]; | ||
1891 | if ((NULL != cpi) && (NULL != cpi->socket)) | ||
1892 | { | 2286 | { |
1893 | GNUNET_STREAM_close (cpi->socket); | 2287 | struct ConsensusPeerInformation *cpi; |
2288 | cpi = &session->info[i]; | ||
2289 | if ((NULL != cpi) && (NULL != cpi->socket)) | ||
2290 | { | ||
2291 | GNUNET_STREAM_close (cpi->socket); | ||
2292 | } | ||
1894 | } | 2293 | } |
1895 | } | ||
1896 | 2294 | ||
1897 | if (NULL != session->client) | 2295 | if (NULL != session->client) |
1898 | GNUNET_SERVER_client_disconnect (session->client); | 2296 | GNUNET_SERVER_client_disconnect (session->client); |
@@ -1952,7 +2350,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU | |||
1952 | listen_cb, NULL, | 2350 | listen_cb, NULL, |
1953 | GNUNET_STREAM_OPTION_END); | 2351 | GNUNET_STREAM_OPTION_END); |
1954 | 2352 | ||
1955 | |||
1956 | /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ | 2353 | /* we have to wait for the core_startup callback before proceeding with the consensus service startup */ |
1957 | core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers); | 2354 | core = GNUNET_CORE_connect (c, NULL, &core_startup, NULL, NULL, NULL, GNUNET_NO, NULL, GNUNET_NO, core_handlers); |
1958 | GNUNET_assert (NULL != core); | 2355 | GNUNET_assert (NULL != core); |
diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c index fb3bbe7cd..d3218ff9b 100644 --- a/src/consensus/ibf.c +++ b/src/consensus/ibf.c | |||
@@ -18,30 +18,27 @@ | |||
18 | Boston, MA 02111-1307, USA. | 18 | Boston, MA 02111-1307, USA. |
19 | */ | 19 | */ |
20 | 20 | ||
21 | |||
22 | /** | 21 | /** |
23 | * @file consensus/ibf.c | 22 | * @file consensus/ibf.c |
24 | * @brief implementation of the invertible bloom filter | 23 | * @brief implementation of the invertible bloom filter |
25 | * @author Florian Dold | 24 | * @author Florian Dold |
26 | */ | 25 | */ |
27 | 26 | ||
28 | |||
29 | #include "ibf.h" | 27 | #include "ibf.h" |
30 | 28 | ||
31 | |||
32 | /** | 29 | /** |
33 | * Create a key from a hashcode. | 30 | * Create a key from a hashcode. |
34 | * | 31 | * |
35 | * @param hash the hashcode | 32 | * @param hash the hashcode |
36 | * @return a key | 33 | * @return a key |
37 | */ | 34 | */ |
38 | uint64_t | 35 | struct IBF_Key |
39 | ibf_key_from_hashcode (const struct GNUNET_HashCode *hash) | 36 | ibf_key_from_hashcode (const struct GNUNET_HashCode *hash) |
40 | { | 37 | { |
41 | return GNUNET_ntohll (*(uint64_t *) hash); | 38 | /* FIXME: endianess */ |
39 | return *(struct IBF_Key *) hash; | ||
42 | } | 40 | } |
43 | 41 | ||
44 | |||
45 | /** | 42 | /** |
46 | * Create a hashcode from a key, by replicating the key | 43 | * Create a hashcode from a key, by replicating the key |
47 | * until the hascode is filled | 44 | * until the hascode is filled |
@@ -50,12 +47,13 @@ ibf_key_from_hashcode (const struct GNUNET_HashCode *hash) | |||
50 | * @param dst hashcode to store the result in | 47 | * @param dst hashcode to store the result in |
51 | */ | 48 | */ |
52 | void | 49 | void |
53 | ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst) | 50 | ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst) |
54 | { | 51 | { |
55 | uint64_t *p; | 52 | struct IBF_Key *p; |
56 | int i; | 53 | unsigned int i; |
57 | p = (uint64_t *) dst; | 54 | const unsigned int keys_per_hashcode = sizeof (struct GNUNET_HashCode) / sizeof (struct IBF_Key); |
58 | for (i = 0; i < 8; i++) | 55 | p = (struct IBF_Key *) dst; |
56 | for (i = 0; i < keys_per_hashcode; i++) | ||
59 | *p++ = key; | 57 | *p++ = key; |
60 | } | 58 | } |
61 | 59 | ||
@@ -70,14 +68,16 @@ ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst) | |||
70 | * @return the newly created invertible bloom filter | 68 | * @return the newly created invertible bloom filter |
71 | */ | 69 | */ |
72 | struct InvertibleBloomFilter * | 70 | struct InvertibleBloomFilter * |
73 | ibf_create (uint32_t size, unsigned int hash_num, uint32_t salt) | 71 | ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt) |
74 | { | 72 | { |
75 | struct InvertibleBloomFilter *ibf; | 73 | struct InvertibleBloomFilter *ibf; |
76 | 74 | ||
75 | /* TODO: use malloc_large */ | ||
76 | |||
77 | ibf = GNUNET_malloc (sizeof (struct InvertibleBloomFilter)); | 77 | ibf = GNUNET_malloc (sizeof (struct InvertibleBloomFilter)); |
78 | ibf->count = GNUNET_malloc (size * sizeof (uint8_t)); | 78 | ibf->count = GNUNET_malloc (size * sizeof (uint8_t)); |
79 | ibf->id_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode)); | 79 | ibf->key_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode)); |
80 | ibf->hash_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode)); | 80 | ibf->key_hash_sum = GNUNET_malloc (size * sizeof (struct GNUNET_HashCode)); |
81 | ibf->size = size; | 81 | ibf->size = size; |
82 | ibf->hash_num = hash_num; | 82 | ibf->hash_num = hash_num; |
83 | 83 | ||
@@ -89,7 +89,7 @@ ibf_create (uint32_t size, unsigned int hash_num, uint32_t salt) | |||
89 | */ | 89 | */ |
90 | static inline void | 90 | static inline void |
91 | ibf_get_indices (const struct InvertibleBloomFilter *ibf, | 91 | ibf_get_indices (const struct InvertibleBloomFilter *ibf, |
92 | uint64_t key, int *dst) | 92 | struct IBF_Key key, int *dst) |
93 | { | 93 | { |
94 | struct GNUNET_HashCode bucket_indices; | 94 | struct GNUNET_HashCode bucket_indices; |
95 | unsigned int filled = 0; | 95 | unsigned int filled = 0; |
@@ -113,20 +113,20 @@ ibf_get_indices (const struct InvertibleBloomFilter *ibf, | |||
113 | 113 | ||
114 | static void | 114 | static void |
115 | ibf_insert_into (struct InvertibleBloomFilter *ibf, | 115 | ibf_insert_into (struct InvertibleBloomFilter *ibf, |
116 | uint64_t key, | 116 | struct IBF_Key key, |
117 | const int *buckets, int side) | 117 | const int *buckets, int side) |
118 | { | 118 | { |
119 | int i; | 119 | int i; |
120 | struct GNUNET_HashCode key_hash_sha; | 120 | struct GNUNET_HashCode key_hash_sha; |
121 | uint32_t key_hash; | 121 | struct IBF_KeyHash key_hash; |
122 | GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha); | 122 | GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha); |
123 | key_hash = key_hash_sha.bits[0]; | 123 | key_hash.key_hash_val = key_hash_sha.bits[0]; |
124 | for (i = 0; i < ibf->hash_num; i++) | 124 | for (i = 0; i < ibf->hash_num; i++) |
125 | { | 125 | { |
126 | const int bucket = buckets[i]; | 126 | const int bucket = buckets[i]; |
127 | ibf->count[bucket] += side; | 127 | ibf->count[bucket].count_val += side; |
128 | ibf->id_sum[bucket] ^= key; | 128 | ibf->key_sum[bucket].key_val ^= key.key_val; |
129 | ibf->hash_sum[bucket] ^= key_hash; | 129 | ibf->key_hash_sum[bucket].key_hash_val ^= key_hash.key_hash_val; |
130 | } | 130 | } |
131 | } | 131 | } |
132 | 132 | ||
@@ -138,7 +138,7 @@ ibf_insert_into (struct InvertibleBloomFilter *ibf, | |||
138 | * @param id the element's hash code | 138 | * @param id the element's hash code |
139 | */ | 139 | */ |
140 | void | 140 | void |
141 | ibf_insert (struct InvertibleBloomFilter *ibf, uint64_t key) | 141 | ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key) |
142 | { | 142 | { |
143 | int buckets[ibf->hash_num]; | 143 | int buckets[ibf->hash_num]; |
144 | ibf_get_indices (ibf, key, buckets); | 144 | ibf_get_indices (ibf, key, buckets); |
@@ -154,11 +154,11 @@ ibf_is_empty (struct InvertibleBloomFilter *ibf) | |||
154 | int i; | 154 | int i; |
155 | for (i = 0; i < ibf->size; i++) | 155 | for (i = 0; i < ibf->size; i++) |
156 | { | 156 | { |
157 | if (0 != ibf->count[i]) | 157 | if (0 != ibf->count[i].count_val) |
158 | return GNUNET_NO; | 158 | return GNUNET_NO; |
159 | if (0 != ibf->hash_sum[i]) | 159 | if (0 != ibf->key_hash_sum[i].key_hash_val) |
160 | return GNUNET_NO; | 160 | return GNUNET_NO; |
161 | if (0 != ibf->id_sum[i]) | 161 | if (0 != ibf->key_sum[i].key_val) |
162 | return GNUNET_NO; | 162 | return GNUNET_NO; |
163 | } | 163 | } |
164 | return GNUNET_YES; | 164 | return GNUNET_YES; |
@@ -179,9 +179,9 @@ ibf_is_empty (struct InvertibleBloomFilter *ibf) | |||
179 | */ | 179 | */ |
180 | int | 180 | int |
181 | ibf_decode (struct InvertibleBloomFilter *ibf, | 181 | ibf_decode (struct InvertibleBloomFilter *ibf, |
182 | int *ret_side, uint64_t *ret_id) | 182 | int *ret_side, struct IBF_Key *ret_id) |
183 | { | 183 | { |
184 | uint32_t hash; | 184 | struct IBF_KeyHash hash; |
185 | int i; | 185 | int i; |
186 | struct GNUNET_HashCode key_hash_sha; | 186 | struct GNUNET_HashCode key_hash_sha; |
187 | int buckets[ibf->hash_num]; | 187 | int buckets[ibf->hash_num]; |
@@ -194,20 +194,20 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
194 | int hit; | 194 | int hit; |
195 | 195 | ||
196 | /* we can only decode from pure buckets */ | 196 | /* we can only decode from pure buckets */ |
197 | if ((1 != ibf->count[i]) && (-1 != ibf->count[i])) | 197 | if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val)) |
198 | continue; | 198 | continue; |
199 | 199 | ||
200 | GNUNET_CRYPTO_hash (&ibf->id_sum[i], sizeof (uint64_t), &key_hash_sha); | 200 | GNUNET_CRYPTO_hash (&ibf->key_sum[i], sizeof (struct IBF_Key), &key_hash_sha); |
201 | hash = key_hash_sha.bits[0]; | 201 | hash.key_hash_val = key_hash_sha.bits[0]; |
202 | 202 | ||
203 | /* test if the hash matches the key */ | 203 | /* test if the hash matches the key */ |
204 | if (hash != ibf->hash_sum[i]) | 204 | if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val) |
205 | continue; | 205 | continue; |
206 | 206 | ||
207 | /* test if key in bucket hits its own location, | 207 | /* test if key in bucket hits its own location, |
208 | * if not, the key hash was subject to collision */ | 208 | * if not, the key hash was subject to collision */ |
209 | hit = GNUNET_NO; | 209 | hit = GNUNET_NO; |
210 | ibf_get_indices (ibf, ibf->id_sum[i], buckets); | 210 | ibf_get_indices (ibf, ibf->key_sum[i], buckets); |
211 | for (j = 0; j < ibf->hash_num; j++) | 211 | for (j = 0; j < ibf->hash_num; j++) |
212 | if (buckets[j] == i) | 212 | if (buckets[j] == i) |
213 | hit = GNUNET_YES; | 213 | hit = GNUNET_YES; |
@@ -216,12 +216,12 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
216 | continue; | 216 | continue; |
217 | 217 | ||
218 | if (NULL != ret_side) | 218 | if (NULL != ret_side) |
219 | *ret_side = ibf->count[i]; | 219 | *ret_side = ibf->count[i].count_val; |
220 | if (NULL != ret_id) | 220 | if (NULL != ret_id) |
221 | *ret_id = ibf->id_sum[i]; | 221 | *ret_id = ibf->key_sum[i]; |
222 | 222 | ||
223 | /* insert on the opposite side, effectively removing the element */ | 223 | /* insert on the opposite side, effectively removing the element */ |
224 | ibf_insert_into (ibf, ibf->id_sum[i], buckets, -ibf->count[i]); | 224 | ibf_insert_into (ibf, ibf->key_sum[i], buckets, -ibf->count[i].count_val); |
225 | 225 | ||
226 | return GNUNET_YES; | 226 | return GNUNET_YES; |
227 | } | 227 | } |
@@ -233,6 +233,128 @@ ibf_decode (struct InvertibleBloomFilter *ibf, | |||
233 | 233 | ||
234 | 234 | ||
235 | /** | 235 | /** |
236 | * Write an ibf. | ||
237 | * | ||
238 | * @param ibf the ibf to write | ||
239 | * @param start with which bucket to start | ||
240 | * @param count how many buckets to write | ||
241 | * @param buf buffer to write the data to, will be updated to point to the | ||
242 | * first byte after the written data | ||
243 | * @param size pointer to the size of the buffer, will be updated, can be NULL | ||
244 | */ | ||
245 | void | ||
246 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size) | ||
247 | { | ||
248 | struct IBF_Key *key_dst; | ||
249 | struct IBF_KeyHash *key_hash_dst; | ||
250 | struct IBF_Count *count_dst; | ||
251 | |||
252 | /* update size and check for overflow */ | ||
253 | if (NULL != size) | ||
254 | { | ||
255 | size_t old_size; | ||
256 | old_size = *size; | ||
257 | *size = *size - count * IBF_BUCKET_SIZE; | ||
258 | GNUNET_assert (*size < old_size); | ||
259 | } | ||
260 | /* copy keys */ | ||
261 | key_dst = (struct IBF_Key *) *buf; | ||
262 | memcpy (key_dst, ibf->key_sum + start, count * sizeof *key_dst); | ||
263 | key_dst += count; | ||
264 | /* copy key hashes */ | ||
265 | key_hash_dst = (struct IBF_KeyHash *) key_dst; | ||
266 | memcpy (key_hash_dst, ibf->key_hash_sum + start, count * sizeof *key_hash_dst); | ||
267 | key_hash_dst += count; | ||
268 | /* copy counts */ | ||
269 | count_dst = (struct IBF_Count *) key_hash_dst; | ||
270 | memcpy (count_dst, ibf->count + start, count * sizeof *count_dst); | ||
271 | count_dst += count; | ||
272 | /* returned buffer is at the end of written data*/ | ||
273 | *buf = (void *) count_dst; | ||
274 | } | ||
275 | |||
276 | |||
277 | /** | ||
278 | * Read an ibf. | ||
279 | * | ||
280 | * @param buf pointer to the buffer to write to, will point to first | ||
281 | * byte after the written data | ||
282 | * @param size size of the buffer, will be updated | ||
283 | * @param start which bucket to start at | ||
284 | * @param count how many buckets to read | ||
285 | * @param dst ibf to write buckets to | ||
286 | * @return GNUNET_OK on success | ||
287 | */ | ||
288 | int | ||
289 | ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf) | ||
290 | { | ||
291 | struct IBF_Key *key_src; | ||
292 | struct IBF_KeyHash *key_hash_src; | ||
293 | struct IBF_Count *count_src; | ||
294 | |||
295 | /* update size and check for overflow */ | ||
296 | if (NULL != size) | ||
297 | { | ||
298 | size_t old_size; | ||
299 | old_size = *size; | ||
300 | *size = *size - count * IBF_BUCKET_SIZE; | ||
301 | if (*size > old_size) | ||
302 | return GNUNET_SYSERR; | ||
303 | } | ||
304 | /* copy keys */ | ||
305 | key_src = (struct IBF_Key *) *buf; | ||
306 | memcpy (ibf->key_sum + start, key_src, count * sizeof *key_src); | ||
307 | key_src += count; | ||
308 | /* copy key hashes */ | ||
309 | key_hash_src = (struct IBF_KeyHash *) key_src; | ||
310 | memcpy (ibf->key_hash_sum + start, key_hash_src, count * sizeof *key_hash_src); | ||
311 | key_hash_src += count; | ||
312 | /* copy counts */ | ||
313 | count_src = (struct IBF_Count *) key_hash_src; | ||
314 | memcpy (ibf->count + start, count_src, count * sizeof *count_src); | ||
315 | count_src += count; | ||
316 | /* returned buffer is at the end of written data*/ | ||
317 | *buf = (void *) count_src; | ||
318 | return GNUNET_OK; | ||
319 | } | ||
320 | |||
321 | |||
322 | /** | ||
323 | * Write an ibf. | ||
324 | * | ||
325 | * @param ibf the ibf to write | ||
326 | * @param start with which bucket to start | ||
327 | * @param count how many buckets to write | ||
328 | * @param buf buffer to write the data to, will be updated to point to the | ||
329 | * first byte after the written data | ||
330 | * @param size pointer to the size of the buffer, will be updated, can be NULL | ||
331 | */ | ||
332 | void | ||
333 | ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size) | ||
334 | { | ||
335 | ibf_write_slice (ibf, 0, ibf->size, buf, size); | ||
336 | } | ||
337 | |||
338 | |||
339 | /** | ||
340 | * Read an ibf. | ||
341 | * | ||
342 | * @param buf pointer to the buffer to write to, will point to first | ||
343 | * byte after the written data | ||
344 | * @param size size of the buffer, will be updated | ||
345 | * @param start which bucket to start at | ||
346 | * @param count how many buckets to read | ||
347 | * @param dst ibf to write buckets to | ||
348 | * @return GNUNET_OK on success | ||
349 | */ | ||
350 | int | ||
351 | ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst) | ||
352 | { | ||
353 | return ibf_read_slice (buf, size, 0, dst->size, dst); | ||
354 | } | ||
355 | |||
356 | |||
357 | /** | ||
236 | * Subtract ibf2 from ibf1, storing the result in ibf1. | 358 | * Subtract ibf2 from ibf1, storing the result in ibf1. |
237 | * The two IBF's must have the same parameters size and hash_num. | 359 | * The two IBF's must have the same parameters size and hash_num. |
238 | * | 360 | * |
@@ -250,31 +372,33 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFi | |||
250 | 372 | ||
251 | for (i = 0; i < ibf1->size; i++) | 373 | for (i = 0; i < ibf1->size; i++) |
252 | { | 374 | { |
253 | ibf1->count[i] -= ibf2->count[i]; | 375 | ibf1->count[i].count_val -= ibf2->count[i].count_val; |
254 | ibf1->hash_sum[i] ^= ibf2->hash_sum[i]; | 376 | ibf1->key_hash_sum[i].key_hash_val ^= ibf2->key_hash_sum[i].key_hash_val; |
255 | ibf1->id_sum[i] ^= ibf2->id_sum[i]; | 377 | ibf1->key_sum[i].key_val ^= ibf2->key_sum[i].key_val; |
256 | } | 378 | } |
257 | } | 379 | } |
258 | 380 | ||
381 | |||
259 | /** | 382 | /** |
260 | * Create a copy of an IBF, the copy has to be destroyed properly. | 383 | * Create a copy of an IBF, the copy has to be destroyed properly. |
261 | * | 384 | * |
262 | * @param ibf the IBF to copy | 385 | * @param ibf the IBF to copy |
263 | */ | 386 | */ |
264 | struct InvertibleBloomFilter * | 387 | struct InvertibleBloomFilter * |
265 | ibf_dup (struct InvertibleBloomFilter *ibf) | 388 | ibf_dup (const struct InvertibleBloomFilter *ibf) |
266 | { | 389 | { |
267 | struct InvertibleBloomFilter *copy; | 390 | struct InvertibleBloomFilter *copy; |
268 | copy = GNUNET_malloc (sizeof *copy); | 391 | copy = GNUNET_malloc (sizeof *copy); |
269 | copy->hash_num = ibf->hash_num; | 392 | copy->hash_num = ibf->hash_num; |
270 | copy->salt = ibf->salt; | 393 | copy->salt = ibf->salt; |
271 | copy->size = ibf->size; | 394 | copy->size = ibf->size; |
272 | copy->hash_sum = GNUNET_memdup (ibf->hash_sum, ibf->size * sizeof (struct GNUNET_HashCode)); | 395 | copy->key_hash_sum = GNUNET_memdup (ibf->key_hash_sum, ibf->size * sizeof (struct IBF_KeyHash)); |
273 | copy->id_sum = GNUNET_memdup (ibf->id_sum, ibf->size * sizeof (struct GNUNET_HashCode)); | 396 | copy->key_sum = GNUNET_memdup (ibf->key_sum, ibf->size * sizeof (struct IBF_Key)); |
274 | copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof (uint8_t)); | 397 | copy->count = GNUNET_memdup (ibf->count, ibf->size * sizeof (struct IBF_Count)); |
275 | return copy; | 398 | return copy; |
276 | } | 399 | } |
277 | 400 | ||
401 | |||
278 | /** | 402 | /** |
279 | * Destroy all resources associated with the invertible bloom filter. | 403 | * Destroy all resources associated with the invertible bloom filter. |
280 | * No more ibf_*-functions may be called on ibf after calling destroy. | 404 | * No more ibf_*-functions may be called on ibf after calling destroy. |
@@ -284,8 +408,9 @@ ibf_dup (struct InvertibleBloomFilter *ibf) | |||
284 | void | 408 | void |
285 | ibf_destroy (struct InvertibleBloomFilter *ibf) | 409 | ibf_destroy (struct InvertibleBloomFilter *ibf) |
286 | { | 410 | { |
287 | GNUNET_free (ibf->hash_sum); | 411 | GNUNET_free (ibf->key_sum); |
288 | GNUNET_free (ibf->id_sum); | 412 | GNUNET_free (ibf->key_hash_sum); |
289 | GNUNET_free (ibf->count); | 413 | GNUNET_free (ibf->count); |
290 | GNUNET_free (ibf); | 414 | GNUNET_free (ibf); |
291 | } | 415 | } |
416 | |||
diff --git a/src/consensus/ibf.h b/src/consensus/ibf.h index 72345d3e1..cafe55c8d 100644 --- a/src/consensus/ibf.h +++ b/src/consensus/ibf.h | |||
@@ -39,11 +39,27 @@ extern "C" | |||
39 | #endif | 39 | #endif |
40 | #endif | 40 | #endif |
41 | 41 | ||
42 | |||
43 | struct IBF_Key | ||
44 | { | ||
45 | uint64_t key_val; | ||
46 | }; | ||
47 | |||
48 | struct IBF_KeyHash | ||
49 | { | ||
50 | uint32_t key_hash_val; | ||
51 | }; | ||
52 | |||
53 | struct IBF_Count | ||
54 | { | ||
55 | int8_t count_val; | ||
56 | }; | ||
57 | |||
42 | /** | 58 | /** |
43 | * Size of one ibf bucket in bytes | 59 | * Size of one ibf bucket in bytes |
44 | */ | 60 | */ |
45 | #define IBF_BUCKET_SIZE (8+4+1) | 61 | #define IBF_BUCKET_SIZE (sizeof (struct IBF_Count) + sizeof (struct IBF_Key) + \ |
46 | 62 | sizeof (struct IBF_KeyHash)) | |
47 | 63 | ||
48 | /** | 64 | /** |
49 | * Invertible bloom filter (IBF). | 65 | * Invertible bloom filter (IBF). |
@@ -62,7 +78,7 @@ struct InvertibleBloomFilter | |||
62 | * In how many cells do we hash one element? | 78 | * In how many cells do we hash one element? |
63 | * Usually 4 or 3. | 79 | * Usually 4 or 3. |
64 | */ | 80 | */ |
65 | unsigned int hash_num; | 81 | uint8_t hash_num; |
66 | 82 | ||
67 | /** | 83 | /** |
68 | * Salt for mingling hashes | 84 | * Salt for mingling hashes |
@@ -70,30 +86,91 @@ struct InvertibleBloomFilter | |||
70 | uint32_t salt; | 86 | uint32_t salt; |
71 | 87 | ||
72 | /** | 88 | /** |
73 | * xor sums of the elements' hash codes, used to identify the elements. | 89 | * Xor sums of the elements' keys, used to identify the elements. |
90 | * Array of 'size' elements. | ||
74 | */ | 91 | */ |
75 | uint64_t *id_sum; | 92 | struct IBF_Key *key_sum; |
76 | 93 | ||
77 | /** | 94 | /** |
78 | * xor sums of the "hash of the hash". | 95 | * Xor sums of the hashes of the keys of inserted elements. |
96 | * Array of 'size' elements. | ||
79 | */ | 97 | */ |
80 | uint32_t *hash_sum; | 98 | struct IBF_KeyHash *key_hash_sum; |
81 | 99 | ||
82 | /** | 100 | /** |
83 | * How many times has a bucket been hit? | 101 | * How many times has a bucket been hit? |
84 | * Can be negative, as a result of IBF subtraction. | 102 | * Can be negative, as a result of IBF subtraction. |
103 | * Array of 'size' elements. | ||
85 | */ | 104 | */ |
86 | int8_t *count; | 105 | struct IBF_Count *count; |
87 | }; | 106 | }; |
88 | 107 | ||
89 | 108 | ||
90 | /** | 109 | /** |
110 | * Write an ibf. | ||
111 | * | ||
112 | * @param ibf the ibf to write | ||
113 | * @param start with which bucket to start | ||
114 | * @param count how many buckets to write | ||
115 | * @param buf buffer to write the data to, will be updated to point to the | ||
116 | * first byte after the written data | ||
117 | * @param size pointer to the size of the buffer, will be updated, can be NULL | ||
118 | */ | ||
119 | void | ||
120 | ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void **buf, size_t *size); | ||
121 | |||
122 | |||
123 | /** | ||
124 | * Read an ibf. | ||
125 | * | ||
126 | * @param buf pointer to the buffer to write to, will point to first | ||
127 | * byte after the written data | ||
128 | * @param size size of the buffer, will be updated | ||
129 | * @param start which bucket to start at | ||
130 | * @param count how many buckets to read | ||
131 | * @param dst ibf to write buckets to | ||
132 | * @return GNUNET_OK on success | ||
133 | */ | ||
134 | int | ||
135 | ibf_read_slice (void **buf, size_t *size, uint32_t start, uint32_t count, struct InvertibleBloomFilter *dst); | ||
136 | |||
137 | |||
138 | /** | ||
139 | * Write an ibf. | ||
140 | * | ||
141 | * @param ibf the ibf to write | ||
142 | * @param start with which bucket to start | ||
143 | * @param count how many buckets to write | ||
144 | * @param buf buffer to write the data to, will be updated to point to the | ||
145 | * first byte after the written data | ||
146 | * @param size pointer to the size of the buffer, will be updated, can be NULL | ||
147 | */ | ||
148 | void | ||
149 | ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size); | ||
150 | |||
151 | |||
152 | /** | ||
153 | * Read an ibf. | ||
154 | * | ||
155 | * @param buf pointer to the buffer to write to, will point to first | ||
156 | * byte after the written data | ||
157 | * @param size size of the buffer, will be updated | ||
158 | * @param start which bucket to start at | ||
159 | * @param count how many buckets to read | ||
160 | * @param dst ibf to write buckets to | ||
161 | * @return GNUNET_OK on success | ||
162 | */ | ||
163 | int | ||
164 | ibf_read (void **buf, size_t *size, struct InvertibleBloomFilter *dst); | ||
165 | |||
166 | |||
167 | /** | ||
91 | * Create a key from a hashcode. | 168 | * Create a key from a hashcode. |
92 | * | 169 | * |
93 | * @param hash the hashcode | 170 | * @param hash the hashcode |
94 | * @return a key | 171 | * @return a key |
95 | */ | 172 | */ |
96 | uint64_t | 173 | struct IBF_Key |
97 | ibf_key_from_hashcode (const struct GNUNET_HashCode *hash); | 174 | ibf_key_from_hashcode (const struct GNUNET_HashCode *hash); |
98 | 175 | ||
99 | 176 | ||
@@ -105,7 +182,7 @@ ibf_key_from_hashcode (const struct GNUNET_HashCode *hash); | |||
105 | * @param dst hashcode to store the result in | 182 | * @param dst hashcode to store the result in |
106 | */ | 183 | */ |
107 | void | 184 | void |
108 | ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst); | 185 | ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst); |
109 | 186 | ||
110 | 187 | ||
111 | /** | 188 | /** |
@@ -118,17 +195,17 @@ ibf_hashcode_from_key (uint64_t key, struct GNUNET_HashCode *dst); | |||
118 | * @return the newly created invertible bloom filter | 195 | * @return the newly created invertible bloom filter |
119 | */ | 196 | */ |
120 | struct InvertibleBloomFilter * | 197 | struct InvertibleBloomFilter * |
121 | ibf_create(uint32_t size, unsigned int hash_num, uint32_t salt); | 198 | ibf_create (uint32_t size, uint8_t hash_num, uint32_t salt); |
122 | 199 | ||
123 | 200 | ||
124 | /** | 201 | /** |
125 | * Insert an element into an IBF. | 202 | * Insert an element into an IBF. |
126 | * | 203 | * |
127 | * @param ibf the IBF | 204 | * @param ibf the IBF |
128 | * @param id the element's hash code | 205 | * @param key the element's hash code |
129 | */ | 206 | */ |
130 | void | 207 | void |
131 | ibf_insert (struct InvertibleBloomFilter *ibf, uint64_t id); | 208 | ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key); |
132 | 209 | ||
133 | 210 | ||
134 | /** | 211 | /** |
@@ -154,7 +231,7 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFi | |||
154 | * GNUNET_SYSERR if the decoding has faile | 231 | * GNUNET_SYSERR if the decoding has faile |
155 | */ | 232 | */ |
156 | int | 233 | int |
157 | ibf_decode (struct InvertibleBloomFilter *ibf, int *side, uint64_t *ret_id); | 234 | ibf_decode (struct InvertibleBloomFilter *ibf, int *side, struct IBF_Key *ret_key); |
158 | 235 | ||
159 | 236 | ||
160 | /** | 237 | /** |
@@ -163,7 +240,7 @@ ibf_decode (struct InvertibleBloomFilter *ibf, int *side, uint64_t *ret_id); | |||
163 | * @param ibf the IBF to copy | 240 | * @param ibf the IBF to copy |
164 | */ | 241 | */ |
165 | struct InvertibleBloomFilter * | 242 | struct InvertibleBloomFilter * |
166 | ibf_dup (struct InvertibleBloomFilter *ibf); | 243 | ibf_dup (const struct InvertibleBloomFilter *ibf); |
167 | 244 | ||
168 | /** | 245 | /** |
169 | * Destroy all resources associated with the invertible bloom filter. | 246 | * Destroy all resources associated with the invertible bloom filter. |