aboutsummaryrefslogtreecommitdiff
path: root/src/set
diff options
context:
space:
mode:
authorChristian Fuchs <christian.fuchs@cfuchs.net>2013-10-21 14:02:39 +0000
committerChristian Fuchs <christian.fuchs@cfuchs.net>2013-10-21 14:02:39 +0000
commit15389f2525da19c32e040ac1d32d3473b43456df (patch)
tree3f985987b66303bc93b22de0f29efd329f1c9e59 /src/set
parent1d58e25444716142e5a76367e6bf22ba3e036aea (diff)
downloadgnunet-15389f2525da19c32e040ac1d32d3473b43456df.tar.gz
gnunet-15389f2525da19c32e040ac1d32d3473b43456df.zip
re-synced set intersection code with union as a starting point
added create for set-intersection
Diffstat (limited to 'src/set')
-rw-r--r--src/set/gnunet-service-set.c2
-rw-r--r--src/set/gnunet-service-set.h7
-rw-r--r--src/set/gnunet-service-set_intersection.c805
3 files changed, 389 insertions, 425 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 99deb3b23..a5f132b41 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -583,7 +583,7 @@ handle_client_create (void *cls,
583 switch (ntohs (msg->operation)) 583 switch (ntohs (msg->operation))
584 { 584 {
585 case GNUNET_SET_OPERATION_INTERSECTION: 585 case GNUNET_SET_OPERATION_INTERSECTION:
586 // FIXME 586 set->vt = _GSS_intersection_vt ();
587 break; 587 break;
588 case GNUNET_SET_OPERATION_UNION: 588 case GNUNET_SET_OPERATION_UNION:
589 set->vt = _GSS_union_vt (); 589 set->vt = _GSS_union_vt ();
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index f26ff3fc3..e69f2a09a 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -377,5 +377,12 @@ struct TunnelContext
377const struct SetVT * 377const struct SetVT *
378_GSS_union_vt (void); 378_GSS_union_vt (void);
379 379
380/**
381 * Get the table with implementing functions for
382 * set intersection.
383 */
384const struct SetVT *
385_GSS_intersection_vt (void);
386
380 387
381#endif 388#endif
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index 3082f94b5..7e8bd9cbf 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -21,13 +21,11 @@
21/** 21/**
22 * @file set/gnunet-service-set_intersection.c 22 * @file set/gnunet-service-set_intersection.c
23 * @brief two-peer set intersection 23 * @brief two-peer set intersection
24 * @author Christian M. Fuchs 24 * @author Christian Fuchs
25 */ 25 */
26 26#include "platform.h"
27 27#include "gnunet_util_lib.h"
28#include "gnunet-service-set.h" 28#include "gnunet-service-set.h"
29#include "gnunet_container_lib.h"
30#include "gnunet_crypto_lib.h"
31#include "ibf.h" 29#include "ibf.h"
32#include "strata_estimator.h" 30#include "strata_estimator.h"
33#include "set_protocol.h" 31#include "set_protocol.h"
@@ -45,7 +43,7 @@
45/** 43/**
46 * hash num parameter for the difference digests and strata estimators 44 * hash num parameter for the difference digests and strata estimators
47 */ 45 */
48#define SE_IBF_HASH_NUM 3 46#define SE_IBF_HASH_NUM 4
49 47
50/** 48/**
51 * Number of buckets that can be transmitted in one message. 49 * Number of buckets that can be transmitted in one message.
@@ -59,9 +57,15 @@
59 */ 57 */
60#define MAX_IBF_ORDER (16) 58#define MAX_IBF_ORDER (16)
61 59
60/**
61 * Number of buckets used in the ibf per estimated
62 * difference.
63 */
64#define IBF_ALPHA 4
65
62 66
63/** 67/**
64 * Current phase we are in for a union operation 68 * Current phase we are in for a union operation.
65 */ 69 */
66enum IntersectionOperationPhase 70enum IntersectionOperationPhase
67{ 71{
@@ -100,39 +104,23 @@ enum IntersectionOperationPhase
100 * State of an evaluate operation 104 * State of an evaluate operation
101 * with another peer. 105 * with another peer.
102 */ 106 */
103struct IntersectionEvaluateOperation 107struct OperationState
104{ 108{
105 /** 109 /**
106 * Local set the operation is evaluated on. 110 * Tunnel to the remote peer.
107 */ 111 */
108 struct Set *set; 112 struct GNUNET_MESH_Tunnel *tunnel;
109
110 /**
111 * Peer with the remote set
112 */
113 struct GNUNET_PeerIdentity peer;
114
115 /**
116 * Application-specific identifier
117 */
118 struct GNUNET_HashCode app_id;
119
120 /**
121 * Context message, given to us
122 * by the client, may be NULL.
123 */
124 struct GNUNET_MessageHeader *context_msg;
125 113
126 /** 114 /**
127 * Tunnel to the other peer. 115 * Detail information about the set operation,
116 * including the set to use.
128 */ 117 */
129 struct GNUNET_MESH_Tunnel *tunnel; 118 struct OperationSpecification *spec;
130 119
131 /** 120 /**
132 * Request ID to multiplex set operations to 121 * Message queue for the peer.
133 * the client inhabiting the set.
134 */ 122 */
135 uint32_t request_id; 123 struct GNUNET_MQ_Handle *mq;
136 124
137 /** 125 /**
138 * Number of ibf buckets received 126 * Number of ibf buckets received
@@ -157,6 +145,8 @@ struct IntersectionEvaluateOperation
157 145
158 /** 146 /**
159 * Maps IBF-Keys (specific to the current salt) to elements. 147 * Maps IBF-Keys (specific to the current salt) to elements.
148 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
149 * Colliding IBF-Keys are linked.
160 */ 150 */
161 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; 151 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
162 152
@@ -166,80 +156,39 @@ struct IntersectionEvaluateOperation
166 enum IntersectionOperationPhase phase; 156 enum IntersectionOperationPhase phase;
167 157
168 /** 158 /**
169 * Salt to use for this operation.
170 */
171 uint16_t salt;
172
173 /**
174 * Generation in which the operation handle 159 * Generation in which the operation handle
175 * was created. 160 * was created.
176 */ 161 */
177 unsigned int generation_created; 162 unsigned int generation_created;
178 163
179 /** 164 /**
180 * Evaluate operations are held in 165 * Set state of the set that this operation
181 * a linked list. 166 * belongs to.
182 */ 167 */
183 struct IntersectionEvaluateOperation *next; 168 struct Set *set;
184 169
185 /** 170 /**
186 * Evaluate operations are held in 171 * Evaluate operations are held in
187 * a linked list. 172 * a linked list.
188 */ 173 */
189 struct IntersectionEvaluateOperation *prev; 174 struct OperationState *next;
190};
191
192 175
193/** 176 /**
194 * Information about the element in a set. 177 * Evaluate operations are held in
195 * All elements are stored in a hash-table 178 * a linked list.
196 * from their hash-code to their 'struct Element', 179 */
197 * so that the remove and add operations are reasonably 180 struct OperationState *prev;
198 * fast.
199 */
200struct ElementEntry
201{
202 /**
203 * The actual element. The data for the element
204 * should be allocated at the end of this struct.
205 */
206 struct GNUNET_SET_Element element;
207
208 /**
209 * Hash of the element.
210 * Will be used to derive the different IBF keys
211 * for different salts.
212 */
213 struct GNUNET_HashCode element_hash;
214
215 /**
216 * Generation the element was added by the client.
217 * Operations of earlier generations will not consider the element.
218 */
219 unsigned int generation_added;
220
221 /**
222 * GNUNET_YES if the element has been removed in some generation.
223 */
224 int removed;
225
226 /**
227 * Generation the element was removed by the client.
228 * Operations of later generations will not consider the element.
229 * Only valid if is_removed is GNUNET_YES.
230 */
231 unsigned int generation_removed;
232 181
233 /** 182 /**
234 * GNUNET_YES if the element is a remote element, and does not belong 183 * Did we send the client that we are done?
235 * to the operation's set.
236 */ 184 */
237 int remote; 185 int client_done_sent;
238}; 186};
239 187
240 188
241/** 189/**
242 * Entries in the key-to-element map of the union set. 190 * The key entry is used to associate an ibf key with
191 * an element.
243 */ 192 */
244struct KeyEntry 193struct KeyEntry
245{ 194{
@@ -260,6 +209,7 @@ struct KeyEntry
260 struct KeyEntry *next_colliding; 209 struct KeyEntry *next_colliding;
261}; 210};
262 211
212
263/** 213/**
264 * Used as a closure for sending elements 214 * Used as a closure for sending elements
265 * with a specific IBF key. 215 * with a specific IBF key.
@@ -276,14 +226,14 @@ struct SendElementClosure
276 * Operation for which the elements 226 * Operation for which the elements
277 * should be sent. 227 * should be sent.
278 */ 228 */
279 struct IntersectionEvaluateOperation *eo; 229 struct OperationState *eo;
280}; 230};
281 231
282 232
283/** 233/**
284 * Extra state required for efficient set union. 234 * Extra state required for efficient set union.
285 */ 235 */
286struct IntersectionState 236struct SetState
287{ 237{
288 /** 238 /**
289 * The strata estimator is only generated once for 239 * The strata estimator is only generated once for
@@ -294,70 +244,19 @@ struct IntersectionState
294 struct StrataEstimator *se; 244 struct StrataEstimator *se;
295 245
296 /** 246 /**
297 * Maps 'struct GNUNET_HashCode' to 'struct ElementEntry'.
298 */
299 struct GNUNET_CONTAINER_MultiHashMap *elements;
300
301 /**
302 * Evaluate operations are held in 247 * Evaluate operations are held in
303 * a linked list. 248 * a linked list.
304 */ 249 */
305 struct IntersectionEvaluateOperation *ops_head; 250 struct OperationState *ops_head;
306 251
307 /** 252 /**
308 * Evaluate operations are held in 253 * Evaluate operations are held in
309 * a linked list. 254 * a linked list.
310 */ 255 */
311 struct IntersectionEvaluateOperation *ops_tail; 256 struct OperationState *ops_tail;
312
313 /**
314 * Current generation, that is, number of
315 * previously executed operations on this set
316 */
317 unsigned int current_generation;
318}; 257};
319 258
320 259
321
322/**
323 * Iterator over hash map entries.
324 *
325 * @param cls closure
326 * @param key current key code
327 * @param value value in the hash map
328 * @return GNUNET_YES if we should continue to
329 * iterate,
330 * GNUNET_NO if not.
331 */
332static int
333destroy_elements_iterator (void *cls,
334 const struct GNUNET_HashCode * key,
335 void *value)
336{
337 struct ElementEntry *ee = value;
338
339 GNUNET_free (ee);
340 return GNUNET_YES;
341}
342
343
344/**
345 * Destroy the elements belonging to a union set.
346 *
347 * @param us union state that contains the elements
348 */
349static void
350destroy_elements (struct IntersectionState *us)
351{
352 if (NULL == us->elements)
353 return;
354 GNUNET_CONTAINER_multihashmap_iterate (us->elements, destroy_elements_iterator, NULL);
355 GNUNET_CONTAINER_multihashmap_destroy (us->elements);
356 us->elements = NULL;
357}
358
359
360
361/** 260/**
362 * Iterator over hash map entries. 261 * Iterator over hash map entries.
363 * 262 *
@@ -379,6 +278,11 @@ destroy_key_to_element_iter (void *cls,
379 { 278 {
380 struct KeyEntry *k_tmp = k; 279 struct KeyEntry *k_tmp = k;
381 k = k->next_colliding; 280 k = k->next_colliding;
281 if (GNUNET_YES == k_tmp->element->remote)
282 {
283 GNUNET_free (k_tmp->element);
284 k_tmp->element = NULL;
285 }
382 GNUNET_free (k_tmp); 286 GNUNET_free (k_tmp);
383 } 287 }
384 return GNUNET_YES; 288 return GNUNET_YES;
@@ -391,18 +295,24 @@ destroy_key_to_element_iter (void *cls,
391 * 295 *
392 * @param eo the union operation to destroy 296 * @param eo the union operation to destroy
393 */ 297 */
394void 298static void
395_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) 299intersection_operation_destroy (struct OperationState *eo)
396{ 300{
397 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); 301 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
398 302 GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head,
303 eo->set->state->ops_tail,
304 eo);
305 if (NULL != eo->mq)
306 {
307 GNUNET_MQ_destroy (eo->mq);
308 eo->mq = NULL;
309 }
399 if (NULL != eo->tunnel) 310 if (NULL != eo->tunnel)
400 { 311 {
401 GNUNET_MESH_tunnel_destroy (eo->tunnel); 312 struct GNUNET_MESH_Tunnel *t = eo->tunnel;
402 /* wait for the final destruction by the tunnel cleaner */ 313 eo->tunnel = NULL;
403 return; 314 GNUNET_MESH_tunnel_destroy (t);
404 } 315 }
405
406 if (NULL != eo->remote_ibf) 316 if (NULL != eo->remote_ibf)
407 { 317 {
408 ibf_destroy (eo->remote_ibf); 318 ibf_destroy (eo->remote_ibf);
@@ -424,13 +334,19 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
424 GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); 334 GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
425 eo->key_to_element = NULL; 335 eo->key_to_element = NULL;
426 } 336 }
427 337 if (NULL != eo->spec)
428 GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, 338 {
429 eo->set->state.u->ops_tail, 339 if (NULL != eo->spec->context_msg)
430 eo); 340 {
341 GNUNET_free (eo->spec->context_msg);
342 eo->spec->context_msg = NULL;
343 }
344 GNUNET_free (eo->spec);
345 eo->spec = NULL;
346 }
431 GNUNET_free (eo); 347 GNUNET_free (eo);
432 348
433 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n"); 349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
434 350
435 /* FIXME: do a garbage collection of the set generations */ 351 /* FIXME: do a garbage collection of the set generations */
436} 352}
@@ -443,16 +359,17 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
443 * @param eo the union operation to fail 359 * @param eo the union operation to fail
444 */ 360 */
445static void 361static void
446fail_union_operation (struct UnionEvaluateOperation *eo) 362fail_intersection_operation (struct OperationState *eo)
447{ 363{
448 struct GNUNET_MQ_Envelope *mqm; 364 struct GNUNET_MQ_Envelope *ev;
449 struct GNUNET_SET_ResultMessage *msg; 365 struct GNUNET_SET_ResultMessage *msg;
450 366
451 mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 367 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
452 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 368 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
453 msg->request_id = htonl (eo->request_id); 369 msg->request_id = htonl (eo->spec->client_request_id);
454 GNUNET_MQ_send (eo->set->client_mq, mqm); 370 msg->element_type = htons (0);
455 _GSS_union_operation_destroy (eo); 371 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
372 union_operation_destroy (eo);
456} 373}
457 374
458 375
@@ -484,31 +401,37 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
484 * @param eo operation with the other peer 401 * @param eo operation with the other peer
485 */ 402 */
486static void 403static void
487send_operation_request (struct IntersectionEvaluateOperation *eo) 404send_operation_request (struct OperationState *eo)
488{ 405{
489 struct GNUNET_MQ_Envelope *mqm; 406 struct GNUNET_MQ_Envelope *ev;
490 struct OperationRequestMessage *msg; 407 struct OperationRequestMessage *msg;
491 408
492 mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg); 409 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
410 eo->spec->context_msg);
493 411
494 if (NULL == mqm) 412 if (NULL == ev)
495 { 413 {
496 /* the context message is too large */ 414 /* the context message is too large */
497 GNUNET_break (0); 415 GNUNET_break (0);
498 GNUNET_SERVER_client_disconnect (eo->set->client); 416 GNUNET_SERVER_client_disconnect (eo->spec->set->client);
499 return; 417 return;
500 } 418 }
501 msg->operation = htons (GNUNET_SET_OPERATION_INTERSECTION); 419 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
502 msg->app_id = eo->app_id; 420 msg->app_id = eo->spec->app_id;
503 GNUNET_MQ_send (eo->tc->mq, mqm); 421 msg->salt = htonl (eo->spec->salt);
422 GNUNET_MQ_send (eo->mq, ev);
423
424 if (NULL != eo->spec->context_msg)
425 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
426 else
427 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
504 428
505 if (NULL != eo->context_msg) 429 if (NULL != eo->spec->context_msg)
506 { 430 {
507 GNUNET_free (eo->context_msg); 431 GNUNET_free (eo->spec->context_msg);
508 eo->context_msg = NULL; 432 eo->spec->context_msg = NULL;
509 } 433 }
510 434
511 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
512} 435}
513 436
514 437
@@ -524,7 +447,7 @@ send_operation_request (struct IntersectionEvaluateOperation *eo)
524 * GNUNET_NO if not. 447 * GNUNET_NO if not.
525 */ 448 */
526static int 449static int
527insert_element_iterator (void *cls, 450op_register_element_iterator (void *cls,
528 uint32_t key, 451 uint32_t key,
529 void *value) 452 void *value)
530{ 453{
@@ -548,32 +471,36 @@ insert_element_iterator (void *cls,
548 471
549/** 472/**
550 * Insert an element into the union operation's 473 * Insert an element into the union operation's
551 * key-to-element mapping 474 * key-to-element mapping. Takes ownership of 'ee'.
475 * Note that this does not insert the element in the set,
476 * only in the operation's key-element mapping.
477 * This is done to speed up re-tried operations, if some elements
478 * were transmitted, and then the IBF fails to decode.
552 * 479 *
553 * @param eo the union operation 480 * @param eo the union operation
554 * @param ee the element entry 481 * @param ee the element entry
555 */ 482 */
556static void 483static void
557insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee) 484op_register_element (struct OperationState *eo, struct ElementEntry *ee)
558{ 485{
559 int ret; 486 int ret;
560 struct IBF_Key ibf_key; 487 struct IBF_Key ibf_key;
561 struct KeyEntry *k; 488 struct KeyEntry *k;
562 489
563 ibf_key = get_ibf_key (&ee->element_hash, eo->salt); 490 ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
564 k = GNUNET_new (struct KeyEntry); 491 k = GNUNET_new (struct KeyEntry);
565 k->element = ee; 492 k->element = ee;
566 k->ibf_key = ibf_key; 493 k->ibf_key = ibf_key;
567 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, 494 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
568 (uint32_t) ibf_key.key_val, 495 (uint32_t) ibf_key.key_val,
569 insert_element_iterator, k); 496 op_register_element_iterator, k);
570 497
571 /* was the element inserted into a colliding bucket? */ 498 /* was the element inserted into a colliding bucket? */
572 if (GNUNET_SYSERR == ret) 499 if (GNUNET_SYSERR == ret)
573 return; 500 return;
574 501
575 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, 502 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
576 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 503 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
577} 504}
578 505
579 506
@@ -592,6 +519,8 @@ prepare_ibf_iterator (void *cls,
592 struct InvertibleBloomFilter *ibf = cls; 519 struct InvertibleBloomFilter *ibf = cls;
593 struct KeyEntry *ke = value; 520 struct KeyEntry *ke = value;
594 521
522 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val);
523
595 ibf_insert (ibf, ke->ibf_key); 524 ibf_insert (ibf, ke->ibf_key);
596 return GNUNET_YES; 525 return GNUNET_YES;
597} 526}
@@ -605,13 +534,15 @@ prepare_ibf_iterator (void *cls,
605 * @param key unised 534 * @param key unised
606 * @param value the element entry to insert 535 * @param value the element entry to insert
607 * into the key-to-element mapping 536 * into the key-to-element mapping
537 * @return GNUNET_YES to continue iterating,
538 * GNUNET_NO to stop
608 */ 539 */
609static int 540static int
610init_key_to_element_iterator (void *cls, 541init_key_to_element_iterator (void *cls,
611 const struct GNUNET_HashCode *key, 542 const struct GNUNET_HashCode *key,
612 void *value) 543 void *value)
613{ 544{
614 struct UnionEvaluateOperation *eo = cls; 545 struct OperationState *eo = cls;
615 struct ElementEntry *e = value; 546 struct ElementEntry *e = value;
616 547
617 /* make sure that the element belongs to the set at the time 548 /* make sure that the element belongs to the set at the time
@@ -621,7 +552,9 @@ init_key_to_element_iterator (void *cls,
621 (e->generation_removed < eo->generation_created))) 552 (e->generation_removed < eo->generation_created)))
622 return GNUNET_YES; 553 return GNUNET_YES;
623 554
624 insert_element (eo, e); 555 GNUNET_assert (GNUNET_NO == e->remote);
556
557 op_register_element (eo, e);
625 return GNUNET_YES; 558 return GNUNET_YES;
626} 559}
627 560
@@ -634,15 +567,15 @@ init_key_to_element_iterator (void *cls,
634 * @param size size of the ibf to create 567 * @param size size of the ibf to create
635 */ 568 */
636static void 569static void
637prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) 570prepare_ibf (struct OperationState *eo, uint16_t size)
638{ 571{
639 if (NULL == eo->key_to_element) 572 if (NULL == eo->key_to_element)
640 { 573 {
641 unsigned int len; 574 unsigned int len;
642 len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); 575 len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements);
643 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); 576 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
644 GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, 577 GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements,
645 init_key_to_element_iterator, eo); 578 init_key_to_element_iterator, eo);
646 } 579 }
647 if (NULL != eo->local_ibf) 580 if (NULL != eo->local_ibf)
648 ibf_destroy (eo->local_ibf); 581 ibf_destroy (eo->local_ibf);
@@ -659,21 +592,21 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
659 * @param ibf_order order of the ibf to send, size=2^order 592 * @param ibf_order order of the ibf to send, size=2^order
660 */ 593 */
661static void 594static void
662send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) 595send_ibf (struct OperationState *eo, uint16_t ibf_order)
663{ 596{
664 unsigned int buckets_sent = 0; 597 unsigned int buckets_sent = 0;
665 struct InvertibleBloomFilter *ibf; 598 struct InvertibleBloomFilter *ibf;
666 599
667 prepare_ibf (eo, 1<<ibf_order); 600 prepare_ibf (eo, 1<<ibf_order);
668 601
669 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 1<<ibf_order); 602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
670 603
671 ibf = eo->local_ibf; 604 ibf = eo->local_ibf;
672 605
673 while (buckets_sent < (1 << ibf_order)) 606 while (buckets_sent < (1 << ibf_order))
674 { 607 {
675 unsigned int buckets_in_message; 608 unsigned int buckets_in_message;
676 struct GNUNET_MQ_Envelope *mqm; 609 struct GNUNET_MQ_Envelope *ev;
677 struct IBFMessage *msg; 610 struct IBFMessage *msg;
678 611
679 buckets_in_message = (1 << ibf_order) - buckets_sent; 612 buckets_in_message = (1 << ibf_order) - buckets_sent;
@@ -681,14 +614,17 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
681 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) 614 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
682 buckets_in_message = MAX_BUCKETS_PER_MESSAGE; 615 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
683 616
684 mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, 617 ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
685 GNUNET_MESSAGE_TYPE_SET_P2P_IBF); 618 GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
619 msg->reserved = 0;
686 msg->order = ibf_order; 620 msg->order = ibf_order;
687 msg->offset = htons (buckets_sent); 621 msg->offset = htons (buckets_sent);
688 ibf_write_slice (ibf, buckets_sent, 622 ibf_write_slice (ibf, buckets_sent,
689 buckets_in_message, &msg[1]); 623 buckets_in_message, &msg[1]);
690 buckets_sent += buckets_in_message; 624 buckets_sent += buckets_in_message;
691 GNUNET_MQ_send (eo->tc->mq, mqm); 625 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
626 buckets_in_message, buckets_sent, 1<<ibf_order);
627 GNUNET_MQ_send (eo->mq, ev);
692 } 628 }
693 629
694 eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; 630 eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
@@ -701,17 +637,18 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
701 * @param eo the union operation with the remote peer 637 * @param eo the union operation with the remote peer
702 */ 638 */
703static void 639static void
704send_strata_estimator (struct IntersectionEvaluateOperation *eo) 640send_strata_estimator (struct OperationState *eo)
705{ 641{
706 struct GNUNET_MQ_Envelope *mqm; 642 struct GNUNET_MQ_Envelope *ev;
707 struct GNUNET_MessageHeader *strata_msg; 643 struct GNUNET_MessageHeader *strata_msg;
708 644
709 mqm = GNUNET_MQ_msg_header_extra (strata_msg, 645 ev = GNUNET_MQ_msg_header_extra (strata_msg,
710 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, 646 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
711 GNUNET_MESSAGE_TYPE_SET_P2P_SE); 647 GNUNET_MESSAGE_TYPE_SET_P2P_SE);
712 strata_estimator_write (eo->set->state.i->se, &strata_msg[1]); 648 strata_estimator_write (eo->set->state->se, &strata_msg[1]);
713 GNUNET_MQ_send (eo->tc->mq, mqm); 649 GNUNET_MQ_send (eo->mq, ev);
714 eo->phase = PHASE_EXPECT_IBF; 650 eo->phase = PHASE_EXPECT_IBF;
651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
715} 652}
716 653
717 654
@@ -728,7 +665,7 @@ get_order_from_difference (unsigned int diff)
728 unsigned int ibf_order; 665 unsigned int ibf_order;
729 666
730 ibf_order = 2; 667 ibf_order = 2;
731 while ((1<<ibf_order) < (2 * diff)) 668 while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM)
732 ibf_order++; 669 ibf_order++;
733 if (ibf_order > MAX_IBF_ORDER) 670 if (ibf_order > MAX_IBF_ORDER)
734 ibf_order = MAX_IBF_ORDER; 671 ibf_order = MAX_IBF_ORDER;
@@ -745,11 +682,10 @@ get_order_from_difference (unsigned int diff)
745static void 682static void
746handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) 683handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
747{ 684{
748 struct UnionEvaluateOperation *eo = cls; 685 struct OperationState *eo = cls;
749 struct StrataEstimator *remote_se; 686 struct StrataEstimator *remote_se;
750 int diff; 687 int diff;
751 688
752
753 if (eo->phase != PHASE_EXPECT_SE) 689 if (eo->phase != PHASE_EXPECT_SE)
754 { 690 {
755 fail_union_operation (eo); 691 fail_union_operation (eo);
@@ -761,10 +697,11 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
761 strata_estimator_read (&mh[1], remote_se); 697 strata_estimator_read (&mh[1], remote_se);
762 GNUNET_assert (NULL != eo->se); 698 GNUNET_assert (NULL != eo->se);
763 diff = strata_estimator_difference (remote_se, eo->se); 699 diff = strata_estimator_difference (remote_se, eo->se);
764 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
765 strata_estimator_destroy (remote_se); 700 strata_estimator_destroy (remote_se);
766 strata_estimator_destroy (eo->se); 701 strata_estimator_destroy (eo->se);
767 eo->se = NULL; 702 eo->se = NULL;
703 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
704 diff, 1<<get_order_from_difference (diff));
768 send_ibf (eo, get_order_from_difference (diff)); 705 send_ibf (eo, get_order_from_difference (diff));
769} 706}
770 707
@@ -784,7 +721,7 @@ send_element_iterator (void *cls,
784{ 721{
785 struct SendElementClosure *sec = cls; 722 struct SendElementClosure *sec = cls;
786 struct IBF_Key ibf_key = sec->ibf_key; 723 struct IBF_Key ibf_key = sec->ibf_key;
787 struct UnionEvaluateOperation *eo = sec->eo; 724 struct OperationState *eo = sec->eo;
788 struct KeyEntry *ke = value; 725 struct KeyEntry *ke = value;
789 726
790 if (ke->ibf_key.key_val != ibf_key.key_val) 727 if (ke->ibf_key.key_val != ibf_key.key_val)
@@ -792,20 +729,21 @@ send_element_iterator (void *cls,
792 while (NULL != ke) 729 while (NULL != ke)
793 { 730 {
794 const struct GNUNET_SET_Element *const element = &ke->element->element; 731 const struct GNUNET_SET_Element *const element = &ke->element->element;
795 struct GNUNET_MQ_Envelope *mqm; 732 struct GNUNET_MQ_Envelope *ev;
796 struct GNUNET_MessageHeader *mh; 733 struct GNUNET_MessageHeader *mh;
797 734
798 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); 735 GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
799 mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); 736 ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
800 if (NULL == mqm) 737 if (NULL == ev)
801 { 738 {
802 /* element too large */ 739 /* element too large */
803 GNUNET_break (0); 740 GNUNET_break (0);
804 continue; 741 continue;
805 } 742 }
806 memcpy (&mh[1], element->data, element->size); 743 memcpy (&mh[1], element->data, element->size);
807 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); 744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
808 GNUNET_MQ_send (eo->tc->mq, mqm); 745 GNUNET_h2s (&ke->element->element_hash));
746 GNUNET_MQ_send (eo->mq, ev);
809 ke = ke->next_colliding; 747 ke = ke->next_colliding;
810 } 748 }
811 return GNUNET_NO; 749 return GNUNET_NO;
@@ -819,7 +757,7 @@ send_element_iterator (void *cls,
819 * @param ibf_key IBF key of interest 757 * @param ibf_key IBF key of interest
820 */ 758 */
821static void 759static void
822send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key) 760send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
823{ 761{
824 struct SendElementClosure send_cls; 762 struct SendElementClosure send_cls;
825 763
@@ -837,10 +775,12 @@ send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key ibf_key
837 * @param eo union operation 775 * @param eo union operation
838 */ 776 */
839static void 777static void
840decode_and_send (struct UnionEvaluateOperation *eo) 778decode_and_send (struct OperationState *eo)
841{ 779{
842 struct IBF_Key key; 780 struct IBF_Key key;
781 struct IBF_Key last_key;
843 int side; 782 int side;
783 unsigned int num_decoded;
844 struct InvertibleBloomFilter *diff_ibf; 784 struct InvertibleBloomFilter *diff_ibf;
845 785
846 GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase); 786 GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase);
@@ -849,12 +789,35 @@ decode_and_send (struct UnionEvaluateOperation *eo)
849 diff_ibf = ibf_dup (eo->local_ibf); 789 diff_ibf = ibf_dup (eo->local_ibf);
850 ibf_subtract (diff_ibf, eo->remote_ibf); 790 ibf_subtract (diff_ibf, eo->remote_ibf);
851 791
792 ibf_destroy (eo->remote_ibf);
793 eo->remote_ibf = NULL;
794
795 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
796
797 num_decoded = 0;
798 last_key.key_val = 0;
799
852 while (1) 800 while (1)
853 { 801 {
854 int res; 802 int res;
803 int cycle_detected = GNUNET_NO;
804
805 last_key = key;
855 806
856 res = ibf_decode (diff_ibf, &side, &key); 807 res = ibf_decode (diff_ibf, &side, &key);
857 if (GNUNET_SYSERR == res) 808 if (res == GNUNET_OK)
809 {
810 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
811 key.key_val);
812 num_decoded += 1;
813 if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val))
814 {
815 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n",
816 num_decoded, diff_ibf->size);
817 cycle_detected = GNUNET_YES;
818 }
819 }
820 if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
858 { 821 {
859 int next_order; 822 int next_order;
860 next_order = 0; 823 next_order = 0;
@@ -877,28 +840,36 @@ decode_and_send (struct UnionEvaluateOperation *eo)
877 } 840 }
878 if (GNUNET_NO == res) 841 if (GNUNET_NO == res)
879 { 842 {
880 struct GNUNET_MQ_Envelope *mqm; 843 struct GNUNET_MQ_Envelope *ev;
881 844
882 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); 845 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
883 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 846 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
884 GNUNET_MQ_send (eo->tc->mq, mqm); 847 GNUNET_MQ_send (eo->mq, ev);
885 break; 848 break;
886 } 849 }
887 if (1 == side) 850 if (1 == side)
888 { 851 {
889 send_elements_for_key (eo, key); 852 send_elements_for_key (eo, key);
890 } 853 }
891 else 854 else if (-1 == side)
892 { 855 {
893 struct GNUNET_MQ_Envelope *mqm; 856 struct GNUNET_MQ_Envelope *ev;
894 struct GNUNET_MessageHeader *msg; 857 struct GNUNET_MessageHeader *msg;
895 858
896 /* FIXME: before sending the request, check if we may just have the element */ 859 /* FIXME: before sending the request, check if we may just have the element */
897 /* FIXME: merge multiple requests */ 860 /* FIXME: merge multiple requests */
898 mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), 861 /* FIXME: remember somewhere that we already requested the element,
862 * so that we don't request it again with the next ibf if decoding fails */
863 ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
899 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); 864 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
865
900 *(struct IBF_Key *) &msg[1] = key; 866 *(struct IBF_Key *) &msg[1] = key;
901 GNUNET_MQ_send (eo->tc->mq, mqm); 867 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
868 GNUNET_MQ_send (eo->mq, ev);
869 }
870 else
871 {
872 GNUNET_assert (0);
902 } 873 }
903 } 874 }
904 ibf_destroy (diff_ibf); 875 ibf_destroy (diff_ibf);
@@ -914,7 +885,7 @@ decode_and_send (struct UnionEvaluateOperation *eo)
914static void 885static void
915handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) 886handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
916{ 887{
917 struct UnionEvaluateOperation *eo = cls; 888 struct OperationState *eo = cls;
918 struct IBFMessage *msg = (struct IBFMessage *) mh; 889 struct IBFMessage *msg = (struct IBFMessage *) mh;
919 unsigned int buckets_in_message; 890 unsigned int buckets_in_message;
920 891
@@ -923,12 +894,14 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
923 { 894 {
924 eo->phase = PHASE_EXPECT_IBF_CONT; 895 eo->phase = PHASE_EXPECT_IBF_CONT;
925 GNUNET_assert (NULL == eo->remote_ibf); 896 GNUNET_assert (NULL == eo->remote_ibf);
926 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 1<<msg->order); 897 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
927 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); 898 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
899 eo->ibf_buckets_received = 0;
928 if (0 != ntohs (msg->offset)) 900 if (0 != ntohs (msg->offset))
929 { 901 {
930 GNUNET_break (0); 902 GNUNET_break (0);
931 fail_union_operation (eo); 903 fail_union_operation (eo);
904 return;
932 } 905 }
933 } 906 }
934 else if (eo->phase == PHASE_EXPECT_IBF_CONT) 907 else if (eo->phase == PHASE_EXPECT_IBF_CONT)
@@ -944,6 +917,13 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
944 917
945 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; 918 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
946 919
920 if (0 == buckets_in_message)
921 {
922 GNUNET_break_op (0);
923 fail_union_operation (eo);
924 return;
925 }
926
947 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) 927 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
948 { 928 {
949 GNUNET_break (0); 929 GNUNET_break (0);
@@ -956,8 +936,7 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
956 936
957 if (eo->ibf_buckets_received == eo->remote_ibf->size) 937 if (eo->ibf_buckets_received == eo->remote_ibf->size)
958 { 938 {
959 939 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
960 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
961 eo->phase = PHASE_EXPECT_ELEMENTS; 940 eo->phase = PHASE_EXPECT_ELEMENTS;
962 decode_and_send (eo); 941 decode_and_send (eo);
963 } 942 }
@@ -972,24 +951,26 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
972 * @param element element to send 951 * @param element element to send
973 */ 952 */
974static void 953static void
975send_client_element (struct UnionEvaluateOperation *eo, 954send_client_element (struct OperationState *eo,
976 struct GNUNET_SET_Element *element) 955 struct GNUNET_SET_Element *element)
977{ 956{
978 struct GNUNET_MQ_Envelope *mqm; 957 struct GNUNET_MQ_Envelope *ev;
979 struct GNUNET_SET_ResultMessage *rm; 958 struct GNUNET_SET_ResultMessage *rm;
980 959
981 GNUNET_assert (0 != eo->request_id); 960 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
982 mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); 961 GNUNET_assert (0 != eo->spec->client_request_id);
983 if (NULL == mqm) 962 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
963 if (NULL == ev)
984 { 964 {
985 GNUNET_MQ_discard (mqm); 965 GNUNET_MQ_discard (ev);
986 GNUNET_break (0); 966 GNUNET_break (0);
987 return; 967 return;
988 } 968 }
989 rm->result_status = htons (GNUNET_SET_STATUS_OK); 969 rm->result_status = htons (GNUNET_SET_STATUS_OK);
990 rm->request_id = htonl (eo->request_id); 970 rm->request_id = htonl (eo->spec->client_request_id);
971 rm->element_type = element->type;
991 memcpy (&rm[1], element->data, element->size); 972 memcpy (&rm[1], element->data, element->size);
992 GNUNET_MQ_send (eo->set->client_mq, mqm); 973 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
993} 974}
994 975
995 976
@@ -1002,17 +983,22 @@ send_client_element (struct UnionEvaluateOperation *eo,
1002 * @param eo union operation 983 * @param eo union operation
1003 */ 984 */
1004static void 985static void
1005send_client_done_and_destroy (struct UnionEvaluateOperation *eo) 986send_client_done_and_destroy (struct OperationState *eo)
1006{ 987{
1007 struct GNUNET_MQ_Envelope *mqm; 988 struct GNUNET_MQ_Envelope *ev;
1008 struct GNUNET_SET_ResultMessage *rm; 989 struct GNUNET_SET_ResultMessage *rm;
1009 990
1010 GNUNET_assert (0 != eo->request_id); 991 GNUNET_assert (GNUNET_NO == eo->client_done_sent);
1011 mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 992
1012 rm->request_id = htonl (eo->request_id); 993 eo->client_done_sent = GNUNET_YES;
994
995 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
996 rm->request_id = htonl (eo->spec->client_request_id);
1013 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 997 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1014 GNUNET_MQ_send (eo->set->client_mq, mqm); 998 rm->element_type = htons (0);
999 GNUNET_MQ_send (eo->spec->set->client_mq, ev);
1015 1000
1001 union_operation_destroy (eo);
1016} 1002}
1017 1003
1018 1004
@@ -1025,11 +1011,11 @@ send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
1025static void 1011static void
1026handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) 1012handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1027{ 1013{
1028 struct UnionEvaluateOperation *eo = cls; 1014 struct OperationState *eo = cls;
1029 struct ElementEntry *ee; 1015 struct ElementEntry *ee;
1030 uint16_t element_size; 1016 uint16_t element_size;
1031 1017
1032 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n"); 1018 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1033 1019
1034 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && 1020 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
1035 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) 1021 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
@@ -1041,13 +1027,15 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1041 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); 1027 element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
1042 ee = GNUNET_malloc (sizeof *ee + element_size); 1028 ee = GNUNET_malloc (sizeof *ee + element_size);
1043 memcpy (&ee[1], &mh[1], element_size); 1029 memcpy (&ee[1], &mh[1], element_size);
1030 ee->element.size = element_size;
1044 ee->element.data = &ee[1]; 1031 ee->element.data = &ee[1];
1045 ee->remote = GNUNET_YES; 1032 ee->remote = GNUNET_YES;
1033 GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
1046 1034
1047 insert_element (eo, ee); 1035 /* FIXME: see if the element has already been inserted! */
1048 send_client_element (eo, &ee->element);
1049 1036
1050 GNUNET_free (ee); 1037 op_register_element (eo, ee);
1038 send_client_element (eo, &ee->element);
1051} 1039}
1052 1040
1053 1041
@@ -1060,7 +1048,7 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1060static void 1048static void
1061handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) 1049handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1062{ 1050{
1063 struct UnionEvaluateOperation *eo = cls; 1051 struct OperationState *eo = cls;
1064 struct IBF_Key *ibf_key; 1052 struct IBF_Key *ibf_key;
1065 unsigned int num_keys; 1053 unsigned int num_keys;
1066 1054
@@ -1091,20 +1079,6 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1091 1079
1092 1080
1093/** 1081/**
1094 * Callback used for notifications
1095 *
1096 * @param cls closure
1097 */
1098static void
1099peer_done_sent_cb (void *cls)
1100{
1101 struct UnionEvaluateOperation *eo = cls;
1102
1103 send_client_done_and_destroy (eo);
1104}
1105
1106
1107/**
1108 * Handle a done message from a remote peer 1082 * Handle a done message from a remote peer
1109 * 1083 *
1110 * @param cls the union operation 1084 * @param cls the union operation
@@ -1113,23 +1087,22 @@ peer_done_sent_cb (void *cls)
1113static void 1087static void
1114handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) 1088handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1115{ 1089{
1116 struct UnionEvaluateOperation *eo = cls; 1090 struct OperationState *eo = cls;
1091 struct GNUNET_MQ_Envelope *ev;
1117 1092
1118 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1093 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1119 { 1094 {
1120 /* we got all requests, but still have to send our elements as response */ 1095 /* we got all requests, but still have to send our elements as response */
1121 struct GNUNET_MQ_Envelope *mqm;
1122 1096
1123 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); 1097 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1124 eo->phase = PHASE_FINISHED; 1098 eo->phase = PHASE_FINISHED;
1125 mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 1099 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1126 GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); 1100 GNUNET_MQ_send (eo->mq, ev);
1127 GNUNET_MQ_send (eo->tc->mq, mqm);
1128 return; 1101 return;
1129 } 1102 }
1130 if (eo->phase == PHASE_EXPECT_ELEMENTS) 1103 if (eo->phase == PHASE_EXPECT_ELEMENTS)
1131 { 1104 {
1132 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n"); 1105 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1133 eo->phase = PHASE_FINISHED; 1106 eo->phase = PHASE_FINISHED;
1134 send_client_done_and_destroy (eo); 1107 send_client_done_and_destroy (eo);
1135 return; 1108 return;
@@ -1143,45 +1116,38 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1143 * Evaluate a union operation with 1116 * Evaluate a union operation with
1144 * a remote peer. 1117 * a remote peer.
1145 * 1118 *
1146 * @param m the evaluate request message from the client 1119 * @param spec specification of the operation the evaluate
1147 * @param set the set to evaluate the operation with 1120 * @param tunnel tunnel already connected to the partner peer
1121 * @param tc tunnel context, passed here so all new incoming
1122 * messages are directly going to the union operations
1123 * @return a handle to the operation
1148 */ 1124 */
1149void 1125static void
1150_GSS_intersection_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) 1126intersection_evaluate (struct OperationSpecification *spec,
1127 struct GNUNET_MESH_Tunnel *tunnel,
1128 struct TunnelContext *tc)
1151{ 1129{
1152 struct IntersectionEvaluateOperation *eo; 1130 struct OperationState *eo;
1153 struct GNUNET_MessageHeader *context_msg; 1131
1154 1132 eo = GNUNET_new (struct OperationState);
1155 eo = GNUNET_new (struct IntersectionEvaluateOperation); 1133 tc->vt = _GSS_union_vt ();
1156 eo->peer = m->target_peer; 1134 tc->op = eo;
1157 eo->set = set; 1135 eo->se = strata_estimator_dup (spec->set->state->se);
1158 eo->request_id = htonl (m->request_id); 1136 eo->generation_created = spec->set->current_generation++;
1159 GNUNET_assert (0 != eo->request_id); 1137 eo->set = spec->set;
1160 eo->se = strata_estimator_dup (set->state.i->se); 1138 eo->spec = spec;
1161 eo->salt = ntohs (m->salt); 1139 eo->tunnel = tunnel;
1162 eo->app_id = m->app_id; 1140 eo->mq = GNUNET_MESH_mq_create (tunnel);
1163 1141
1164 context_msg = GNUNET_MQ_extract_nested_mh (m); 1142 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165 if (NULL != context_msg) 1143 "evaluating union operation, (app %s)\n",
1166 { 1144 GNUNET_h2s (&eo->spec->app_id));
1167 eo->context_msg = GNUNET_copy_message (context_msg);
1168 }
1169
1170 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1171 "evaluating intersection operation, (app %s)\n",
1172 GNUNET_h2s (&eo->app_id));
1173 1145
1174 eo->tc = GNUNET_new (struct TunnelContext);
1175 eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer,
1176 GNUNET_APPLICATION_TYPE_SET);
1177 GNUNET_assert (NULL != eo->tc->tunnel);
1178 eo->tc->peer = eo->peer;
1179 eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel);
1180 /* we started the operation, thus we have to send the operation request */ 1146 /* we started the operation, thus we have to send the operation request */
1181 eo->phase = PHASE_EXPECT_SE; 1147 eo->phase = PHASE_EXPECT_SE;
1182 1148
1183 GNUNET_CONTAINER_DLL_insert (eo->set->state.i->ops_head, 1149 GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1184 eo->set->state.i->ops_tail, 1150 eo->set->state->ops_tail,
1185 eo); 1151 eo);
1186 1152
1187 send_operation_request (eo); 1153 send_operation_request (eo);
@@ -1191,29 +1157,33 @@ _GSS_intersection_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *se
1191/** 1157/**
1192 * Accept an union operation request from a remote peer 1158 * Accept an union operation request from a remote peer
1193 * 1159 *
1194 * @param m the accept message from the client 1160 * @param spec all necessary information about the operation
1195 * @param set the set of the client 1161 * @param tunnel open tunnel to the partner's peer
1196 * @param incoming information about the requesting remote peer 1162 * @param tc tunnel context, passed here so all new incoming
1163 * messages are directly going to the union operations
1164 * @return operation
1197 */ 1165 */
1198void 1166static void
1199_GSS_intersection_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, 1167intersection_accept (struct OperationSpecification *spec,
1200 struct Incoming *incoming) 1168 struct GNUNET_MESH_Tunnel *tunnel,
1169 struct TunnelContext *tc)
1201{ 1170{
1202 struct IntersectionEvaluateOperation *eo; 1171 struct OperationState *eo;
1203 1172
1204 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); 1173 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1205 1174
1206 eo = GNUNET_new (struct IntersectionEvaluateOperation); 1175 eo = GNUNET_new (struct OperationState);
1207 eo->tc = incoming->tc; 1176 tc->vt = _GSS_union_vt ();
1208 eo->generation_created = set->state.i->current_generation++; 1177 tc->op = eo;
1209 eo->set = set; 1178 eo->set = spec->set;
1210 eo->salt = ntohs (incoming->salt); 1179 eo->generation_created = eo->set->current_generation++;
1211 GNUNET_assert (0 != ntohl (m->request_id)); 1180 eo->spec = spec;
1212 eo->request_id = ntohl (m->request_id); 1181 eo->tunnel = tunnel;
1213 eo->se = strata_estimator_dup (set->state.i->se); 1182 eo->mq = GNUNET_MESH_mq_create (tunnel);
1183 eo->se = strata_estimator_dup (eo->set->state->se);
1214 /* transfer ownership of mq and socket from incoming to eo */ 1184 /* transfer ownership of mq and socket from incoming to eo */
1215 GNUNET_CONTAINER_DLL_insert (eo->set->state.i->ops_head, 1185 GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1216 eo->set->state.i->ops_tail, 1186 eo->set->state->ops_tail,
1217 eo); 1187 eo);
1218 /* kick off the operation */ 1188 /* kick off the operation */
1219 send_strata_estimator (eo); 1189 send_strata_estimator (eo);
@@ -1221,156 +1191,86 @@ _GSS_intersection_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *
1221 1191
1222 1192
1223/** 1193/**
1224 * Create a new set supporting the intersection operation 1194 * Create a new set supporting the union operation
1225 * 1195 *
1226 * @return the newly created set 1196 * @return the newly created set
1227 */ 1197 */
1228struct Set * 1198static struct SetState *
1229_GSS_intersection_set_create (void) 1199intersection_set_create (void)
1230{ 1200{
1231 struct Set *set; 1201 struct SetState *set_state;
1232 1202
1233 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "intersection set created\n"); 1203 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
1234 1204
1235 set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct IntersectionState)); 1205 set_state = GNUNET_new (struct SetState);
1236 set->state.i = (struct IntersectionState *) &set[1]; 1206 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1237 set->operation = GNUNET_SET_OPERATION_INTERSECTION;
1238 /* keys of the hash map are stored in the element entrys, thus we do not
1239 * want the hash map to copy them */
1240 set->state.i->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
1241 set->state.i->se = strata_estimator_create (SE_STRATA_COUNT,
1242 SE_IBF_SIZE, SE_IBF_HASH_NUM); 1207 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1243 return set; 1208 return set_state;
1244} 1209}
1245 1210
1246 1211
1247/** 1212/**
1248 * Add the element from the given element message to the set. 1213 * Add the element from the given element message to the set.
1249 * 1214 *
1250 * @param m message with the element 1215 * @param set_state state of the set want to add to
1251 * @param set set to add the element to 1216 * @param ee the element to add to the set
1252 */ 1217 */
1253void 1218static void
1254_GSS_intersection_add (struct GNUNET_SET_ElementMessage *m, struct Set *set) 1219intersection_add (struct SetState *set_state, struct ElementEntry *ee)
1255{ 1220{
1256 struct ElementEntry *ee; 1221 strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
1257 struct ElementEntry *ee_dup;
1258 uint16_t element_size;
1259
1260 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
1261
1262 GNUNET_assert (GNUNET_SET_OPERATION_INTERSECTION == set->operation);
1263 element_size = ntohs (m->header.size) - sizeof *m;
1264 ee = GNUNET_malloc (element_size + sizeof *ee);
1265 ee->element.size = element_size;
1266 memcpy (&ee[1], &m[1], element_size);
1267 ee->element.data = &ee[1];
1268 ee->generation_added = set->state.i->current_generation;
1269 GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
1270 ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.i->elements, &ee->element_hash);
1271 if (NULL != ee_dup)
1272 {
1273 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
1274 GNUNET_free (ee);
1275 return;
1276 }
1277 GNUNET_CONTAINER_multihashmap_put (set->state.i->elements, &ee->element_hash, ee,
1278 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1279 strata_estimator_insert (set->state.i->se, get_ibf_key (&ee->element_hash, 0));
1280} 1222}
1281 1223
1282 1224
1283/** 1225/**
1284 * Destroy a set that supports the union operation 1226 * Destroy a set that supports the union operation
1285 * 1227 *
1286 * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION 1228 * @param set_state the set to destroy
1287 */ 1229 */
1288void 1230static void
1289_GSS_union_set_destroy (struct Set *set) 1231intersection_set_destroy (struct SetState *set_state)
1290{ 1232{
1291 GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation); 1233 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
1292 if (NULL != set->client) 1234 /* important to destroy operations before the rest of the set */
1293 { 1235 while (NULL != set_state->ops_head)
1294 GNUNET_SERVER_client_drop (set->client); 1236 union_operation_destroy (set_state->ops_head);
1295 set->client = NULL; 1237 if (NULL != set_state->se)
1296 }
1297 if (NULL != set->client_mq)
1298 {
1299 GNUNET_MQ_destroy (set->client_mq);
1300 set->client_mq = NULL;
1301 }
1302
1303 if (NULL != set->state.u->se)
1304 {
1305 strata_estimator_destroy (set->state.u->se);
1306 set->state.u->se = NULL;
1307 }
1308
1309 destroy_elements (set->state.u);
1310
1311 while (NULL != set->state.u->ops_head)
1312 { 1238 {
1313 _GSS_union_operation_destroy (set->state.u->ops_head); 1239 strata_estimator_destroy (set_state->se);
1240 set_state->se = NULL;
1314 } 1241 }
1242 GNUNET_free (set_state);
1315} 1243}
1316 1244
1245
1317/** 1246/**
1318 * Remove the element given in the element message from the set. 1247 * Remove the element given in the element message from the set.
1319 * Only marks the element as removed, so that older set operations can still exchange it. 1248 * Only marks the element as removed, so that older set operations can still exchange it.
1320 * 1249 *
1321 * @param m message with the element 1250 * @param set_state state of the set to remove from
1322 * @param set set to remove the element from 1251 * @param element set element to remove
1323 */ 1252 */
1324void 1253static void
1325_GSS_intersection_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) 1254intersection_remove (struct SetState *set_state, struct ElementEntry *element)
1326{ 1255{
1327 struct GNUNET_HashCode hash; 1256 /* FIXME: remove from strata estimator */
1328 struct ElementEntry *ee;
1329
1330 GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
1331 GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
1332 ee = GNUNET_CONTAINER_multihashmap_get (set->state.i->elements, &hash);
1333 if (NULL == ee)
1334 {
1335 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove non-existing element\n");
1336 return;
1337 }
1338 if (GNUNET_YES == ee->removed)
1339 {
1340 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove element twice\n");
1341 return;
1342 }
1343 ee->removed = GNUNET_YES;
1344 ee->generation_removed = set->state.i->current_generation;
1345} 1257}
1346 1258
1347 1259
1348/** 1260/**
1349 * Dispatch messages for a union operation. 1261 * Dispatch messages for a union operation.
1350 * 1262 *
1351 * @param cls closure 1263 * @param eo the state of the union evaluate operation
1352 * @param tunnel mesh tunnel 1264 * @param mh the received message
1353 * @param tunnel_ctx tunnel context 1265 * @return GNUNET_SYSERR if the tunnel should be disconnected,
1354 * @param mh message to process 1266 * GNUNET_OK otherwise
1355 * @return ???
1356 */ 1267 */
1357int 1268int
1358_GSS_union_handle_p2p_message (void *cls, 1269intersection_handle_p2p_message (struct OperationState *eo,
1359 struct GNUNET_MESH_Tunnel *tunnel, 1270 const struct GNUNET_MessageHeader *mh)
1360 void **tunnel_ctx,
1361 const struct GNUNET_MessageHeader *mh)
1362{ 1271{
1363 struct TunnelContext *tc = *tunnel_ctx; 1272 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
1364 struct UnionEvaluateOperation *eo; 1273 ntohs (mh->type), ntohs (mh->size));
1365
1366 if (CONTEXT_OPERATION_UNION != tc->type)
1367 {
1368 /* never kill mesh */
1369 return GNUNET_OK;
1370 }
1371
1372 eo = tc->data;
1373
1374 switch (ntohs (mh->type)) 1274 switch (ntohs (mh->type))
1375 { 1275 {
1376 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: 1276 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
@@ -1392,6 +1292,63 @@ _GSS_union_handle_p2p_message (void *cls,
1392 /* something wrong with mesh's message handlers? */ 1292 /* something wrong with mesh's message handlers? */
1393 GNUNET_assert (0); 1293 GNUNET_assert (0);
1394 } 1294 }
1395 /* never kill mesh! */
1396 return GNUNET_OK; 1295 return GNUNET_OK;
1397} 1296}
1297
1298
1299static void
1300intersection_peer_disconnect (struct OperationState *op)
1301{
1302 /* Are we already disconnected? */
1303 if (NULL == op->tunnel)
1304 return;
1305 op->tunnel = NULL;
1306 if (NULL != op->mq)
1307 {
1308 GNUNET_MQ_destroy (op->mq);
1309 op->mq = NULL;
1310 }
1311 if (PHASE_FINISHED != op->phase)
1312 {
1313 struct GNUNET_MQ_Envelope *ev;
1314 struct GNUNET_SET_ResultMessage *msg;
1315
1316 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
1317 msg->request_id = htonl (op->spec->client_request_id);
1318 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1319 msg->element_type = htons (0);
1320 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1321 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1322 union_operation_destroy (op);
1323 return;
1324 }
1325 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1326 if (GNUNET_NO == op->client_done_sent)
1327 send_client_done_and_destroy (op);
1328}
1329
1330
1331static void
1332intersection_op_cancel (struct SetState *set_state, uint32_t op_id)
1333{
1334 /* FIXME: implement */
1335}
1336
1337
1338const struct SetVT *
1339_GSS_intersection_vt ()
1340{
1341 static const struct SetVT union_vt = {
1342 .create = &union_set_create,
1343 .msg_handler = &union_handle_p2p_message,
1344 .add = &union_add,
1345 .remove = &union_remove,
1346 .destroy_set = &union_set_destroy,
1347 .evaluate = &union_evaluate,
1348 .accept = &union_accept,
1349 .peer_disconnect = &union_peer_disconnect,
1350 .cancel = &union_op_cancel,
1351 };
1352
1353 return &union_vt;
1354}