aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorChristian Fuchs <christian.fuchs@cfuchs.net>2013-11-07 15:45:14 +0000
committerChristian Fuchs <christian.fuchs@cfuchs.net>2013-11-07 15:45:14 +0000
commit7961175bdde4e1efe2140e04caa6e600d41bcf90 (patch)
tree1d7fd0decda1cc2a135e883887caf918e6944ee3 /src/set
parent5e4d5baa9bb49dd2a0e9b15f0687b6609ceaa52e (diff)
downloadgnunet-7961175bdde4e1efe2140e04caa6e600d41bcf90.tar.gz
gnunet-7961175bdde4e1efe2140e04caa6e600d41bcf90.zip
added bloomfilter message struct
renamed set-union specific message types added set-intersection specific message type added intersection cancel API implementation added intersection create API implementation added intersection destroy API implementation added intersection peer_disconnect API implementation added intersection accept API imlementation stub added many prototypes for intersection operation handlers
Diffstat (limited to 'src/set')
-rw-r--r--src/set/gnunet-service-set.c4
-rw-r--r--src/set/gnunet-service-set_intersection.c526
-rw-r--r--src/set/gnunet-service-set_union.c8
-rw-r--r--src/set/set_protocol.h29
4 files changed, 218 insertions, 349 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 4f09aa8ac..7338059fa 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -1318,11 +1318,11 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
1318 }; 1318 };
1319 static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { 1319 static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
1320 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, 1320 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
1321 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0}, 1321 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
1322 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, 1322 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
1323 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0}, 1323 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
1324 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, 1324 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
1325 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0}, 1325 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
1326 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, 1326 {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
1327 {NULL, 0, 0} 1327 {NULL, 0, 0}
1328 }; 1328 };
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index 221fb74a3..b46044c35 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -30,50 +30,17 @@
30#include "set_protocol.h" 30#include "set_protocol.h"
31#include <gcrypt.h> 31#include <gcrypt.h>
32 32
33
34/**
35 * Number of IBFs in a strata estimator.
36 */
37#define SE_STRATA_COUNT 32
38/**
39 * Size of the IBFs in the strata estimator.
40 */
41#define SE_IBF_SIZE 80
42/**
43 * hash num parameter for the difference digests and strata estimators
44 */
45#define SE_IBF_HASH_NUM 4
46
47/**
48 * Number of buckets that can be transmitted in one message.
49 */
50#define MAX_BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE)
51
52/**
53 * The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
54 * Choose this value so that computing the IBF is still cheaper
55 * than transmitting all values.
56 */
57#define MAX_IBF_ORDER (16)
58
59/**
60 * Number of buckets used in the ibf per estimated
61 * difference.
62 */
63#define IBF_ALPHA 4
64
65
66/** 33/**
67 * Current phase we are in for a intersection operation. 34 * Current phase we are in for a intersection operation.
68 */ 35 */
69enum IntersectionOperationPhase 36enum IntersectionOperationPhase
70{ 37{
71 /** 38 /**
72 * We sent the request message, and expect a BF 39 * We get our tunnel but received no message as of now
73 */ 40 */
74 PHASE_EXPECT_INITIAL, 41 PHASE_EXPECT_INITIAL,
75 /** 42 /**
76 * We sent the request message, and expect a BF 43 * We expect a BF + the number of the other peers elements
77 */ 44 */
78 PHASE_BF_EXCHANGE, 45 PHASE_BF_EXCHANGE,
79 /** 46 /**
@@ -109,12 +76,12 @@ struct OperationState
109 /** 76 /**
110 * The bf we currently receive 77 * The bf we currently receive
111 */ 78 */
112 struct BloomFilter *remote_bf; 79 struct GNUNET_CONTAINER_BloomFilter *remote_bf;
113 80
114 /** 81 /**
115 * BF of the set's element. 82 * BF of the set's element.
116 */ 83 */
117 struct BloomFilter *local_bf; 84 struct GNUNET_CONTAINER_BloomFilter *local_bf;
118 85
119 /** 86 /**
120 * Current state of the operation. 87 * Current state of the operation.
@@ -132,8 +99,18 @@ struct OperationState
132 * belongs to. 99 * belongs to.
133 */ 100 */
134 struct Set *set; 101 struct Set *set;
102
103 /**
104 * Maps element-id-hashes to 'elements in our set'.
105 */
106 struct GNUNET_CONTAINER_MultiHashMap *contained_elements;
135 107
136 /** 108 /**
109 * Iterator for sending elements on the key to element mapping to the client.
110 */
111 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
112
113 /**
137 * Evaluate operations are held in 114 * Evaluate operations are held in
138 * a linked list. 115 * a linked list.
139 */ 116 */
@@ -153,63 +130,11 @@ struct OperationState
153 130
154 131
155/** 132/**
156 * The key entry is used to associate an ibf key with
157 * an element.
158 */
159struct KeyEntry
160{
161 /**
162 * IBF key for the entry, derived from the current salt.
163 */
164 struct IBF_Key ibf_key;
165
166 /**
167 * The actual element associated with the key
168 */
169 struct ElementEntry *element;
170
171 /**
172 * Element that collides with this element
173 * on the ibf key
174 */
175 struct KeyEntry *next_colliding;
176};
177
178
179/**
180 * Used as a closure for sending elements
181 * with a specific IBF key.
182 */
183struct SendElementClosure
184{
185 /**
186 * The IBF key whose matching elements should be
187 * sent.
188 */
189 struct IBF_Key ibf_key;
190
191 /**
192 * Operation for which the elements
193 * should be sent.
194 */
195 struct OperationState *eo;
196};
197
198
199/**
200 * Extra state required for efficient set intersection. 133 * Extra state required for efficient set intersection.
201 */ 134 */
202struct SetState 135struct SetState
203{ 136{
204 /** 137 /**
205 * The strata estimator is only generated once for
206 * each set.
207 * The IBF keys are derived from the element hashes with
208 * salt=0.
209 */
210 struct StrataEstimator *se;
211
212 /**
213 * Evaluate operations are held in 138 * Evaluate operations are held in
214 * a linked list. 139 * a linked list.
215 */ 140 */
@@ -224,38 +149,6 @@ struct SetState
224 149
225 150
226/** 151/**
227 * Iterator over hash map entries.
228 *
229 * @param cls closure
230 * @param key current key code
231 * @param value value in the hash map
232 * @return GNUNET_YES if we should continue to
233 * iterate,
234 * GNUNET_NO if not.
235 */
236static int
237destroy_key_to_element_iter (void *cls,
238 uint32_t key,
239 void *value)
240{
241 struct KeyEntry *k = value;
242
243 while (NULL != k)
244 {
245 struct KeyEntry *k_tmp = k;
246 k = k->next_colliding;
247 if (GNUNET_YES == k_tmp->element->remote)
248 {
249 GNUNET_free (k_tmp->element);
250 k_tmp->element = NULL;
251 }
252 GNUNET_free (k_tmp);
253 }
254 return GNUNET_YES;
255}
256
257
258/**
259 * Destroy a intersection operation, and free all resources 152 * Destroy a intersection operation, and free all resources
260 * associated with it. 153 * associated with it.
261 * 154 *
@@ -320,171 +213,47 @@ fail_intersection_operation (struct OperationState *eo)
320 213
321 214
322/** 215/**
323 * Derive the IBF key from a hash code and
324 * a salt.
325 *
326 * @param src the hash code
327 * @param salt salt to use
328 * @return the derived IBF key
329 */
330static struct IBF_Key
331get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
332{
333 struct IBF_Key key;
334
335 GNUNET_CRYPTO_hkdf (&key, sizeof (key),
336 GCRY_MD_SHA512, GCRY_MD_SHA256,
337 src, sizeof *src,
338 &salt, sizeof (salt),
339 NULL, 0);
340 return key;
341}
342
343
344/**
345 * Send a request for the evaluate operation to a remote peer 216 * Send a request for the evaluate operation to a remote peer
346 * 217 *
347 * @param eo operation with the other peer 218 * @param eo operation with the other peer
348 */ 219 */
349static void 220static void
350send_operation_request (struct OperationState *eo) 221send_operation_request (struct Operation *op)
351{ 222{
352 struct GNUNET_MQ_Envelope *ev; 223 struct GNUNET_MQ_Envelope *ev;
353 struct OperationRequestMessage *msg; 224 struct OperationRequestMessage *msg;
354 225
355 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 226 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
356 eo->spec->context_msg); 227 op->spec->context_msg);
357 228
358 if (NULL == ev) 229 if (NULL == ev)
359 { 230 {
360 /* the context message is too large */ 231 /* the context message is too large */
361 GNUNET_break (0); 232 GNUNET_break (0);
362 GNUNET_SERVER_client_disconnect (eo->spec->set->client); 233 GNUNET_SERVER_client_disconnect (op->spec->set->client);
363 return; 234 return;
364 } 235 }
365 msg->operation = htonl (GNUNET_SET_OPERATION_UNION); 236 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
366 msg->app_id = eo->spec->app_id; 237 msg->app_id = op->spec->app_id;
367 msg->salt = htonl (eo->spec->salt); 238 msg->salt = htonl (op->spec->salt);
368 GNUNET_MQ_send (eo->mq, ev); 239 GNUNET_MQ_send (op->mq, ev);
369 240
370 if (NULL != eo->spec->context_msg) 241 if (NULL != op->spec->context_msg)
371 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); 242 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
372 else 243 else
373 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); 244 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
374 245
375 if (NULL != eo->spec->context_msg) 246 if (NULL != op->spec->context_msg)
376 { 247 {
377 GNUNET_free (eo->spec->context_msg); 248 GNUNET_free (op->spec->context_msg);
378 eo->spec->context_msg = NULL; 249 op->spec->context_msg = NULL;
379 } 250 }
380 251
381} 252}
382 253
383 254
384/** 255/**
385 * Iterator to create the mapping between ibf keys 256 * Handle an BF message from a remote peer.
386 * and element entries.
387 *
388 * @param cls closure
389 * @param key current key code
390 * @param value value in the hash map
391 * @return GNUNET_YES if we should continue to
392 * iterate,
393 * GNUNET_NO if not.
394 */
395static int
396op_register_element_iterator (void *cls,
397 uint32_t key,
398 void *value)
399{
400 struct KeyEntry *const new_k = cls;
401 struct KeyEntry *old_k = value;
402
403 GNUNET_assert (NULL != old_k);
404 do
405 {
406 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
407 {
408 new_k->next_colliding = old_k->next_colliding;
409 old_k->next_colliding = new_k;
410 return GNUNET_NO;
411 }
412 old_k = old_k->next_colliding;
413 } while (NULL != old_k);
414 return GNUNET_YES;
415}
416
417
418/**
419 * Insert an element into the intersection operation's
420 * key-to-element mapping. Takes ownership of 'ee'.
421 * Note that this does not insert the element in the set,
422 * only in the operation's key-element mapping.
423 * This is done to speed up re-tried operations, if some elements
424 * were transmitted, and then the IBF fails to decode.
425 *
426 * @param eo the intersection operation
427 * @param ee the element entry
428 */
429static void
430op_register_element (struct OperationState *eo, struct ElementEntry *ee)
431{
432 int ret;
433 struct IBF_Key ibf_key;
434 struct KeyEntry *k;
435
436 ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
437 k = GNUNET_new (struct KeyEntry);
438 k->element = ee;
439 k->ibf_key = ibf_key;
440 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
441 (uint32_t) ibf_key.key_val,
442 op_register_element_iterator, k);
443
444 /* was the element inserted into a colliding bucket? */
445 if (GNUNET_SYSERR == ret)
446 return;
447
448 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
449 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
450}
451
452
453
454/**
455 * Iterator for initializing the
456 * key-to-element mapping of a intersection operation
457 *
458 * @param cls the intersection operation
459 * @param key unised
460 * @param value the element entry to insert
461 * into the key-to-element mapping
462 * @return GNUNET_YES to continue iterating,
463 * GNUNET_NO to stop
464 */
465static int
466init_key_to_element_iterator (void *cls,
467 const struct GNUNET_HashCode *key,
468 void *value)
469{
470 struct OperationState *eo = cls;
471 struct ElementEntry *e = value;
472
473 /* make sure that the element belongs to the set at the time
474 * of creating the operation */
475 if ( (e->generation_added > eo->generation_created) ||
476 ( (GNUNET_YES == e->removed) &&
477 (e->generation_removed < eo->generation_created)))
478 return GNUNET_YES;
479
480 GNUNET_assert (GNUNET_NO == e->remote);
481
482 op_register_element (eo, e);
483 return GNUNET_YES;
484}
485
486/**
487 * Handle an IBF message from a remote peer.
488 * 257 *
489 * @param cls the intersection operation 258 * @param cls the intersection operation
490 * @param mh the header of the message 259 * @param mh the header of the message
@@ -579,6 +348,57 @@ send_client_done_and_destroy (struct OperationState *eo)
579 intersection_operation_destroy (eo); 348 intersection_operation_destroy (eo);
580} 349}
581 350
351/**
352 * Send a bloomfilter to our peer.
353 * that the operation is over.
354 * After the result done message has been sent to the client,
355 * destroy the evaluate operation.
356 *
357 * @param eo intersection operation
358 */
359static void
360send_bloomfilter (struct OperationState *eo){
361 //get number of all elements still in the set
362
363 // send the bloomfilter
364 unsigned int buckets_sent = 0;
365 struct BloomFilter *bf;
366 //TODO:
367 // add all our elements to the bloomfilter
368 // create new bloomfilter for all our elements & count elements
369 //GNUNET_CONTAINER_multihashmap32_remove
370 //eo->local_bf = GNUNET_CONTAINER_multihashmap32_iterate(eo->set->elements, add);
371
372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n", 1<<ibf_order);
373
374 bf = eo->local_bf;
375
376 while (buckets_sent < (1 << bf_order))
377 {
378 unsigned int buckets_in_message;
379 struct GNUNET_MQ_Envelope *ev;
380 struct IBFMessage *msg;
381
382 buckets_in_message = (1 << bf_order) - buckets_sent;
383 /* limit to maximum */
384 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
385 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
386
387 ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
388 GNUNET_MESSAGE_TYPE_SET_P2P_BF);
389 msg->reserved = 0;
390 msg->order = bf_order;
391 msg->offset = htons (buckets_sent);
392 ibf_write_slice (ibf, buckets_sent,
393 buckets_in_message, &msg[1]);
394 buckets_sent += buckets_in_message;
395 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
396 buckets_in_message, buckets_sent, 1<<ibf_order);
397 GNUNET_MQ_send (eo->mq, ev);
398 }
399
400 eo->phase = PHASE_EXPECT_BF;
401}
582 402
583/** 403/**
584 * Handle a done message from a remote peer 404 * Handle a done message from a remote peer
@@ -615,78 +435,46 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
615 435
616 436
617/** 437/**
618 * Evaluate a intersection operation with 438 * Evaluate a union operation with
619 * a remote peer. 439 * a remote peer.
620 * 440 *
621 * @param spec specification of the operation the evaluate 441 * @param op operation to evaluate
622 * @param tunnel tunnel already connected to the partner peer
623 * @param tc tunnel context, passed here so all new incoming
624 * messages are directly going to the intersection operations
625 * @return a handle to the operation
626 */ 442 */
627static void 443static void
628intersection_evaluate (struct OperationSpecification *spec, 444intersection_evaluate (struct Operation *op)
629 struct GNUNET_MESH_Tunnel *tunnel,
630 struct TunnelContext *tc)
631{ 445{
632 struct OperationState *eo; 446 op->state = GNUNET_new (struct OperationState);
633
634 eo = GNUNET_new (struct OperationState);
635 tc->vt = _GSS_intersection_vt ();
636 tc->op = eo;
637 eo->generation_created = spec->set->current_generation++;
638 eo->set = spec->set;
639 eo->spec = spec;
640 eo->tunnel = tunnel;
641 eo->mq = GNUNET_MESH_mq_create (tunnel);
642
643 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644 "evaluating intersection operation, (app %s)\n",
645 GNUNET_h2s (&eo->spec->app_id));
646
647 /* we started the operation, thus we have to send the operation request */ 447 /* we started the operation, thus we have to send the operation request */
648 eo->phase = PHASE_EXPECT_SE; 448 op->state->phase = PHASE_BF_EXCHANGE;
649 449 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating intersection operation");
650 GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, 450 send_operation_request (op);
651 eo->set->state->ops_tail,
652 eo);
653
654 send_initial_bloomfilter (eo);
655} 451}
656 452
657 453
658/** 454/**
659 * Accept an intersection operation request from a remote peer 455 * Accept an union operation request from a remote peer.
456 * Only initializes the private operation state.
660 * 457 *
661 * @param spec all necessary information about the operation 458 * @param op operation that will be accepted as a union operation
662 * @param tunnel open tunnel to the partner's peer
663 * @param tc tunnel context, passed here so all new incoming
664 * messages are directly going to the intersection operations
665 * @return operation
666 */ 459 */
667static void 460static void
668intersection_accept (struct OperationSpecification *spec, 461intersection_accept (struct Operation *op)
669 struct GNUNET_MESH_Tunnel *tunnel,
670 struct TunnelContext *tc)
671{ 462{
672 struct OperationState *eo; 463 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
673 464 op->state = GNUNET_new (struct OperationState);
674 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set intersection operation\n"); 465
675 466 op->state->contained_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
676 eo = GNUNET_new (struct OperationState); 467
677 tc->vt = _GSS_intersection_vt (); 468 if (NULL != op->state->remote_bf){
678 tc->op = eo; 469 // run the set through the remote bloomfilter
679 eo->set = spec->set; 470 ;
680 eo->generation_created = eo->set->current_generation++; 471 }
681 eo->spec = spec; 472
682 eo->tunnel = tunnel; 473 //
683 eo->mq = GNUNET_MESH_mq_create (tunnel); 474 op->state->local_bf;
684 /* transfer ownership of mq and socket from incoming to eo */ 475
685 GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
686 eo->set->state->ops_tail,
687 eo);
688 /* kick off the operation */ 476 /* kick off the operation */
689 send_bloomfilter (eo); 477 send_bloomfilter (op);
690} 478}
691 479
692 480
@@ -703,8 +491,6 @@ intersection_set_create (void)
703 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n"); 491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
704 492
705 set_state = GNUNET_new (struct SetState); 493 set_state = GNUNET_new (struct SetState);
706
707 //TODO: actually create that thing
708 494
709 return set_state; 495 return set_state;
710} 496}
@@ -719,7 +505,7 @@ intersection_set_create (void)
719static void 505static void
720intersection_add (struct SetState *set_state, struct ElementEntry *ee) 506intersection_add (struct SetState *set_state, struct ElementEntry *ee)
721{ 507{
722 //TODO 508 //nothing to do here
723} 509}
724 510
725 511
@@ -731,15 +517,6 @@ intersection_add (struct SetState *set_state, struct ElementEntry *ee)
731static void 517static void
732intersection_set_destroy (struct SetState *set_state) 518intersection_set_destroy (struct SetState *set_state)
733{ 519{
734 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection set\n");
735 /* important to destroy operations before the rest of the set */
736 while (NULL != set_state->ops_head)
737 intersection_operation_destroy (set_state->ops_head);
738 if (NULL != set_state->se)
739 {
740 //TODO: actually destroy that thing
741 set_state->se = NULL;
742 }
743 GNUNET_free (set_state); 520 GNUNET_free (set_state);
744} 521}
745 522
@@ -753,7 +530,7 @@ intersection_set_destroy (struct SetState *set_state)
753static void 530static void
754intersection_remove (struct SetState *set_state, struct ElementEntry *element) 531intersection_remove (struct SetState *set_state, struct ElementEntry *element)
755{ 532{
756 //TODO 533 //nothing to do here
757} 534}
758 535
759 536
@@ -773,7 +550,10 @@ intersection_handle_p2p_message (struct OperationState *eo,
773 ntohs (mh->type), ntohs (mh->size)); 550 ntohs (mh->type), ntohs (mh->size));
774 switch (ntohs (mh->type)) 551 switch (ntohs (mh->type))
775 { 552 {
776 case GNUNET_MESSAGE_TYPE_SET_P2P_BF: 553 /* this message handler is not active until after we received an
554 * operation request message, thus the ops request is not handled here
555 */
556 case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
777 handle_p2p_bf (eo, mh); 557 handle_p2p_bf (eo, mh);
778 break; 558 break;
779 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: 559 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
@@ -786,20 +566,55 @@ intersection_handle_p2p_message (struct OperationState *eo,
786 return GNUNET_OK; 566 return GNUNET_OK;
787} 567}
788 568
569/**
570 * Signal to the client that the operation has finished and
571 * destroy the operation.
572 *
573 * @param cls operation to destroy
574 */
575static void
576send_done_and_destroy (void *cls)
577{
578 struct Operation *op = cls;
579 struct GNUNET_MQ_Envelope *ev;
580 struct GNUNET_SET_ResultMessage *rm;
581 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
582 rm->request_id = htonl (op->spec->client_request_id);
583 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
584 rm->element_type = htons (0);
585 GNUNET_MQ_send (op->spec->set->client_mq, ev);
586 _GSS_operation_destroy (op);
587}
789 588
589/**
590 * Send a result message to the client indicating
591 * that the operation is over.
592 * After the result done message has been sent to the client,
593 * destroy the evaluate operation.
594 *
595 * @param op union operation
596 */
790static void 597static void
791intersection_peer_disconnect (struct OperationState *op) 598finish_and_destroy (struct Operation *op)
792{ 599{
793 /* Are we already disconnected? */ 600 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
794 if (NULL == op->tunnel) 601
795 return; 602 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
796 op->tunnel = NULL;
797 if (NULL != op->mq)
798 { 603 {
799 GNUNET_MQ_destroy (op->mq); 604 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
800 op->mq = NULL; 605 GNUNET_assert (NULL == op->state->full_result_iter);
606 op->state->full_result_iter =
607 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->contained_elements);
608 return;
801 } 609 }
802 if (PHASE_FINISHED != op->phase) 610 send_done_and_destroy (op);
611}
612
613
614static void
615intersection_peer_disconnect (struct Operation *op)
616{
617 if (PHASE_FINISHED != op->state->phase)
803 { 618 {
804 struct GNUNET_MQ_Envelope *ev; 619 struct GNUNET_MQ_Envelope *ev;
805 struct GNUNET_SET_ResultMessage *msg; 620 struct GNUNET_SET_ResultMessage *msg;
@@ -810,22 +625,47 @@ intersection_peer_disconnect (struct OperationState *op)
810 msg->element_type = htons (0); 625 msg->element_type = htons (0);
811 GNUNET_MQ_send (op->spec->set->client_mq, ev); 626 GNUNET_MQ_send (op->spec->set->client_mq, ev);
812 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); 627 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
813 intersection_operation_destroy (op); 628 _GSS_operation_destroy (op);
814 return; 629 return;
815 } 630 }
816 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); 631 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
817 if (GNUNET_NO == op->client_done_sent) 632 if (GNUNET_NO == op->state->client_done_sent)
818 send_client_done_and_destroy (op); 633 finish_and_destroy (op);
819} 634}
820 635
821 636
637/**
638 * Destroy the union operation. Only things specific to the union operation are destroyed.
639 *
640 * @param op union operation to destroy
641 */
822static void 642static void
823intersection_op_cancel (struct SetState *set_state, uint32_t op_id) 643intersection_op_cancel (struct Operation *op)
824{ 644{
825 /* FIXME: implement */ 645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op\n");
646 /* check if the op was canceled twice */
647 GNUNET_assert (NULL != op->state);
648 if (NULL != op->state->remote_bf)
649 {
650 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
651 op->state->remote_bf = NULL;
652 }
653 if (NULL != op->state->local_bf)
654 {
655 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
656 op->state->local_bf = NULL;
657 }
658 if (NULL != op->state->contained_elements)
659 {
660 // no need to free the elements, they are still part of the set
661 GNUNET_CONTAINER_multihashmap_destroy (op->state->contained_elements);
662 op->state->contained_elements = NULL;
663 }
664 GNUNET_free (op->state);
665 op->state = NULL;
666 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying intersection op done\n");
826} 667}
827 668
828
829const struct SetVT * 669const struct SetVT *
830_GSS_intersection_vt () 670_GSS_intersection_vt ()
831{ 671{
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 6ad985bcb..6bd86c5b5 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -609,7 +609,7 @@ send_ibf (struct Operation *op, uint16_t ibf_order)
609 buckets_in_message = MAX_BUCKETS_PER_MESSAGE; 609 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
610 610
611 ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, 611 ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
612 GNUNET_MESSAGE_TYPE_SET_P2P_IBF); 612 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF);
613 msg->reserved = 0; 613 msg->reserved = 0;
614 msg->order = ibf_order; 614 msg->order = ibf_order;
615 msg->offset = htons (buckets_sent); 615 msg->offset = htons (buckets_sent);
@@ -638,7 +638,7 @@ send_strata_estimator (struct Operation *op)
638 638
639 ev = GNUNET_MQ_msg_header_extra (strata_msg, 639 ev = GNUNET_MQ_msg_header_extra (strata_msg,
640 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, 640 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
641 GNUNET_MESSAGE_TYPE_SET_P2P_SE); 641 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE);
642 strata_estimator_write (op->state->se, &strata_msg[1]); 642 strata_estimator_write (op->state->se, &strata_msg[1]);
643 GNUNET_MQ_send (op->mq, ev); 643 GNUNET_MQ_send (op->mq, ev);
644 op->state->phase = PHASE_EXPECT_IBF; 644 op->state->phase = PHASE_EXPECT_IBF;
@@ -1299,10 +1299,10 @@ union_handle_p2p_message (struct Operation *op,
1299 ntohs (mh->type), ntohs (mh->size)); 1299 ntohs (mh->type), ntohs (mh->size));
1300 switch (ntohs (mh->type)) 1300 switch (ntohs (mh->type))
1301 { 1301 {
1302 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: 1302 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1303 handle_p2p_ibf (op, mh); 1303 handle_p2p_ibf (op, mh);
1304 break; 1304 break;
1305 case GNUNET_MESSAGE_TYPE_SET_P2P_SE: 1305 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1306 handle_p2p_strata_estimator (op, mh); 1306 handle_p2p_strata_estimator (op, mh);
1307 break; 1307 break;
1308 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: 1308 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
diff --git a/src/set/set_protocol.h b/src/set/set_protocol.h
index e4755bbcc..21b7358d6 100644
--- a/src/set/set_protocol.h
+++ b/src/set/set_protocol.h
@@ -88,6 +88,35 @@ struct IBFMessage
88 /* rest: strata */ 88 /* rest: strata */
89}; 89};
90 90
91struct BFMessage
92{
93 /**
94 * Type: GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF
95 */
96 struct GNUNET_MessageHeader header;
97
98 /**
99 * Padding, must be 0.
100 */
101 uint8_t reserved;
102
103 /**
104 * Offset of the bloomfilter in the rest of the message
105 */
106 uint16_t offset GNUNET_PACKED;
107
108 /**
109 * mutator used with this bloomfilter.
110 */
111 uint64_t my_element_count;
112
113 /**
114 * mutator used with this bloomfilter.
115 */
116 uint32_t my_mutator;
117
118 /* rest: bloomfilter */
119};
91 120
92GNUNET_NETWORK_STRUCT_END 121GNUNET_NETWORK_STRUCT_END
93 122