aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_union.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r--src/set/gnunet-service-set_union.c690
1 files changed, 341 insertions, 349 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 33a36d260..6ad985bcb 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -112,12 +112,6 @@ struct OperationState
112 struct GNUNET_MESH_Tunnel *tunnel; 112 struct GNUNET_MESH_Tunnel *tunnel;
113 113
114 /** 114 /**
115 * Detail information about the set operation,
116 * including the set to use.
117 */
118 struct OperationSpecification *spec;
119
120 /**
121 * Message queue for the peer. 115 * Message queue for the peer.
122 */ 116 */
123 struct GNUNET_MQ_Handle *mq; 117 struct GNUNET_MQ_Handle *mq;
@@ -151,33 +145,14 @@ struct OperationState
151 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; 145 struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
152 146
153 /** 147 /**
154 * Current state of the operation. 148 * Iterator for sending elements on the key to element mapping to the client.
155 */
156 enum UnionOperationPhase phase;
157
158 /**
159 * Generation in which the operation handle
160 * was created.
161 */
162 unsigned int generation_created;
163
164 /**
165 * Set state of the set that this operation
166 * belongs to.
167 */ 149 */
168 struct Set *set; 150 struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
169 151
170 /** 152 /**
171 * Evaluate operations are held in 153 * Current state of the operation.
172 * a linked list.
173 */ 154 */
174 struct OperationState *next; 155 enum UnionOperationPhase phase;
175
176 /**
177 * Evaluate operations are held in
178 * a linked list.
179 */
180 struct OperationState *prev;
181 156
182 /** 157 /**
183 * Did we send the client that we are done? 158 * Did we send the client that we are done?
@@ -198,13 +173,13 @@ struct KeyEntry
198 struct IBF_Key ibf_key; 173 struct IBF_Key ibf_key;
199 174
200 /** 175 /**
201 * The actual element associated with the key 176 * The actual element associated with the key.
202 */ 177 */
203 struct ElementEntry *element; 178 struct ElementEntry *element;
204 179
205 /** 180 /**
206 * Element that collides with this element 181 * Element that collides with this element
207 * on the ibf key 182 * on the ibf key. All colliding entries must have the same ibf key.
208 */ 183 */
209 struct KeyEntry *next_colliding; 184 struct KeyEntry *next_colliding;
210}; 185};
@@ -226,7 +201,7 @@ struct SendElementClosure
226 * Operation for which the elements 201 * Operation for which the elements
227 * should be sent. 202 * should be sent.
228 */ 203 */
229 struct OperationState *eo; 204 struct Operation *op;
230}; 205};
231 206
232 207
@@ -242,18 +217,6 @@ struct SetState
242 * salt=0. 217 * salt=0.
243 */ 218 */
244 struct StrataEstimator *se; 219 struct StrataEstimator *se;
245
246 /**
247 * Evaluate operations are held in
248 * a linked list.
249 */
250 struct OperationState *ops_head;
251
252 /**
253 * Evaluate operations are held in
254 * a linked list.
255 */
256 struct OperationState *ops_tail;
257}; 220};
258 221
259 222
@@ -263,9 +226,9 @@ struct SetState
263 * @param cls closure 226 * @param cls closure
264 * @param key current key code 227 * @param key current key code
265 * @param value value in the hash map 228 * @param value value in the hash map
266 * @return GNUNET_YES if we should continue to 229 * @return #GNUNET_YES if we should continue to
267 * iterate, 230 * iterate,
268 * GNUNET_NO if not. 231 * #GNUNET_NO if not.
269 */ 232 */
270static int 233static int
271destroy_key_to_element_iter (void *cls, 234destroy_key_to_element_iter (void *cls,
@@ -290,65 +253,40 @@ destroy_key_to_element_iter (void *cls,
290 253
291 254
292/** 255/**
293 * Destroy a union operation, and free all resources 256 * Destroy the union operation. Only things specific to the union operation are destroyed.
294 * associated with it. 257 *
295 * 258 * @param op union operation to destroy
296 * @param eo the union operation to destroy
297 */ 259 */
298static void 260static void
299union_operation_destroy (struct OperationState *eo) 261union_op_cancel (struct Operation *op)
300{ 262{
301 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n"); 263 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
302 GNUNET_CONTAINER_DLL_remove (eo->set->state->ops_head, 264 /* check if the op was canceled twice */
303 eo->set->state->ops_tail, 265 GNUNET_assert (NULL != op->state);
304 eo); 266 if (NULL != op->state->remote_ibf)
305 if (NULL != eo->mq)
306 {
307 GNUNET_MQ_destroy (eo->mq);
308 eo->mq = NULL;
309 }
310 if (NULL != eo->tunnel)
311 {
312 struct GNUNET_MESH_Tunnel *t = eo->tunnel;
313 eo->tunnel = NULL;
314 GNUNET_MESH_tunnel_destroy (t);
315 }
316 if (NULL != eo->remote_ibf)
317 { 267 {
318 ibf_destroy (eo->remote_ibf); 268 ibf_destroy (op->state->remote_ibf);
319 eo->remote_ibf = NULL; 269 op->state->remote_ibf = NULL;
320 } 270 }
321 if (NULL != eo->local_ibf) 271 if (NULL != op->state->local_ibf)
322 { 272 {
323 ibf_destroy (eo->local_ibf); 273 ibf_destroy (op->state->local_ibf);
324 eo->local_ibf = NULL; 274 op->state->local_ibf = NULL;
325 } 275 }
326 if (NULL != eo->se) 276 if (NULL != op->state->se)
327 { 277 {
328 strata_estimator_destroy (eo->se); 278 strata_estimator_destroy (op->state->se);
329 eo->se = NULL; 279 op->state->se = NULL;
330 } 280 }
331 if (NULL != eo->key_to_element) 281 if (NULL != op->state->key_to_element)
332 { 282 {
333 GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL); 283 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, destroy_key_to_element_iter, NULL);
334 GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); 284 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
335 eo->key_to_element = NULL; 285 op->state->key_to_element = NULL;
336 } 286 }
337 if (NULL != eo->spec) 287 GNUNET_free (op->state);
338 { 288 op->state = NULL;
339 if (NULL != eo->spec->context_msg)
340 {
341 GNUNET_free (eo->spec->context_msg);
342 eo->spec->context_msg = NULL;
343 }
344 GNUNET_free (eo->spec);
345 eo->spec = NULL;
346 }
347 GNUNET_free (eo);
348
349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n"); 289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
350
351 /* FIXME: do a garbage collection of the set generations */
352} 290}
353 291
354 292
@@ -356,20 +294,22 @@ union_operation_destroy (struct OperationState *eo)
356 * Inform the client that the union operation has failed, 294 * Inform the client that the union operation has failed,
357 * and proceed to destroy the evaluate operation. 295 * and proceed to destroy the evaluate operation.
358 * 296 *
359 * @param eo the union operation to fail 297 * @param op the union operation to fail
360 */ 298 */
361static void 299static void
362fail_union_operation (struct OperationState *eo) 300fail_union_operation (struct Operation *op)
363{ 301{
364 struct GNUNET_MQ_Envelope *ev; 302 struct GNUNET_MQ_Envelope *ev;
365 struct GNUNET_SET_ResultMessage *msg; 303 struct GNUNET_SET_ResultMessage *msg;
366 304
305 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "union operation failed\n");
306
367 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 307 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
368 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 308 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
369 msg->request_id = htonl (eo->spec->client_request_id); 309 msg->request_id = htonl (op->spec->client_request_id);
370 msg->element_type = htons (0); 310 msg->element_type = htons (0);
371 GNUNET_MQ_send (eo->spec->set->client_mq, ev); 311 GNUNET_MQ_send (op->spec->set->client_mq, ev);
372 union_operation_destroy (eo); 312 _GSS_operation_destroy (op);
373} 313}
374 314
375 315
@@ -382,7 +322,7 @@ fail_union_operation (struct OperationState *eo)
382 * @return the derived IBF key 322 * @return the derived IBF key
383 */ 323 */
384static struct IBF_Key 324static struct IBF_Key
385get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) 325get_ibf_key (const struct GNUNET_HashCode *src, uint16_t salt)
386{ 326{
387 struct IBF_Key key; 327 struct IBF_Key key;
388 328
@@ -398,40 +338,39 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt)
398/** 338/**
399 * Send a request for the evaluate operation to a remote peer 339 * Send a request for the evaluate operation to a remote peer
400 * 340 *
401 * @param eo operation with the other peer 341 * @param op operation with the other peer
402 */ 342 */
403static void 343static void
404send_operation_request (struct OperationState *eo) 344send_operation_request (struct Operation *op)
405{ 345{
406 struct GNUNET_MQ_Envelope *ev; 346 struct GNUNET_MQ_Envelope *ev;
407 struct OperationRequestMessage *msg; 347 struct OperationRequestMessage *msg;
408 348
409 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 349 ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
410 eo->spec->context_msg); 350 op->spec->context_msg);
411 351
412 if (NULL == ev) 352 if (NULL == ev)
413 { 353 {
414 /* the context message is too large */ 354 /* the context message is too large */
415 GNUNET_break (0); 355 GNUNET_break (0);
416 GNUNET_SERVER_client_disconnect (eo->spec->set->client); 356 GNUNET_SERVER_client_disconnect (op->spec->set->client);
417 return; 357 return;
418 } 358 }
419 msg->operation = htonl (GNUNET_SET_OPERATION_UNION); 359 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
420 msg->app_id = eo->spec->app_id; 360 msg->app_id = op->spec->app_id;
421 msg->salt = htonl (eo->spec->salt); 361 msg->salt = htonl (op->spec->salt);
422 GNUNET_MQ_send (eo->mq, ev); 362 GNUNET_MQ_send (op->mq, ev);
423 363
424 if (NULL != eo->spec->context_msg) 364 if (NULL != op->spec->context_msg)
425 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n"); 365 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request with context message\n");
426 else 366 else
427 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n"); 367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request without context message\n");
428 368
429 if (NULL != eo->spec->context_msg) 369 if (NULL != op->spec->context_msg)
430 { 370 {
431 GNUNET_free (eo->spec->context_msg); 371 GNUNET_free (op->spec->context_msg);
432 eo->spec->context_msg = NULL; 372 op->spec->context_msg = NULL;
433 } 373 }
434
435} 374}
436 375
437 376
@@ -442,34 +381,89 @@ send_operation_request (struct OperationState *eo)
442 * @param cls closure 381 * @param cls closure
443 * @param key current key code 382 * @param key current key code
444 * @param value value in the hash map 383 * @param value value in the hash map
445 * @return GNUNET_YES if we should continue to 384 * @return #GNUNET_YES if we should continue to
446 * iterate, 385 * iterate,
447 * GNUNET_NO if not. 386 * #GNUNET_NO if not.
448 */ 387 */
449static int 388static int
450op_register_element_iterator (void *cls, 389op_register_element_iterator (void *cls,
451 uint32_t key, 390 uint32_t key,
452 void *value) 391 void *value)
453{ 392{
454 struct KeyEntry *const new_k = cls; 393 struct KeyEntry *const new_k = cls;
455 struct KeyEntry *old_k = value; 394 struct KeyEntry *old_k = value;
456 395
457 GNUNET_assert (NULL != old_k); 396 GNUNET_assert (NULL != old_k);
458 do 397 /* check if our ibf key collides with the ibf key in the existing entry */
398 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
459 { 399 {
460 if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) 400 /* insert the the new key in the collision chain */
461 { 401 new_k->next_colliding = old_k->next_colliding;
462 new_k->next_colliding = old_k->next_colliding; 402 old_k->next_colliding = new_k;
463 old_k->next_colliding = new_k; 403 /* signal to the caller that we were able to insert into a colliding bucket */
404 return GNUNET_NO;
405 }
406 return GNUNET_YES;
407}
408
409
410/**
411 * Iterator to create the mapping between ibf keys
412 * and element entries.
413 *
414 * @param cls closure
415 * @param key current key code
416 * @param value value in the hash map
417 * @return #GNUNET_YES if we should continue to
418 * iterate,
419 * #GNUNET_NO if not.
420 */
421static int
422op_has_element_iterator (void *cls,
423 uint32_t key,
424 void *value)
425{
426 struct GNUNET_HashCode *element_hash = cls;
427 struct KeyEntry *k = value;
428
429 GNUNET_assert (NULL != k);
430 while (NULL != k)
431 {
432 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, element_hash))
464 return GNUNET_NO; 433 return GNUNET_NO;
465 } 434 k = k->next_colliding;
466 old_k = old_k->next_colliding; 435 }
467 } while (NULL != old_k);
468 return GNUNET_YES; 436 return GNUNET_YES;
469} 437}
470 438
471 439
472/** 440/**
441 * Determine whether the given element is already in the operation's element
442 * set.
443 *
444 * @param op operation that should be tested for 'element_hash'
445 * @param element_hash hash of the element to look for
446 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
447 */
448static int
449op_has_element (struct Operation *op, const struct GNUNET_HashCode *element_hash)
450{
451 int ret;
452 struct IBF_Key ibf_key;
453
454 ibf_key = get_ibf_key (element_hash, op->spec->salt);
455 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
456 (uint32_t) ibf_key.key_val,
457 op_has_element_iterator, (void *) element_hash);
458
459 /* was the iteration aborted because we found the element? */
460 if (GNUNET_SYSERR == ret)
461 return GNUNET_YES;
462 return GNUNET_NO;
463}
464
465
466/**
473 * Insert an element into the union operation's 467 * Insert an element into the union operation's
474 * key-to-element mapping. Takes ownership of 'ee'. 468 * key-to-element mapping. Takes ownership of 'ee'.
475 * Note that this does not insert the element in the set, 469 * Note that this does not insert the element in the set,
@@ -477,21 +471,21 @@ op_register_element_iterator (void *cls,
477 * This is done to speed up re-tried operations, if some elements 471 * This is done to speed up re-tried operations, if some elements
478 * were transmitted, and then the IBF fails to decode. 472 * were transmitted, and then the IBF fails to decode.
479 * 473 *
480 * @param eo the union operation 474 * @param op the union operation
481 * @param ee the element entry 475 * @param ee the element entry
482 */ 476 */
483static void 477static void
484op_register_element (struct OperationState *eo, struct ElementEntry *ee) 478op_register_element (struct Operation *op, struct ElementEntry *ee)
485{ 479{
486 int ret; 480 int ret;
487 struct IBF_Key ibf_key; 481 struct IBF_Key ibf_key;
488 struct KeyEntry *k; 482 struct KeyEntry *k;
489 483
490 ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt); 484 ibf_key = get_ibf_key (&ee->element_hash, op->spec->salt);
491 k = GNUNET_new (struct KeyEntry); 485 k = GNUNET_new (struct KeyEntry);
492 k->element = ee; 486 k->element = ee;
493 k->ibf_key = ibf_key; 487 k->ibf_key = ibf_key;
494 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, 488 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
495 (uint32_t) ibf_key.key_val, 489 (uint32_t) ibf_key.key_val,
496 op_register_element_iterator, k); 490 op_register_element_iterator, k);
497 491
@@ -499,7 +493,7 @@ op_register_element (struct OperationState *eo, struct ElementEntry *ee)
499 if (GNUNET_SYSERR == ret) 493 if (GNUNET_SYSERR == ret)
500 return; 494 return;
501 495
502 GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, 496 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, (uint32_t) ibf_key.key_val, k,
503 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 497 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
504} 498}
505 499
@@ -542,19 +536,19 @@ init_key_to_element_iterator (void *cls,
542 const struct GNUNET_HashCode *key, 536 const struct GNUNET_HashCode *key,
543 void *value) 537 void *value)
544{ 538{
545 struct OperationState *eo = cls; 539 struct Operation *op = cls;
546 struct ElementEntry *e = value; 540 struct ElementEntry *e = value;
547 541
548 /* make sure that the element belongs to the set at the time 542 /* make sure that the element belongs to the set at the time
549 * of creating the operation */ 543 * of creating the operation */
550 if ( (e->generation_added > eo->generation_created) || 544 if ( (e->generation_added > op->generation_created) ||
551 ( (GNUNET_YES == e->removed) && 545 ( (GNUNET_YES == e->removed) &&
552 (e->generation_removed < eo->generation_created))) 546 (e->generation_removed < op->generation_created)))
553 return GNUNET_YES; 547 return GNUNET_YES;
554 548
555 GNUNET_assert (GNUNET_NO == e->remote); 549 GNUNET_assert (GNUNET_NO == e->remote);
556 550
557 op_register_element (eo, e); 551 op_register_element (op, e);
558 return GNUNET_YES; 552 return GNUNET_YES;
559} 553}
560 554
@@ -563,45 +557,45 @@ init_key_to_element_iterator (void *cls,
563 * Create an ibf with the operation's elements 557 * Create an ibf with the operation's elements
564 * of the specified size 558 * of the specified size
565 * 559 *
566 * @param eo the union operation 560 * @param op the union operation
567 * @param size size of the ibf to create 561 * @param size size of the ibf to create
568 */ 562 */
569static void 563static void
570prepare_ibf (struct OperationState *eo, uint16_t size) 564prepare_ibf (struct Operation *op, uint16_t size)
571{ 565{
572 if (NULL == eo->key_to_element) 566 if (NULL == op->state->key_to_element)
573 { 567 {
574 unsigned int len; 568 unsigned int len;
575 len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements); 569 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->elements);
576 eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); 570 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
577 GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements, 571 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->elements,
578 init_key_to_element_iterator, eo); 572 init_key_to_element_iterator, op);
579 } 573 }
580 if (NULL != eo->local_ibf) 574 if (NULL != op->state->local_ibf)
581 ibf_destroy (eo->local_ibf); 575 ibf_destroy (op->state->local_ibf);
582 eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 576 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
583 GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, 577 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
584 prepare_ibf_iterator, eo->local_ibf); 578 prepare_ibf_iterator, op->state->local_ibf);
585} 579}
586 580
587 581
588/** 582/**
589 * Send an ibf of appropriate size. 583 * Send an ibf of appropriate size.
590 * 584 *
591 * @param eo the union operation 585 * @param op the union operation
592 * @param ibf_order order of the ibf to send, size=2^order 586 * @param ibf_order order of the ibf to send, size=2^order
593 */ 587 */
594static void 588static void
595send_ibf (struct OperationState *eo, uint16_t ibf_order) 589send_ibf (struct Operation *op, uint16_t ibf_order)
596{ 590{
597 unsigned int buckets_sent = 0; 591 unsigned int buckets_sent = 0;
598 struct InvertibleBloomFilter *ibf; 592 struct InvertibleBloomFilter *ibf;
599 593
600 prepare_ibf (eo, 1<<ibf_order); 594 prepare_ibf (op, 1<<ibf_order);
601 595
602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order); 596 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order);
603 597
604 ibf = eo->local_ibf; 598 ibf = op->state->local_ibf;
605 599
606 while (buckets_sent < (1 << ibf_order)) 600 while (buckets_sent < (1 << ibf_order))
607 { 601 {
@@ -624,20 +618,20 @@ send_ibf (struct OperationState *eo, uint16_t ibf_order)
624 buckets_sent += buckets_in_message; 618 buckets_sent += buckets_in_message;
625 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", 619 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
626 buckets_in_message, buckets_sent, 1<<ibf_order); 620 buckets_in_message, buckets_sent, 1<<ibf_order);
627 GNUNET_MQ_send (eo->mq, ev); 621 GNUNET_MQ_send (op->mq, ev);
628 } 622 }
629 623
630 eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; 624 op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
631} 625}
632 626
633 627
634/** 628/**
635 * Send a strata estimator to the remote peer. 629 * Send a strata estimator to the remote peer.
636 * 630 *
637 * @param eo the union operation with the remote peer 631 * @param op the union operation with the remote peer
638 */ 632 */
639static void 633static void
640send_strata_estimator (struct OperationState *eo) 634send_strata_estimator (struct Operation *op)
641{ 635{
642 struct GNUNET_MQ_Envelope *ev; 636 struct GNUNET_MQ_Envelope *ev;
643 struct GNUNET_MessageHeader *strata_msg; 637 struct GNUNET_MessageHeader *strata_msg;
@@ -645,9 +639,9 @@ send_strata_estimator (struct OperationState *eo)
645 ev = GNUNET_MQ_msg_header_extra (strata_msg, 639 ev = GNUNET_MQ_msg_header_extra (strata_msg,
646 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, 640 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE,
647 GNUNET_MESSAGE_TYPE_SET_P2P_SE); 641 GNUNET_MESSAGE_TYPE_SET_P2P_SE);
648 strata_estimator_write (eo->set->state->se, &strata_msg[1]); 642 strata_estimator_write (op->state->se, &strata_msg[1]);
649 GNUNET_MQ_send (eo->mq, ev); 643 GNUNET_MQ_send (op->mq, ev);
650 eo->phase = PHASE_EXPECT_IBF; 644 op->state->phase = PHASE_EXPECT_IBF;
651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); 645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
652} 646}
653 647
@@ -682,27 +676,27 @@ get_order_from_difference (unsigned int diff)
682static void 676static void
683handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) 677handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
684{ 678{
685 struct OperationState *eo = cls; 679 struct Operation *op = cls;
686 struct StrataEstimator *remote_se; 680 struct StrataEstimator *remote_se;
687 int diff; 681 int diff;
688 682
689 if (eo->phase != PHASE_EXPECT_SE) 683 if (op->state->phase != PHASE_EXPECT_SE)
690 { 684 {
691 fail_union_operation (eo); 685 fail_union_operation (op);
692 GNUNET_break (0); 686 GNUNET_break (0);
693 return; 687 return;
694 } 688 }
695 remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, 689 remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE,
696 SE_IBF_HASH_NUM); 690 SE_IBF_HASH_NUM);
697 strata_estimator_read (&mh[1], remote_se); 691 strata_estimator_read (&mh[1], remote_se);
698 GNUNET_assert (NULL != eo->se); 692 GNUNET_assert (NULL != op->state->se);
699 diff = strata_estimator_difference (remote_se, eo->se); 693 diff = strata_estimator_difference (remote_se, op->state->se);
700 strata_estimator_destroy (remote_se); 694 strata_estimator_destroy (remote_se);
701 strata_estimator_destroy (eo->se); 695 strata_estimator_destroy (op->state->se);
702 eo->se = NULL; 696 op->state->se = NULL;
703 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", 697 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n",
704 diff, 1<<get_order_from_difference (diff)); 698 diff, 1<<get_order_from_difference (diff));
705 send_ibf (eo, get_order_from_difference (diff)); 699 send_ibf (op, get_order_from_difference (diff));
706} 700}
707 701
708 702
@@ -721,7 +715,7 @@ send_element_iterator (void *cls,
721{ 715{
722 struct SendElementClosure *sec = cls; 716 struct SendElementClosure *sec = cls;
723 struct IBF_Key ibf_key = sec->ibf_key; 717 struct IBF_Key ibf_key = sec->ibf_key;
724 struct OperationState *eo = sec->eo; 718 struct Operation *op = sec->op;
725 struct KeyEntry *ke = value; 719 struct KeyEntry *ke = value;
726 720
727 if (ke->ibf_key.key_val != ibf_key.key_val) 721 if (ke->ibf_key.key_val != ibf_key.key_val)
@@ -743,7 +737,7 @@ send_element_iterator (void *cls,
743 memcpy (&mh[1], element->data, element->size); 737 memcpy (&mh[1], element->data, element->size);
744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n", 738 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n",
745 GNUNET_h2s (&ke->element->element_hash)); 739 GNUNET_h2s (&ke->element->element_hash));
746 GNUNET_MQ_send (eo->mq, ev); 740 GNUNET_MQ_send (op->mq, ev);
747 ke = ke->next_colliding; 741 ke = ke->next_colliding;
748 } 742 }
749 return GNUNET_NO; 743 return GNUNET_NO;
@@ -753,17 +747,17 @@ send_element_iterator (void *cls,
753 * Send all elements that have the specified IBF key 747 * Send all elements that have the specified IBF key
754 * to the remote peer of the union operation 748 * to the remote peer of the union operation
755 * 749 *
756 * @param eo union operation 750 * @param op union operation
757 * @param ibf_key IBF key of interest 751 * @param ibf_key IBF key of interest
758 */ 752 */
759static void 753static void
760send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key) 754send_elements_for_key (struct Operation *op, struct IBF_Key ibf_key)
761{ 755{
762 struct SendElementClosure send_cls; 756 struct SendElementClosure send_cls;
763 757
764 send_cls.ibf_key = ibf_key; 758 send_cls.ibf_key = ibf_key;
765 send_cls.eo = eo; 759 send_cls.op = op;
766 GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val, 760 GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, (uint32_t) ibf_key.key_val,
767 &send_element_iterator, &send_cls); 761 &send_element_iterator, &send_cls);
768} 762}
769 763
@@ -772,10 +766,10 @@ send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
772 * Decode which elements are missing on each side, and 766 * Decode which elements are missing on each side, and
773 * send the appropriate elemens and requests 767 * send the appropriate elemens and requests
774 * 768 *
775 * @param eo union operation 769 * @param op union operation
776 */ 770 */
777static void 771static void
778decode_and_send (struct OperationState *eo) 772decode_and_send (struct Operation *op)
779{ 773{
780 struct IBF_Key key; 774 struct IBF_Key key;
781 struct IBF_Key last_key; 775 struct IBF_Key last_key;
@@ -783,14 +777,14 @@ decode_and_send (struct OperationState *eo)
783 unsigned int num_decoded; 777 unsigned int num_decoded;
784 struct InvertibleBloomFilter *diff_ibf; 778 struct InvertibleBloomFilter *diff_ibf;
785 779
786 GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase); 780 GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
787 781
788 prepare_ibf (eo, eo->remote_ibf->size); 782 prepare_ibf (op, op->state->remote_ibf->size);
789 diff_ibf = ibf_dup (eo->local_ibf); 783 diff_ibf = ibf_dup (op->state->local_ibf);
790 ibf_subtract (diff_ibf, eo->remote_ibf); 784 ibf_subtract (diff_ibf, op->state->remote_ibf);
791 785
792 ibf_destroy (eo->remote_ibf); 786 ibf_destroy (op->state->remote_ibf);
793 eo->remote_ibf = NULL; 787 op->state->remote_ibf = NULL;
794 788
795 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); 789 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size);
796 790
@@ -829,7 +823,7 @@ decode_and_send (struct OperationState *eo)
829 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 823 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
830 "decoding failed, sending larger ibf (size %u)\n", 824 "decoding failed, sending larger ibf (size %u)\n",
831 1<<next_order); 825 1<<next_order);
832 send_ibf (eo, next_order); 826 send_ibf (op, next_order);
833 } 827 }
834 else 828 else
835 { 829 {
@@ -844,28 +838,26 @@ decode_and_send (struct OperationState *eo)
844 838
845 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); 839 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n");
846 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 840 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
847 GNUNET_MQ_send (eo->mq, ev); 841 GNUNET_MQ_send (op->mq, ev);
848 break; 842 break;
849 } 843 }
850 if (1 == side) 844 if (1 == side)
851 { 845 {
852 send_elements_for_key (eo, key); 846 send_elements_for_key (op, key);
853 } 847 }
854 else if (-1 == side) 848 else if (-1 == side)
855 { 849 {
856 struct GNUNET_MQ_Envelope *ev; 850 struct GNUNET_MQ_Envelope *ev;
857 struct GNUNET_MessageHeader *msg; 851 struct GNUNET_MessageHeader *msg;
858 852
859 /* FIXME: before sending the request, check if we may just have the element */ 853 /* It may be nice to merge multiple requests, but with mesh's corking it is not worth
860 /* FIXME: merge multiple requests */ 854 * the effort additional complexity. */
861 /* FIXME: remember somewhere that we already requested the element,
862 * so that we don't request it again with the next ibf if decoding fails */
863 ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), 855 ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
864 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); 856 GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
865 857
866 *(struct IBF_Key *) &msg[1] = key; 858 *(struct IBF_Key *) &msg[1] = key;
867 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n"); 859 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n");
868 GNUNET_MQ_send (eo->mq, ev); 860 GNUNET_MQ_send (op->mq, ev);
869 } 861 }
870 else 862 else
871 { 863 {
@@ -885,32 +877,32 @@ decode_and_send (struct OperationState *eo)
885static void 877static void
886handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) 878handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
887{ 879{
888 struct OperationState *eo = cls; 880 struct Operation *op = cls;
889 struct IBFMessage *msg = (struct IBFMessage *) mh; 881 struct IBFMessage *msg = (struct IBFMessage *) mh;
890 unsigned int buckets_in_message; 882 unsigned int buckets_in_message;
891 883
892 if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || 884 if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
893 (eo->phase == PHASE_EXPECT_IBF) ) 885 (op->state->phase == PHASE_EXPECT_IBF) )
894 { 886 {
895 eo->phase = PHASE_EXPECT_IBF_CONT; 887 op->state->phase = PHASE_EXPECT_IBF_CONT;
896 GNUNET_assert (NULL == eo->remote_ibf); 888 GNUNET_assert (NULL == op->state->remote_ibf);
897 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order); 889 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order);
898 eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); 890 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
899 eo->ibf_buckets_received = 0; 891 op->state->ibf_buckets_received = 0;
900 if (0 != ntohs (msg->offset)) 892 if (0 != ntohs (msg->offset))
901 { 893 {
902 GNUNET_break (0); 894 GNUNET_break (0);
903 fail_union_operation (eo); 895 fail_union_operation (op);
904 return; 896 return;
905 } 897 }
906 } 898 }
907 else if (eo->phase == PHASE_EXPECT_IBF_CONT) 899 else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
908 { 900 {
909 if ( (ntohs (msg->offset) != eo->ibf_buckets_received) || 901 if ( (ntohs (msg->offset) != op->state->ibf_buckets_received) ||
910 (1<<msg->order != eo->remote_ibf->size) ) 902 (1<<msg->order != op->state->remote_ibf->size) )
911 { 903 {
912 GNUNET_break (0); 904 GNUNET_break (0);
913 fail_union_operation (eo); 905 fail_union_operation (op);
914 return; 906 return;
915 } 907 }
916 } 908 }
@@ -920,25 +912,25 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
920 if (0 == buckets_in_message) 912 if (0 == buckets_in_message)
921 { 913 {
922 GNUNET_break_op (0); 914 GNUNET_break_op (0);
923 fail_union_operation (eo); 915 fail_union_operation (op);
924 return; 916 return;
925 } 917 }
926 918
927 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) 919 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
928 { 920 {
929 GNUNET_break (0); 921 GNUNET_break (0);
930 fail_union_operation (eo); 922 fail_union_operation (op);
931 return; 923 return;
932 } 924 }
933 925
934 ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); 926 ibf_read_slice (&msg[1], op->state->ibf_buckets_received, buckets_in_message, op->state->remote_ibf);
935 eo->ibf_buckets_received += buckets_in_message; 927 op->state->ibf_buckets_received += buckets_in_message;
936 928
937 if (eo->ibf_buckets_received == eo->remote_ibf->size) 929 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
938 { 930 {
939 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); 931 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
940 eo->phase = PHASE_EXPECT_ELEMENTS; 932 op->state->phase = PHASE_EXPECT_ELEMENTS;
941 decode_and_send (eo); 933 decode_and_send (op);
942 } 934 }
943} 935}
944 936
@@ -947,18 +939,18 @@ handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
947 * Send a result message to the client indicating 939 * Send a result message to the client indicating
948 * that there is a new element. 940 * that there is a new element.
949 * 941 *
950 * @param eo union operation 942 * @param op union operation
951 * @param element element to send 943 * @param element element to send
952 */ 944 */
953static void 945static void
954send_client_element (struct OperationState *eo, 946send_client_element (struct Operation *op,
955 struct GNUNET_SET_Element *element) 947 struct GNUNET_SET_Element *element)
956{ 948{
957 struct GNUNET_MQ_Envelope *ev; 949 struct GNUNET_MQ_Envelope *ev;
958 struct GNUNET_SET_ResultMessage *rm; 950 struct GNUNET_SET_ResultMessage *rm;
959 951
960 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size); 952 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client\n", element->size);
961 GNUNET_assert (0 != eo->spec->client_request_id); 953 GNUNET_assert (0 != op->spec->client_request_id);
962 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); 954 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
963 if (NULL == ev) 955 if (NULL == ev)
964 { 956 {
@@ -967,38 +959,112 @@ send_client_element (struct OperationState *eo,
967 return; 959 return;
968 } 960 }
969 rm->result_status = htons (GNUNET_SET_STATUS_OK); 961 rm->result_status = htons (GNUNET_SET_STATUS_OK);
970 rm->request_id = htonl (eo->spec->client_request_id); 962 rm->request_id = htonl (op->spec->client_request_id);
971 rm->element_type = element->type; 963 rm->element_type = element->type;
972 memcpy (&rm[1], element->data, element->size); 964 memcpy (&rm[1], element->data, element->size);
973 GNUNET_MQ_send (eo->spec->set->client_mq, ev); 965 GNUNET_MQ_send (op->spec->set->client_mq, ev);
974} 966}
975 967
976 968
977/** 969/**
978 * Send a result message to the client indicating 970 * Signal to the client that the operation has finished and
979 * that the operation is over. 971 * destroy the operation.
980 * After the result done message has been sent to the client,
981 * destroy the evaluate operation.
982 * 972 *
983 * @param eo union operation 973 * @param cls operation to destroy
984 */ 974 */
985static void 975static void
986send_client_done_and_destroy (struct OperationState *eo) 976send_done_and_destroy (void *cls)
987{ 977{
978 struct Operation *op = cls;
988 struct GNUNET_MQ_Envelope *ev; 979 struct GNUNET_MQ_Envelope *ev;
989 struct GNUNET_SET_ResultMessage *rm; 980 struct GNUNET_SET_ResultMessage *rm;
990
991 GNUNET_assert (GNUNET_NO == eo->client_done_sent);
992
993 eo->client_done_sent = GNUNET_YES;
994
995 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 981 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
996 rm->request_id = htonl (eo->spec->client_request_id); 982 rm->request_id = htonl (op->spec->client_request_id);
997 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 983 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
998 rm->element_type = htons (0); 984 rm->element_type = htons (0);
999 GNUNET_MQ_send (eo->spec->set->client_mq, ev); 985 GNUNET_MQ_send (op->spec->set->client_mq, ev);
986 _GSS_operation_destroy (op);
987}
988
989
990/**
991 * Send all remaining elements in the full result iterator.
992 *
993 * @param cls operation
994 */
995static void
996send_remaining_elements (void *cls)
997{
998 struct Operation *op = cls;
999 struct KeyEntry *ke;
1000 int res;
1001
1002 res = GNUNET_CONTAINER_multihashmap32_iterator_next (op->state->full_result_iter, NULL, (const void **) &ke);
1003 res = GNUNET_NO;
1004 if (GNUNET_NO == res)
1005 {
1006 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending done and destroy because iterator ran out\n");
1007 send_done_and_destroy (op);
1008 return;
1009 }
1010
1011 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending elements from key entry\n");
1012
1013 while (1)
1014 {
1015 struct GNUNET_MQ_Envelope *ev;
1016 struct GNUNET_SET_ResultMessage *rm;
1017 struct GNUNET_SET_Element *element;
1018 element = &ke->element->element;
1019
1020 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (size %u) to client (full set)\n", element->size);
1021 GNUNET_assert (0 != op->spec->client_request_id);
1022 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1023 if (NULL == ev)
1024 {
1025 GNUNET_MQ_discard (ev);
1026 GNUNET_break (0);
1027 continue;
1028 }
1029 rm->result_status = htons (GNUNET_SET_STATUS_OK);
1030 rm->request_id = htonl (op->spec->client_request_id);
1031 rm->element_type = element->type;
1032 memcpy (&rm[1], element->data, element->size);
1033 if (ke->next_colliding == NULL)
1034 {
1035 GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
1036 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1037 break;
1038 }
1039 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1040 ke = ke->next_colliding;
1041 }
1042}
1000 1043
1001 union_operation_destroy (eo); 1044
1045/**
1046 * Send a result message to the client indicating
1047 * that the operation is over.
1048 * After the result done message has been sent to the client,
1049 * destroy the evaluate operation.
1050 *
1051 * @param op union operation
1052 */
1053static void
1054finish_and_destroy (struct Operation *op)
1055{
1056 GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
1057
1058 if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
1059 {
1060 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending full result set\n");
1061 GNUNET_assert (NULL == op->state->full_result_iter);
1062 op->state->full_result_iter =
1063 GNUNET_CONTAINER_multihashmap32_iterator_create (op->state->key_to_element);
1064 send_remaining_elements (op);
1065 return;
1066 }
1067 send_done_and_destroy (op);
1002} 1068}
1003 1069
1004 1070
@@ -1011,16 +1077,16 @@ send_client_done_and_destroy (struct OperationState *eo)
1011static void 1077static void
1012handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) 1078handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1013{ 1079{
1014 struct OperationState *eo = cls; 1080 struct Operation *op = cls;
1015 struct ElementEntry *ee; 1081 struct ElementEntry *ee;
1016 uint16_t element_size; 1082 uint16_t element_size;
1017 1083
1018 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); 1084 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
1019 1085
1020 if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && 1086 if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
1021 (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) 1087 (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
1022 { 1088 {
1023 fail_union_operation (eo); 1089 fail_union_operation (op);
1024 GNUNET_break (0); 1090 GNUNET_break (0);
1025 return; 1091 return;
1026 } 1092 }
@@ -1032,12 +1098,17 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1032 ee->remote = GNUNET_YES; 1098 ee->remote = GNUNET_YES;
1033 GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); 1099 GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
1034 1100
1035 /* FIXME: see if the element has already been inserted! */ 1101 if (GNUNET_YES == op_has_element (op, &ee->element_hash))
1102 {
1103 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got existing element from peer\n");
1104 GNUNET_free (ee);
1105 return;
1106 }
1036 1107
1037 op_register_element (eo, ee); 1108 op_register_element (op, ee);
1038 /* only send results immediately if the client wants it */ 1109 /* only send results immediately if the client wants it */
1039 if (GNUNET_SET_RESULT_ADDED == eo->spec->result_mode) 1110 if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
1040 send_client_element (eo, &ee->element); 1111 send_client_element (op, &ee->element);
1041} 1112}
1042 1113
1043 1114
@@ -1050,15 +1121,15 @@ handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
1050static void 1121static void
1051handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) 1122handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1052{ 1123{
1053 struct OperationState *eo = cls; 1124 struct Operation *op = cls;
1054 struct IBF_Key *ibf_key; 1125 struct IBF_Key *ibf_key;
1055 unsigned int num_keys; 1126 unsigned int num_keys;
1056 1127
1057 /* look up elements and send them */ 1128 /* look up elements and send them */
1058 if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1129 if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1059 { 1130 {
1060 GNUNET_break (0); 1131 GNUNET_break (0);
1061 fail_union_operation (eo); 1132 fail_union_operation (op);
1062 return; 1133 return;
1063 } 1134 }
1064 1135
@@ -1067,14 +1138,14 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1067 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) 1138 if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key))
1068 { 1139 {
1069 GNUNET_break (0); 1140 GNUNET_break (0);
1070 fail_union_operation (eo); 1141 fail_union_operation (op);
1071 return; 1142 return;
1072 } 1143 }
1073 1144
1074 ibf_key = (struct IBF_Key *) &mh[1]; 1145 ibf_key = (struct IBF_Key *) &mh[1];
1075 while (0 != num_keys--) 1146 while (0 != num_keys--)
1076 { 1147 {
1077 send_elements_for_key (eo, *ibf_key); 1148 send_elements_for_key (op, *ibf_key);
1078 ibf_key++; 1149 ibf_key++;
1079 } 1150 }
1080} 1151}
@@ -1089,28 +1160,28 @@ handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
1089static void 1160static void
1090handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) 1161handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1091{ 1162{
1092 struct OperationState *eo = cls; 1163 struct Operation *op = cls;
1093 struct GNUNET_MQ_Envelope *ev; 1164 struct GNUNET_MQ_Envelope *ev;
1094 1165
1095 if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) 1166 if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
1096 { 1167 {
1097 /* we got all requests, but still have to send our elements as response */ 1168 /* we got all requests, but still have to send our elements as response */
1098 1169
1099 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n"); 1170 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after elements\n");
1100 eo->phase = PHASE_FINISHED; 1171 op->state->phase = PHASE_FINISHED;
1101 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); 1172 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
1102 GNUNET_MQ_send (eo->mq, ev); 1173 GNUNET_MQ_send (op->mq, ev);
1103 return; 1174 return;
1104 } 1175 }
1105 if (eo->phase == PHASE_EXPECT_ELEMENTS) 1176 if (op->state->phase == PHASE_EXPECT_ELEMENTS)
1106 { 1177 {
1107 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n"); 1178 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
1108 eo->phase = PHASE_FINISHED; 1179 op->state->phase = PHASE_FINISHED;
1109 send_client_done_and_destroy (eo); 1180 finish_and_destroy (op);
1110 return; 1181 return;
1111 } 1182 }
1112 GNUNET_break (0); 1183 GNUNET_break (0);
1113 fail_union_operation (eo); 1184 fail_union_operation (op);
1114} 1185}
1115 1186
1116 1187
@@ -1118,78 +1189,34 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
1118 * Evaluate a union operation with 1189 * Evaluate a union operation with
1119 * a remote peer. 1190 * a remote peer.
1120 * 1191 *
1121 * @param spec specification of the operation the evaluate 1192 * @param op operation to evaluate
1122 * @param tunnel tunnel already connected to the partner peer
1123 * @param tc tunnel context, passed here so all new incoming
1124 * messages are directly going to the union operations
1125 * @return a handle to the operation
1126 */ 1193 */
1127static void 1194static void
1128union_evaluate (struct OperationSpecification *spec, 1195union_evaluate (struct Operation *op)
1129 struct GNUNET_MESH_Tunnel *tunnel,
1130 struct TunnelContext *tc)
1131{ 1196{
1132 struct OperationState *eo; 1197 op->state = GNUNET_new (struct OperationState);
1133 1198 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1134 eo = GNUNET_new (struct OperationState);
1135 tc->vt = _GSS_union_vt ();
1136 tc->op = eo;
1137 eo->se = strata_estimator_dup (spec->set->state->se);
1138 eo->generation_created = spec->set->current_generation++;
1139 eo->set = spec->set;
1140 eo->spec = spec;
1141 eo->tunnel = tunnel;
1142 eo->tunnel = tunnel;
1143 eo->mq = GNUNET_MESH_mq_create (tunnel);
1144
1145 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1146 "evaluating union operation, (app %s)\n",
1147 GNUNET_h2s (&eo->spec->app_id));
1148
1149 /* we started the operation, thus we have to send the operation request */ 1199 /* we started the operation, thus we have to send the operation request */
1150 eo->phase = PHASE_EXPECT_SE; 1200 op->state->phase = PHASE_EXPECT_SE;
1151 1201 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "evaluating union operation");
1152 GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, 1202 send_operation_request (op);
1153 eo->set->state->ops_tail,
1154 eo);
1155
1156 send_operation_request (eo);
1157} 1203}
1158 1204
1159 1205
1160/** 1206/**
1161 * Accept an union operation request from a remote peer 1207 * Accept an union operation request from a remote peer.
1208 * Only initializes the private operation state.
1162 * 1209 *
1163 * @param spec all necessary information about the operation 1210 * @param op operation that will be accepted as a union operation
1164 * @param tunnel open tunnel to the partner's peer
1165 * @param tc tunnel context, passed here so all new incoming
1166 * messages are directly going to the union operations
1167 * @return operation
1168 */ 1211 */
1169static void 1212static void
1170union_accept (struct OperationSpecification *spec, 1213union_accept (struct Operation *op)
1171 struct GNUNET_MESH_Tunnel *tunnel,
1172 struct TunnelContext *tc)
1173{ 1214{
1174 struct OperationState *eo;
1175
1176 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n"); 1215 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
1177 1216 op->state = GNUNET_new (struct OperationState);
1178 eo = GNUNET_new (struct OperationState); 1217 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1179 tc->vt = _GSS_union_vt ();
1180 tc->op = eo;
1181 eo->set = spec->set;
1182 eo->generation_created = eo->set->current_generation++;
1183 eo->spec = spec;
1184 eo->tunnel = tunnel;
1185 eo->mq = GNUNET_MESH_mq_create (tunnel);
1186 eo->se = strata_estimator_dup (eo->set->state->se);
1187 /* transfer ownership of mq and socket from incoming to eo */
1188 GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head,
1189 eo->set->state->ops_tail,
1190 eo);
1191 /* kick off the operation */ 1218 /* kick off the operation */
1192 send_strata_estimator (eo); 1219 send_strata_estimator (op);
1193} 1220}
1194 1221
1195 1222
@@ -1240,17 +1267,13 @@ union_remove (struct SetState *set_state, struct ElementEntry *ee)
1240 1267
1241 1268
1242/** 1269/**
1243 * Destroy a set that supports the union operation 1270 * Destroy a set that supports the union operation.
1244 * 1271 *
1245 * @param set_state the set to destroy 1272 * @param set_state the set to destroy
1246 */ 1273 */
1247static void 1274static void
1248union_set_destroy (struct SetState *set_state) 1275union_set_destroy (struct SetState *set_state)
1249{ 1276{
1250 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
1251 /* important to destroy operations before the rest of the set */
1252 while (NULL != set_state->ops_head)
1253 union_operation_destroy (set_state->ops_head);
1254 if (NULL != set_state->se) 1277 if (NULL != set_state->se)
1255 { 1278 {
1256 strata_estimator_destroy (set_state->se); 1279 strata_estimator_destroy (set_state->se);
@@ -1263,13 +1286,13 @@ union_set_destroy (struct SetState *set_state)
1263/** 1286/**
1264 * Dispatch messages for a union operation. 1287 * Dispatch messages for a union operation.
1265 * 1288 *
1266 * @param eo the state of the union evaluate operation 1289 * @param op the state of the union evaluate operation
1267 * @param mh the received message 1290 * @param mh the received message
1268 * @return GNUNET_SYSERR if the tunnel should be disconnected, 1291 * @return GNUNET_SYSERR if the tunnel should be disconnected,
1269 * GNUNET_OK otherwise 1292 * GNUNET_OK otherwise
1270 */ 1293 */
1271int 1294int
1272union_handle_p2p_message (struct OperationState *eo, 1295union_handle_p2p_message (struct Operation *op,
1273 const struct GNUNET_MessageHeader *mh) 1296 const struct GNUNET_MessageHeader *mh)
1274{ 1297{
1275 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n", 1298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
@@ -1277,19 +1300,19 @@ union_handle_p2p_message (struct OperationState *eo,
1277 switch (ntohs (mh->type)) 1300 switch (ntohs (mh->type))
1278 { 1301 {
1279 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: 1302 case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
1280 handle_p2p_ibf (eo, mh); 1303 handle_p2p_ibf (op, mh);
1281 break; 1304 break;
1282 case GNUNET_MESSAGE_TYPE_SET_P2P_SE: 1305 case GNUNET_MESSAGE_TYPE_SET_P2P_SE:
1283 handle_p2p_strata_estimator (eo, mh); 1306 handle_p2p_strata_estimator (op, mh);
1284 break; 1307 break;
1285 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: 1308 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1286 handle_p2p_elements (eo, mh); 1309 handle_p2p_elements (op, mh);
1287 break; 1310 break;
1288 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: 1311 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
1289 handle_p2p_element_requests (eo, mh); 1312 handle_p2p_element_requests (op, mh);
1290 break; 1313 break;
1291 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: 1314 case GNUNET_MESSAGE_TYPE_SET_P2P_DONE:
1292 handle_p2p_done (eo, mh); 1315 handle_p2p_done (op, mh);
1293 break; 1316 break;
1294 default: 1317 default:
1295 /* something wrong with mesh's message handlers? */ 1318 /* something wrong with mesh's message handlers? */
@@ -1300,18 +1323,9 @@ union_handle_p2p_message (struct OperationState *eo,
1300 1323
1301 1324
1302static void 1325static void
1303union_peer_disconnect (struct OperationState *op) 1326union_peer_disconnect (struct Operation *op)
1304{ 1327{
1305 /* Are we already disconnected? */ 1328 if (PHASE_FINISHED != op->state->phase)
1306 if (NULL == op->tunnel)
1307 return;
1308 op->tunnel = NULL;
1309 if (NULL != op->mq)
1310 {
1311 GNUNET_MQ_destroy (op->mq);
1312 op->mq = NULL;
1313 }
1314 if (PHASE_FINISHED != op->phase)
1315 { 1329 {
1316 struct GNUNET_MQ_Envelope *ev; 1330 struct GNUNET_MQ_Envelope *ev;
1317 struct GNUNET_SET_ResultMessage *msg; 1331 struct GNUNET_SET_ResultMessage *msg;
@@ -1322,34 +1336,12 @@ union_peer_disconnect (struct OperationState *op)
1322 msg->element_type = htons (0); 1336 msg->element_type = htons (0);
1323 GNUNET_MQ_send (op->spec->set->client_mq, ev); 1337 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1324 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n"); 1338 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected prematurely\n");
1325 union_operation_destroy (op); 1339 _GSS_operation_destroy (op);
1326 return; 1340 return;
1327 } 1341 }
1328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n"); 1342 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected (finished)\n");
1329 if (GNUNET_NO == op->client_done_sent) 1343 if (GNUNET_NO == op->state->client_done_sent)
1330 send_client_done_and_destroy (op); 1344 finish_and_destroy (op);
1331}
1332
1333
1334static void
1335union_op_cancel (struct SetState *set_state, uint32_t op_id)
1336{
1337 struct OperationState *op_state;
1338 int found = GNUNET_NO;
1339 for (op_state = set_state->ops_head; NULL != op_state; op_state = op_state->next)
1340 {
1341 if (op_state->spec->client_request_id == op_id)
1342 {
1343 found = GNUNET_YES;
1344 break;
1345 }
1346 }
1347 if (GNUNET_NO == found)
1348 {
1349 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "canceling non-existing operation\n");
1350 return;
1351 }
1352 union_operation_destroy (op_state);
1353} 1345}
1354 1346
1355 1347