aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-11-05 00:08:13 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-11-05 00:08:13 +0000
commitca2c7bdfa64a30c0013598f0718dcfe7e6d98b2a (patch)
tree3bedd0e18f88371c2e75bd1953e0bc321629c828 /src/set
parent6c3bf6b3486fd31402ab991f5ddef76bf9cd93c4 (diff)
downloadgnunet-ca2c7bdfa64a30c0013598f0718dcfe7e6d98b2a.tar.gz
gnunet-ca2c7bdfa64a30c0013598f0718dcfe7e6d98b2a.zip
- implemented missing set functionality
- secretsharing api changes
Diffstat (limited to 'src/set')
-rw-r--r--src/set/Makefile.am11
-rw-r--r--src/set/gnunet-service-set.c387
-rw-r--r--src/set/gnunet-service-set.h99
-rw-r--r--src/set/gnunet-service-set_union.c690
-rw-r--r--src/set/ibf.h14
-rw-r--r--src/set/strata_estimator.h49
-rw-r--r--src/set/test_set.conf1
-rw-r--r--src/set/test_set_union_result_full.c255
8 files changed, 1010 insertions, 496 deletions
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index 878ff0cbd..72d3d82a0 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -63,7 +63,7 @@ libgnunetset_la_LDFLAGS = \
63 63
64if HAVE_TESTING 64if HAVE_TESTING
65check_PROGRAMS = \ 65check_PROGRAMS = \
66 test_set_api 66 test_set_api test_set_union_result_full
67endif 67endif
68 68
69if ENABLE_TEST_RUN 69if ENABLE_TEST_RUN
@@ -79,6 +79,15 @@ test_set_api_LDADD = \
79test_set_api_DEPENDENCIES = \ 79test_set_api_DEPENDENCIES = \
80 libgnunetset.la 80 libgnunetset.la
81 81
82test_set_union_result_full_SOURCES = \
83 test_set_union_result_full.c
84test_set_union_result_full_LDADD = \
85 $(top_builddir)/src/util/libgnunetutil.la \
86 $(top_builddir)/src/testing/libgnunettesting.la \
87 $(top_builddir)/src/set/libgnunetset.la
88test_set_union_result_full_DEPENDENCIES = \
89 libgnunetset.la
90
82EXTRA_DIST = \ 91EXTRA_DIST = \
83 test_set.conf 92 test_set.conf
84 93
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 2e951c3f2..7eb3fdb30 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -28,41 +28,20 @@
28 28
29 29
30/** 30/**
31 * Peer that has connected to us, but is not yet evaluating a set operation. 31 * State of an operation where the peer has connected to us, but is not yet
32 * Once the peer has sent a request, and the client has 32 * evaluating a set operation. Once the peer has sent a concrete request, and
33 * accepted or rejected it, this information will be deleted. 33 * the client has accepted or rejected it, this information will be deleted
34 * and replaced by the real set operation state.
34 */ 35 */
35struct Incoming 36struct OperationState
36{ 37{
37 /** 38 /**
38 * Incoming peers are held in a linked list
39 */
40 struct Incoming *next;
41
42 /**
43 * Incoming peers are held in a linked list
44 */
45 struct Incoming *prev;
46
47 /**
48 * Detail information about the operation.
49 * NULL as long as we did not receive the operation
50 * request from the remote peer.
51 */
52 struct OperationSpecification *spec;
53
54 /**
55 * The identity of the requesting peer. Needs to 39 * The identity of the requesting peer. Needs to
56 * be stored here as the op spec might not have been created yet. 40 * be stored here as the op spec might not have been created yet.
57 */ 41 */
58 struct GNUNET_PeerIdentity peer; 42 struct GNUNET_PeerIdentity peer;
59 43
60 /** 44 /**
61 * Tunnel to the peer.
62 */
63 struct GNUNET_MESH_Tunnel *tunnel;
64
65 /**
66 * Unique request id for the request from 45 * Unique request id for the request from
67 * a remote peer, sent to the client, which will 46 * a remote peer, sent to the client, which will
68 * accept or reject the request. 47 * accept or reject the request.
@@ -76,12 +55,6 @@ struct Incoming
76 * after the timeout, it will be disconnected. 55 * after the timeout, it will be disconnected.
77 */ 56 */
78 GNUNET_SCHEDULER_TaskIdentifier timeout_task; 57 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
79
80 /**
81 * Tunnel context, needs to be stored here as a client's accept will change
82 * the tunnel context.
83 */
84 struct TunnelContext *tc;
85}; 58};
86 59
87 60
@@ -160,13 +133,13 @@ static struct Listener *listeners_tail;
160 * Incoming sockets from remote peers are 133 * Incoming sockets from remote peers are
161 * held in a doubly linked list. 134 * held in a doubly linked list.
162 */ 135 */
163static struct Incoming *incoming_head; 136static struct Operation *incoming_head;
164 137
165/** 138/**
166 * Incoming sockets from remote peers are 139 * Incoming sockets from remote peers are
167 * held in a doubly linked list. 140 * held in a doubly linked list.
168 */ 141 */
169static struct Incoming *incoming_tail; 142static struct Operation *incoming_tail;
170 143
171/** 144/**
172 * Counter for allocating unique IDs for clients, 145 * Counter for allocating unique IDs for clients,
@@ -221,14 +194,14 @@ listener_get (struct GNUNET_SERVER_Client *client)
221 * @return the incoming socket associated with the id, 194 * @return the incoming socket associated with the id,
222 * or NULL if there is none 195 * or NULL if there is none
223 */ 196 */
224static struct Incoming * 197static struct Operation *
225get_incoming (uint32_t id) 198get_incoming (uint32_t id)
226{ 199{
227 struct Incoming *incoming; 200 struct Operation *op;
228 201
229 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) 202 for (op = incoming_head; NULL != op; op = op)
230 if (incoming->suggest_id == id) 203 if (op->state->suggest_id == id)
231 return incoming; 204 return op;
232 return NULL; 205 return NULL;
233} 206}
234 207
@@ -261,7 +234,8 @@ listener_destroy (struct Listener *listener)
261 234
262 235
263/** 236/**
264 * Iterator over hash map entries. 237 * Iterator over hash map entries to free
238 * element entries.
265 * 239 *
266 * @param cls closure 240 * @param cls closure
267 * @param key current key code 241 * @param key current key code
@@ -283,6 +257,100 @@ destroy_elements_iterator (void *cls,
283 257
284 258
285/** 259/**
260 * Collect and destroy elements that are not needed anymore, because
261 * their lifetime (as determined by their generation) does not overlap with any active
262 * set operation.
263 */
264void
265collect_generation_garbage (struct Set *set)
266{
267 struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
268 struct ElementEntry *ee;
269 struct GNUNET_CONTAINER_MultiHashMap *new_elements;
270 int res;
271 struct Operation *op;
272
273 new_elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
274 iter = GNUNET_CONTAINER_multihashmap_iterator_create (set->elements);
275 while (GNUNET_OK ==
276 (res = GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &ee)))
277 {
278 if (GNUNET_NO == ee->removed)
279 goto still_needed;
280 for (op = set->ops_head; NULL != op; op = op->next)
281 if ( (op->generation_created >= ee->generation_added) &&
282 (op->generation_created < ee->generation_removed) )
283 goto still_needed;
284 GNUNET_free (ee);
285 continue;
286still_needed:
287 // we don't expect collisions, thus the replace option
288 GNUNET_CONTAINER_multihashmap_put (new_elements, &ee->element_hash, ee,
289 GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
290 }
291 GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
292 GNUNET_CONTAINER_multihashmap_destroy (set->elements);
293 set->elements = new_elements;
294}
295
296
297/**
298 * Destroy the given operation. Call the implementation-specific cancel function
299 * of the operation. Disconnects from the remote peer.
300 * Does not disconnect the client, as there may be multiple operations per set.
301 *
302 * @param op operation to destroy
303 */
304void
305_GSS_operation_destroy (struct Operation *op)
306{
307 struct Set *set;
308
309 if (NULL == op->vt)
310 return;
311
312 set = op->spec->set;
313
314 GNUNET_assert (GNUNET_NO == op->is_incoming);
315 GNUNET_assert (NULL != op->spec);
316 GNUNET_CONTAINER_DLL_remove (op->spec->set->ops_head,
317 op->spec->set->ops_tail,
318 op);
319
320 op->vt->cancel (op);
321 op->vt = NULL;
322
323 if (NULL != op->spec)
324 {
325 if (NULL != op->spec->context_msg)
326 {
327 GNUNET_free (op->spec->context_msg);
328 op->spec->context_msg = NULL;
329 }
330 GNUNET_free (op->spec);
331 op->spec = NULL;
332 }
333
334 if (NULL != op->mq)
335 {
336 GNUNET_MQ_destroy (op->mq);
337 op->mq = NULL;
338 }
339
340 if (NULL != op->tunnel)
341 {
342 GNUNET_MESH_tunnel_destroy (op->tunnel);
343 op->tunnel = NULL;
344 }
345
346 collect_generation_garbage (set);
347
348 /* We rely on the tunnel end handler to free 'op'. When 'op->tunnel' was NULL,
349 * there was a tunnel end handler that will free 'op' on the call stack. */
350}
351
352
353/**
286 * Destroy a set, and free all resources associated with it. 354 * Destroy a set, and free all resources associated with it.
287 * 355 *
288 * @param set the set to destroy 356 * @param set the set to destroy
@@ -302,6 +370,8 @@ set_destroy (struct Set *set)
302 return; 370 return;
303 } 371 }
304 GNUNET_assert (NULL != set->state); 372 GNUNET_assert (NULL != set->state);
373 while (NULL != set->ops_head)
374 _GSS_operation_destroy (set->ops_head);
305 set->vt->destroy_set (set->state); 375 set->vt->destroy_set (set->state);
306 set->state = NULL; 376 set->state = NULL;
307 if (NULL != set->client_mq) 377 if (NULL != set->client_mq)
@@ -364,25 +434,40 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
364 * @param incoming remote request to destroy 434 * @param incoming remote request to destroy
365 */ 435 */
366static void 436static void
367incoming_destroy (struct Incoming *incoming) 437incoming_destroy (struct Operation *incoming)
368{ 438{
439 GNUNET_assert (GNUNET_YES == incoming->is_incoming);
369 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); 440 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
370 if (GNUNET_SCHEDULER_NO_TASK != incoming->timeout_task) 441 if (GNUNET_SCHEDULER_NO_TASK != incoming->state->timeout_task)
371 { 442 {
372 GNUNET_SCHEDULER_cancel (incoming->timeout_task); 443 GNUNET_SCHEDULER_cancel (incoming->state->timeout_task);
373 incoming->timeout_task = GNUNET_SCHEDULER_NO_TASK; 444 incoming->state->timeout_task = GNUNET_SCHEDULER_NO_TASK;
374 } 445 }
375 if (NULL != incoming->tunnel) 446 GNUNET_free (incoming->state);
376 {
377 struct GNUNET_MESH_Tunnel *t = incoming->tunnel;
378 incoming->tunnel = NULL;
379 GNUNET_MESH_tunnel_destroy (t);
380 return;
381 }
382 GNUNET_free (incoming);
383} 447}
384 448
385 449
450static void
451incoming_retire (struct Operation *incoming)
452{
453 GNUNET_assert (NULL != incoming->spec);
454 GNUNET_assert (GNUNET_YES == incoming->is_incoming);
455 incoming->is_incoming = GNUNET_NO;
456 GNUNET_free (incoming->state);
457 incoming->state = NULL;
458 GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
459}
460
461
462/**
463 * Find a listener that is interested in the given operation type
464 * and application id.
465 *
466 * @param op operation type to look for
467 * @param app_id application id to look for
468 * @return a matching listener, or NULL if no listener matches the
469 * given operation and application id
470 */
386static struct Listener * 471static struct Listener *
387listener_get_by_target (enum GNUNET_SET_OperationType op, 472listener_get_by_target (enum GNUNET_SET_OperationType op,
388 const struct GNUNET_HashCode *app_id) 473 const struct GNUNET_HashCode *app_id)
@@ -409,23 +494,24 @@ listener_get_by_target (enum GNUNET_SET_OperationType op,
409 * @param listener the listener to suggest the request to 494 * @param listener the listener to suggest the request to
410 */ 495 */
411static void 496static void
412incoming_suggest (struct Incoming *incoming, struct Listener *listener) 497incoming_suggest (struct Operation *incoming, struct Listener *listener)
413{ 498{
414 struct GNUNET_MQ_Envelope *mqm; 499 struct GNUNET_MQ_Envelope *mqm;
415 struct GNUNET_SET_RequestMessage *cmsg; 500 struct GNUNET_SET_RequestMessage *cmsg;
416 501
417 GNUNET_assert (0 == incoming->suggest_id);
418 GNUNET_assert (NULL != incoming->spec); 502 GNUNET_assert (NULL != incoming->spec);
419 incoming->suggest_id = suggest_id++; 503 GNUNET_assert (0 == incoming->state->suggest_id);
504 incoming->state->suggest_id = suggest_id++;
420 505
421 GNUNET_SCHEDULER_cancel (incoming->timeout_task); 506 GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != incoming->state->timeout_task);
422 incoming->timeout_task = GNUNET_SCHEDULER_NO_TASK; 507 GNUNET_SCHEDULER_cancel (incoming->state->timeout_task);
508 incoming->state->timeout_task = GNUNET_SCHEDULER_NO_TASK;
423 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, 509 mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST,
424 incoming->spec->context_msg); 510 incoming->spec->context_msg);
425 GNUNET_assert (NULL != mqm); 511 GNUNET_assert (NULL != mqm);
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "suggesting request with accept id %u\n", 512 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "suggesting request with accept id %u\n",
427 incoming->suggest_id); 513 incoming->state->suggest_id);
428 cmsg->accept_id = htonl (incoming->suggest_id); 514 cmsg->accept_id = htonl (incoming->state->suggest_id);
429 cmsg->peer_id = incoming->spec->peer; 515 cmsg->peer_id = incoming->spec->peer;
430 GNUNET_MQ_send (listener->client_mq, mqm); 516 GNUNET_MQ_send (listener->client_mq, mqm);
431} 517}
@@ -441,10 +527,9 @@ incoming_suggest (struct Incoming *incoming, struct Listener *listener)
441 * GNUNET_SYSERR to destroy the tunnel 527 * GNUNET_SYSERR to destroy the tunnel
442 */ 528 */
443static int 529static int
444handle_incoming_msg (struct OperationState *op, 530handle_incoming_msg (struct Operation *op,
445 const struct GNUNET_MessageHeader *mh) 531 const struct GNUNET_MessageHeader *mh)
446{ 532{
447 struct Incoming *incoming = (struct Incoming *) op;
448 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; 533 const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
449 struct Listener *listener; 534 struct Listener *listener;
450 struct OperationSpecification *spec; 535 struct OperationSpecification *spec;
@@ -457,7 +542,7 @@ handle_incoming_msg (struct OperationState *op,
457 return GNUNET_SYSERR; 542 return GNUNET_SYSERR;
458 } 543 }
459 544
460 if (NULL != incoming->spec) 545 if (NULL != op->spec)
461 { 546 {
462 /* double operation request */ 547 /* double operation request */
463 GNUNET_break_op (0); 548 GNUNET_break_op (0);
@@ -471,9 +556,9 @@ handle_incoming_msg (struct OperationState *op,
471 spec->operation = ntohl (msg->operation); 556 spec->operation = ntohl (msg->operation);
472 spec->app_id = msg->app_id; 557 spec->app_id = msg->app_id;
473 spec->salt = ntohl (msg->salt); 558 spec->salt = ntohl (msg->salt);
474 spec->peer = incoming->peer; 559 spec->peer = op->state->peer;
475 560
476 incoming->spec = spec; 561 op->spec = spec;
477 562
478 if ( (NULL != spec->context_msg) && 563 if ( (NULL != spec->context_msg) &&
479 (ntohs (spec->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) 564 (ntohs (spec->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
@@ -491,11 +576,19 @@ handle_incoming_msg (struct OperationState *op,
491 "no listener matches incoming request, waiting with timeout\n"); 576 "no listener matches incoming request, waiting with timeout\n");
492 return GNUNET_OK; 577 return GNUNET_OK;
493 } 578 }
494 incoming_suggest (incoming, listener); 579 incoming_suggest (op, listener);
495 return GNUNET_OK; 580 return GNUNET_OK;
496} 581}
497 582
498 583
584/**
585 * Send the next element of a set to the set's client. The next element is given by
586 * the set's current hashmap iterator. The set's iterator will be set to NULL if there
587 * are no more elements in the set. The caller must ensure that the set's iterator is
588 * valid.
589 *
590 * @param set set that should send its next element to its client
591 */
499static void 592static void
500send_client_element (struct Set *set) 593send_client_element (struct Set *set)
501{ 594{
@@ -508,6 +601,8 @@ send_client_element (struct Set *set)
508 if (GNUNET_NO == ret) 601 if (GNUNET_NO == ret)
509 { 602 {
510 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE); 603 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
604 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
605 set->iter = NULL;
511 } 606 }
512 else 607 else
513 { 608 {
@@ -588,7 +683,8 @@ handle_client_create (void *cls,
588 switch (ntohs (msg->operation)) 683 switch (ntohs (msg->operation))
589 { 684 {
590 case GNUNET_SET_OPERATION_INTERSECTION: 685 case GNUNET_SET_OPERATION_INTERSECTION:
591// set->vt = _GSS_intersection_vt (); 686 // FIXME: implement intersection vt
687 // set->vt = _GSS_intersection_vt ();
592 break; 688 break;
593 case GNUNET_SET_OPERATION_UNION: 689 case GNUNET_SET_OPERATION_UNION:
594 set->vt = _GSS_union_vt (); 690 set->vt = _GSS_union_vt ();
@@ -623,7 +719,7 @@ handle_client_listen (void *cls,
623{ 719{
624 struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m; 720 struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m;
625 struct Listener *listener; 721 struct Listener *listener;
626 struct Incoming *incoming; 722 struct Operation *op;
627 723
628 if (NULL != listener_get (client)) 724 if (NULL != listener_get (client))
629 { 725 {
@@ -639,24 +735,26 @@ handle_client_listen (void *cls,
639 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); 735 GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
640 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n", 736 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app %s)\n",
641 listener->operation, GNUNET_h2s (&listener->app_id)); 737 listener->operation, GNUNET_h2s (&listener->app_id));
642 for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) 738 /* check for incoming requests the listener is interested in */
739 for (op = incoming_head; NULL != op; op = op->next)
643 { 740 {
644 if (NULL == incoming->spec) 741 if (NULL == op->spec)
645 { 742 {
646 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request has no spec yet\n"); 743 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request has no spec yet\n");
647 continue; 744 continue;
648 } 745 }
649 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considering (op: %u, app: %s, suggest: %u)\n", 746 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considering (op: %u, app: %s, suggest: %u)\n",
650 incoming->spec->operation, GNUNET_h2s (&incoming->spec->app_id), incoming->suggest_id); 747 op->spec->operation, GNUNET_h2s (&op->spec->app_id), op->state->suggest_id);
651 748
652 if (0 != incoming->suggest_id) 749 /* don't consider the incoming request if it has been already suggested to a listener */
750 if (0 != op->state->suggest_id)
653 continue; 751 continue;
654 if (listener->operation != incoming->spec->operation) 752 if (listener->operation != op->spec->operation)
655 continue; 753 continue;
656 if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->spec->app_id)) 754 if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &op->spec->app_id))
657 continue; 755 continue;
658 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request suggested\n"); 756 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request suggested\n");
659 incoming_suggest (incoming, listener); 757 incoming_suggest (op, listener);
660 } 758 }
661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considered all incoming requests\n"); 759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considered all incoming requests\n");
662 GNUNET_SERVER_receive_done (client, GNUNET_OK); 760 GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -676,8 +774,9 @@ handle_client_reject (void *cls,
676 struct GNUNET_SERVER_Client *client, 774 struct GNUNET_SERVER_Client *client,
677 const struct GNUNET_MessageHeader *m) 775 const struct GNUNET_MessageHeader *m)
678{ 776{
679 struct Incoming *incoming; 777 struct Operation *incoming;
680 const struct GNUNET_SET_AcceptRejectMessage *msg; 778 const struct GNUNET_SET_AcceptRejectMessage *msg;
779 struct GNUNET_MESH_Tunnel *tunnel;
681 780
682 msg = (const struct GNUNET_SET_AcceptRejectMessage *) m; 781 msg = (const struct GNUNET_SET_AcceptRejectMessage *) m;
683 GNUNET_break (0 == ntohl (msg->request_id)); 782 GNUNET_break (0 == ntohl (msg->request_id));
@@ -689,13 +788,17 @@ handle_client_reject (void *cls,
689 return; 788 return;
690 } 789 }
691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "peer request rejected by client\n"); 790 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "peer request rejected by client\n");
692 GNUNET_MESH_tunnel_destroy (incoming->tunnel); 791 /* set the incoming's tunnel to NULL so that we don't accidentally destroy
792 * the tunnel again. */
793 tunnel = incoming->tunnel;
794 incoming->tunnel = NULL;
795 GNUNET_MESH_tunnel_destroy (tunnel);
693 GNUNET_SERVER_receive_done (client, GNUNET_OK); 796 GNUNET_SERVER_receive_done (client, GNUNET_OK);
694} 797}
695 798
696 799
697/** 800/**
698 * Called when a client wants to add an element to a 801 * Called when a client wants to add/remove an element to/from a
699 * set it inhabits. 802 * set it inhabits.
700 * 803 *
701 * @param cls unused 804 * @param cls unused
@@ -784,10 +887,9 @@ handle_client_evaluate (void *cls,
784 const struct GNUNET_MessageHeader *m) 887 const struct GNUNET_MessageHeader *m)
785{ 888{
786 struct Set *set; 889 struct Set *set;
787 struct TunnelContext *tc;
788 struct GNUNET_MESH_Tunnel *tunnel;
789 struct GNUNET_SET_EvaluateMessage *msg; 890 struct GNUNET_SET_EvaluateMessage *msg;
790 struct OperationSpecification *spec; 891 struct OperationSpecification *spec;
892 struct Operation *op;
791 893
792 set = set_get (client); 894 set = set_get (client);
793 if (NULL == set) 895 if (NULL == set)
@@ -798,7 +900,6 @@ handle_client_evaluate (void *cls,
798 } 900 }
799 901
800 msg = (struct GNUNET_SET_EvaluateMessage *) m; 902 msg = (struct GNUNET_SET_EvaluateMessage *) m;
801 tc = GNUNET_new (struct TunnelContext);
802 spec = GNUNET_new (struct OperationSpecification); 903 spec = GNUNET_new (struct OperationSpecification);
803 spec->operation = set->operation; 904 spec->operation = set->operation;
804 spec->app_id = msg->app_id; 905 spec->app_id = msg->app_id;
@@ -811,13 +912,20 @@ handle_client_evaluate (void *cls,
811 if (NULL != spec->context_msg) 912 if (NULL != spec->context_msg)
812 spec->context_msg = GNUNET_copy_message (spec->context_msg); 913 spec->context_msg = GNUNET_copy_message (spec->context_msg);
813 914
814 tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer, 915 op = GNUNET_new (struct Operation);
815 GNUNET_APPLICATION_TYPE_SET, 916 op->spec = spec;
816 GNUNET_YES, 917 op->generation_created = set->current_generation++;
817 GNUNET_YES); 918 op->vt = set->vt;
919 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
818 920
819 set->vt->evaluate (spec, tunnel, tc); 921 op->tunnel = GNUNET_MESH_tunnel_create (mesh, op, &msg->target_peer,
922 GNUNET_APPLICATION_TYPE_SET,
923 GNUNET_YES,
924 GNUNET_YES);
820 925
926 op->mq = GNUNET_MESH_mq_create (op->tunnel);
927
928 set->vt->evaluate (op);
821 GNUNET_SERVER_receive_done (client, GNUNET_OK); 929 GNUNET_SERVER_receive_done (client, GNUNET_OK);
822} 930}
823 931
@@ -857,8 +965,8 @@ handle_client_iter_ack (void *cls,
857 965
858 966
859/** 967/**
860 * Handle a request from the client to accept 968 * Handle a request from the client to
861 * a set operation that came from a remote peer. 969 * cancel a running set operation.
862 * 970 *
863 * @param cls unused 971 * @param cls unused
864 * @param client the client 972 * @param client the client
@@ -872,6 +980,8 @@ handle_client_cancel (void *cls,
872 const struct GNUNET_SET_CancelMessage *msg = 980 const struct GNUNET_SET_CancelMessage *msg =
873 (const struct GNUNET_SET_CancelMessage *) mh; 981 (const struct GNUNET_SET_CancelMessage *) mh;
874 struct Set *set; 982 struct Set *set;
983 struct Operation *op;
984 int found;
875 985
876 set = set_get (client); 986 set = set_get (client);
877 if (NULL == set) 987 if (NULL == set)
@@ -880,8 +990,24 @@ handle_client_cancel (void *cls,
880 GNUNET_SERVER_client_disconnect (client); 990 GNUNET_SERVER_client_disconnect (client);
881 return; 991 return;
882 } 992 }
883 /* FIXME: maybe cancel should return success/error code? */ 993 found = GNUNET_NO;
884 set->vt->cancel (set->state, ntohl (msg->request_id)); 994 for (op = set->ops_head; NULL != op; op = op->next)
995 {
996 if (op->spec->client_request_id == msg->request_id)
997 {
998 found = GNUNET_YES;
999 break;
1000 }
1001 }
1002
1003 if (GNUNET_NO == found)
1004 {
1005 GNUNET_break (0);
1006 GNUNET_SERVER_client_disconnect (client);
1007 return;
1008 }
1009
1010 _GSS_operation_destroy (op);
885} 1011}
886 1012
887 1013
@@ -899,20 +1025,22 @@ handle_client_accept (void *cls,
899 const struct GNUNET_MessageHeader *mh) 1025 const struct GNUNET_MessageHeader *mh)
900{ 1026{
901 struct Set *set; 1027 struct Set *set;
902 struct Incoming *incoming;
903 struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; 1028 struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh;
1029 struct Operation *op;
904 1030
905 incoming = get_incoming (ntohl (msg->accept_reject_id)); 1031 op = get_incoming (ntohl (msg->accept_reject_id));
906
907 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client accepting %u\n", ntohl (msg->accept_reject_id));
908 1032
909 if (NULL == incoming) 1033 if (NULL == op)
910 { 1034 {
911 GNUNET_break (0); 1035 GNUNET_break (0);
912 GNUNET_SERVER_client_disconnect (client); 1036 GNUNET_SERVER_client_disconnect (client);
913 return; 1037 return;
914 } 1038 }
915 1039
1040 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client accepting %u\n", ntohl (msg->accept_reject_id));
1041
1042 GNUNET_assert (GNUNET_YES == op->is_incoming);
1043
916 set = set_get (client); 1044 set = set_get (client);
917 1045
918 if (NULL == set) 1046 if (NULL == set)
@@ -922,13 +1050,21 @@ handle_client_accept (void *cls,
922 return; 1050 return;
923 } 1051 }
924 1052
925 incoming->spec->set = set; 1053 op->spec->set = set;
926 incoming->spec->client_request_id = ntohl (msg->request_id); 1054
927 incoming->spec->result_mode = ntohs (msg->result_mode); 1055 incoming_retire (op);
928 set->vt->accept (incoming->spec, incoming->tunnel, incoming->tc); 1056
929 /* tunnel ownership goes to operation */ 1057 GNUNET_assert (NULL != op->spec->set);
930 incoming->tunnel = NULL; 1058 GNUNET_assert (NULL != op->spec->set->vt);
931 incoming_destroy (incoming); 1059
1060 GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, op);
1061
1062 op->spec->client_request_id = ntohl (msg->request_id);
1063 op->spec->result_mode = ntohs (msg->result_mode);
1064 op->generation_created = set->current_generation++;
1065 op->vt = op->spec->set->vt;
1066 GNUNET_assert (NULL != op->vt->accept);
1067 set->vt->accept (op);
932 GNUNET_SERVER_receive_done (client, GNUNET_OK); 1068 GNUNET_SERVER_receive_done (client, GNUNET_OK);
933} 1069}
934 1070
@@ -952,9 +1088,8 @@ shutdown_task (void *cls,
952 while (NULL != sets_head) 1088 while (NULL != sets_head)
953 set_destroy (sets_head); 1089 set_destroy (sets_head);
954 1090
955 1091 /* it's important to destroy mesh at the end, as all tunnels
956 /* it's important to destroy mesh at the end, as tunnels 1092 * must be destroyed before the mesh handle! */
957 * must be destroyed first! */
958 if (NULL != mesh) 1093 if (NULL != mesh)
959 { 1094 {
960 GNUNET_MESH_disconnect (mesh); 1095 GNUNET_MESH_disconnect (mesh);
@@ -966,7 +1101,8 @@ shutdown_task (void *cls,
966 1101
967 1102
968/** 1103/**
969 * Signature of the main function of a task. 1104 * Handle an incoming peer timeout, that is, disconnect a peer if
1105 * has not requested an operation for some amount of time.
970 * 1106 *
971 * @param cls closure 1107 * @param cls closure
972 * @param tc context information (why was this task triggered now) 1108 * @param tc context information (why was this task triggered now)
@@ -975,7 +1111,9 @@ static void
975incoming_timeout_cb (void *cls, 1111incoming_timeout_cb (void *cls,
976 const struct GNUNET_SCHEDULER_TaskContext *tc) 1112 const struct GNUNET_SCHEDULER_TaskContext *tc)
977{ 1113{
978 struct Incoming *incoming = cls; 1114 struct Operation *incoming = cls;
1115
1116 GNUNET_assert (GNUNET_YES == incoming->is_incoming);
979 1117
980 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) 1118 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
981 return; 1119 return;
@@ -986,13 +1124,12 @@ incoming_timeout_cb (void *cls,
986 1124
987 1125
988static void 1126static void
989handle_incoming_disconnect (struct OperationState *op_state) 1127handle_incoming_disconnect (struct Operation *op)
990{ 1128{
991 struct Incoming *incoming = (struct Incoming *) op_state; 1129 if (NULL == op->tunnel)
992 if (NULL == incoming->tunnel)
993 return; 1130 return;
994 1131
995 incoming_destroy (incoming); 1132 incoming_destroy (op);
996} 1133}
997 1134
998 1135
@@ -1017,7 +1154,7 @@ tunnel_new_cb (void *cls,
1017 const struct GNUNET_PeerIdentity *initiator, 1154 const struct GNUNET_PeerIdentity *initiator,
1018 uint32_t port) 1155 uint32_t port)
1019{ 1156{
1020 struct Incoming *incoming; 1157 struct Operation *incoming;
1021 static const struct SetVT incoming_vt = { 1158 static const struct SetVT incoming_vt = {
1022 .msg_handler = handle_incoming_msg, 1159 .msg_handler = handle_incoming_msg,
1023 .peer_disconnect = handle_incoming_disconnect 1160 .peer_disconnect = handle_incoming_disconnect
@@ -1026,17 +1163,18 @@ tunnel_new_cb (void *cls,
1026 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new incoming tunnel\n"); 1163 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new incoming tunnel\n");
1027 1164
1028 GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); 1165 GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
1029 incoming = GNUNET_new (struct Incoming); 1166 incoming = GNUNET_new (struct Operation);
1030 incoming->peer = *initiator; 1167 incoming->is_incoming = GNUNET_YES;
1168 incoming->state = GNUNET_new (struct OperationState);
1169 incoming->state->peer = *initiator;
1031 incoming->tunnel = tunnel; 1170 incoming->tunnel = tunnel;
1032 incoming->tc = GNUNET_new (struct TunnelContext);; 1171 incoming->mq = GNUNET_MESH_mq_create (incoming->tunnel);
1033 incoming->tc->vt = &incoming_vt; 1172 incoming->vt = &incoming_vt;
1034 incoming->tc->op = (struct OperationState *) incoming; 1173 incoming->state->timeout_task =
1035 incoming->timeout_task =
1036 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); 1174 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming);
1037 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); 1175 GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
1038 1176
1039 return incoming->tc; 1177 return incoming;
1040} 1178}
1041 1179
1042 1180
@@ -1055,9 +1193,14 @@ static void
1055tunnel_end_cb (void *cls, 1193tunnel_end_cb (void *cls,
1056 const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx) 1194 const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx)
1057{ 1195{
1058 struct TunnelContext *ctx = tunnel_ctx; 1196 struct Operation *op = tunnel_ctx;
1197
1198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tunnel end cb called\n");
1199
1200 op->tunnel = NULL;
1059 1201
1060 ctx->vt->peer_disconnect (ctx->op); 1202 if (NULL != op->vt)
1203 op->vt->peer_disconnect (op);
1061 /* mesh will never call us with the context again! */ 1204 /* mesh will never call us with the context again! */
1062 GNUNET_free (tunnel_ctx); 1205 GNUNET_free (tunnel_ctx);
1063} 1206}
@@ -1085,14 +1228,14 @@ dispatch_p2p_message (void *cls,
1085 void **tunnel_ctx, 1228 void **tunnel_ctx,
1086 const struct GNUNET_MessageHeader *message) 1229 const struct GNUNET_MessageHeader *message)
1087{ 1230{
1088 struct TunnelContext *tc = *tunnel_ctx; 1231 struct Operation *op = *tunnel_ctx;
1089 int ret; 1232 int ret;
1090 1233
1091 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n", 1234 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n",
1092 ntohs (message->type)); 1235 ntohs (message->type));
1093 /* do this before the handler, as the handler might kill the tunnel */ 1236 /* do this before the handler, as the handler might kill the tunnel */
1094 GNUNET_MESH_receive_done (tunnel); 1237 GNUNET_MESH_receive_done (tunnel);
1095 ret = tc->vt->msg_handler (tc->op, message); 1238 ret = op->vt->msg_handler (op, message);
1096 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n", 1239 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n",
1097 ntohs (message->type)); 1240 ntohs (message->type));
1098 return ret; 1241 return ret;
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index 7c2363e9f..7a2c5ba8d 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -55,8 +55,8 @@ struct OperationState;
55 55
56/* forward declarations */ 56/* forward declarations */
57struct Set; 57struct Set;
58struct TunnelContext;
59struct ElementEntry; 58struct ElementEntry;
59struct Operation;
60 60
61 61
62/** 62/**
@@ -135,7 +135,7 @@ typedef void (*AddRemoveImpl) (struct SetState *state, struct ElementEntry *ee);
135 * 135 *
136 * @param op the set operation, contains implementation-specific data 136 * @param op the set operation, contains implementation-specific data
137 */ 137 */
138typedef void (*PeerDisconnectImpl) (struct OperationState *op); 138typedef void (*PeerDisconnectImpl) (struct Operation *op);
139 139
140 140
141/** 141/**
@@ -151,13 +151,9 @@ typedef void (*DestroySetImpl) (struct SetState *state);
151 * Signature of functions that implement the creation of set operations 151 * Signature of functions that implement the creation of set operations
152 * (currently evaluate and accept). 152 * (currently evaluate and accept).
153 * 153 *
154 * @param spec specification of the set operation to be created 154 * @param op operation that is created, should be initialized by the implementation
155 * @param tunnel the tunnel with the other peer
156 * @param tc tunnel context
157 */ 155 */
158typedef void (*OpCreateImpl) (struct OperationSpecification *spec, 156typedef void (*OpCreateImpl) (struct Operation *op);
159 struct GNUNET_MESH_Tunnel *tunnel,
160 struct TunnelContext *tc);
161 157
162 158
163/** 159/**
@@ -169,11 +165,10 @@ typedef void (*OpCreateImpl) (struct OperationSpecification *spec,
169 * @return GNUNET_OK on success, GNUNET_SYSERR to 165 * @return GNUNET_OK on success, GNUNET_SYSERR to
170 * destroy the operation and the tunnel 166 * destroy the operation and the tunnel
171 */ 167 */
172typedef int (*MsgHandlerImpl) (struct OperationState *op, 168typedef int (*MsgHandlerImpl) (struct Operation *op,
173 const struct GNUNET_MessageHeader *msg); 169 const struct GNUNET_MessageHeader *msg);
174 170
175typedef void (*CancelImpl) (struct SetState *set, 171typedef void (*CancelImpl) (struct Operation *op);
176 uint32_t request_id);
177 172
178 173
179/** 174/**
@@ -263,6 +258,7 @@ struct ElementEntry
263 258
264 /** 259 /**
265 * Hash of the element. 260 * Hash of the element.
261 * For set union:
266 * Will be used to derive the different IBF keys 262 * Will be used to derive the different IBF keys
267 * for different salts. 263 * for different salts.
268 */ 264 */
@@ -294,6 +290,63 @@ struct ElementEntry
294}; 290};
295 291
296 292
293struct Operation
294{
295 /**
296 * V-Table for the operation belonging
297 * to the tunnel contest.
298 */
299 const struct SetVT *vt;
300
301 /**
302 * Tunnel to the peer.
303 */
304 struct GNUNET_MESH_Tunnel *tunnel;
305
306 /**
307 * Message queue for the tunnel.
308 */
309 struct GNUNET_MQ_Handle *mq;
310
311 /**
312 * GNUNET_YES if this is not a "real" set operation yet, and we still
313 * need to wait for the other peer to give us more details.
314 */
315 int is_incoming;
316
317 /**
318 * Generation in which the operation handle
319 * was created.
320 */
321 unsigned int generation_created;
322
323 /**
324 * Detail information about the set operation,
325 * including the set to use.
326 * When 'spec' is NULL, the operation is not yet entirely
327 * initialized.
328 */
329 struct OperationSpecification *spec;
330
331 /**
332 * Operation-specific operation state.
333 */
334 struct OperationState *state;
335
336 /**
337 * Evaluate operations are held in
338 * a linked list.
339 */
340 struct Operation *next;
341
342 /**
343 * Evaluate operations are held in
344 * a linked list.
345 */
346 struct Operation *prev;
347};
348
349
297/** 350/**
298 * A set that supports a specific operation 351 * A set that supports a specific operation
299 * with other peers. 352 * with other peers.
@@ -353,28 +406,25 @@ struct Set
353 * previously executed operations on this set 406 * previously executed operations on this set
354 */ 407 */
355 unsigned int current_generation; 408 unsigned int current_generation;
356};
357 409
358
359/**
360 * Information about a tunnel we are connected to.
361 * Used as tunnel context with mesh.
362 */
363struct TunnelContext
364{
365 /** 410 /**
366 * V-Table for the operation belonging 411 * Evaluate operations are held in
367 * to the tunnel contest. 412 * a linked list.
368 */ 413 */
369 const struct SetVT *vt; 414 struct Operation *ops_head;
370 415
371 /** 416 /**
372 * Implementation-specific operation state. 417 * Evaluate operations are held in
418 * a linked list.
373 */ 419 */
374 struct OperationState *op; 420 struct Operation *ops_tail;
375}; 421};
376 422
377 423
424void
425_GSS_operation_destroy (struct Operation *op);
426
427
378/** 428/**
379 * Get the table with implementing functions for 429 * Get the table with implementing functions for
380 * set union. 430 * set union.
@@ -382,6 +432,7 @@ struct TunnelContext
382const struct SetVT * 432const struct SetVT *
383_GSS_union_vt (void); 433_GSS_union_vt (void);
384 434
435
385/** 436/**
386 * Get the table with implementing functions for 437 * Get the table with implementing functions for
387 * set intersection. 438 * set intersection.
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 33a36d260..6ad985bcb 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -112,12 +112,6 @@ struct OperationState
112 struct GNUNET_MESH_Tunnel *tunnel; 112 struct GNUNET_MESH_Tunnel *tunnel;
113 113
114 /** 114 /**
115 * Detail information about the set operation,
116 * including the set to use.
117 */
118 struct OperationSpecification *spec;
119
120 /**
121 * Message queue for the peer. 115 * Message queue for the peer.
122 */ 116 */
123 struct GNUNET_MQ_Handle *mq; 117 struct GNUNET_MQ_Handle *mq;
@@ -151,33 +145,14 @@ struct OperationState
151 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; 145 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
152 146
153 /** 147 /**
154 * Current state of the operation. 148 * Iterator for sending elements on the key to element mapping to the client.
155 */
156 enum UnionOperationPhase phase;
157
158 /**
159 * Generation in which the operation handle
160 * was created.
161 */
162 unsigned int generation_created;
163
164 /**
165 * Set state of the set that this operation
166 * belongs to.
167 */ 149 */
168 struct Set *set; 150 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
169 151
170 /** 152 /**
171 * Evaluate operations are held in 153 * Current state of the operation.
172 * a linked list.
173 */ 154 */
174 struct OperationState *next; 155 enum UnionOperationPhase phase;
175
176 /**
177 * Evaluate operations are held in
178 * a linked list.
179 */
180 struct OperationState *prev;
181 156
182 /** 157 /**
183 * Did we send the client that we are done? 158 * Did we send the client that we are done?
@@ -198,13 +173,13 @@ struct KeyEntry
198 struct IBF_Key ibf_key; 173 struct IBF_Key ibf_key;
199 174
200 /** 175 /**
201 * The actual element associated with the key 176 * The actual element associated with the key.
202 */ 177 */
203 struct ElementEntry *element; 178 struct ElementEntry *element;
204 179
205 /** 180 /**
206 * Element that collides with this element 181 * Element that collides with this element
207 * on the ibf key 182 * on the ibf key. All colliding entries must have the same ibf key.
208 */ 183 */
209 struct KeyEntry *next_colliding; 184 struct KeyEntry *next_colliding;
210}; 185};
@@ -226,7 +201,7 @@ struct SendElementClosure
226 * Operation for which the elements 201 * Operation for which the elements
227 * should be sent. 202 * should be sent.
228 */ 203 */
229 struct OperationState *eo; 204 struct Operation *op;
230}; 205};
231 206
232 207
@@ -242,18 +217,6 @@ struct SetState
242 * salt=0. 217 * salt=0.
243 */ 218 */
244 struct StrataEstimator *se; 219 struct StrataEstimator *se;
245
246 /**
247 * Evaluate operations are held in
248 * a linked list.
249 */
250 struct OperationState *ops_head;
251
252 /**
253 * Evaluate operations are held in
254 * a linked list.
255 */
256 struct OperationState *ops_tail;
257}; 220};
258 221
259 222
@@ -263,9 +226,9 @@ struct SetState
263 * @param cls closure 226 * @param cls closure
264 * @param key current key code 227 * @param key current key code
265 * @param value value in the hash map 228 * @param value value in the hash map
266 * @return GNUNET_YES if we should continue to 229 * @return #GNUNET_YES if we should continue to
267 * iterate, 230 * iterate,
268 * GNUNET_NO if not. 231 * #GNUNET_NO if not.
269 */ 232 */
270static int 233static int
271destroy_key_to_element_iter (void *cls, 234destroy_key_to_element_iter (void *cls,
@@ -290,65 +253,40 @@ destroy_key_to_element_iter (void *cls,
290 253
291 254
292/** 255/**
293 * Destroy a union operation, and free all resources 256 * Destroy the union operation. Only things specific to the union operation are destroyed.
294 * associated with it. 257 *
295 * 258 * @param op union operation to destroy
296 * @param eo the union operation to destroy
297 */ 259 */
298static void 260static void
299union_operation_destroy (struct OperationState *eo) 261union_op_cancel (struct Operation *op)
300{ 262{
301 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n"); 263 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
302 GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head, 264 /* check if the op was canceled twice */
303 eo->set->state->ops_tail, 265 GNUNET_assert (NULL != op->state);
304 eo); 266 if (NULL != op->state->remote_ibf)
305 if (NULL != eo->mq)
306 {
307 GNUNET_MQ_destroy (eo->mq);
308 eo->mq = NULL;
309 }
310 if (NULL != eo->tunnel)
311 {
312 struct GNUNET_MESH_Tunnel *t = eo->tunnel;
313 eo->tunnel = NULL;
314 GNUNET_MESH_tunnel_destroy (t);
315 }
316 if (NULL != eo->remote_ibf)
317 { 267 {
318 ibf_destroy (eo->remote_ibf); 268 ibf_destroy (op->state->remote_ibf);
319 eo->remote_ibf = NULL; 269 op->state->remote_ibf = NULL;
320 } 270 }
321 if (NULL != eo->local_ibf) 271 if (NULL != op->state->local_ibf)
322 { 272 {
323 ibf_destroy (eo->local_ibf); 273 ibf_destroy (op->state->local_ibf);
324 eo->local_ibf = NULL; 274 op->state->local_ibf = NULL;
325 } 275 }
326 if (NULL != eo->se) 276 if (NULL != op->state->se)
327 { 277 {
328 strata_estimator_destroy (eo->se); 278 strata_estimator_destroy (op->state->se);
329 eo->se = NULL; 279 op->state->se = NULL;
330 } 280 }
331 if (NULL != eo->key_to_element) 281 if (NULL != op->state->key_to_element)
332 { 282 {
333 GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL); 283 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL);
334 GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); 284 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
335 eo->key_to_element = NULL; 285 op->state->key_to_element = NULL;
336 } 286 }
337 if (NULL != eo->spec) 287 GNUNET_free (op->state);
338 { 288 op->state = NULL;
339 if (NULL != eo->spec->context_msg)
340 {
341 GNUNET_free (eo->spec->context_msg);
342 eo->spec->context_msg = NULL;
343 }
344 GNUNET_free (eo->spec);
345 eo->spec = NULL;
346 }
347 GNUNET_free (eo);
348
349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n"); 289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
350
351 /* FIXME: do a garbage collection of the set generations */
352} 290}
353 291
354 292
@@ -356,20 +294,22 @@ union_operation_destroy (struct OperationState *eo)
356 * Inform the client that the union operation has failed, 294 * Inform the client that the union operation has failed,
357 * and proceed to destroy the evaluate operation. 295 * and proceed to destroy the evaluate operation.
358 * 296 *
359 * @param eo the union operation to fail 297 * @param op the union operation to fail
360 */ 298 */
361static void 299static void
362fail_union_operation (struct OperationState *eo) 300fail_union_operation (struct Operation *op)
363{ 301{
364 struct GNUNET_MQ_Envelope *ev; 302 struct GNUNET_MQ_Envelope *ev;
365 struct GNUNET_SET_ResultMessage *msg; 303 struct GNUNET_SET_ResultMessage *msg;
366 304
305 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n");
306
367 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 307 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
368 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 308 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
369 msg->request_id = htonl (eo->spec->client_request_id); 309 msg->request_id = htonl (op->spec->client_request_id);
370 msg->element_type = htons (0); 310 msg->element_type = htons (0);
371 GNUNET_MQ_send (eo->spec->set->client_mq, ev); 311 GNUNET_MQ_send (op->spec->set->client_mq, ev);
372 union_operation_destroy (eo); 312 _GSS_operation_destroy (op);
373} 313}
374 314
375 315
@@ -382,7 +322,7 @@ fail_union_operation (struct OperationState *eo)
382 * @return the derived IBF key 322 * @return the derived IBF key
383 */ 323 */
384static struct IBF_Key 324static struct IBF_Key
385get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) 325get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt)
386{ 326{
387 struct IBF_Key key; 327 struct IBF_Key key;
388 328
@@ -398,40 +338,39 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
398/** 338/**
399 * Send a request for the evaluate operation to a remote peer 339 * Send a request for the evaluate operation to a remote peer
400 * 340 *
401 * @param eo operation with the other peer 341 * @param op operation with the other peer
402 */ 342 */
403static void 343static void
404send_operation_request (struct OperationState *eo) 344send_operation_request (struct Operation *op)
405{ 345{
406 struct GNUNET_MQ_Envelope *ev; 346 struct GNUNET_MQ_Envelope *ev;
407 struct OperationRequestMessage *msg; 347 struct OperationRequestMessage *msg;
408 348
409 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 349 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
410 eo->spec->context_msg); 350 op->spec->context_msg);
411 351
412 if (NULL == ev) 352 if (NULL == ev)
413 { 353 {
414 /* the context message is too large */ 354 /* the context message is too large */
415 GNUNET_break (0); 355 GNUNET_break (0);
416 GNUNET_SERVER_client_disconnect (eo->spec->set->client); 356 GNUNET_SERVER_client_disconnect (op->spec->set->client);
417 return; 357 return;
418 } 358 }
419 msg->operation = htonl (GNUNET_SET_OPERATION_UNION); 359 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
420 msg->app_id = eo->spec->app_id; 360 msg->app_id = op->spec->app_id;
421 msg->salt = htonl (eo->spec->salt); 361 msg->salt = htonl (op->spec->salt);
422 GNUNET_MQ_send (eo->mq, ev); 362 GNUNET_MQ_send (op->mq, ev);
423 363
424 if (NULL != eo->spec->context_msg) 364 if (NULL != op->spec->context_msg)
425 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); 365 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
426 else 366 else
427 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); 367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
428 368
429 if (NULL != eo->spec->context_msg) 369 if (NULL != op->spec->context_msg)
430 { 370 {
431 GNUNET_free (eo->spec->context_msg); 371 GNUNET_free (op->spec->context_msg);
432 eo->spec->context_msg = NULL; 372 op->spec->context_msg = NULL;
433 } 373 }
434
435} 374}
436 375
437 376
@@ -442,34 +381,89 @@ send_operation_request (struct OperationState *eo)
442 * @param cls closure 381 * @param cls closure
443 * @param key current key code 382 * @param key current key code
444 * @param value value in the hash map 383 * @param value value in the hash map
445 * @return GNUNET_YES if we should continue to 384 * @return #GNUNET_YES if we should continue to
446 * iterate, 385 * iterate,
447 * GNUNET_NO if not. 386 * #GNUNET_NO if not.
448 */ 387 */
449static int 388static int
450op_register_element_iterator (void *cls, 389op_register_element_iterator (void *cls,
451 uint32_t key, 390 uint32_t key,
452 void *value) 391 void *value)
453{ 392{
454 struct KeyEntry *const new_k = cls; 393 struct KeyEntry *const new_k = cls;
455 struct KeyEntry *old_k = value; 394 struct KeyEntry *old_k = value;
456 395
457 GNUNET_assert (NULL != old_k); 396 GNUNET_assert (NULL != old_k);
458 do 397 /* check if our ibf key collides with the ibf key in the existing entry */
398 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
459 { 399 {
460 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) 400 /* insert the the new key in the collision chain */
461 { 401 new_k->next_colliding = old_k->next_colliding;
462 new_k->next_colliding = old_k->next_colliding; 402 old_k->next_colliding = new_k;
463 old_k->next_colliding = new_k; 403 /* signal to the caller that we were able to insert into a colliding bucket */
404 return GNUNET_NO;
405 }
406 return GNUNET_YES;
407}
408
409
410/**
411 * Iterator to create the mapping between ibf keys
412 * and element entries.
413 *
414 * @param cls closure
415 * @param key current key code
416 * @param value value in the hash map
417 * @return #GNUNET_YES if we should continue to
418 * iterate,
419 * #GNUNET_NO if not.
420 */
421static int
422op_has_element_iterator (void *cls,
423 uint32_t key,
424 void *value)
425{
426 struct GNUNET_HashCode *element_hash = cls;
427 struct KeyEntry *k = value;
428
429 GNUNET_assert (NULL != k);
430 while (NULL != k)
431 {
432 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash))
464 return GNUNET_NO; 433 return GNUNET_NO;
465 } 434 k = k->next_colliding;
466 old_k = old_k->next_colliding; 435 }
467 } while (NULL != old_k);
468 return GNUNET_YES; 436 return GNUNET_YES;
469} 437}
470 438
471 439
472/** 440/**
441 * Determine whether the given element is already in the operation's element
442 * set.
443 *
444 * @param op operation that should be tested for 'element_hash'
445 * @param element_hash hash of the element to look for
446 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
447 */
448static int
449op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash)
450{
451 int ret;
452 struct IBF_Key ibf_key;
453
454 ibf_key = get_ibf_key (element_hash, op->spec->salt);
455 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
456 (uint32_t) ibf_key.key_val,
457 op_has_element_iterator, (void *) element_hash);
458
459 /* was the iteration aborted because we found the element? */
460 if (GNUNET_SYSERR == ret)
461 return GNUNET_YES;
462 return GNUNET_NO;
463}
464
465
466/**
473 * Insert an element into the union operation's 467 * Insert an element into the union operation's
474 * key-to-element mapping. Takes ownership of 'ee'. 468 * key-to-element mapping. Takes ownership of 'ee'.
475 * Note that this does not insert the element in the set, 469 * Note that this does not insert the element in the set,
@@ -477,21 +471,21 @@ op_register_element_iterator (void *cls,
477 * This is done to speed up re-tried operations, if some elements 471 * This is done to speed up re-tried operations, if some elements
478 * were transmitted, and then the IBF fails to decode. 472 * were transmitted, and then the IBF fails to decode.
479 * 473 *
480 * @param eo the union operation 474 * @param op the union operation
481 * @param ee the element entry 475 * @param ee the element entry
482 */ 476 */
483static void 477static void
484op_register_element (struct OperationState *eo, struct ElementEntry *ee) 478op_register_element (struct Operation *op, struct ElementEntry *ee)
485{ 479{
486 int ret; 480 int ret;
487 struct IBF_Key ibf_key; 481 struct IBF_Key ibf_key;
488 struct KeyEntry *k; 482 struct KeyEntry *k;
489 483
490 ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt); 484 ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
491 k = GNUNET_new (struct KeyEntry); 485 k = GNUNET_new (struct KeyEntry);
492 k->element = ee; 486 k->element = ee;
493 k->ibf_key = ibf_key; 487 k->ibf_key = ibf_key;
494 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, 488 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
495 (uint32_t) ibf_key.key_val, 489 (uint32_t) ibf_key.key_val,
496 op_register_element_iterator, k); 490 op_register_element_iterator, k);
497 491
@@ -499,7 +493,7 @@ op_register_element (struct OperationState *eo, struct ElementEntry *ee)
499 if (GNUNET_SYSERR == ret) 493 if (GNUNET_SYSERR == ret)
500 return; 494 return;
501 495
502 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, 496 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k,
503 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 497 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
504} 498}
505 499
@@ -542,19 +536,19 @@ init_key_to_element_iterator (void *cls,
542 const struct GNUNET_HashCode *key, 536 const struct GNUNET_HashCode *key,
543 void *value) 537 void *value)
544{ 538{
545 struct OperationState *eo = cls; 539 struct Operation *op = cls;
546 struct ElementEntry *e = value; 540 struct ElementEntry *e = value;
547 541
548 /* make sure that the element belongs to the set at the time 542 /* make sure that the element belongs to the set at the time
549 * of creating the operation */ 543 * of creating the operation */
550 if ( (e->generation_added > eo->generation_created) || 544 if ( (e->generation_added > op->generation_created) ||
551 ( (GNUNET_YES == e->removed) && 545 ( (GNUNET_YES == e->removed) &&
552 (e->generation_removed < eo->generation_created))) 546 (e->generation_removed < op->generation_created)))
553 return GNUNET_YES; 547 return GNUNET_YES;
554 548
555 GNUNET_assert (GNUNET_NO == e->remote); 549 GNUNET_assert (GNUNET_NO == e->remote);
556 550
557 op_register_element (eo, e); 551 op_register_element (op, e);
558 return GNUNET_YES; 552 return GNUNET_YES;
559} 553}
560 554
@@ -563,45 +557,45 @@ init_key_to_element_iterator (void *cls,
563 * Create an ibf with the operation's elements 557 * Create an ibf with the operation's elements
564 * of the specified size 558 * of the specified size
565 * 559 *
566 * @param eo the union operation 560 * @param op the union operation
567 * @param size size of the ibf to create 561 * @param size size of the ibf to create
568 */ 562 */
569static void 563static void
570prepare_ibf (struct OperationState *eo, uint16_t size) 564prepare_ibf (struct Operation *op, uint16_t size)
571{ 565{
572 if (NULL == eo->key_to_element) 566 if (NULL == op->state->key_to_element)
573 { 567 {
574 unsigned int len; 568 unsigned int len;
575 len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements); 569 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
576 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); 570 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
577 GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements, 571 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
578 init_key_to_element_iterator, eo); 572 init_key_to_element_iterator, op);
579 } 573 }
580 if (NULL != eo->local_ibf) 574 if (NULL != op->state->local_ibf)
581 ibf_destroy (eo->local_ibf); 575 ibf_destroy (op->state->local_ibf);
582 eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 576 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
583 GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, 577 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
584 prepare_ibf_iterator, eo->local_ibf); 578 prepare_ibf_iterator, op->state->local_ibf);
585} 579}
586 580
587 581
588/** 582/**
589 * Send an ibf of appropriate size. 583 * Send an ibf of appropriate size.
590 * 584 *
591 * @param eo the union operation 585 * @param op the union operation
592 * @param ibf_order order of the ibf to send, size=2^order 586 * @param ibf_order order of the ibf to send, size=2^order
593 */ 587 */
594static void 588static void
595send_ibf (struct OperationState *eo, uint16_t ibf_order) 589send_ibf (struct Operation *op, uint16_t ibf_order)
596{ 590{
597 unsigned int buckets_sent = 0; 591 unsigned int buckets_sent = 0;
598 struct InvertibleBloomFilter *ibf; 592 struct InvertibleBloomFilter *ibf;
599 593
600 prepare_ibf (eo, 1<<ibf_order); 594 prepare_ibf (op, 1<<ibf_order);
601 595
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order); 596 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
603 597
604 ibf = eo->local_ibf; 598 ibf = op->state->local_ibf;
605 599
606 while (buckets_sent < (1 << ibf_order)) 600 while (buckets_sent < (1 << ibf_order))
607 { 601 {
@@ -624,20 +618,20 @@ send_ibf (struct OperationState *eo, uint16_t ibf_order)
624 buckets_sent += buckets_in_message; 618 buckets_sent += buckets_in_message;
625 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", 619 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
626 buckets_in_message, buckets_sent, 1<<ibf_order); 620 buckets_in_message, buckets_sent, 1<<ibf_order);
627 GNUNET_MQ_send (eo->mq, ev); 621 GNUNET_MQ_send (op->mq, ev);
628 } 622 }
629 623
630 eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; 624 op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
631} 625}
632 626
633 627
634/** 628/**
635 * Send a strata estimator to the remote peer. 629 * Send a strata estimator to the remote peer.
636 * 630 *
637 * @param eo the union operation with the remote peer 631 * @param op the union operation with the remote peer
638 */ 632 */
639static void 633static void
640send_strata_estimator (struct OperationState *eo) 634send_strata_estimator (struct Operation *op)
641{ 635{
642 struct GNUNET_MQ_Envelope *ev; 636 struct GNUNET_MQ_Envelope *ev;
643 struct GNUNET_MessageHeader *strata_msg; 637 struct GNUNET_MessageHeader *strata_msg;
@@ -645,9 +639,9 @@ send_strata_estimator (struct OperationState *eo)
645 ev = GNUNET_MQ_msg_header_extra (strata_msg, 639 ev = GNUNET_MQ_msg_header_extra (strata_msg,
646 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, 640 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
647 GNUNET_MESSAGE_TYPE_SET_P2P_SE); 641 GNUNET_MESSAGE_TYPE_SET_P2P_SE);
648 strata_estimator_write (eo->set->state->se, &strata_msg[1]); 642 strata_estimator_write (op->state->se, &strata_msg[1]);
649 GNUNET_MQ_send (eo->mq, ev); 643 GNUNET_MQ_send (op->mq, ev);
650 eo->phase = PHASE_EXPECT_IBF; 644 op->state->phase = PHASE_EXPECT_IBF;
651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); 645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
652} 646}
653 647
@@ -682,27 +676,27 @@ get_order_from_difference (unsigned int diff)
682static void 676static void
683handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) 677handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
684{ 678{
685 struct OperationState *eo = cls; 679 struct Operation *op = cls;
686 struct StrataEstimator *remote_se; 680 struct StrataEstimator *remote_se;
687 int diff; 681 int diff;
688 682
689 if (eo->phase != PHASE_EXPECT_SE) 683 if (op->state->phase != PHASE_EXPECT_SE)
690 { 684 {
691 fail_union_operation (eo); 685 fail_union_operation (op);
692 GNUNET_break (0); 686 GNUNET_break (0);
693 return; 687 return;
694 } 688 }
695 remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, 689 remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
696 SE_IBF_HASH_NUM); 690 SE_IBF_HASH_NUM);
697 strata_estimator_read (&mh[1], remote_se); 691 strata_estimator_read (&mh[1], remote_se);
698 GNUNET_assert (NULL != eo->se); 692 GNUNET_assert (NULL != op->state->se);
699 diff = strata_estimator_difference (remote_se, eo->se); 693 diff = strata_estimator_difference (remote_se, op->state->se);
700 strata_estimator_destroy (remote_se); 694 strata_estimator_destroy (remote_se);
701 strata_estimator_destroy (eo->se); 695 strata_estimator_destroy (op->state->se);
702 eo->se = NULL; 696 op->state->se = NULL;
703 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", 697 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
704 diff, 1<<get_order_from_difference (diff)); 698 diff, 1<<get_order_from_difference (diff));
705 send_ibf (eo, get_order_from_difference (diff)); 699 send_ibf (op, get_order_from_difference (diff));
706} 700}
707 701
708 702
@@ -721,7 +715,7 @@ send_element_iterator (void *cls,
721{ 715{
722 struct SendElementClosure *sec = cls; 716 struct SendElementClosure *sec = cls;
723 struct IBF_Key ibf_key = sec->ibf_key; 717 struct IBF_Key ibf_key = sec->ibf_key;
724 struct OperationState *eo = sec->eo; 718 struct Operation *op = sec->op;
725 struct KeyEntry *ke = value; 719 struct KeyEntry *ke = value;
726 720
727 if (ke->ibf_key.key_val != ibf_key.key_val) 721 if (ke->ibf_key.key_val != ibf_key.key_val)
@@ -743,7 +737,7 @@ send_element_iterator (void *cls,
743 memcpy (&mh[1], element->data, element->size); 737 memcpy (&mh[1], element->data, element->size);
744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n", 738 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
745 GNUNET_h2s (&ke->element->element_hash)); 739 GNUNET_h2s (&ke->element->element_hash));
746 GNUNET_MQ_send (eo->mq, ev); 740 GNUNET_MQ_send (op->mq, ev);
747 ke = ke->next_colliding; 741 ke = ke->next_colliding;
748 } 742 }
749 return GNUNET_NO; 743 return GNUNET_NO;
@@ -753,17 +747,17 @@ send_element_iterator (void *cls,
753 * Send all elements that have the specified IBF key 747 * Send all elements that have the specified IBF key
754 * to the remote peer of the union operation 748 * to the remote peer of the union operation
755 * 749 *
756 * @param eo union operation 750 * @param op union operation
757 * @param ibf_key IBF key of interest 751 * @param ibf_key IBF key of interest
758 */ 752 */
759static void 753static void
760send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key) 754send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key)
761{ 755{
762 struct SendElementClosure send_cls; 756 struct SendElementClosure send_cls;
763 757
764 send_cls.ibf_key = ibf_key; 758 send_cls.ibf_key = ibf_key;
765 send_cls.eo = eo; 759 send_cls.op = op;
766 GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val, 760 GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val,
767 &send_element_iterator, &send_cls); 761 &send_element_iterator, &send_cls);
768} 762}
769 763
@@ -772,10 +766,10 @@ send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
772 * Decode which elements are missing on each side, and 766 * Decode which elements are missing on each side, and
773 * send the appropriate elemens and requests 767 * send the appropriate elemens and requests
774 * 768 *
775 * @param eo union operation 769 * @param op union operation
776 */ 770 */
777static void 771static void
778decode_and_send (struct OperationState *eo) 772decode_and_send (struct Operation *op)
779{ 773{
780 struct IBF_Key key; 774 struct IBF_Key key;
781 struct IBF_Key last_key; 775 struct IBF_Key last_key;
@@ -783,14 +777,14 @@ decode_and_send (struct OperationState *eo)
783 unsigned int num_decoded; 777 unsigned int num_decoded;
784 struct InvertibleBloomFilter *diff_ibf; 778 struct InvertibleBloomFilter *diff_ibf;
785 779
786 GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase); 780 GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
787 781
788 prepare_ibf (eo, eo->remote_ibf->size); 782 prepare_ibf (op, op->state->remote_ibf->size);
789 diff_ibf = ibf_dup (eo->local_ibf); 783 diff_ibf = ibf_dup (op->state->local_ibf);
790 ibf_subtract (diff_ibf, eo->remote_ibf); 784 ibf_subtract (diff_ibf, op->state->remote_ibf);
791 785
792 ibf_destroy (eo->remote_ibf); 786 ibf_destroy (op->state->remote_ibf);
793 eo->remote_ibf = NULL; 787 op->state->remote_ibf = NULL;
794 788
795 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); 789 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
796 790
@@ -829,7 +823,7 @@ decode_and_send (struct OperationState *eo)
829 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 823 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
830 "decoding failed, sending larger ibf (size %u)\n", 824 "decoding failed, sending larger ibf (size %u)\n",
831 1<<next_order); 825 1<<next_order);
832 send_ibf (eo, next_order); 826 send_ibf (op, next_order);
833 } 827 }
834 else 828 else
835 { 829 {
@@ -844,28 +838,26 @@ decode_and_send (struct OperationState *eo)
844 838
845 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); 839 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
846 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 840 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
847 GNUNET_MQ_send (eo->mq, ev); 841 GNUNET_MQ_send (op->mq, ev);
848 break; 842 break;
849 } 843 }
850 if (1 == side) 844 if (1 == side)
851 { 845 {
852 send_elements_for_key (eo, key); 846 send_elements_for_key (op, key);
853 } 847 }
854 else if (-1 == side) 848 else if (-1 == side)
855 { 849 {
856 struct GNUNET_MQ_Envelope *ev; 850 struct GNUNET_MQ_Envelope *ev;
857 struct GNUNET_MessageHeader *msg; 851 struct GNUNET_MessageHeader *msg;
858 852
859 /* FIXME: before sending the request, check if we may just have the element */ 853 /* It may be nice to merge multiple requests, but with mesh's corking it is not worth
860 /* FIXME: merge multiple requests */ 854 * the effort additional complexity. */
861 /* FIXME: remember somewhere that we already requested the element,
862 * so that we don't request it again with the next ibf if decoding fails */
863 ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), 855 ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
864 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); 856 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
865 857
866 *(struct IBF_Key *) &msg[1] = key; 858 *(struct IBF_Key *) &msg[1] = key;
867 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n"); 859 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
868 GNUNET_MQ_send (eo->mq, ev); 860 GNUNET_MQ_send (op->mq, ev);
869 } 861 }
870 else 862 else
871 { 863 {
@@ -885,32 +877,32 @@ decode_and_send (struct OperationState *eo)
885static void 877static void
886handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) 878handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
887{ 879{
888 struct OperationState *eo = cls; 880 struct Operation *op = cls;
889 struct IBFMessage *msg = (struct IBFMessage *) mh; 881 struct IBFMessage *msg = (struct IBFMessage *) mh;
890 unsigned int buckets_in_message; 882 unsigned int buckets_in_message;
891 883
892 if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || 884 if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
893 (eo->phase == PHASE_EXPECT_IBF) ) 885 (op->state->phase == PHASE_EXPECT_IBF) )
894 { 886 {
895 eo->phase = PHASE_EXPECT_IBF_CONT; 887 op->state->phase = PHASE_EXPECT_IBF_CONT;
896 GNUNET_assert (NULL == eo->remote_ibf); 888 GNUNET_assert (NULL == op->state->remote_ibf);
897 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order); 889 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
898 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); 890 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
899 eo->ibf_buckets_received = 0; 891 op->state->ibf_buckets_received = 0;
900 if (0 != ntohs (msg->offset)) 892 if (0 != ntohs (msg->offset))
901 { 893 {
902 GNUNET_break (0); 894 GNUNET_break (0);
903 fail_union_operation (eo); 895 fail_union_operation (op);
904 return; 896 return;
905 } 897 }
906 } 898 }
907 else if (eo->phase == PHASE_EXPECT_IBF_CONT) 899 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
908 { 900 {
909 if ( (ntohs (msg->offset) != eo->ibf_buckets_received) || 901 if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
910 (1<<msg->order != eo->remote_ibf->size) ) 902 (1<<msg->order != op->state->remote_ibf->size) )
911 { 903 {
912 GNUNET_break (0); 904 GNUNET_break (0);
913 fail_union_operation (eo); 905 fail_union_operation (op);
914 return; 906 return;
915 } 907 }
916 } 908 }
@@ -920,25 +912,25 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
920 if (0 == buckets_in_message) 912 if (0 == buckets_in_message)
921 { 913 {
922 GNUNET_break_op (0); 914 GNUNET_break_op (0);
923 fail_union_operation (eo); 915 fail_union_operation (op);
924 return; 916 return;
925 } 917 }
926 918
927 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) 919 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
928 { 920 {
929 GNUNET_break (0); 921 GNUNET_break (0);
930 fail_union_operation (eo); 922 fail_union_operation (op);
931 return; 923 return;
932 } 924 }
933 925
934 ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); 926 ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf);
935 eo->ibf_buckets_received += buckets_in_message; 927 op->state->ibf_buckets_received += buckets_in_message;
936 928
937 if (eo->ibf_buckets_received == eo->remote_ibf->size) 929 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
938 { 930 {
939 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); 931 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
940 eo->phase = PHASE_EXPECT_ELEMENTS; 932 op->state->phase = PHASE_EXPECT_ELEMENTS;
941 decode_and_send (eo); 933 decode_and_send (op);
942 } 934 }
943} 935}
944 936
@@ -947,18 +939,18 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
947 * Send a result message to the client indicating 939 * Send a result message to the client indicating
948 * that there is a new element. 940 * that there is a new element.
949 * 941 *
950 * @param eo union operation 942 * @param op union operation
951 * @param element element to send 943 * @param element element to send
952 */ 944 */
953static void 945static void
954send_client_element (struct OperationState *eo, 946send_client_element (struct Operation *op,
955 struct GNUNET_SET_Element *element) 947 struct GNUNET_SET_Element *element)
956{ 948{
957 struct GNUNET_MQ_Envelope *ev; 949 struct GNUNET_MQ_Envelope *ev;
958 struct GNUNET_SET_ResultMessage *rm; 950 struct GNUNET_SET_ResultMessage *rm;
959 951
960 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size); 952 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
961 GNUNET_assert (0 != eo->spec->client_request_id); 953 GNUNET_assert (0 != op->spec->client_request_id);
962 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); 954 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
963 if (NULL == ev) 955 if (NULL == ev)
964 { 956 {
@@ -967,38 +959,112 @@ send_client_element (struct OperationState *eo,
967 return; 959 return;
968 } 960 }
969 rm->result_status = htons (GNUNET_SET_STATUS_OK); 961 rm->result_status = htons (GNUNET_SET_STATUS_OK);
970 rm->request_id = htonl (eo->spec->client_request_id); 962 rm->request_id = htonl (op->spec->client_request_id);
971 rm->element_type = element->type; 963 rm->element_type = element->type;
972 memcpy (&rm[1], element->data, element->size); 964 memcpy (&rm[1], element->data, element->size);
973 GNUNET_MQ_send (eo->spec->set->client_mq, ev); 965 GNUNET_MQ_send (op->spec->set->client_mq, ev);
974} 966}
975 967
976 968
977/** 969/**
978 * Send a result message to the client indicating 970 * Signal to the client that the operation has finished and
979 * that the operation is over. 971 * destroy the operation.
980 * After the result done message has been sent to the client,
981 * destroy the evaluate operation.
982 * 972 *
983 * @param eo union operation 973 * @param cls operation to destroy
984 */ 974 */
985static void 975static void
986send_client_done_and_destroy (struct OperationState *eo) 976send_done_and_destroy (void *cls)
987{ 977{
978 struct Operation *op = cls;
988 struct GNUNET_MQ_Envelope *ev; 979 struct GNUNET_MQ_Envelope *ev;
989 struct GNUNET_SET_ResultMessage *rm; 980 struct GNUNET_SET_ResultMessage *rm;
990
991 GNUNET_assert (GNUNET_NO == eo->client_done_sent);
992
993 eo->client_done_sent = GNUNET_YES;
994
995 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 981 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
996 rm->request_id = htonl (eo->spec->client_request_id); 982 rm->request_id = htonl (op->spec->client_request_id);
997 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 983 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
998 rm->element_type = htons (0); 984 rm->element_type = htons (0);
999 GNUNET_MQ_send (eo->spec->set->client_mq, ev); 985 GNUNET_MQ_send (op->spec->set->client_mq, ev);
986 _GSS_operation_destroy (op);
987}
988
989
990/**
991 * Send all remaining elements in the full result iterator.
992 *
993 * @param cls operation
994 */
995static void
996send_remaining_elements (void *cls)
997{
998 struct Operation *op = cls;
999 struct KeyEntry *ke;
1000 int res;
1001
1002 res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke);
1003 res = GNUNET_NO;
1004 if (GNUNET_NO == res)
1005 {
1006 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
1007 send_done_and_destroy (op);
1008 return;
1009 }
1010
1011 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n");
1012
1013 while (1)
1014 {
1015 struct GNUNET_MQ_Envelope *ev;
1016 struct GNUNET_SET_ResultMessage *rm;
1017 struct GNUNET_SET_Element *element;
1018 element = &ke->element->element;
1019
1020 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
1021 GNUNET_assert (0 != op->spec->client_request_id);
1022 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1023 if (NULL == ev)
1024 {
1025 GNUNET_MQ_discard (ev);
1026 GNUNET_break (0);
1027 continue;
1028 }
1029 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1030 rm->request_id = htonl (op->spec->client_request_id);
1031 rm->element_type = element->type;
1032 memcpy (&rm[1], element->data, element->size);
1033 if (ke->next_colliding == NULL)
1034 {
1035 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1036 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1037 break;
1038 }
1039 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1040 ke = ke->next_colliding;
1041 }
1042}
1000 1043
1001 union_operation_destroy (eo); 1044
1045/**
1046 * Send a result message to the client indicating
1047 * that the operation is over.
1048 * After the result done message has been sent to the client,
1049 * destroy the evaluate operation.
1050 *
1051 * @param op union operation
1052 */
1053static void
1054finish_and_destroy (struct Operation *op)
1055{
1056 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1057
1058 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1059 {
1060 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
1061 GNUNET_assert (NULL == op->state->full_result_iter);
1062 op->state->full_result_iter =
1063 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
1064 send_remaining_elements (op);
1065 return;
1066 }
1067 send_done_and_destroy (op);
1002} 1068}
1003 1069
1004 1070
@@ -1011,16 +1077,16 @@ send_client_done_and_destroy (struct OperationState *eo)
1011static void 1077static void
1012handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) 1078handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1013{ 1079{
1014 struct OperationState *eo = cls; 1080 struct Operation *op = cls;
1015 struct ElementEntry *ee; 1081 struct ElementEntry *ee;
1016 uint16_t element_size; 1082 uint16_t element_size;
1017 1083
1018 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); 1084 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1019 1085
1020 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && 1086 if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1021 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) 1087 (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1022 { 1088 {
1023 fail_union_operation (eo); 1089 fail_union_operation (op);
1024 GNUNET_break (0); 1090 GNUNET_break (0);
1025 return; 1091 return;
1026 } 1092 }
@@ -1032,12 +1098,17 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1032 ee->remote = GNUNET_YES; 1098 ee->remote = GNUNET_YES;
1033 GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); 1099 GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
1034 1100
1035 /* FIXME: see if the element has already been inserted! */ 1101 if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1102 {
1103 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got existing element from peer\n");
1104 GNUNET_free (ee);
1105 return;
1106 }
1036 1107
1037 op_register_element (eo, ee); 1108 op_register_element (op, ee);
1038 /* only send results immediately if the client wants it */ 1109 /* only send results immediately if the client wants it */
1039 if (GNUNET_SET_RESULT_ADDED == eo->spec->result_mode) 1110 if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1040 send_client_element (eo, &ee->element); 1111 send_client_element (op, &ee->element);
1041} 1112}
1042 1113
1043 1114
@@ -1050,15 +1121,15 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1050static void 1121static void
1051handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) 1122handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1052{ 1123{
1053 struct OperationState *eo = cls; 1124 struct Operation *op = cls;
1054 struct IBF_Key *ibf_key; 1125 struct IBF_Key *ibf_key;
1055 unsigned int num_keys; 1126 unsigned int num_keys;
1056 1127
1057 /* look up elements and send them */ 1128 /* look up elements and send them */
1058 if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1129 if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1059 { 1130 {
1060 GNUNET_break (0); 1131 GNUNET_break (0);
1061 fail_union_operation (eo); 1132 fail_union_operation (op);
1062 return; 1133 return;
1063 } 1134 }
1064 1135
@@ -1067,14 +1138,14 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1067 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) 1138 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1068 { 1139 {
1069 GNUNET_break (0); 1140 GNUNET_break (0);
1070 fail_union_operation (eo); 1141 fail_union_operation (op);
1071 return; 1142 return;
1072 } 1143 }
1073 1144
1074 ibf_key = (struct IBF_Key *) &mh[1]; 1145 ibf_key = (struct IBF_Key *) &mh[1];
1075 while (0 != num_keys--) 1146 while (0 != num_keys--)
1076 { 1147 {
1077 send_elements_for_key (eo, *ibf_key); 1148 send_elements_for_key (op, *ibf_key);
1078 ibf_key++; 1149 ibf_key++;
1079 } 1150 }
1080} 1151}
@@ -1089,28 +1160,28 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1089static void 1160static void
1090handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) 1161handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1091{ 1162{
1092 struct OperationState *eo = cls; 1163 struct Operation *op = cls;
1093 struct GNUNET_MQ_Envelope *ev; 1164 struct GNUNET_MQ_Envelope *ev;
1094 1165
1095 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1166 if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1096 { 1167 {
1097 /* we got all requests, but still have to send our elements as response */ 1168 /* we got all requests, but still have to send our elements as response */
1098 1169
1099 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); 1170 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1100 eo->phase = PHASE_FINISHED; 1171 op->state->phase = PHASE_FINISHED;
1101 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 1172 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1102 GNUNET_MQ_send (eo->mq, ev); 1173 GNUNET_MQ_send (op->mq, ev);
1103 return; 1174 return;
1104 } 1175 }
1105 if (eo->phase == PHASE_EXPECT_ELEMENTS) 1176 if (op->state->phase == PHASE_EXPECT_ELEMENTS)
1106 { 1177 {
1107 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); 1178 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1108 eo->phase = PHASE_FINISHED; 1179 op->state->phase = PHASE_FINISHED;
1109 send_client_done_and_destroy (eo); 1180 finish_and_destroy (op);
1110 return; 1181 return;
1111 } 1182 }
1112 GNUNET_break (0); 1183 GNUNET_break (0);
1113 fail_union_operation (eo); 1184 fail_union_operation (op);
1114} 1185}
1115 1186
1116 1187
@@ -1118,78 +1189,34 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1118 * Evaluate a union operation with 1189 * Evaluate a union operation with
1119 * a remote peer. 1190 * a remote peer.
1120 * 1191 *
1121 * @param spec specification of the operation the evaluate 1192 * @param op operation to evaluate
1122 * @param tunnel tunnel already connected to the partner peer
1123 * @param tc tunnel context, passed here so all new incoming
1124 * messages are directly going to the union operations
1125 * @return a handle to the operation
1126 */ 1193 */
1127static void 1194static void
1128union_evaluate (struct OperationSpecification *spec, 1195union_evaluate (struct Operation *op)
1129 struct GNUNET_MESH_Tunnel *tunnel,
1130 struct TunnelContext *tc)
1131{ 1196{
1132 struct OperationState *eo; 1197 op->state = GNUNET_new (struct OperationState);
1133 1198 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1134 eo = GNUNET_new (struct OperationState);
1135 tc->vt = _GSS_union_vt ();
1136 tc->op = eo;
1137 eo->se = strata_estimator_dup (spec->set->state->se);
1138 eo->generation_created = spec->set->current_generation++;
1139 eo->set = spec->set;
1140 eo->spec = spec;
1141 eo->tunnel = tunnel;
1142 eo->tunnel = tunnel;
1143 eo->mq = GNUNET_MESH_mq_create (tunnel);
1144
1145 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1146 "evaluating union operation, (app %s)\n",
1147 GNUNET_h2s (&eo->spec->app_id));
1148
1149 /* we started the operation, thus we have to send the operation request */ 1199 /* we started the operation, thus we have to send the operation request */
1150 eo->phase = PHASE_EXPECT_SE; 1200 op->state->phase = PHASE_EXPECT_SE;
1151 1201 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating union operation");
1152 GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, 1202 send_operation_request (op);
1153 eo->set->state->ops_tail,
1154 eo);
1155
1156 send_operation_request (eo);
1157} 1203}
1158 1204
1159 1205
1160/** 1206/**
1161 * Accept an union operation request from a remote peer 1207 * Accept an union operation request from a remote peer.
1208 * Only initializes the private operation state.
1162 * 1209 *
1163 * @param spec all necessary information about the operation 1210 * @param op operation that will be accepted as a union operation
1164 * @param tunnel open tunnel to the partner's peer
1165 * @param tc tunnel context, passed here so all new incoming
1166 * messages are directly going to the union operations
1167 * @return operation
1168 */ 1211 */
1169static void 1212static void
1170union_accept (struct OperationSpecification *spec, 1213union_accept (struct Operation *op)
1171 struct GNUNET_MESH_Tunnel *tunnel,
1172 struct TunnelContext *tc)
1173{ 1214{
1174 struct OperationState *eo;
1175
1176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); 1215 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1177 1216 op->state = GNUNET_new (struct OperationState);
1178 eo = GNUNET_new (struct OperationState); 1217 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1179 tc->vt = _GSS_union_vt ();
1180 tc->op = eo;
1181 eo->set = spec->set;
1182 eo->generation_created = eo->set->current_generation++;
1183 eo->spec = spec;
1184 eo->tunnel = tunnel;
1185 eo->mq = GNUNET_MESH_mq_create (tunnel);
1186 eo->se = strata_estimator_dup (eo->set->state->se);
1187 /* transfer ownership of mq and socket from incoming to eo */
1188 GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1189 eo->set->state->ops_tail,
1190 eo);
1191 /* kick off the operation */ 1218 /* kick off the operation */
1192 send_strata_estimator (eo); 1219 send_strata_estimator (op);
1193} 1220}
1194 1221
1195 1222
@@ -1240,17 +1267,13 @@ union_remove (struct SetState *set_state, struct ElementEntry *ee)
1240 1267
1241 1268
1242/** 1269/**
1243 * Destroy a set that supports the union operation 1270 * Destroy a set that supports the union operation.
1244 * 1271 *
1245 * @param set_state the set to destroy 1272 * @param set_state the set to destroy
1246 */ 1273 */
1247static void 1274static void
1248union_set_destroy (struct SetState *set_state) 1275union_set_destroy (struct SetState *set_state)
1249{ 1276{
1250 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
1251 /* important to destroy operations before the rest of the set */
1252 while (NULL != set_state->ops_head)
1253 union_operation_destroy (set_state->ops_head);
1254 if (NULL != set_state->se) 1277 if (NULL != set_state->se)
1255 { 1278 {
1256 strata_estimator_destroy (set_state->se); 1279 strata_estimator_destroy (set_state->se);
@@ -1263,13 +1286,13 @@ union_set_destroy (struct SetState *set_state)
1263/** 1286/**
1264 * Dispatch messages for a union operation. 1287 * Dispatch messages for a union operation.
1265 * 1288 *
1266 * @param eo the state of the union evaluate operation 1289 * @param op the state of the union evaluate operation
1267 * @param mh the received message 1290 * @param mh the received message
1268 * @return GNUNET_SYSERR if the tunnel should be disconnected, 1291 * @return GNUNET_SYSERR if the tunnel should be disconnected,
1269 * GNUNET_OK otherwise 1292 * GNUNET_OK otherwise
1270 */ 1293 */
1271int 1294int
1272union_handle_p2p_message (struct OperationState *eo, 1295union_handle_p2p_message (struct Operation *op,
1273 const struct GNUNET_MessageHeader *mh) 1296 const struct GNUNET_MessageHeader *mh)
1274{ 1297{
1275 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", 1298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
@@ -1277,19 +1300,19 @@ union_handle_p2p_message (struct OperationState *eo,
1277 switch (ntohs (mh->type)) 1300 switch (ntohs (mh->type))
1278 { 1301 {
1279 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: 1302 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1280 handle_p2p_ibf (eo, mh); 1303 handle_p2p_ibf (op, mh);
1281 break; 1304 break;
1282 case GNUNET_MESSAGE_TYPE_SET_P2P_SE: 1305 case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1283 handle_p2p_strata_estimator (eo, mh); 1306 handle_p2p_strata_estimator (op, mh);
1284 break; 1307 break;
1285 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: 1308 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1286 handle_p2p_elements (eo, mh); 1309 handle_p2p_elements (op, mh);
1287 break; 1310 break;
1288 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: 1311 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1289 handle_p2p_element_requests (eo, mh); 1312 handle_p2p_element_requests (op, mh);
1290 break; 1313 break;
1291 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: 1314 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1292 handle_p2p_done (eo, mh); 1315 handle_p2p_done (op, mh);
1293 break; 1316 break;
1294 default: 1317 default:
1295 /* something wrong with mesh's message handlers? */ 1318 /* something wrong with mesh's message handlers? */
@@ -1300,18 +1323,9 @@ union_handle_p2p_message (struct OperationState *eo,
1300 1323
1301 1324
1302static void 1325static void
1303union_peer_disconnect (struct OperationState *op) 1326union_peer_disconnect (struct Operation *op)
1304{ 1327{
1305 /* Are we already disconnected? */ 1328 if (PHASE_FINISHED != op->state->phase)
1306 if (NULL == op->tunnel)
1307 return;
1308 op->tunnel = NULL;
1309 if (NULL != op->mq)
1310 {
1311 GNUNET_MQ_destroy (op->mq);
1312 op->mq = NULL;
1313 }
1314 if (PHASE_FINISHED != op->phase)
1315 { 1329 {
1316 struct GNUNET_MQ_Envelope *ev; 1330 struct GNUNET_MQ_Envelope *ev;
1317 struct GNUNET_SET_ResultMessage *msg; 1331 struct GNUNET_SET_ResultMessage *msg;
@@ -1322,34 +1336,12 @@ union_peer_disconnect (struct OperationState *op)
1322 msg->element_type = htons (0); 1336 msg->element_type = htons (0);
1323 GNUNET_MQ_send (op->spec->set->client_mq, ev); 1337 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1324 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); 1338 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1325 union_operation_destroy (op); 1339 _GSS_operation_destroy (op);
1326 return; 1340 return;
1327 } 1341 }
1328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); 1342 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1329 if (GNUNET_NO == op->client_done_sent) 1343 if (GNUNET_NO == op->state->client_done_sent)
1330 send_client_done_and_destroy (op); 1344 finish_and_destroy (op);
1331}
1332
1333
1334static void
1335union_op_cancel (struct SetState *set_state, uint32_t op_id)
1336{
1337 struct OperationState *op_state;
1338 int found = GNUNET_NO;
1339 for (op_state = set_state->ops_head; NULL != op_state; op_state = op_state->next)
1340 {
1341 if (op_state->spec->client_request_id == op_id)
1342 {
1343 found = GNUNET_YES;
1344 break;
1345 }
1346 }
1347 if (GNUNET_NO == found)
1348 {
1349 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "canceling non-existing operation\n");
1350 return;
1351 }
1352 union_operation_destroy (op_state);
1353} 1345}
1354 1346
1355 1347
diff --git a/src/set/ibf.h b/src/set/ibf.h
index 407d14f64..c62ecac43 100644
--- a/src/set/ibf.h
+++ b/src/set/ibf.h
@@ -39,27 +39,40 @@ extern "C"
39#endif 39#endif
40 40
41 41
42/**
43 * Keys that can be inserted into and removed from an IBF.
44 */
42struct IBF_Key 45struct IBF_Key
43{ 46{
44 uint64_t key_val; 47 uint64_t key_val;
45}; 48};
46 49
50
51/**
52 * Hash of an IBF key.
53 */
47struct IBF_KeyHash 54struct IBF_KeyHash
48{ 55{
49 uint32_t key_hash_val; 56 uint32_t key_hash_val;
50}; 57};
51 58
59
60/**
61 * Type of the count field of IBF buckets.
62 */
52struct IBF_Count 63struct IBF_Count
53{ 64{
54 int8_t count_val; 65 int8_t count_val;
55}; 66};
56 67
68
57/** 69/**
58 * Size of one ibf bucket in bytes 70 * Size of one ibf bucket in bytes
59 */ 71 */
60#define IBF_BUCKET_SIZE (sizeof (struct IBF_Count) + sizeof (struct IBF_Key) + \ 72#define IBF_BUCKET_SIZE (sizeof (struct IBF_Count) + sizeof (struct IBF_Key) + \
61 sizeof (struct IBF_KeyHash)) 73 sizeof (struct IBF_KeyHash))
62 74
75
63/** 76/**
64 * Invertible bloom filter (IBF). 77 * Invertible bloom filter (IBF).
65 * 78 *
@@ -212,6 +225,7 @@ ibf_decode (struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *re
212struct InvertibleBloomFilter * 225struct InvertibleBloomFilter *
213ibf_dup (const struct InvertibleBloomFilter *ibf); 226ibf_dup (const struct InvertibleBloomFilter *ibf);
214 227
228
215/** 229/**
216 * Destroy all resources associated with the invertible bloom filter. 230 * Destroy all resources associated with the invertible bloom filter.
217 * No more ibf_*-functions may be called on ibf after calling destroy. 231 * No more ibf_*-functions may be called on ibf after calling destroy.
diff --git a/src/set/strata_estimator.h b/src/set/strata_estimator.h
index abc39f25a..9de5598bd 100644
--- a/src/set/strata_estimator.h
+++ b/src/set/strata_estimator.h
@@ -40,6 +40,9 @@ extern "C"
40#endif 40#endif
41 41
42 42
43/**
44 * A handle to a strata estimator.
45 */
43struct StrataEstimator 46struct StrataEstimator
44{ 47{
45 struct InvertibleBloomFilter **strata; 48 struct InvertibleBloomFilter **strata;
@@ -48,31 +51,77 @@ struct StrataEstimator
48}; 51};
49 52
50 53
54/**
55 * Write the given strata estimator to the buffer.
56 *
57 * @param se strata estimator to serialize
58 * @param buf buffer to write to, must be of appropriate size
59 */
51void 60void
52strata_estimator_write (const struct StrataEstimator *se, void *buf); 61strata_estimator_write (const struct StrataEstimator *se, void *buf);
53 62
54 63
64/**
65 * Read strata from the buffer into the given strata
66 * estimator. The strata estimator must already be allocated.
67 *
68 * @param buf buffer to read from
69 * @param se strata estimator to write to
70 */
55void 71void
56strata_estimator_read (const void *buf, struct StrataEstimator *se); 72strata_estimator_read (const void *buf, struct StrataEstimator *se);
57 73
58 74
75/**
76 * Create a new strata estimator with the given parameters.
77 *
78 * @param strata_count number of stratas, that is, number of ibfs in the estimator
79 * @param ibf_size size of each ibf stratum
80 * @param ibf_hashnum hashnum parameter of each ibf
81 * @return a freshly allocated, empty strata estimator
82 */
59struct StrataEstimator * 83struct StrataEstimator *
60strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum); 84strata_estimator_create (unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum);
61 85
62 86
87/**
88 * Get an estimation of the symmetric difference of the elements
89 * contained in both strata estimators.
90 *
91 * @param se1 first strata estimator
92 * @param se2 second strata estimator
93 * @return abs(|se1| - |se2|)
94 */
63unsigned int 95unsigned int
64strata_estimator_difference (const struct StrataEstimator *se1, 96strata_estimator_difference (const struct StrataEstimator *se1,
65 const struct StrataEstimator *se2); 97 const struct StrataEstimator *se2);
66 98
67 99
100/**
101 * Add a key to the strata estimator.
102 *
103 * @param se strata estimator to add the key to
104 * @param key key to add
105 */
68void 106void
69strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key); 107strata_estimator_insert (struct StrataEstimator *se, struct IBF_Key key);
70 108
71 109
110/**
111 * Remove a key from the strata estimator.
112 *
113 * @param se strata estimator to remove the key from
114 * @param key key to remove
115 */
72void 116void
73strata_estimator_remove (struct StrataEstimator *se, struct IBF_Key key); 117strata_estimator_remove (struct StrataEstimator *se, struct IBF_Key key);
74 118
75 119
120/**
121 * Destroy a strata estimator, free all of its resources.
122 *
123 * @param se strata estimator to destroy.
124 */
76void 125void
77strata_estimator_destroy (struct StrataEstimator *se); 126strata_estimator_destroy (struct StrataEstimator *se);
78 127
diff --git a/src/set/test_set.conf b/src/set/test_set.conf
index 0d8ef692d..d59c425c4 100644
--- a/src/set/test_set.conf
+++ b/src/set/test_set.conf
@@ -8,6 +8,7 @@ PORT = 2106
8HOSTNAME = localhost 8HOSTNAME = localhost
9BINARY = gnunet-service-set 9BINARY = gnunet-service-set
10#PREFIX = valgrind 10#PREFIX = valgrind
11#PREFIX = valgrind -v --leak-check=full
11#PREFIX = gdbserver :1234 12#PREFIX = gdbserver :1234
12ACCEPT_FROM = 127.0.0.1; 13ACCEPT_FROM = 127.0.0.1;
13ACCEPT_FROM6 = ::1; 14ACCEPT_FROM6 = ::1;
diff --git a/src/set/test_set_union_result_full.c b/src/set/test_set_union_result_full.c
new file mode 100644
index 000000000..e54332b8b
--- /dev/null
+++ b/src/set/test_set_union_result_full.c
@@ -0,0 +1,255 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file set/test_set_union_result_full.c
23 * @brief testcase for full result mode of the union set operation
24 */
25#include "platform.h"
26#include "gnunet_util_lib.h"
27#include "gnunet_testing_lib.h"
28#include "gnunet_set_service.h"
29
30
31static struct GNUNET_PeerIdentity local_id;
32
33static struct GNUNET_HashCode app_id;
34static struct GNUNET_SET_Handle *set1;
35static struct GNUNET_SET_Handle *set2;
36static struct GNUNET_SET_ListenHandle *listen_handle;
37const static struct GNUNET_CONFIGURATION_Handle *config;
38
39static int iter_count;
40
41
42static void
43result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element,
44 enum GNUNET_SET_Status status)
45{
46 switch (status)
47 {
48 case GNUNET_SET_STATUS_OK:
49 printf ("set 1: got element\n");
50 break;
51 case GNUNET_SET_STATUS_FAILURE:
52 printf ("set 1: failure\n");
53 break;
54 case GNUNET_SET_STATUS_DONE:
55 printf ("set 1: done\n");
56 GNUNET_SET_destroy (set1);
57 break;
58 default:
59 GNUNET_assert (0);
60 }
61}
62
63
64static void
65result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element,
66 enum GNUNET_SET_Status status)
67{
68 switch (status)
69 {
70 case GNUNET_SET_STATUS_OK:
71 printf ("set 2: got element\n");
72 break;
73 case GNUNET_SET_STATUS_FAILURE:
74 printf ("set 2: failure\n");
75 break;
76 case GNUNET_SET_STATUS_DONE:
77 printf ("set 2: done\n");
78 GNUNET_SET_destroy (set2);
79 break;
80 default:
81 GNUNET_assert (0);
82 }
83}
84
85
86static void
87listen_cb (void *cls,
88 const struct GNUNET_PeerIdentity *other_peer,
89 const struct GNUNET_MessageHeader *context_msg,
90 struct GNUNET_SET_Request *request)
91{
92 struct GNUNET_SET_OperationHandle *oh;
93
94 GNUNET_assert (NULL != context_msg);
95
96 GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_TEST);
97
98 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
99 GNUNET_SET_listen_cancel (listen_handle);
100
101 oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_FULL, result_cb_set2, NULL);
102 GNUNET_SET_commit (oh, set2);
103}
104
105
106/**
107 * Start the set operation.
108 *
109 * @param cls closure, unused
110 */
111static void
112start (void *cls)
113{
114 struct GNUNET_SET_OperationHandle *oh;
115 struct GNUNET_MessageHeader context_msg;
116
117 context_msg.size = htons (sizeof context_msg);
118 context_msg.type = htons (GNUNET_MESSAGE_TYPE_TEST);
119
120 listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
121 &app_id, listen_cb, NULL);
122 oh = GNUNET_SET_prepare (&local_id, &app_id, &context_msg, 42,
123 GNUNET_SET_RESULT_FULL,
124 result_cb_set1, NULL);
125 GNUNET_SET_commit (oh, set1);
126}
127
128
129/**
130 * Initialize the second set, continue
131 *
132 * @param cls closure, unused
133 */
134static void
135init_set2 (void *cls)
136{
137 struct GNUNET_SET_Element element;
138
139 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n");
140
141 element.type = 0;
142
143 element.data = "hello";
144 element.size = strlen(element.data);
145 GNUNET_SET_add_element (set2, &element, NULL, NULL);
146 element.data = "quux";
147 element.size = strlen(element.data);
148 GNUNET_SET_add_element (set2, &element, NULL, NULL);
149 element.data = "baz";
150 element.size = strlen(element.data);
151 GNUNET_SET_add_element (set2, &element, start, NULL);
152}
153
154
155/**
156 * Initialize the first set, continue.
157 */
158static void
159init_set1 (void)
160{
161 struct GNUNET_SET_Element element;
162
163 element.type = 0;
164
165 element.data = "hello";
166 element.size = strlen(element.data);
167 GNUNET_SET_add_element (set1, &element, NULL, NULL);
168 element.data = "bar";
169 element.size = strlen(element.data);
170 GNUNET_SET_add_element (set1, &element, init_set2, NULL);
171
172 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n");
173}
174
175
176static int
177iter_cb (void *cls,
178 const struct GNUNET_SET_Element *element)
179{
180 if (NULL == element)
181 {
182 GNUNET_assert (iter_count == 3);
183 GNUNET_SET_destroy (cls);
184 return GNUNET_YES;
185 }
186 printf ("iter: got element\n");
187 iter_count++;
188 return GNUNET_YES;
189}
190
191
192static void
193test_iter ()
194{
195 struct GNUNET_SET_Element element;
196 struct GNUNET_SET_Handle *iter_set;
197
198 iter_set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
199
200 element.type = 0;
201
202 element.data = "hello";
203 element.size = strlen(element.data);
204 GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
205 element.data = "bar";
206 element.size = strlen(element.data);
207 GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
208 element.data = "quux";
209 element.size = strlen(element.data);
210 GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
211
212 GNUNET_SET_iterate (iter_set, iter_cb, iter_set);
213}
214
215
216/**
217 * Signature of the 'main' function for a (single-peer) testcase that
218 * is run using 'GNUNET_TESTING_peer_run'.
219 *
220 * @param cls closure
221 * @param cfg configuration of the peer that was started
222 * @param peer identity of the peer that was created
223 */
224static void
225run (void *cls,
226 const struct GNUNET_CONFIGURATION_Handle *cfg,
227 struct GNUNET_TESTING_Peer *peer)
228{
229 config = cfg;
230 GNUNET_CRYPTO_get_peer_identity (cfg, &local_id);
231 printf ("my id (from CRYPTO): %s\n", GNUNET_i2s (&local_id));
232 GNUNET_TESTING_peer_get_identity (peer, &local_id);
233 printf ("my id (from TESTING): %s\n", GNUNET_i2s (&local_id));
234
235 test_iter ();
236
237 set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
238 set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
239 GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
240
241 /* test the real set reconciliation */
242 init_set1 ();
243}
244
245int
246main (int argc, char **argv)
247{
248 int ret;
249
250 ret = GNUNET_TESTING_peer_run ("test_set_api",
251 "test_set.conf",
252 &run, NULL);
253 return ret;
254}
255