aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorChristian Fuchs <christian.fuchs@cfuchs.net>2013-11-20 16:20:26 +0000
committerChristian Fuchs <christian.fuchs@cfuchs.net>2013-11-20 16:20:26 +0000
commit225260a0a885e09a417395a5c9de7b6da549f290 (patch)
tree4859d85f24bc17a4197ea2465cc4034102626124 /src/set
parent52a5e73ced4c456e5d6951158844f047048bd4e0 (diff)
downloadgnunet-225260a0a885e09a417395a5c9de7b6da549f290.tar.gz
gnunet-225260a0a885e09a417395a5c9de7b6da549f290.zip
added intersection to set's automake script
renamed shared variable in set-service's common structure reordered code in intersection i assume: finished reworking intersection, still misses dynamic bloomfilter size re-added struct SetState to intersection, which keeps track of the newest (and only this!) element count in the set's hashmap. removed iterator over the set for counting (see line above) a bugfix here and there
Diffstat (limited to 'src/set')
-rw-r--r--src/set/Makefile.am2
-rw-r--r--src/set/gnunet-service-set.c2
-rw-r--r--src/set/gnunet-service-set.h2
-rw-r--r--src/set/gnunet-service-set_intersection.c324
4 files changed, 162 insertions, 168 deletions
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index 72d3d82a0..a2ca25f94 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -45,12 +45,14 @@ gnunet_set_ibf_profiler_LDADD = \
45gnunet_service_set_SOURCES = \ 45gnunet_service_set_SOURCES = \
46 gnunet-service-set.c \ 46 gnunet-service-set.c \
47 gnunet-service-set_union.c \ 47 gnunet-service-set_union.c \
48 gnunet-service-set_intersection.c \
48 ibf.c \ 49 ibf.c \
49 strata_estimator.c 50 strata_estimator.c
50gnunet_service_set_LDADD = \ 51gnunet_service_set_LDADD = \
51 $(top_builddir)/src/util/libgnunetutil.la \ 52 $(top_builddir)/src/util/libgnunetutil.la \
52 $(top_builddir)/src/core/libgnunetcore.la \ 53 $(top_builddir)/src/core/libgnunetcore.la \
53 $(top_builddir)/src/mesh/libgnunetmesh.la \ 54 $(top_builddir)/src/mesh/libgnunetmesh.la \
55 $(top_builddir)/src/block/libgnunetblock.la \
54 $(GN_LIBINTL) 56 $(GN_LIBINTL)
55 57
56libgnunetset_la_SOURCES = \ 58libgnunetset_la_SOURCES = \
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 490a3e5ce..3c5e80606 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -567,7 +567,7 @@ handle_incoming_msg (struct Operation *op,
567 spec->app_id = msg->app_id; 567 spec->app_id = msg->app_id;
568 spec->salt = ntohl (msg->salt); 568 spec->salt = ntohl (msg->salt);
569 spec->peer = op->state->peer; 569 spec->peer = op->state->peer;
570 spec->element_count = ntohl (msg->element_count); 570 spec->remote_element_count = ntohl (msg->element_count);
571 571
572 op->spec = spec; 572 op->spec = spec;
573 573
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index af3aa7287..d752fc5cb 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -93,7 +93,7 @@ struct OperationSpecification
93 /** 93 /**
94 * Remote peers element count 94 * Remote peers element count
95 */ 95 */
96 uint32_t element_count; 96 uint32_t remote_element_count;
97 97
98 /** 98 /**
99 * ID used to identify an operation between service and client 99 * ID used to identify an operation between service and client
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index 8631dd4a7..ce0e5c612 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -26,7 +26,7 @@
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_util_lib.h" 27#include "gnunet_util_lib.h"
28#include "gnunet-service-set.h" 28#include "gnunet-service-set.h"
29#include "strata_estimator.h" 29#include "gnunet_block_lib.h"
30#include "set_protocol.h" 30#include "set_protocol.h"
31#include <gcrypt.h> 31#include <gcrypt.h>
32 32
@@ -96,12 +96,12 @@ struct OperationState
96 /** 96 /**
97 * Current element count contained within contained_elements 97 * Current element count contained within contained_elements
98 */ 98 */
99 uint32_t my_elements_count; 99 uint32_t my_element_count;
100 100
101 /** 101 /**
102 * Iterator for sending elements on the key to element mapping to the client. 102 * Iterator for sending elements on the key to element mapping to the client.
103 */ 103 */
104 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter; 104 struct GNUNET_CONTAINER_MultiHashMapIterator *full_result_iter;
105 105
106 /** 106 /**
107 * Evaluate operations are held in 107 * Evaluate operations are held in
@@ -123,6 +123,18 @@ struct OperationState
123 123
124 124
125/** 125/**
126 * Extra state required for efficient set intersection.
127 */
128struct SetState
129{
130 /**
131 * Number of currently valid elements in the set which have not been removed
132 */
133 uint32_t current_set_element_count;
134};
135
136
137/**
126 * Alice's version: 138 * Alice's version:
127 * 139 *
128 * fills the contained-elements hashmap with all relevant 140 * fills the contained-elements hashmap with all relevant
@@ -154,7 +166,7 @@ iterator_initialization_by_alice (void *cls,
154 &mutated_hash)) 166 &mutated_hash))
155 return GNUNET_YES; 167 return GNUNET_YES;
156 168
157 op->state->my_elements_count++; 169 op->state->my_element_count++;
158 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements, 170 GNUNET_CONTAINER_multihashmap_put (op->state->my_elements,
159 &ee->element_hash, ee, 171 &ee->element_hash, ee,
160 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 172 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
@@ -203,33 +215,6 @@ iterator_initialization (void *cls,
203 return GNUNET_YES; 215 return GNUNET_YES;
204} 216}
205 217
206/**
207 * Counts all valid elements in the hashmap
208 * (the ones that are valid in our generation)
209 *
210 * @param cls closure
211 * @param key current key code
212 * @param value value in the hash map
213 * @return #GNUNET_YES if we should continue to
214 * iterate,
215 * #GNUNET_NO if not.
216 */
217static int
218iterator_element_count (void *cls,
219 const struct GNUNET_HashCode *key,
220 void *value){
221 struct ElementEntry *ee = value;
222 struct Operation *op = cls;
223
224 //only consider this element, if it is valid for us
225 if ((op->generation_created >= ee->generation_removed)
226 || (op->generation_created < ee->generation_added))
227 return GNUNET_YES;
228
229 op->state->my_elements_count++;
230
231 return GNUNET_YES;
232}
233 218
234/** 219/**
235 * removes element from a hashmap if it is not contained within the 220 * removes element from a hashmap if it is not contained within the
@@ -254,7 +239,7 @@ iterator_bf_round (void *cls,
254 239
255 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf, 240 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (op->state->remote_bf,
256 &mutated_hash)){ 241 &mutated_hash)){
257 op->state->my_elements_count--; 242 op->state->my_element_count--;
258 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements, 243 GNUNET_CONTAINER_multihashmap_remove (op->state->my_elements,
259 &ee->element_hash, 244 &ee->element_hash,
260 ee); 245 ee);
@@ -295,26 +280,6 @@ fail_intersection_operation (struct Operation *op)
295} 280}
296 281
297 282
298
299/**
300 * Inform the peer that this operation is complete.
301 *
302 * @param eo the intersection operation to fail
303 */
304static void
305send_peer_done (struct Operation *op)
306{
307 struct GNUNET_MQ_Envelope *ev;
308
309 op->state->phase = PHASE_FINISHED;
310 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
311 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
312 op->state->local_bf = NULL;
313
314 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
315 GNUNET_MQ_send (op->mq, ev);
316}
317
318/** 283/**
319 * Send a request for the evaluate operation to a remote peer 284 * Send a request for the evaluate operation to a remote peer
320 * 285 *
@@ -339,7 +304,7 @@ send_operation_request (struct Operation *op)
339 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION); 304 msg->operation = htonl (GNUNET_SET_OPERATION_INTERSECTION);
340 msg->app_id = op->spec->app_id; 305 msg->app_id = op->spec->app_id;
341 msg->salt = htonl (op->spec->salt); 306 msg->salt = htonl (op->spec->salt);
342 msg->element_count = htonl(op->state->my_elements); 307 msg->element_count = htonl(op->state->my_element_count);
343 308
344 GNUNET_MQ_send (op->mq, ev); 309 GNUNET_MQ_send (op->mq, ev);
345 310
@@ -357,6 +322,121 @@ send_operation_request (struct Operation *op)
357} 322}
358 323
359/** 324/**
325 * Send a bloomfilter to our peer.
326 * that the operation is over.
327 * After the result done message has been sent to the client,
328 * destroy the evaluate operation.
329 *
330 * @param eo intersection operation
331 */
332static void
333send_bloomfilter (struct Operation *op)
334{
335 struct GNUNET_MQ_Envelope *ev;
336 struct BFMessage *msg;
337 uint32_t bf_size;
338
339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n");
340
341 // send our bloomfilter
342 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (op->state->local_bf);
343
344 ev = GNUNET_MQ_msg_extra (msg, bf_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
345 msg->reserved = 0;
346 msg->sender_element_count = htonl (op->state->my_element_count);
347 msg->bloomfilter_length = htonl (bf_size);
348 msg->sender_mutator = htonl (op->spec->salt);
349 GNUNET_assert (GNUNET_SYSERR !=
350 GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
351 (char *) &msg[1],
352 bf_size));
353 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
354 op->state->local_bf = NULL;
355 GNUNET_MQ_send (op->mq, ev);
356}
357
358
359/**
360 * Signal to the client that the operation has finished and
361 * destroy the operation.
362 *
363 * @param cls operation to destroy
364 */
365static void
366send_client_done_and_destroy (void *cls)
367{
368 struct Operation *op = cls;
369 struct GNUNET_MQ_Envelope *ev;
370 struct GNUNET_SET_ResultMessage *rm;
371 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
372 rm->request_id = htonl (op->spec->client_request_id);
373 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
374 rm->element_type = htons (0);
375 GNUNET_MQ_send (op->spec->set->client_mq, ev);
376 _GSS_operation_destroy (op);
377}
378
379
380/**
381 * Send all elements in the full result iterator.
382 *
383 * @param cls operation
384 */
385static void
386send_remaining_elements (void *cls)
387{
388 struct Operation *op = cls;
389 struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
390 struct GNUNET_MQ_Envelope *ev;
391 struct GNUNET_SET_ResultMessage *rm;
392 struct GNUNET_SET_Element *element;
393 int res;
394
395 res = GNUNET_CONTAINER_multihashmap_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
396 if (GNUNET_NO == res) {
397 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
398 send_client_done_and_destroy (op);
399 return;
400 }
401
402 element = &remaining->element;
403 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
404 GNUNET_assert (0 != op->spec->client_request_id);
405
406 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
407 GNUNET_assert (NULL != ev);
408
409 rm->result_status = htons (GNUNET_SET_STATUS_OK);
410 rm->request_id = htonl (op->spec->client_request_id);
411 rm->element_type = element->type;
412 memcpy (&rm[1], element->data, element->size);
413
414 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
415 GNUNET_MQ_send (op->spec->set->client_mq, ev);
416}
417
418
419/**
420 * Inform the peer that this operation is complete.
421 *
422 * @param eo the intersection operation to fail
423 */
424static void
425send_peer_done (struct Operation *op)
426{
427 struct GNUNET_MQ_Envelope *ev;
428
429 op->state->phase = PHASE_FINISHED;
430 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Intersection succeeded, sending DONE\n");
431 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
432 op->state->local_bf = NULL;
433
434 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
435 GNUNET_MQ_send (op->mq, ev);
436}
437
438
439/**
360 * Handle an BF message from a remote peer. 440 * Handle an BF message from a remote peer.
361 * 441 *
362 * @param cls the intersection operation 442 * @param cls the intersection operation
@@ -366,13 +446,14 @@ static void
366handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh) 446handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
367{ 447{
368 struct Operation *op = cls; 448 struct Operation *op = cls;
369 struct BFMessage *msg = (struct BFMessage *) mh; 449 const struct BFMessage *msg = (const struct BFMessage *) mh;
370 uint32_t old_count; 450 uint32_t old_elements;
451 uint32_t peer_elements;
371 452
372 old_count = op->state->my_elements_count; 453 old_elements = op->state->my_element_count;
373 op->spec->salt = ntohl (msg->sender_mutator); 454 op->spec->salt = ntohl (msg->sender_mutator);
374 455
375 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init (&msg[1], 456 op->state->remote_bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &msg[1],
376 BLOOMFILTER_SIZE, 457 BLOOMFILTER_SIZE,
377 ntohl (msg->bloomfilter_length)); 458 ntohl (msg->bloomfilter_length));
378 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 459 op->state->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
@@ -382,7 +463,7 @@ handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
382 { 463 {
383 case PHASE_INITIAL: 464 case PHASE_INITIAL:
384 // If we are ot our first msg 465 // If we are ot our first msg
385 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_elements_count, GNUNET_YES); 466 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count, GNUNET_YES);
386 467
387 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements, 468 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
388 &iterator_initialization_by_alice, 469 &iterator_initialization_by_alice,
@@ -407,8 +488,10 @@ handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
407 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); 488 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
408 op->state->remote_bf = NULL; 489 op->state->remote_bf = NULL;
409 490
491 peer_elements = ntohl(msg->sender_element_count);
410 if ((op->state->phase == PHASE_MAYBE_FINISHED) 492 if ((op->state->phase == PHASE_MAYBE_FINISHED)
411 && (old_count == op->state->my_elements_count)){ 493 && (old_elements == op->state->my_element_count)
494 && (op->state->my_element_count == peer_elements)){
412 // In the last round we though we were finished, we now know this is correct 495 // In the last round we though we were finished, we now know this is correct
413 send_peer_done(op); 496 send_peer_done(op);
414 return; 497 return;
@@ -417,7 +500,7 @@ handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh)
417 op->state->phase = PHASE_BF_EXCHANGE; 500 op->state->phase = PHASE_BF_EXCHANGE;
418 // maybe we are finished, but we do one more round to make certain 501 // maybe we are finished, but we do one more round to make certain
419 // we don't have false positives ... 502 // we don't have false positives ...
420 if (op->state->my_elements_count == ntohl (msg->sender_element_count)) 503 if (op->state->my_element_count == peer_elements)
421 op->state->phase = PHASE_MAYBE_FINISHED; 504 op->state->phase = PHASE_MAYBE_FINISHED;
422 505
423 send_bloomfilter (op); 506 send_bloomfilter (op);
@@ -435,11 +518,10 @@ handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
435{ 518{
436 struct Operation *op = cls; 519 struct Operation *op = cls;
437 struct BFMessage *msg = (struct BFMessage *) mh; 520 struct BFMessage *msg = (struct BFMessage *) mh;
438 uint32_t remote_element_count;
439 521
440 remote_element_count = ntohl(msg->sender_element_count); 522 op->spec->remote_element_count = ntohl(msg->sender_element_count);
441 if ((op->state->phase == PHASE_INITIAL) 523 if ((op->state->phase != PHASE_INITIAL)
442 || (op->state->my_elements_count > remote_element_count)){ 524 || (op->state->my_element_count > op->spec->remote_element_count)){
443 GNUNET_break_op (0); 525 GNUNET_break_op (0);
444 fail_intersection_operation(op); 526 fail_intersection_operation(op);
445 } 527 }
@@ -457,7 +539,7 @@ handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
457 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf); 539 GNUNET_CONTAINER_bloomfilter_free (op->state->remote_bf);
458 op->state->remote_bf = NULL; 540 op->state->remote_bf = NULL;
459 541
460 if (op->state->my_elements_count == ntohl (msg->sender_element_count)) 542 if (op->state->my_element_count == ntohl (msg->sender_element_count))
461 op->state->phase = PHASE_MAYBE_FINISHED; 543 op->state->phase = PHASE_MAYBE_FINISHED;
462 544
463 send_bloomfilter (op); 545 send_bloomfilter (op);
@@ -465,40 +547,6 @@ handle_p2p_element_info (void *cls, const struct GNUNET_MessageHeader *mh)
465 547
466 548
467/** 549/**
468 * Send a bloomfilter to our peer.
469 * that the operation is over.
470 * After the result done message has been sent to the client,
471 * destroy the evaluate operation.
472 *
473 * @param eo intersection operation
474 */
475static void
476send_bloomfilter (struct Operation *op)
477{
478 struct GNUNET_MQ_Envelope *ev;
479 struct BFMessage *msg;
480 uint32_t bf_size;
481
482 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending bf of size %u\n",);
483
484 // send our bloomfilter
485 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (op->state->local_bf);
486
487 ev = GNUNET_MQ_msg_extra (msg, bf_size, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF);
488 msg->reserved = 0;
489 msg->sender_element_count = htonl (op->state->my_elements_count);
490 msg->bloomfilter_length = htonl (bf_size);
491 msg->sender_mutator = htonl (op->spec->salt);
492 GNUNET_assert (GNUNET_SYSERR !=
493 GNUNET_CONTAINER_bloomfilter_get_raw_data (op->state->local_bf,
494 &msg->sender_bf_data,
495 BLOOMFILTER_SIZE));
496 GNUNET_CONTAINER_bloomfilter_free (op->state->local_bf);
497 op->state->local_bf = NULL;
498 GNUNET_MQ_send (op->mq, ev);
499}
500
501/**
502 * Send our element to the peer, in case our element count is lower than his 550 * Send our element to the peer, in case our element count is lower than his
503 * 551 *
504 * @param eo intersection operation 552 * @param eo intersection operation
@@ -514,7 +562,7 @@ send_element_count (struct Operation *op)
514 // just send our element count, as the other peer must start 562 // just send our element count, as the other peer must start
515 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO); 563 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO);
516 msg->reserved = 0; 564 msg->reserved = 0;
517 msg->sender_element_count = htonl (op->state->my_elements_count); 565 msg->sender_element_count = htonl (op->state->my_element_count);
518 msg->bloomfilter_length = htonl (0); 566 msg->bloomfilter_length = htonl (0);
519 msg->sender_mutator = htonl (0); 567 msg->sender_mutator = htonl (0);
520 568
@@ -538,7 +586,7 @@ finish_and_destroy (struct Operation *op)
538 { 586 {
539 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n"); 587 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
540 op->state->full_result_iter = 588 op->state->full_result_iter =
541 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->my_elements); 589 GNUNET_CONTAINER_multihashmap_iterator_create (op->state->my_elements);
542 send_remaining_elements (op); 590 send_remaining_elements (op);
543 return; 591 return;
544 } 592 }
@@ -555,7 +603,6 @@ static void
555handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) 603handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
556{ 604{
557 struct Operation *op = cls; 605 struct Operation *op = cls;
558 struct GNUNET_MQ_Envelope *ev;
559 606
560 if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){ 607 if ((op->state->phase = PHASE_FINISHED) || (op->state->phase = PHASE_MAYBE_FINISHED)){
561 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); 608 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
@@ -582,9 +629,7 @@ intersection_evaluate (struct Operation *op)
582 /* we started the operation, thus we have to send the operation request */ 629 /* we started the operation, thus we have to send the operation request */
583 op->state->phase = PHASE_INITIAL; 630 op->state->phase = PHASE_INITIAL;
584 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES); 631 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
585 GNUNET_CONTAINER_multihashmap_iterate(op->spec->set->elements, 632 op->state->my_element_count = op->spec->set->state->current_set_element_count;
586 &iterator_element_count,
587 op);
588 633
589 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating intersection operation"); 634 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating intersection operation");
590 send_operation_request (op); 635 send_operation_request (op);
@@ -602,11 +647,10 @@ intersection_accept (struct Operation *op)
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); 647 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
603 op->state = GNUNET_new (struct OperationState); 648 op->state = GNUNET_new (struct OperationState);
604 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES); 649 op->state->my_elements = GNUNET_CONTAINER_multihashmap_create(1, GNUNET_YES);
605 GNUNET_CONTAINER_multihashmap_iterate(op->spec->set->elements, 650 op->state->my_element_count = op->spec->set->state->current_set_element_count;
606 &iterator_element_count, 651
607 op);
608 // if Alice (the peer) has more elements than Bob (us), she should start 652 // if Alice (the peer) has more elements than Bob (us), she should start
609 if (op->spec->element_count < op->state->my_elements_count){ 653 if (op->spec->remote_element_count < op->state->my_element_count){
610 op->state->phase = PHASE_INITIAL; 654 op->state->phase = PHASE_INITIAL;
611 send_element_count(op); 655 send_element_count(op);
612 return; 656 return;
@@ -636,6 +680,7 @@ intersection_set_create (void)
636 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n"); 680 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n");
637 681
638 set_state = GNUNET_new (struct SetState); 682 set_state = GNUNET_new (struct SetState);
683 set_state->current_set_element_count = 0;
639 684
640 return set_state; 685 return set_state;
641} 686}
@@ -650,7 +695,8 @@ intersection_set_create (void)
650static void 695static void
651intersection_add (struct SetState *set_state, struct ElementEntry *ee) 696intersection_add (struct SetState *set_state, struct ElementEntry *ee)
652{ 697{
653 //nothing to do here 698 GNUNET_assert(0 < set_state->current_set_element_count);
699 set_state->current_set_element_count++;
654} 700}
655 701
656 702
@@ -675,6 +721,8 @@ intersection_set_destroy (struct SetState *set_state)
675static void 721static void
676intersection_remove (struct SetState *set_state, struct ElementEntry *element) 722intersection_remove (struct SetState *set_state, struct ElementEntry *element)
677{ 723{
724 GNUNET_assert(0 < set_state->current_set_element_count);
725 set_state->current_set_element_count--;
678 //nothing to do here 726 //nothing to do here
679} 727}
680 728
@@ -714,62 +762,6 @@ intersection_handle_p2p_message (struct Operation *op,
714 return GNUNET_OK; 762 return GNUNET_OK;
715} 763}
716 764
717/**
718 * Signal to the client that the operation has finished and
719 * destroy the operation.
720 *
721 * @param cls operation to destroy
722 */
723static void
724send_client_done_and_destroy (void *cls)
725{
726 struct Operation *op = cls;
727 struct GNUNET_MQ_Envelope *ev;
728 struct GNUNET_SET_ResultMessage *rm;
729 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
730 rm->request_id = htonl (op->spec->client_request_id);
731 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
732 rm->element_type = htons (0);
733 GNUNET_MQ_send (op->spec->set->client_mq, ev);
734 _GSS_operation_destroy (op);
735}
736/**
737 * Send all elements in the full result iterator.
738 *
739 * @param cls operation
740 */
741static void
742send_remaining_elements (void *cls)
743{
744 struct Operation *op = cls;
745 struct ElementEntry *remaining; //TODO rework this, key entry does not exist here
746 struct GNUNET_MQ_Envelope *ev;
747 struct GNUNET_SET_ResultMessage *rm;
748 struct GNUNET_SET_Element *element;
749 int res;
750
751 res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &remaining);
752 if (GNUNET_NO == res) {
753 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
754 send_client_done_and_destroy (op);
755 return;
756 }
757
758 element = &remaining->element;
759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
760 GNUNET_assert (0 != op->spec->client_request_id);
761
762 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
763 GNUNET_assert (NULL != ev);
764
765 rm->result_status = htons (GNUNET_SET_STATUS_OK);
766 rm->request_id = htonl (op->spec->client_request_id);
767 rm->element_type = element->type;
768 memcpy (&rm[1], element->data, element->size);
769
770 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
771 GNUNET_MQ_send (op->spec->set->client_mq, ev);
772}
773 765
774/** 766/**
775 * handler for peer-disconnects, notifies the client about the aborted operation 767 * handler for peer-disconnects, notifies the client about the aborted operation