diff options
author | Florian Dold <florian.dold@gmail.com> | 2017-02-23 17:13:39 +0100 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2017-02-23 17:47:40 +0100 |
commit | caf375948ecc718bac6d75f415cc1c8324a9199c (patch) | |
tree | a4afe993f3c5ec837df026b72790e1349bb45387 /src | |
parent | d5af1252b2a83d75fe4a8f6e48e1d01ab027b553 (diff) | |
download | gnunet-caf375948ecc718bac6d75f415cc1c8324a9199c.tar.gz gnunet-caf375948ecc718bac6d75f415cc1c8324a9199c.zip |
implement union via sending whole set
Diffstat (limited to 'src')
-rw-r--r-- | src/include/gnunet_protocols.h | 8 | ||||
-rw-r--r-- | src/set/gnunet-service-set.c | 16 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 351 | ||||
-rw-r--r-- | src/set/test_set.conf | 2 |
4 files changed, 343 insertions, 34 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index a10c0ca5d..f478edd27 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -1800,7 +1800,13 @@ extern "C" | |||
1800 | * based on their sets and the elements we previously sent | 1800 | * based on their sets and the elements we previously sent |
1801 | * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS. | 1801 | * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS. |
1802 | */ | 1802 | */ |
1803 | #define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_GET_MISSING 597 | 1803 | #define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE 597 |
1804 | |||
1805 | /** | ||
1806 | * Send a set element, not as response to a demand but because | ||
1807 | * we're sending the full set. | ||
1808 | */ | ||
1809 | #define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT 598 | ||
1804 | 1810 | ||
1805 | 1811 | ||
1806 | /******************************************************************************* | 1812 | /******************************************************************************* |
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index a545e8a06..1072407f1 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c | |||
@@ -1371,6 +1371,10 @@ handle_client_listen (void *cls, | |||
1371 | struct GNUNET_MessageHeader, | 1371 | struct GNUNET_MessageHeader, |
1372 | NULL), | 1372 | NULL), |
1373 | GNUNET_MQ_hd_var_size (p2p_message, | 1373 | GNUNET_MQ_hd_var_size (p2p_message, |
1374 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, | ||
1375 | struct GNUNET_MessageHeader, | ||
1376 | NULL), | ||
1377 | GNUNET_MQ_hd_var_size (p2p_message, | ||
1374 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, | 1378 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, |
1375 | struct GNUNET_MessageHeader, | 1379 | struct GNUNET_MessageHeader, |
1376 | NULL), | 1380 | NULL), |
@@ -1379,6 +1383,10 @@ handle_client_listen (void *cls, | |||
1379 | struct GNUNET_MessageHeader, | 1383 | struct GNUNET_MessageHeader, |
1380 | NULL), | 1384 | NULL), |
1381 | GNUNET_MQ_hd_var_size (p2p_message, | 1385 | GNUNET_MQ_hd_var_size (p2p_message, |
1386 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, | ||
1387 | struct GNUNET_MessageHeader, | ||
1388 | NULL), | ||
1389 | GNUNET_MQ_hd_var_size (p2p_message, | ||
1382 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, | 1390 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, |
1383 | struct GNUNET_MessageHeader, | 1391 | struct GNUNET_MessageHeader, |
1384 | NULL), | 1392 | NULL), |
@@ -1634,6 +1642,14 @@ handle_client_evaluate (void *cls, | |||
1634 | struct GNUNET_MessageHeader, | 1642 | struct GNUNET_MessageHeader, |
1635 | op), | 1643 | op), |
1636 | GNUNET_MQ_hd_var_size (p2p_message, | 1644 | GNUNET_MQ_hd_var_size (p2p_message, |
1645 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE, | ||
1646 | struct GNUNET_MessageHeader, | ||
1647 | op), | ||
1648 | GNUNET_MQ_hd_var_size (p2p_message, | ||
1649 | GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT, | ||
1650 | struct GNUNET_MessageHeader, | ||
1651 | op), | ||
1652 | GNUNET_MQ_hd_var_size (p2p_message, | ||
1637 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, | 1653 | GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, |
1638 | struct GNUNET_MessageHeader, | 1654 | struct GNUNET_MessageHeader, |
1639 | op), | 1655 | op), |
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 137216ed7..d2dfe049b 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c | |||
@@ -115,14 +115,22 @@ enum UnionOperationPhase | |||
115 | * In the penultimate phase, | 115 | * In the penultimate phase, |
116 | * we wait until all our demands | 116 | * we wait until all our demands |
117 | * are satisfied. Then we send a done | 117 | * are satisfied. Then we send a done |
118 | * message, and wait for another done message.*/ | 118 | * message, and wait for another done message. |
119 | */ | ||
119 | PHASE_FINISH_WAITING, | 120 | PHASE_FINISH_WAITING, |
120 | 121 | ||
121 | /** | 122 | /** |
122 | * In the ultimate phase, we wait until | 123 | * In the ultimate phase, we wait until |
123 | * our demands are satisfied and then | 124 | * our demands are satisfied and then |
124 | * quit (sending another DONE message). */ | 125 | * quit (sending another DONE message). |
125 | PHASE_DONE | 126 | */ |
127 | PHASE_DONE, | ||
128 | |||
129 | /** | ||
130 | * After sending the full set, wait for responses with the elements | ||
131 | * that the local peer is missing. | ||
132 | */ | ||
133 | PHASE_FULL_SENDING, | ||
126 | }; | 134 | }; |
127 | 135 | ||
128 | 136 | ||
@@ -214,6 +222,14 @@ struct KeyEntry | |||
214 | * is #GNUNET_YES. | 222 | * is #GNUNET_YES. |
215 | */ | 223 | */ |
216 | struct ElementEntry *element; | 224 | struct ElementEntry *element; |
225 | |||
226 | /** | ||
227 | * Did we receive this element? | ||
228 | * Even if element->is_foreign is false, we might | ||
229 | * have received the element, so this indicates that | ||
230 | * the other peer has it. | ||
231 | */ | ||
232 | int received; | ||
217 | }; | 233 | }; |
218 | 234 | ||
219 | 235 | ||
@@ -373,6 +389,16 @@ get_ibf_key (const struct GNUNET_HashCode *src) | |||
373 | 389 | ||
374 | 390 | ||
375 | /** | 391 | /** |
392 | * Context for #op_get_element_iterator | ||
393 | */ | ||
394 | struct GetElementContext | ||
395 | { | ||
396 | struct GNUNET_HashCode hash; | ||
397 | struct KeyEntry *k; | ||
398 | }; | ||
399 | |||
400 | |||
401 | /** | ||
376 | * Iterator over the mapping from IBF keys to element entries. Checks if we | 402 | * Iterator over the mapping from IBF keys to element entries. Checks if we |
377 | * have an element with a given GNUNET_HashCode. | 403 | * have an element with a given GNUNET_HashCode. |
378 | * | 404 | * |
@@ -383,17 +409,20 @@ get_ibf_key (const struct GNUNET_HashCode *src) | |||
383 | * #GNUNET_NO if we've found the element. | 409 | * #GNUNET_NO if we've found the element. |
384 | */ | 410 | */ |
385 | static int | 411 | static int |
386 | op_has_element_iterator (void *cls, | 412 | op_get_element_iterator (void *cls, |
387 | uint32_t key, | 413 | uint32_t key, |
388 | void *value) | 414 | void *value) |
389 | { | 415 | { |
390 | struct GNUNET_HashCode *element_hash = cls; | 416 | struct GetElementContext *ctx = cls; |
391 | struct KeyEntry *k = value; | 417 | struct KeyEntry *k = value; |
392 | 418 | ||
393 | GNUNET_assert (NULL != k); | 419 | GNUNET_assert (NULL != k); |
394 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, | 420 | if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, |
395 | element_hash)) | 421 | &ctx->hash)) |
422 | { | ||
423 | ctx->k = k; | ||
396 | return GNUNET_NO; | 424 | return GNUNET_NO; |
425 | } | ||
397 | return GNUNET_YES; | 426 | return GNUNET_YES; |
398 | } | 427 | } |
399 | 428 | ||
@@ -406,23 +435,29 @@ op_has_element_iterator (void *cls, | |||
406 | * @param element_hash hash of the element to look for | 435 | * @param element_hash hash of the element to look for |
407 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise | 436 | * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise |
408 | */ | 437 | */ |
409 | static int | 438 | static struct KeyEntry * |
410 | op_has_element (struct Operation *op, | 439 | op_get_element (struct Operation *op, |
411 | const struct GNUNET_HashCode *element_hash) | 440 | const struct GNUNET_HashCode *element_hash) |
412 | { | 441 | { |
413 | int ret; | 442 | int ret; |
414 | struct IBF_Key ibf_key; | 443 | struct IBF_Key ibf_key; |
444 | struct GetElementContext ctx = { 0 }; | ||
445 | |||
446 | ctx.hash = *element_hash; | ||
415 | 447 | ||
416 | ibf_key = get_ibf_key (element_hash); | 448 | ibf_key = get_ibf_key (element_hash); |
417 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, | 449 | ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, |
418 | (uint32_t) ibf_key.key_val, | 450 | (uint32_t) ibf_key.key_val, |
419 | op_has_element_iterator, | 451 | op_get_element_iterator, |
420 | (void *) element_hash); | 452 | &ctx); |
421 | 453 | ||
422 | /* was the iteration aborted because we found the element? */ | 454 | /* was the iteration aborted because we found the element? */ |
423 | if (GNUNET_SYSERR == ret) | 455 | if (GNUNET_SYSERR == ret) |
424 | return GNUNET_YES; | 456 | { |
425 | return GNUNET_NO; | 457 | GNUNET_assert (NULL != ctx.k); |
458 | return ctx.k; | ||
459 | } | ||
460 | return NULL; | ||
426 | } | 461 | } |
427 | 462 | ||
428 | 463 | ||
@@ -438,10 +473,12 @@ op_has_element (struct Operation *op, | |||
438 | * | 473 | * |
439 | * @param op the union operation | 474 | * @param op the union operation |
440 | * @param ee the element entry | 475 | * @param ee the element entry |
476 | * @parem received was this element received from the remote peer? | ||
441 | */ | 477 | */ |
442 | static void | 478 | static void |
443 | op_register_element (struct Operation *op, | 479 | op_register_element (struct Operation *op, |
444 | struct ElementEntry *ee) | 480 | struct ElementEntry *ee, |
481 | int received) | ||
445 | { | 482 | { |
446 | struct IBF_Key ibf_key; | 483 | struct IBF_Key ibf_key; |
447 | struct KeyEntry *k; | 484 | struct KeyEntry *k; |
@@ -450,6 +487,7 @@ op_register_element (struct Operation *op, | |||
450 | k = GNUNET_new (struct KeyEntry); | 487 | k = GNUNET_new (struct KeyEntry); |
451 | k->element = ee; | 488 | k->element = ee; |
452 | k->ibf_key = ibf_key; | 489 | k->ibf_key = ibf_key; |
490 | k->received = received; | ||
453 | GNUNET_assert (GNUNET_OK == | 491 | GNUNET_assert (GNUNET_OK == |
454 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, | 492 | GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, |
455 | (uint32_t) ibf_key.key_val, | 493 | (uint32_t) ibf_key.key_val, |
@@ -535,12 +573,30 @@ init_key_to_element_iterator (void *cls, | |||
535 | 573 | ||
536 | GNUNET_assert (GNUNET_NO == ee->remote); | 574 | GNUNET_assert (GNUNET_NO == ee->remote); |
537 | 575 | ||
538 | op_register_element (op, ee); | 576 | op_register_element (op, ee, GNUNET_NO); |
539 | return GNUNET_YES; | 577 | return GNUNET_YES; |
540 | } | 578 | } |
541 | 579 | ||
542 | 580 | ||
543 | /** | 581 | /** |
582 | * Initialize the IBF key to element mapping local to this set | ||
583 | * operation. | ||
584 | * | ||
585 | * @param op the set union operation | ||
586 | */ | ||
587 | static void | ||
588 | initialize_key_to_element (struct Operation *op) | ||
589 | { | ||
590 | unsigned int len; | ||
591 | |||
592 | GNUNET_assert (NULL == op->state->key_to_element); | ||
593 | len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); | ||
594 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | ||
595 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op); | ||
596 | } | ||
597 | |||
598 | |||
599 | /** | ||
544 | * Create an ibf with the operation's elements | 600 | * Create an ibf with the operation's elements |
545 | * of the specified size | 601 | * of the specified size |
546 | * | 602 | * |
@@ -552,15 +608,8 @@ static int | |||
552 | prepare_ibf (struct Operation *op, | 608 | prepare_ibf (struct Operation *op, |
553 | uint32_t size) | 609 | uint32_t size) |
554 | { | 610 | { |
555 | if (NULL == op->state->key_to_element) | 611 | GNUNET_assert (NULL != op->state->key_to_element); |
556 | { | ||
557 | unsigned int len; | ||
558 | 612 | ||
559 | len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); | ||
560 | op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | ||
561 | GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, | ||
562 | init_key_to_element_iterator, op); | ||
563 | } | ||
564 | if (NULL != op->state->local_ibf) | 613 | if (NULL != op->state->local_ibf) |
565 | ibf_destroy (op->state->local_ibf); | 614 | ibf_destroy (op->state->local_ibf); |
566 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | 615 | op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); |
@@ -709,6 +758,47 @@ get_order_from_difference (unsigned int diff) | |||
709 | 758 | ||
710 | 759 | ||
711 | /** | 760 | /** |
761 | * Send a set element. | ||
762 | * | ||
763 | * @param cls the union operation `struct Operation *` | ||
764 | * @param key unused | ||
765 | * @param value the `struct ElementEntry *` to insert | ||
766 | * into the key-to-element mapping | ||
767 | * @return #GNUNET_YES (to continue iterating) | ||
768 | */ | ||
769 | static int | ||
770 | send_element_iterator (void *cls, | ||
771 | const struct GNUNET_HashCode *key, | ||
772 | void *value) | ||
773 | { | ||
774 | struct Operation *op = cls; | ||
775 | struct GNUNET_SET_ElementMessage *emsg; | ||
776 | struct GNUNET_SET_Element *el = value; | ||
777 | struct GNUNET_MQ_Envelope *ev; | ||
778 | |||
779 | ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); | ||
780 | emsg->element_type = htonl (el->element_type); | ||
781 | GNUNET_memcpy (&emsg[1], el->data, el->size); | ||
782 | GNUNET_MQ_send (op->mq, ev); | ||
783 | return GNUNET_YES; | ||
784 | } | ||
785 | |||
786 | |||
787 | static void | ||
788 | send_full_set (struct Operation *op) | ||
789 | { | ||
790 | struct GNUNET_MQ_Envelope *ev; | ||
791 | |||
792 | op->state->phase = PHASE_FULL_SENDING; | ||
793 | |||
794 | (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, | ||
795 | &send_element_iterator, op); | ||
796 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); | ||
797 | GNUNET_MQ_send (op->mq, ev); | ||
798 | } | ||
799 | |||
800 | |||
801 | /** | ||
712 | * Handle a strata estimator from a remote peer | 802 | * Handle a strata estimator from a remote peer |
713 | * | 803 | * |
714 | * @param cls the union operation | 804 | * @param cls the union operation |
@@ -776,16 +866,29 @@ handle_p2p_strata_estimator (void *cls, | |||
776 | "got se diff=%d, using ibf size %d\n", | 866 | "got se diff=%d, using ibf size %d\n", |
777 | diff, | 867 | diff, |
778 | 1<<get_order_from_difference (diff)); | 868 | 1<<get_order_from_difference (diff)); |
779 | if (GNUNET_OK != | 869 | |
780 | send_ibf (op, | 870 | if (diff > GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements) / 2) |
781 | get_order_from_difference (diff))) | ||
782 | { | 871 | { |
783 | /* Internal error, best we can do is shut the connection */ | 872 | LOG (GNUNET_ERROR_TYPE_INFO, |
784 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 873 | "Sending full set (diff=%d, own set=%u)\n", |
785 | "Failed to send IBF, closing connection\n"); | 874 | diff, |
786 | fail_union_operation (op); | 875 | GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements)); |
787 | return GNUNET_SYSERR; | 876 | send_full_set (op); |
877 | } | ||
878 | else | ||
879 | { | ||
880 | if (GNUNET_OK != | ||
881 | send_ibf (op, | ||
882 | get_order_from_difference (diff))) | ||
883 | { | ||
884 | /* Internal error, best we can do is shut the connection */ | ||
885 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
886 | "Failed to send IBF, closing connection\n"); | ||
887 | fail_union_operation (op); | ||
888 | return GNUNET_SYSERR; | ||
889 | } | ||
788 | } | 890 | } |
891 | |||
789 | return GNUNET_OK; | 892 | return GNUNET_OK; |
790 | } | 893 | } |
791 | 894 | ||
@@ -1288,7 +1391,9 @@ handle_p2p_elements (void *cls, | |||
1288 | 1391 | ||
1289 | op->state->received_total += 1; | 1392 | op->state->received_total += 1; |
1290 | 1393 | ||
1291 | if (GNUNET_YES == op_has_element (op, &ee->element_hash)) | 1394 | struct KeyEntry *ke = op_get_element (op, &ee->element_hash); |
1395 | |||
1396 | if (NULL != ke) | ||
1292 | { | 1397 | { |
1293 | /* Got repeated element. Should not happen since | 1398 | /* Got repeated element. Should not happen since |
1294 | * we track demands. */ | 1399 | * we track demands. */ |
@@ -1296,6 +1401,7 @@ handle_p2p_elements (void *cls, | |||
1296 | "# repeated elements", | 1401 | "# repeated elements", |
1297 | 1, | 1402 | 1, |
1298 | GNUNET_NO); | 1403 | GNUNET_NO); |
1404 | ke->received = GNUNET_YES; | ||
1299 | GNUNET_free (ee); | 1405 | GNUNET_free (ee); |
1300 | } | 1406 | } |
1301 | else | 1407 | else |
@@ -1303,7 +1409,7 @@ handle_p2p_elements (void *cls, | |||
1303 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1409 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1304 | "Registering new element from remote peer\n"); | 1410 | "Registering new element from remote peer\n"); |
1305 | op->state->received_fresh += 1; | 1411 | op->state->received_fresh += 1; |
1306 | op_register_element (op, ee); | 1412 | op_register_element (op, ee, GNUNET_YES); |
1307 | /* only send results immediately if the client wants it */ | 1413 | /* only send results immediately if the client wants it */ |
1308 | switch (op->spec->result_mode) | 1414 | switch (op->spec->result_mode) |
1309 | { | 1415 | { |
@@ -1333,6 +1439,99 @@ handle_p2p_elements (void *cls, | |||
1333 | 1439 | ||
1334 | 1440 | ||
1335 | /** | 1441 | /** |
1442 | * Handle an element message from a remote peer. | ||
1443 | * | ||
1444 | * @param cls the union operation | ||
1445 | * @param mh the message | ||
1446 | */ | ||
1447 | static void | ||
1448 | handle_p2p_full_element (void *cls, | ||
1449 | const struct GNUNET_MessageHeader *mh) | ||
1450 | { | ||
1451 | struct Operation *op = cls; | ||
1452 | struct ElementEntry *ee; | ||
1453 | const struct GNUNET_SET_ElementMessage *emsg; | ||
1454 | uint16_t element_size; | ||
1455 | |||
1456 | if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) | ||
1457 | { | ||
1458 | GNUNET_break_op (0); | ||
1459 | fail_union_operation (op); | ||
1460 | return; | ||
1461 | } | ||
1462 | |||
1463 | emsg = (const struct GNUNET_SET_ElementMessage *) mh; | ||
1464 | |||
1465 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); | ||
1466 | ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); | ||
1467 | GNUNET_memcpy (&ee[1], &emsg[1], element_size); | ||
1468 | ee->element.size = element_size; | ||
1469 | ee->element.data = &ee[1]; | ||
1470 | ee->element.element_type = ntohs (emsg->element_type); | ||
1471 | ee->remote = GNUNET_YES; | ||
1472 | GNUNET_SET_element_hash (&ee->element, &ee->element_hash); | ||
1473 | |||
1474 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1475 | "Got element (full diff, size %u, hash %s) from peer\n", | ||
1476 | (unsigned int) element_size, | ||
1477 | GNUNET_h2s (&ee->element_hash)); | ||
1478 | |||
1479 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
1480 | "# received elements", | ||
1481 | 1, | ||
1482 | GNUNET_NO); | ||
1483 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
1484 | "# exchanged elements", | ||
1485 | 1, | ||
1486 | GNUNET_NO); | ||
1487 | |||
1488 | op->state->received_total += 1; | ||
1489 | |||
1490 | struct KeyEntry *ke = op_get_element (op, &ee->element_hash); | ||
1491 | |||
1492 | if (NULL != ke) | ||
1493 | { | ||
1494 | /* Got repeated element. Should not happen since | ||
1495 | * we track demands. */ | ||
1496 | GNUNET_STATISTICS_update (_GSS_statistics, | ||
1497 | "# repeated elements", | ||
1498 | 1, | ||
1499 | GNUNET_NO); | ||
1500 | ke->received = GNUNET_YES; | ||
1501 | GNUNET_free (ee); | ||
1502 | } | ||
1503 | else | ||
1504 | { | ||
1505 | LOG (GNUNET_ERROR_TYPE_DEBUG, | ||
1506 | "Registering new element from remote peer\n"); | ||
1507 | op->state->received_fresh += 1; | ||
1508 | op_register_element (op, ee, GNUNET_YES); | ||
1509 | /* only send results immediately if the client wants it */ | ||
1510 | switch (op->spec->result_mode) | ||
1511 | { | ||
1512 | case GNUNET_SET_RESULT_ADDED: | ||
1513 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); | ||
1514 | break; | ||
1515 | case GNUNET_SET_RESULT_SYMMETRIC: | ||
1516 | send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL); | ||
1517 | break; | ||
1518 | default: | ||
1519 | /* Result mode not supported, should have been caught earlier. */ | ||
1520 | GNUNET_break (0); | ||
1521 | break; | ||
1522 | } | ||
1523 | } | ||
1524 | |||
1525 | if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3) | ||
1526 | { | ||
1527 | /* The other peer gave us lots of old elements, there's something wrong. */ | ||
1528 | GNUNET_break_op (0); | ||
1529 | fail_union_operation (op); | ||
1530 | return; | ||
1531 | } | ||
1532 | } | ||
1533 | |||
1534 | /** | ||
1336 | * Send offers (for GNUNET_Hash-es) in response | 1535 | * Send offers (for GNUNET_Hash-es) in response |
1337 | * to inquiries (for IBF_Key-s). | 1536 | * to inquiries (for IBF_Key-s). |
1338 | * | 1537 | * |
@@ -1379,6 +1578,85 @@ handle_p2p_inquiry (void *cls, | |||
1379 | 1578 | ||
1380 | 1579 | ||
1381 | /** | 1580 | /** |
1581 | * Iterator over hash map entries, called to | ||
1582 | * destroy the linked list of colliding ibf key entries. | ||
1583 | * | ||
1584 | * @param cls closure | ||
1585 | * @param key current key code | ||
1586 | * @param value value in the hash map | ||
1587 | * @return #GNUNET_YES if we should continue to iterate, | ||
1588 | * #GNUNET_NO if not. | ||
1589 | */ | ||
1590 | static int | ||
1591 | send_missing_elements_iter (void *cls, | ||
1592 | uint32_t key, | ||
1593 | void *value) | ||
1594 | { | ||
1595 | struct Operation *op = cls; | ||
1596 | struct KeyEntry *ke = value; | ||
1597 | struct GNUNET_MQ_Envelope *ev; | ||
1598 | struct GNUNET_SET_ElementMessage *emsg; | ||
1599 | struct ElementEntry *ee = ke->element; | ||
1600 | |||
1601 | if (GNUNET_YES == ke->received) | ||
1602 | return GNUNET_YES; | ||
1603 | |||
1604 | ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); | ||
1605 | GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); | ||
1606 | emsg->reserved = htons (0); | ||
1607 | emsg->element_type = htons (ee->element.element_type); | ||
1608 | GNUNET_MQ_send (op->mq, ev); | ||
1609 | |||
1610 | return GNUNET_YES; | ||
1611 | } | ||
1612 | |||
1613 | /** | ||
1614 | * Handle a "full done" message. | ||
1615 | * | ||
1616 | * @parem cls closure, a set union operation | ||
1617 | * @param mh the demand message | ||
1618 | */ | ||
1619 | static void | ||
1620 | handle_p2p_full_done (void *cls, | ||
1621 | const struct GNUNET_MessageHeader *mh) | ||
1622 | { | ||
1623 | struct Operation *op = cls; | ||
1624 | |||
1625 | if (PHASE_EXPECT_IBF == op->state->phase) | ||
1626 | { | ||
1627 | struct GNUNET_MQ_Envelope *ev; | ||
1628 | |||
1629 | LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n"); | ||
1630 | |||
1631 | /* send all the elements that did not come from the remote peer */ | ||
1632 | GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, | ||
1633 | &send_missing_elements_iter, | ||
1634 | op); | ||
1635 | |||
1636 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); | ||
1637 | GNUNET_MQ_send (op->mq, ev); | ||
1638 | op->state->phase = PHASE_DONE; | ||
1639 | |||
1640 | /* we now wait until the other peer shuts the tunnel down*/ | ||
1641 | } | ||
1642 | else if (PHASE_FULL_SENDING == op->state->phase) | ||
1643 | { | ||
1644 | LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n"); | ||
1645 | /* We sent the full set, and got the response for that. We're done. */ | ||
1646 | op->state->phase = PHASE_DONE; | ||
1647 | send_done_and_destroy (op); | ||
1648 | } | ||
1649 | else | ||
1650 | { | ||
1651 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase); | ||
1652 | GNUNET_break_op (0); | ||
1653 | fail_union_operation (op); | ||
1654 | return; | ||
1655 | } | ||
1656 | } | ||
1657 | |||
1658 | |||
1659 | /** | ||
1382 | * Handle a demand by the other peer for elements based on a list | 1660 | * Handle a demand by the other peer for elements based on a list |
1383 | * of GNUNET_HashCode-s. | 1661 | * of GNUNET_HashCode-s. |
1384 | * | 1662 | * |
@@ -1635,6 +1913,8 @@ union_evaluate (struct Operation *op, | |||
1635 | else | 1913 | else |
1636 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 1914 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
1637 | "sent op request without context message\n"); | 1915 | "sent op request without context message\n"); |
1916 | |||
1917 | initialize_key_to_element (op); | ||
1638 | } | 1918 | } |
1639 | 1919 | ||
1640 | 1920 | ||
@@ -1664,6 +1944,7 @@ union_accept (struct Operation *op) | |||
1664 | op->state->se = strata_estimator_dup (op->spec->set->state->se); | 1944 | op->state->se = strata_estimator_dup (op->spec->set->state->se); |
1665 | op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); | 1945 | op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); |
1666 | op->state->salt_receive = op->state->salt_send = 42; | 1946 | op->state->salt_receive = op->state->salt_send = 42; |
1947 | initialize_key_to_element (op); | ||
1667 | /* kick off the operation */ | 1948 | /* kick off the operation */ |
1668 | send_strata_estimator (op); | 1949 | send_strata_estimator (op); |
1669 | } | 1950 | } |
@@ -1771,6 +2052,9 @@ union_handle_p2p_message (struct Operation *op, | |||
1771 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: | 2052 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: |
1772 | handle_p2p_elements (op, mh); | 2053 | handle_p2p_elements (op, mh); |
1773 | break; | 2054 | break; |
2055 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT: | ||
2056 | handle_p2p_full_element (op, mh); | ||
2057 | break; | ||
1774 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: | 2058 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: |
1775 | handle_p2p_inquiry (op, mh); | 2059 | handle_p2p_inquiry (op, mh); |
1776 | break; | 2060 | break; |
@@ -1783,6 +2067,9 @@ union_handle_p2p_message (struct Operation *op, | |||
1783 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: | 2067 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: |
1784 | handle_p2p_demand (op, mh); | 2068 | handle_p2p_demand (op, mh); |
1785 | break; | 2069 | break; |
2070 | case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE: | ||
2071 | handle_p2p_full_done (op, mh); | ||
2072 | break; | ||
1786 | default: | 2073 | default: |
1787 | /* Something wrong with cadet's message handlers? */ | 2074 | /* Something wrong with cadet's message handlers? */ |
1788 | GNUNET_assert (0); | 2075 | GNUNET_assert (0); |
diff --git a/src/set/test_set.conf b/src/set/test_set.conf index 69e7f5c52..30ccbde55 100644 --- a/src/set/test_set.conf +++ b/src/set/test_set.conf | |||
@@ -5,7 +5,7 @@ GNUNET_TEST_HOME = /tmp/test-gnunet-set/ | |||
5 | 5 | ||
6 | [set] | 6 | [set] |
7 | AUTOSTART = YES | 7 | AUTOSTART = YES |
8 | # PREFIX = valgrind | 8 | PREFIX = valgrind |
9 | #PREFIX = valgrind --leak-check=full | 9 | #PREFIX = valgrind --leak-check=full |
10 | #PREFIX = gdbserver :1234 | 10 | #PREFIX = gdbserver :1234 |
11 | OPTIONS = -L INFO | 11 | OPTIONS = -L INFO |