diff options
author | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-11-07 15:45:14 +0000 |
---|---|---|
committer | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-11-07 15:45:14 +0000 |
commit | 7961175bdde4e1efe2140e04caa6e600d41bcf90 (patch) | |
tree | 1d7fd0decda1cc2a135e883887caf918e6944ee3 /src/set | |
parent | 5e4d5baa9bb49dd2a0e9b15f0687b6609ceaa52e (diff) | |
download | gnunet-7961175bdde4e1efe2140e04caa6e600d41bcf90.tar.gz gnunet-7961175bdde4e1efe2140e04caa6e600d41bcf90.zip |
added bloomfilter message struct
renamed set-union specific message types
added set-intersection specific message type
added intersection cancel API implementation
added intersection create API implementation
added intersection destroy API implementation
added intersection peer_disconnect API implementation
added intersection accept API imlementation stub
added many prototypes for intersection operation handlers
Diffstat (limited to 'src/set')
-rw-r--r-- | src/set/gnunet-service-set.c | 4 | ||||
-rw-r--r-- | src/set/gnunet-service-set_intersection.c | 526 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 8 | ||||
-rw-r--r-- | src/set/set_protocol.h | 29 |
4 files changed, 218 insertions, 349 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 4f09aa8ac..7338059fa 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -1318,11 +1318,11 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, | |||
1318 | }; | 1318 | }; |
1319 | static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { | 1319 | static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { |
1320 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, | 1320 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, |
1321 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0}, | 1321 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0}, |
1322 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, | 1322 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, |
1323 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0}, | 1323 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0}, |
1324 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, | 1324 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, |
1325 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0}, | 1325 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0}, |
1326 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, | 1326 | {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, |
1327 | {NULL, 0, 0} | 1327 | {NULL, 0, 0} |
1328 | }; | 1328 | }; |
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index 221fb74a3..b46044c35 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c | |||
@@ -30,50 +30,17 @@ | |||
30 | #include "set_protocol.h" | 30 | #include "set_protocol.h" |
31 | #include <gcrypt.h> | 31 | #include <gcrypt.h> |
32 | 32 | ||
33 | |||
34 | /** | ||
35 | * Number of IBFs in a strata estimator. | ||
36 | */ | ||
37 | #define SE_STRATA_COUNT 32 | ||
38 | /** | ||
39 | * Size of the IBFs in the strata estimator. | ||
40 | */ | ||
41 | #define SE_IBF_SIZE 80 | ||
42 | /** | ||
43 | * hash num parameter for the difference digests and strata estimators | ||
44 | */ | ||
45 | #define SE_IBF_HASH_NUM 4 | ||
46 | |||
47 | /** | ||
48 | * Number of buckets that can be transmitted in one message. | ||
49 | */ | ||
50 | #define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) | ||
51 | |||
52 | /** | ||
53 | * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER). | ||
54 | * Choose this value so that computing the IBF is still cheaper | ||
55 | * than transmitting all values. | ||
56 | */ | ||
57 | #define MAX_IBF_ORDER (16) | ||
58 | |||
59 | /** | ||
60 | * Number of buckets used in the ibf per estimated | ||
61 | * difference. | ||
62 | */ | ||
63 | #define IBF_ALPHA 4 | ||
64 | |||
65 | |||
66 | /** | 33 | /** |
67 | * Current phase we are in for a intersection operation. | 34 | * Current phase we are in for a intersection operation. |
68 | */ | 35 | */ |
69 | enum IntersectionOperationPhase | 36 | enum IntersectionOperationPhase |
70 | { | 37 | { |
71 | /** | 38 | /** |
72 | * We sent the request message, and expect a BF | 39 | * We get our tunnel but received no message as of now |
73 | */ | 40 | */ |
74 | PHASE_EXPECT_INITIAL, | 41 | PHASE_EXPECT_INITIAL, |
75 | /** | 42 | /** |
76 | * We sent the request message, and expect a BF | 43 | * We expect a BF + the number of the other peers elements |
77 | */ | 44 | */ |
78 | PHASE_BF_EXCHANGE, | 45 | PHASE_BF_EXCHANGE, |
79 | /** | 46 | /** |
@@ -109,12 +76,12 @@ struct OperationState | |||
109 | /** | 76 | /** |
110 | * The bf we currently receive | 77 | * The bf we currently receive |
111 | */ | 78 | */ |
112 | struct BloomFilter *remote_bf; | 79 | struct GNUNET_CONTAINER_BloomFilter *remote_bf; |
113 | 80 | ||
114 | /** | 81 | /** |
115 | * BF of the set's element. | 82 | * BF of the set's element. |
116 | */ | 83 | */ |
117 | struct BloomFilter *local_bf; | 84 | struct GNUNET_CONTAINER_BloomFilter *local_bf; |
118 | 85 | ||
119 | /** | 86 | /** |
120 | * Current state of the operation. | 87 | * Current state of the operation. |
@@ -132,8 +99,18 @@ struct OperationState | |||
132 | * belongs to. | 99 | * belongs to. |
133 | */ | 100 | */ |
134 | struct Set *set; | 101 | struct Set *set; |
102 | |||
103 | /** | ||
104 | * Maps element-id-hashes to 'elements in our set'. | ||
105 | */ | ||
106 | struct GNUNET_CONTAINER_MultiHashMap *contained_elements; | ||
135 | 107 | ||
136 | /** | 108 | /** |
109 | * Iterator for sending elements on the key to element mapping to the client. | ||
110 | */ | ||
111 | struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter; | ||
112 | |||
113 | /** | ||
137 | * Evaluate operations are held in | 114 | * Evaluate operations are held in |
138 | * a linked list. | 115 | * a linked list. |
139 | */ | 116 | */ |
@@ -153,63 +130,11 @@ struct OperationState | |||
153 | 130 | ||
154 | 131 | ||
155 | /** | 132 | /** |
156 | * The key entry is used to associate an ibf key with | ||
157 | * an element. | ||
158 | */ | ||
159 | struct KeyEntry | ||
160 | { | ||
161 | /** | ||
162 | * IBF key for the entry, derived from the current salt. | ||
163 | */ | ||
164 | struct IBF_Key ibf_key; | ||
165 | |||
166 | /** | ||
167 | * The actual element associated with the key | ||
168 | */ | ||
169 | struct ElementEntry *element; | ||
170 | |||
171 | /** | ||
172 | * Element that collides with this element | ||
173 | * on the ibf key | ||
174 | */ | ||
175 | struct KeyEntry *next_colliding; | ||
176 | }; | ||
177 | |||
178 | |||
179 | /** | ||
180 | * Used as a closure for sending elements | ||
181 | * with a specific IBF key. | ||
182 | */ | ||
183 | struct SendElementClosure | ||
184 | { | ||
185 | /** | ||
186 | * The IBF key whose matching elements should be | ||
187 | * sent. | ||
188 | */ | ||
189 | struct IBF_Key ibf_key; | ||
190 | |||
191 | /** | ||
192 | * Operation for which the elements | ||
193 | * should be sent. | ||
194 | */ | ||
195 | struct OperationState *eo; | ||
196 | }; | ||
197 | |||
198 | |||
199 | /** | ||
200 | * Extra state required for efficient set intersection. | 133 | * Extra state required for efficient set intersection. |
201 | */ | 134 | */ |
202 | struct SetState | 135 | struct SetState |
203 | { | 136 | { |
204 | /** | 137 | /** |
205 | * The strata estimator is only generated once for | ||
206 | * each set. | ||
207 | * The IBF keys are derived from the element hashes with | ||
208 | * salt=0. | ||
209 | */ | ||
210 | struct StrataEstimator *se; | ||
211 | |||
212 | /** | ||
213 | * Evaluate operations are held in | 138 | * Evaluate operations are held in |
214 | * a linked list. | 139 | * a linked list. |
215 | */ | 140 | */ |
@@ -224,38 +149,6 @@ struct SetState | |||
224 | 149 | ||
225 | 150 | ||
226 | /** | 151 | /** |
227 | * Iterator over hash map entries. | ||
228 | * | ||
229 | * @param cls closure | ||
230 | * @param key current key code | ||
231 | * @param value value in the hash map | ||
232 | * @return GNUNET_YES if we should continue to | ||
233 | * iterate, | ||
234 | * GNUNET_NO if not. | ||
235 | */ | ||
236 | static int | ||
237 | destroy_key_to_element_iter (void *cls, | ||
238 | uint32_t key, | ||
239 | void *value) | ||
240 | { | ||
241 | struct KeyEntry *k = value; | ||
242 | |||
243 | while (NULL != k) | ||
244 | { | ||
245 | struct KeyEntry *k_tmp = k; | ||
246 | k = k->next_colliding; | ||
247 | if (GNUNET_YES == k_tmp->element->remote) | ||
248 | { | ||
249 | GNUNET_free (k_tmp->element); | ||
250 | k_tmp->element = NULL; | ||
251 | } | ||
252 | GNUNET_free (k_tmp); | ||
253 | } | ||
254 | return GNUNET_YES; | ||
255 | } | ||
256 | |||
257 | |||
258 | /** | ||
259 | * Destroy a intersection operation, and free all resources | 152 | * Destroy a intersection operation, and free all resources |
260 | * associated with it. | 153 | * associated with it. |
261 | * | 154 | * |
@@ -320,171 +213,47 @@ fail_intersection_operation (struct OperationState *eo) | |||
320 | 213 | ||
321 | 214 | ||
322 | /** | 215 | /** |
323 | * Derive the IBF key from a hash code and | ||
324 | * a salt. | ||
325 | * | ||
326 | * @param src the hash code | ||
327 | * @param salt salt to use | ||
328 | * @return the derived IBF key | ||
329 | */ | ||
330 | static struct IBF_Key | ||
331 | get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) | ||
332 | { | ||
333 | struct IBF_Key key; | ||
334 | |||
335 | GNUNET_CRYPTO_hkdf (&key, sizeof (key), | ||
336 | GCRY_MD_SHA512, GCRY_MD_SHA256, | ||
337 | src, sizeof *src, | ||
338 | &salt, sizeof (salt), | ||
339 | NULL, 0); | ||
340 | return key; | ||
341 | } | ||
342 | |||
343 | |||
344 | /** | ||
345 | * Send a request for the evaluate operation to a remote peer | 216 | * Send a request for the evaluate operation to a remote peer |
346 | * | 217 | * |
347 | * @param eo operation with the other peer | 218 | * @param eo operation with the other peer |
348 | */ | 219 | */ |
349 | static void | 220 | static void |
350 | send_operation_request (struct OperationState *eo) | 221 | send_operation_request (struct Operation *op) |
351 | { | 222 | { |
352 | struct GNUNET_MQ_Envelope *ev; | 223 | struct GNUNET_MQ_Envelope *ev; |
353 | struct OperationRequestMessage *msg; | 224 | struct OperationRequestMessage *msg; |
354 | 225 | ||
355 | ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, | 226 | ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, |
356 | eo->spec->context_msg); | 227 | op->spec->context_msg); |
357 | 228 | ||
358 | if (NULL == ev) | 229 | if (NULL == ev) |
359 | { | 230 | { |
360 | /* the context message is too large */ | 231 | /* the context message is too large */ |
361 | GNUNET_break (0); | 232 | GNUNET_break (0); |
362 | GNUNET_SERVER_client_disconnect (eo->spec->set->client); | 233 | GNUNET_SERVER_client_disconnect (op->spec->set->client); |
363 | return; | 234 | return; |
364 | } | 235 | } |
365 | msg->operation = htonl (GNUNET_SET_OPERATION_UNION); | 236 | msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); |
366 | msg->app_id = eo->spec->app_id; | 237 | msg->app_id = op->spec->app_id; |
367 | msg->salt = htonl (eo->spec->salt); | 238 | msg->salt = htonl (op->spec->salt); |
368 | GNUNET_MQ_send (eo->mq, ev); | 239 | GNUNET_MQ_send (op->mq, ev); |
369 | 240 | ||
370 | if (NULL != eo->spec->context_msg) | 241 | if (NULL != op->spec->context_msg) |
371 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); | 242 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); |
372 | else | 243 | else |
373 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); | 244 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); |
374 | 245 | ||
375 | if (NULL != eo->spec->context_msg) | 246 | if (NULL != op->spec->context_msg) |
376 | { | 247 | { |
377 | GNUNET_free (eo->spec->context_msg); | 248 | GNUNET_free (op->spec->context_msg); |
378 | eo->spec->context_msg = NULL; | 249 | op->spec->context_msg = NULL; |
379 | } | 250 | } |
380 | 251 | ||
381 | } | 252 | } |
382 | 253 | ||
383 | 254 | ||
384 | /** | 255 | /** |
385 | * Iterator to create the mapping between ibf keys | 256 | * Handle an BF message from a remote peer. |
386 | * and element entries. | ||
387 | * | ||
388 | * @param cls closure | ||
389 | * @param key current key code | ||
390 | * @param value value in the hash map | ||
391 | * @return GNUNET_YES if we should continue to | ||
392 | * iterate, | ||
393 | * GNUNET_NO if not. | ||
394 | */ | ||
395 | static int | ||
396 | op_register_element_iterator (void *cls, | ||
397 | uint32_t key, | ||
398 | void *value) | ||
399 | { | ||
400 | struct KeyEntry *const new_k = cls; | ||
401 | struct KeyEntry *old_k = value; | ||
402 | |||
403 | GNUNET_assert (NULL != old_k); | ||
404 | do | ||
405 | { | ||
406 | if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) | ||
407 | { | ||
408 | new_k->next_colliding = old_k->next_colliding; | ||
409 | old_k->next_colliding = new_k; | ||
410 | return GNUNET_NO; | ||
411 | } | ||
412 | old_k = old_k->next_colliding; | ||
413 | } while (NULL != old_k); | ||
414 | return GNUNET_YES; | ||
415 | } | ||
416 | |||
417 | |||
418 | /** | ||
419 | * Insert an element into the intersection operation's | ||
420 | * key-to-element mapping. Takes ownership of 'ee'. | ||
421 | * Note that this does not insert the element in the set, | ||
422 | * only in the operation's key-element mapping. | ||
423 | * This is done to speed up re-tried operations, if some elements | ||
424 | * were transmitted, and then the IBF fails to decode. | ||
425 | * | ||
426 | * @param eo the intersection operation | ||
427 | * @param ee the element entry | ||
428 | */ | ||
429 | static void | ||
430 | op_register_element (struct OperationState *eo, struct ElementEntry *ee) | ||
431 | { | ||
432 | int ret; | ||
433 | struct IBF_Key ibf_key; | ||
434 | struct KeyEntry *k; | ||
435 | |||
436 | ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt); | ||
437 | k = GNUNET_new (struct KeyEntry); | ||
438 | k->element = ee; | ||
439 | k->ibf_key = ibf_key; | ||
440 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, | ||
441 | (uint32_t) ibf_key.key_val, | ||
442 | op_register_element_iterator, k); | ||
443 | |||
444 | /* was the element inserted into a colliding bucket? */ | ||
445 | if (GNUNET_SYSERR == ret) | ||
446 | return; | ||
447 | |||
448 | GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, | ||
449 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
450 | } | ||
451 | |||
452 | |||
453 | |||
454 | /** | ||
455 | * Iterator for initializing the | ||
456 | * key-to-element mapping of a intersection operation | ||
457 | * | ||
458 | * @param cls the intersection operation | ||
459 | * @param key unised | ||
460 | * @param value the element entry to insert | ||
461 | * into the key-to-element mapping | ||
462 | * @return GNUNET_YES to continue iterating, | ||
463 | * GNUNET_NO to stop | ||
464 | */ | ||
465 | static int | ||
466 | init_key_to_element_iterator (void *cls, | ||
467 | const struct GNUNET_HashCode *key, | ||
468 | void *value) | ||
469 | { | ||
470 | struct OperationState *eo = cls; | ||
471 | struct ElementEntry *e = value; | ||
472 | |||
473 | /* make sure that the element belongs to the set at the time | ||
474 | * of creating the operation */ | ||
475 | if ( (e->generation_added > eo->generation_created) || | ||
476 | ( (GNUNET_YES == e->removed) && | ||
477 | (e->generation_removed < eo->generation_created))) | ||
478 | return GNUNET_YES; | ||
479 | |||
480 | GNUNET_assert (GNUNET_NO == e->remote); | ||
481 | |||
482 | op_register_element (eo, e); | ||
483 | return GNUNET_YES; | ||
484 | } | ||
485 | |||
486 | /** | ||
487 | * Handle an IBF message from a remote peer. | ||
488 | * | 257 | * |
489 | * @param cls the intersection operation | 258 | * @param cls the intersection operation |
490 | * @param mh the header of the message | 259 | * @param mh the header of the message |
@@ -579,6 +348,57 @@ send_client_done_and_destroy (struct OperationState *eo) | |||
579 | intersection_operation_destroy (eo); | 348 | intersection_operation_destroy (eo); |
580 | } | 349 | } |
581 | 350 | ||
351 | /** | ||
352 | * Send a bloomfilter to our peer. | ||
353 | * that the operation is over. | ||
354 | * After the result done message has been sent to the client, | ||
355 | * destroy the evaluate operation. | ||
356 | * | ||
357 | * @param eo intersection operation | ||
358 | */ | ||
359 | static void | ||
360 | send_bloomfilter (struct OperationState *eo){ | ||
361 | //get number of all elements still in the set | ||
362 | |||
363 | // send the bloomfilter | ||
364 | unsigned int buckets_sent = 0; | ||
365 | struct BloomFilter *bf; | ||
366 | //TODO: | ||
367 | // add all our elements to the bloomfilter | ||
368 | // create new bloomfilter for all our elements & count elements | ||
369 | //GNUNET_CONTAINER_multihashmap32_remove | ||
370 | //eo->local_bf = GNUNET_CONTAINER_multihashmap32_iterate(eo->set->elements, add); | ||
371 | |||
372 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n", 1<<ibf_order); | ||
373 | |||
374 | bf = eo->local_bf; | ||
375 | |||
376 | while (buckets_sent < (1 << bf_order)) | ||
377 | { | ||
378 | unsigned int buckets_in_message; | ||
379 | struct GNUNET_MQ_Envelope *ev; | ||
380 | struct IBFMessage *msg; | ||
381 | |||
382 | buckets_in_message = (1 << bf_order) - buckets_sent; | ||
383 | /* limit to maximum */ | ||
384 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) | ||
385 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; | ||
386 | |||
387 | ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, | ||
388 | GNUNET_MESSAGE_TYPE_SET_P2P_BF); | ||
389 | msg->reserved = 0; | ||
390 | msg->order = bf_order; | ||
391 | msg->offset = htons (buckets_sent); | ||
392 | ibf_write_slice (ibf, buckets_sent, | ||
393 | buckets_in_message, &msg[1]); | ||
394 | buckets_sent += buckets_in_message; | ||
395 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", | ||
396 | buckets_in_message, buckets_sent, 1<<ibf_order); | ||
397 | GNUNET_MQ_send (eo->mq, ev); | ||
398 | } | ||
399 | |||
400 | eo->phase = PHASE_EXPECT_BF; | ||
401 | } | ||
582 | 402 | ||
583 | /** | 403 | /** |
584 | * Handle a done message from a remote peer | 404 | * Handle a done message from a remote peer |
@@ -615,78 +435,46 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) | |||
615 | 435 | ||
616 | 436 | ||
617 | /** | 437 | /** |
618 | * Evaluate a intersection operation with | 438 | * Evaluate a union operation with |
619 | * a remote peer. | 439 | * a remote peer. |
620 | * | 440 | * |
621 | * @param spec specification of the operation the evaluate | 441 | * @param op operation to evaluate |
622 | * @param tunnel tunnel already connected to the partner peer | ||
623 | * @param tc tunnel context, passed here so all new incoming | ||
624 | * messages are directly going to the intersection operations | ||
625 | * @return a handle to the operation | ||
626 | */ | 442 | */ |
627 | static void | 443 | static void |
628 | intersection_evaluate (struct OperationSpecification *spec, | 444 | intersection_evaluate (struct Operation *op) |
629 | struct GNUNET_MESH_Tunnel *tunnel, | ||
630 | struct TunnelContext *tc) | ||
631 | { | 445 | { |
632 | struct OperationState *eo; | 446 | op->state = GNUNET_new (struct OperationState); |
633 | |||
634 | eo = GNUNET_new (struct OperationState); | ||
635 | tc->vt = _GSS_intersection_vt (); | ||
636 | tc->op = eo; | ||
637 | eo->generation_created = spec->set->current_generation++; | ||
638 | eo->set = spec->set; | ||
639 | eo->spec = spec; | ||
640 | eo->tunnel = tunnel; | ||
641 | eo->mq = GNUNET_MESH_mq_create (tunnel); | ||
642 | |||
643 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
644 | "evaluating intersection operation, (app %s)\n", | ||
645 | GNUNET_h2s (&eo->spec->app_id)); | ||
646 | |||
647 | /* we started the operation, thus we have to send the operation request */ | 447 | /* we started the operation, thus we have to send the operation request */ |
648 | eo->phase = PHASE_EXPECT_SE; | 448 | op->state->phase = PHASE_BF_EXCHANGE; |
649 | 449 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating intersection operation"); | |
650 | GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, | 450 | send_operation_request (op); |
651 | eo->set->state->ops_tail, | ||
652 | eo); | ||
653 | |||
654 | send_initial_bloomfilter (eo); | ||
655 | } | 451 | } |
656 | 452 | ||
657 | 453 | ||
658 | /** | 454 | /** |
659 | * Accept an intersection operation request from a remote peer | 455 | * Accept an union operation request from a remote peer. |
456 | * Only initializes the private operation state. | ||
660 | * | 457 | * |
661 | * @param spec all necessary information about the operation | 458 | * @param op operation that will be accepted as a union operation |
662 | * @param tunnel open tunnel to the partner's peer | ||
663 | * @param tc tunnel context, passed here so all new incoming | ||
664 | * messages are directly going to the intersection operations | ||
665 | * @return operation | ||
666 | */ | 459 | */ |
667 | static void | 460 | static void |
668 | intersection_accept (struct OperationSpecification *spec, | 461 | intersection_accept (struct Operation *op) |
669 | struct GNUNET_MESH_Tunnel *tunnel, | ||
670 | struct TunnelContext *tc) | ||
671 | { | 462 | { |
672 | struct OperationState *eo; | 463 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); |
673 | 464 | op->state = GNUNET_new (struct OperationState); | |
674 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set intersection operation\n"); | 465 | |
675 | 466 | op->state->contained_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES); | |
676 | eo = GNUNET_new (struct OperationState); | 467 | |
677 | tc->vt = _GSS_intersection_vt (); | 468 | if (NULL != op->state->remote_bf){ |
678 | tc->op = eo; | 469 | // run the set through the remote bloomfilter |
679 | eo->set = spec->set; | 470 | ; |
680 | eo->generation_created = eo->set->current_generation++; | 471 | } |
681 | eo->spec = spec; | 472 | |
682 | eo->tunnel = tunnel; | 473 | // |
683 | eo->mq = GNUNET_MESH_mq_create (tunnel); | 474 | op->state->local_bf; |
684 | /* transfer ownership of mq and socket from incoming to eo */ | 475 | |
685 | GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, | ||
686 | eo->set->state->ops_tail, | ||
687 | eo); | ||
688 | /* kick off the operation */ | 476 | /* kick off the operation */ |
689 | send_bloomfilter (eo); | 477 | send_bloomfilter (op); |
690 | } | 478 | } |
691 | 479 | ||
692 | 480 | ||
@@ -703,8 +491,6 @@ intersection_set_create (void) | |||
703 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n"); | 491 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n"); |
704 | 492 | ||
705 | set_state = GNUNET_new (struct SetState); | 493 | set_state = GNUNET_new (struct SetState); |
706 | |||
707 | //TODO: actually create that thing | ||
708 | 494 | ||
709 | return set_state; | 495 | return set_state; |
710 | } | 496 | } |
@@ -719,7 +505,7 @@ intersection_set_create (void) | |||
719 | static void | 505 | static void |
720 | intersection_add (struct SetState *set_state, struct ElementEntry *ee) | 506 | intersection_add (struct SetState *set_state, struct ElementEntry *ee) |
721 | { | 507 | { |
722 | //TODO | 508 | //nothing to do here |
723 | } | 509 | } |
724 | 510 | ||
725 | 511 | ||
@@ -731,15 +517,6 @@ intersection_add (struct SetState *set_state, struct ElementEntry *ee) | |||
731 | static void | 517 | static void |
732 | intersection_set_destroy (struct SetState *set_state) | 518 | intersection_set_destroy (struct SetState *set_state) |
733 | { | 519 | { |
734 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection set\n"); | ||
735 | /* important to destroy operations before the rest of the set */ | ||
736 | while (NULL != set_state->ops_head) | ||
737 | intersection_operation_destroy (set_state->ops_head); | ||
738 | if (NULL != set_state->se) | ||
739 | { | ||
740 | //TODO: actually destroy that thing | ||
741 | set_state->se = NULL; | ||
742 | } | ||
743 | GNUNET_free (set_state); | 520 | GNUNET_free (set_state); |
744 | } | 521 | } |
745 | 522 | ||
@@ -753,7 +530,7 @@ intersection_set_destroy (struct SetState *set_state) | |||
753 | static void | 530 | static void |
754 | intersection_remove (struct SetState *set_state, struct ElementEntry *element) | 531 | intersection_remove (struct SetState *set_state, struct ElementEntry *element) |
755 | { | 532 | { |
756 | //TODO | 533 | //nothing to do here |
757 | } | 534 | } |
758 | 535 | ||
759 | 536 | ||
@@ -773,7 +550,10 @@ intersection_handle_p2p_message (struct OperationState *eo, | |||
773 | ntohs (mh->type), ntohs (mh->size)); | 550 | ntohs (mh->type), ntohs (mh->size)); |
774 | switch (ntohs (mh->type)) | 551 | switch (ntohs (mh->type)) |
775 | { | 552 | { |
776 | case GNUNET_MESSAGE_TYPE_SET_P2P_BF: | 553 | /* this message handler is not active until after we received an |
554 | * operation request message, thus the ops request is not handled here | ||
555 | */ | ||
556 | case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF: | ||
777 | handle_p2p_bf (eo, mh); | 557 | handle_p2p_bf (eo, mh); |
778 | break; | 558 | break; |
779 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: | 559 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: |
@@ -786,20 +566,55 @@ intersection_handle_p2p_message (struct OperationState *eo, | |||
786 | return GNUNET_OK; | 566 | return GNUNET_OK; |
787 | } | 567 | } |
788 | 568 | ||
569 | /** | ||
570 | * Signal to the client that the operation has finished and | ||
571 | * destroy the operation. | ||
572 | * | ||
573 | * @param cls operation to destroy | ||
574 | */ | ||
575 | static void | ||
576 | send_done_and_destroy (void *cls) | ||
577 | { | ||
578 | struct Operation *op = cls; | ||
579 | struct GNUNET_MQ_Envelope *ev; | ||
580 | struct GNUNET_SET_ResultMessage *rm; | ||
581 | ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); | ||
582 | rm->request_id = htonl (op->spec->client_request_id); | ||
583 | rm->result_status = htons (GNUNET_SET_STATUS_DONE); | ||
584 | rm->element_type = htons (0); | ||
585 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | ||
586 | _GSS_operation_destroy (op); | ||
587 | } | ||
789 | 588 | ||
589 | /** | ||
590 | * Send a result message to the client indicating | ||
591 | * that the operation is over. | ||
592 | * After the result done message has been sent to the client, | ||
593 | * destroy the evaluate operation. | ||
594 | * | ||
595 | * @param op union operation | ||
596 | */ | ||
790 | static void | 597 | static void |
791 | intersection_peer_disconnect (struct OperationState *op) | 598 | finish_and_destroy (struct Operation *op) |
792 | { | 599 | { |
793 | /* Are we already disconnected? */ | 600 | GNUNET_assert (GNUNET_NO == op->state->client_done_sent); |
794 | if (NULL == op->tunnel) | 601 | |
795 | return; | 602 | if (GNUNET_SET_RESULT_FULL == op->spec->result_mode) |
796 | op->tunnel = NULL; | ||
797 | if (NULL != op->mq) | ||
798 | { | 603 | { |
799 | GNUNET_MQ_destroy (op->mq); | 604 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n"); |
800 | op->mq = NULL; | 605 | GNUNET_assert (NULL == op->state->full_result_iter); |
606 | op->state->full_result_iter = | ||
607 | GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->contained_elements); | ||
608 | return; | ||
801 | } | 609 | } |
802 | if (PHASE_FINISHED != op->phase) | 610 | send_done_and_destroy (op); |
611 | } | ||
612 | |||
613 | |||
614 | static void | ||
615 | intersection_peer_disconnect (struct Operation *op) | ||
616 | { | ||
617 | if (PHASE_FINISHED != op->state->phase) | ||
803 | { | 618 | { |
804 | struct GNUNET_MQ_Envelope *ev; | 619 | struct GNUNET_MQ_Envelope *ev; |
805 | struct GNUNET_SET_ResultMessage *msg; | 620 | struct GNUNET_SET_ResultMessage *msg; |
@@ -810,22 +625,47 @@ intersection_peer_disconnect (struct OperationState *op) | |||
810 | msg->element_type = htons (0); | 625 | msg->element_type = htons (0); |
811 | GNUNET_MQ_send (op->spec->set->client_mq, ev); | 626 | GNUNET_MQ_send (op->spec->set->client_mq, ev); |
812 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); | 627 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); |
813 | intersection_operation_destroy (op); | 628 | _GSS_operation_destroy (op); |
814 | return; | 629 | return; |
815 | } | 630 | } |
816 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); | 631 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); |
817 | if (GNUNET_NO == op->client_done_sent) | 632 | if (GNUNET_NO == op->state->client_done_sent) |
818 | send_client_done_and_destroy (op); | 633 | finish_and_destroy (op); |
819 | } | 634 | } |
820 | 635 | ||
821 | 636 | ||
637 | /** | ||
638 | * Destroy the union operation. Only things specific to the union operation are destroyed. | ||
639 | * | ||
640 | * @param op union operation to destroy | ||
641 | */ | ||
822 | static void | 642 | static void |
823 | intersection_op_cancel (struct SetState *set_state, uint32_t op_id) | 643 | intersection_op_cancel (struct Operation *op) |
824 | { | 644 | { |
825 | /* FIXME: implement */ | 645 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n"); |
646 | /* check if the op was canceled twice */ | ||
647 | GNUNET_assert (NULL != op->state); | ||
648 | if (NULL != op->state->remote_bf) | ||
649 | { | ||
650 | GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); | ||
651 | op->state->remote_bf = NULL; | ||
652 | } | ||
653 | if (NULL != op->state->local_bf) | ||
654 | { | ||
655 | GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf); | ||
656 | op->state->local_bf = NULL; | ||
657 | } | ||
658 | if (NULL != op->state->contained_elements) | ||
659 | { | ||
660 | // no need to free the elements, they are still part of the set | ||
661 | GNUNET_CONTAINER_multihashmap_destroy (op->state->contained_elements); | ||
662 | op->state->contained_elements = NULL; | ||
663 | } | ||
664 | GNUNET_free (op->state); | ||
665 | op->state = NULL; | ||
666 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n"); | ||
826 | } | 667 | } |
827 | 668 | ||
828 | |||
829 | const struct SetVT * | 669 | const struct SetVT * |
830 | _GSS_intersection_vt () | 670 | _GSS_intersection_vt () |
831 | { | 671 | { |
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 6ad985bcb..6bd86c5b5 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -609,7 +609,7 @@ send_ibf (struct Operation *op, uint16_t ibf_order) | |||
609 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; | 609 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; |
610 | 610 | ||
611 | ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, | 611 | ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, |
612 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF); | 612 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF); |
613 | msg->reserved = 0; | 613 | msg->reserved = 0; |
614 | msg->order = ibf_order; | 614 | msg->order = ibf_order; |
615 | msg->offset = htons (buckets_sent); | 615 | msg->offset = htons (buckets_sent); |
@@ -638,7 +638,7 @@ send_strata_estimator (struct Operation *op) | |||
638 | 638 | ||
639 | ev = GNUNET_MQ_msg_header_extra (strata_msg, | 639 | ev = GNUNET_MQ_msg_header_extra (strata_msg, |
640 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, | 640 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, |
641 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); | 641 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE); |
642 | strata_estimator_write (op->state->se, &strata_msg[1]); | 642 | strata_estimator_write (op->state->se, &strata_msg[1]); |
643 | GNUNET_MQ_send (op->mq, ev); | 643 | GNUNET_MQ_send (op->mq, ev); |
644 | op->state->phase = PHASE_EXPECT_IBF; | 644 | op->state->phase = PHASE_EXPECT_IBF; |
@@ -1299,10 +1299,10 @@ union_handle_p2p_message (struct Operation *op, | |||
1299 | ntohs (mh->type), ntohs (mh->size)); | 1299 | ntohs (mh->type), ntohs (mh->size)); |
1300 | switch (ntohs (mh->type)) | 1300 | switch (ntohs (mh->type)) |
1301 | { | 1301 | { |
1302 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: | 1302 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: |
1303 | handle_p2p_ibf (op, mh); | 1303 | handle_p2p_ibf (op, mh); |
1304 | break; | 1304 | break; |
1305 | case GNUNET_MESSAGE_TYPE_SET_P2P_SE: | 1305 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE: |
1306 | handle_p2p_strata_estimator (op, mh); | 1306 | handle_p2p_strata_estimator (op, mh); |
1307 | break; | 1307 | break; |
1308 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: | 1308 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: |
diff --git a/src/set/set_protocol.h b/src/set/set_protocol.h index e4755bbcc..21b7358d6 100644 --- a/src/set/set_protocol.h +++ b/src/set/set_protocol.h | |||
@@ -88,6 +88,35 @@ struct IBFMessage | |||
88 | /* rest: strata */ | 88 | /* rest: strata */ |
89 | }; | 89 | }; |
90 | 90 | ||
91 | struct BFMessage | ||
92 | { | ||
93 | /** | ||
94 | * Type: GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF | ||
95 | */ | ||
96 | struct GNUNET_MessageHeader header; | ||
97 | |||
98 | /** | ||
99 | * Padding, must be 0. | ||
100 | */ | ||
101 | uint8_t reserved; | ||
102 | |||
103 | /** | ||
104 | * Offset of the bloomfilter in the rest of the message | ||
105 | */ | ||
106 | uint16_t offset GNUNET_PACKED; | ||
107 | |||
108 | /** | ||
109 | * mutator used with this bloomfilter. | ||
110 | */ | ||
111 | uint64_t my_element_count; | ||
112 | |||
113 | /** | ||
114 | * mutator used with this bloomfilter. | ||
115 | */ | ||
116 | uint32_t my_mutator; | ||
117 | |||
118 | /* rest: bloomfilter */ | ||
119 | }; | ||
91 | 120 | ||
92 | GNUNET_NETWORK_STRUCT_END | 121 | GNUNET_NETWORK_STRUCT_END |
93 | 122 | ||