aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_union.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-03-13 01:24:22 +0100
committerChristian Grothoff <christian@grothoff.org>2017-03-13 01:24:34 +0100
commitbf6f552fdefe75425635f66343f98995e2f602f6 (patch)
treeadd6ea146823579a137763b78e89839ff97b3902 /src/set/gnunet-service-set_union.c
parenta9a5994e518ded483edb87513d5197b6539ed4ff (diff)
downloadgnunet-bf6f552fdefe75425635f66343f98995e2f602f6.tar.gz
gnunet-bf6f552fdefe75425635f66343f98995e2f602f6.zip
major clean up and bugfixes of SET
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r--src/set/gnunet-service-set_union.c363
1 files changed, 196 insertions, 167 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 9eaf12fef..fc7e578e6 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -368,9 +368,10 @@ fail_union_operation (struct Operation *op)
368 "union operation failed\n"); 368 "union operation failed\n");
369 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); 369 ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
370 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); 370 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371 msg->request_id = htonl (op->spec->client_request_id); 371 msg->request_id = htonl (op->client_request_id);
372 msg->element_type = htons (0); 372 msg->element_type = htons (0);
373 GNUNET_MQ_send (op->spec->set->client_mq, ev); 373 GNUNET_MQ_send (op->set->cs->mq,
374 ev);
374 _GSS_operation_destroy (op, GNUNET_YES); 375 _GSS_operation_destroy (op, GNUNET_YES);
375} 376}
376 377
@@ -401,7 +402,14 @@ get_ibf_key (const struct GNUNET_HashCode *src)
401 */ 402 */
402struct GetElementContext 403struct GetElementContext
403{ 404{
405 /**
406 * FIXME.
407 */
404 struct GNUNET_HashCode hash; 408 struct GNUNET_HashCode hash;
409
410 /**
411 * FIXME.
412 */
405 struct KeyEntry *k; 413 struct KeyEntry *k;
406}; 414};
407 415
@@ -504,6 +512,9 @@ op_register_element (struct Operation *op,
504} 512}
505 513
506 514
515/**
516 * FIXME.
517 */
507static void 518static void
508salt_key (const struct IBF_Key *k_in, 519salt_key (const struct IBF_Key *k_in,
509 uint32_t salt, 520 uint32_t salt,
@@ -517,6 +528,9 @@ salt_key (const struct IBF_Key *k_in,
517} 528}
518 529
519 530
531/**
532 * FIXME.
533 */
520static void 534static void
521unsalt_key (const struct IBF_Key *k_in, 535unsalt_key (const struct IBF_Key *k_in,
522 uint32_t salt, 536 uint32_t salt,
@@ -550,7 +564,9 @@ prepare_ibf_iterator (void *cls,
550 (void *) op, 564 (void *) op,
551 (unsigned long) ke->ibf_key.key_val, 565 (unsigned long) ke->ibf_key.key_val,
552 GNUNET_h2s (&ke->element->element_hash)); 566 GNUNET_h2s (&ke->element->element_hash));
553 salt_key (&ke->ibf_key, op->state->salt_send, &salted_key); 567 salt_key (&ke->ibf_key,
568 op->state->salt_send,
569 &salted_key);
554 ibf_insert (op->state->local_ibf, salted_key); 570 ibf_insert (op->state->local_ibf, salted_key);
555 return GNUNET_YES; 571 return GNUNET_YES;
556} 572}
@@ -576,12 +592,14 @@ init_key_to_element_iterator (void *cls,
576 592
577 /* make sure that the element belongs to the set at the time 593 /* make sure that the element belongs to the set at the time
578 * of creating the operation */ 594 * of creating the operation */
579 if (GNUNET_NO == _GSS_is_element_of_operation (ee, op)) 595 if (GNUNET_NO ==
596 _GSS_is_element_of_operation (ee,
597 op))
580 return GNUNET_YES; 598 return GNUNET_YES;
581
582 GNUNET_assert (GNUNET_NO == ee->remote); 599 GNUNET_assert (GNUNET_NO == ee->remote);
583 600 op_register_element (op,
584 op_register_element (op, ee, GNUNET_NO); 601 ee,
602 GNUNET_NO);
585 return GNUNET_YES; 603 return GNUNET_YES;
586} 604}
587 605
@@ -598,9 +616,11 @@ initialize_key_to_element (struct Operation *op)
598 unsigned int len; 616 unsigned int len;
599 617
600 GNUNET_assert (NULL == op->state->key_to_element); 618 GNUNET_assert (NULL == op->state->key_to_element);
601 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements); 619 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
602 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); 620 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
603 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op); 621 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
622 &init_key_to_element_iterator,
623 op);
604} 624}
605 625
606 626
@@ -707,44 +727,6 @@ send_ibf (struct Operation *op,
707 727
708 728
709/** 729/**
710 * Send a strata estimator to the remote peer.
711 *
712 * @param op the union operation with the remote peer
713 */
714static void
715send_strata_estimator (struct Operation *op)
716{
717 const struct StrataEstimator *se = op->state->se;
718 struct GNUNET_MQ_Envelope *ev;
719 struct StrataEstimatorMessage *strata_msg;
720 char *buf;
721 size_t len;
722 uint16_t type;
723
724 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
725 len = strata_estimator_write (op->state->se,
726 buf);
727 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
728 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
729 else
730 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
731 ev = GNUNET_MQ_msg_extra (strata_msg,
732 len,
733 type);
734 GNUNET_memcpy (&strata_msg[1],
735 buf,
736 len);
737 GNUNET_free (buf);
738 strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
739 GNUNET_MQ_send (op->mq,
740 ev);
741 op->state->phase = PHASE_EXPECT_IBF;
742 LOG (GNUNET_ERROR_TYPE_DEBUG,
743 "sent SE, expecting IBF\n");
744}
745
746
747/**
748 * Compute the necessary order of an ibf 730 * Compute the necessary order of an ibf
749 * from the size of the symmetric set difference. 731 * from the size of the symmetric set difference.
750 * 732 *
@@ -777,7 +759,7 @@ get_order_from_difference (unsigned int diff)
777 * @return #GNUNET_YES (to continue iterating) 759 * @return #GNUNET_YES (to continue iterating)
778 */ 760 */
779static int 761static int
780send_element_iterator (void *cls, 762send_full_element_iterator (void *cls,
781 const struct GNUNET_HashCode *key, 763 const struct GNUNET_HashCode *key,
782 void *value) 764 void *value)
783{ 765{
@@ -803,16 +785,23 @@ send_element_iterator (void *cls,
803} 785}
804 786
805 787
788/**
789 * Switch to full set transmission for @a op.
790 *
791 * @param op operation to switch to full set transmission.
792 */
806static void 793static void
807send_full_set (struct Operation *op) 794send_full_set (struct Operation *op)
808{ 795{
809 struct GNUNET_MQ_Envelope *ev; 796 struct GNUNET_MQ_Envelope *ev;
810 797
811 op->state->phase = PHASE_FULL_SENDING; 798 op->state->phase = PHASE_FULL_SENDING;
799 LOG (GNUNET_ERROR_TYPE_INFO,
800 "Dedicing to transmit the full set\n");
812 /* FIXME: use a more memory-friendly way of doing this with an 801 /* FIXME: use a more memory-friendly way of doing this with an
813 iterator, just as we do in the non-full case! */ 802 iterator, just as we do in the non-full case! */
814 (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, 803 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
815 &send_element_iterator, 804 &send_full_element_iterator,
816 op); 805 op);
817 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); 806 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
818 GNUNET_MQ_send (op->mq, 807 GNUNET_MQ_send (op->mq,
@@ -923,15 +912,15 @@ handle_union_p2p_strata_estimator (void *cls,
923 } 912 }
924 } 913 }
925 914
926 if ( (GNUNET_YES == op->spec->byzantine) && 915 if ( (GNUNET_YES == op->byzantine) &&
927 (other_size < op->spec->byzantine_lower_bound) ) 916 (other_size < op->byzantine_lower_bound) )
928 { 917 {
929 GNUNET_break (0); 918 GNUNET_break (0);
930 fail_union_operation (op); 919 fail_union_operation (op);
931 return; 920 return;
932 } 921 }
933 922
934 if ( (GNUNET_YES == op->spec->force_full) || 923 if ( (GNUNET_YES == op->force_full) ||
935 (diff > op->state->initial_size / 4) || 924 (diff > op->state->initial_size / 4) ||
936 (0 == other_size) ) 925 (0 == other_size) )
937 { 926 {
@@ -1058,14 +1047,16 @@ decode_and_send (struct Operation *op)
1058 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); 1047 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1059 1048
1060 if (GNUNET_OK != 1049 if (GNUNET_OK !=
1061 prepare_ibf (op, op->state->remote_ibf->size)) 1050 prepare_ibf (op,
1051 op->state->remote_ibf->size))
1062 { 1052 {
1063 GNUNET_break (0); 1053 GNUNET_break (0);
1064 /* allocation failed */ 1054 /* allocation failed */
1065 return GNUNET_SYSERR; 1055 return GNUNET_SYSERR;
1066 } 1056 }
1067 diff_ibf = ibf_dup (op->state->local_ibf); 1057 diff_ibf = ibf_dup (op->state->local_ibf);
1068 ibf_subtract (diff_ibf, op->state->remote_ibf); 1058 ibf_subtract (diff_ibf,
1059 op->state->remote_ibf);
1069 1060
1070 ibf_destroy (op->state->remote_ibf); 1061 ibf_destroy (op->state->remote_ibf);
1071 op->state->remote_ibf = NULL; 1062 op->state->remote_ibf = NULL;
@@ -1162,8 +1153,12 @@ decode_and_send (struct Operation *op)
1162 if (1 == side) 1153 if (1 == side)
1163 { 1154 {
1164 struct IBF_Key unsalted_key; 1155 struct IBF_Key unsalted_key;
1165 unsalt_key (&key, op->state->salt_receive, &unsalted_key); 1156
1166 send_offers_for_key (op, unsalted_key); 1157 unsalt_key (&key,
1158 op->state->salt_receive,
1159 &unsalted_key);
1160 send_offers_for_key (op,
1161 unsalted_key);
1167 } 1162 }
1168 else if (-1 == side) 1163 else if (-1 == side)
1169 { 1164 {
@@ -1211,7 +1206,7 @@ check_union_p2p_ibf (void *cls,
1211 struct Operation *op = cls; 1206 struct Operation *op = cls;
1212 unsigned int buckets_in_message; 1207 unsigned int buckets_in_message;
1213 1208
1214 if (GNUNET_SET_OPERATION_UNION != op->operation) 1209 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1215 { 1210 {
1216 GNUNET_break_op (0); 1211 GNUNET_break_op (0);
1217 return GNUNET_SYSERR; 1212 return GNUNET_SYSERR;
@@ -1304,6 +1299,8 @@ handle_union_p2p_ibf (void *cls,
1304 else 1299 else
1305 { 1300 {
1306 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT); 1301 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1302 LOG (GNUNET_ERROR_TYPE_INFO,
1303 "Received more of IBF\n");
1307 } 1304 }
1308 GNUNET_assert (NULL != op->state->remote_ibf); 1305 GNUNET_assert (NULL != op->state->remote_ibf);
1309 1306
@@ -1351,7 +1348,7 @@ send_client_element (struct Operation *op,
1351 LOG (GNUNET_ERROR_TYPE_DEBUG, 1348 LOG (GNUNET_ERROR_TYPE_DEBUG,
1352 "sending element (size %u) to client\n", 1349 "sending element (size %u) to client\n",
1353 element->size); 1350 element->size);
1354 GNUNET_assert (0 != op->spec->client_request_id); 1351 GNUNET_assert (0 != op->client_request_id);
1355 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); 1352 ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
1356 if (NULL == ev) 1353 if (NULL == ev)
1357 { 1354 {
@@ -1360,11 +1357,14 @@ send_client_element (struct Operation *op,
1360 return; 1357 return;
1361 } 1358 }
1362 rm->result_status = htons (status); 1359 rm->result_status = htons (status);
1363 rm->request_id = htonl (op->spec->client_request_id); 1360 rm->request_id = htonl (op->client_request_id);
1364 rm->element_type = htons (element->element_type); 1361 rm->element_type = htons (element->element_type);
1365 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); 1362 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1366 GNUNET_memcpy (&rm[1], element->data, element->size); 1363 GNUNET_memcpy (&rm[1],
1367 GNUNET_MQ_send (op->spec->set->client_mq, ev); 1364 element->data,
1365 element->size);
1366 GNUNET_MQ_send (op->set->cs->mq,
1367 ev);
1368} 1368}
1369 1369
1370 1370
@@ -1381,14 +1381,19 @@ send_done_and_destroy (void *cls)
1381 struct GNUNET_MQ_Envelope *ev; 1381 struct GNUNET_MQ_Envelope *ev;
1382 struct GNUNET_SET_ResultMessage *rm; 1382 struct GNUNET_SET_ResultMessage *rm;
1383 1383
1384 ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); 1384 LOG (GNUNET_ERROR_TYPE_INFO,
1385 rm->request_id = htonl (op->spec->client_request_id); 1385 "Signalling client that union operation is done\n");
1386 ev = GNUNET_MQ_msg (rm,
1387 GNUNET_MESSAGE_TYPE_SET_RESULT);
1388 rm->request_id = htonl (op->client_request_id);
1386 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 1389 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1387 rm->element_type = htons (0); 1390 rm->element_type = htons (0);
1388 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element)); 1391 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1389 GNUNET_MQ_send (op->spec->set->client_mq, ev); 1392 GNUNET_MQ_send (op->set->cs->mq,
1393 ev);
1390 /* Will also call the union-specific cancel function. */ 1394 /* Will also call the union-specific cancel function. */
1391 _GSS_operation_destroy (op, GNUNET_YES); 1395 _GSS_operation_destroy (op,
1396 GNUNET_YES);
1392} 1397}
1393 1398
1394 1399
@@ -1415,8 +1420,8 @@ maybe_finish (struct Operation *op)
1415 1420
1416 op->state->phase = PHASE_DONE; 1421 op->state->phase = PHASE_DONE;
1417 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE); 1422 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
1418 GNUNET_MQ_send (op->mq, ev); 1423 GNUNET_MQ_send (op->mq,
1419 1424 ev);
1420 /* We now wait until the other peer closes the channel 1425 /* We now wait until the other peer closes the channel
1421 * after it got all elements from us. */ 1426 * after it got all elements from us. */
1422 } 1427 }
@@ -1447,7 +1452,7 @@ check_union_p2p_elements (void *cls,
1447{ 1452{
1448 struct Operation *op = cls; 1453 struct Operation *op = cls;
1449 1454
1450 if (GNUNET_SET_OPERATION_UNION != op->operation) 1455 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1451 { 1456 {
1452 GNUNET_break_op (0); 1457 GNUNET_break_op (0);
1453 return GNUNET_SYSERR; 1458 return GNUNET_SYSERR;
@@ -1535,7 +1540,7 @@ handle_union_p2p_elements (void *cls,
1535 op->state->received_fresh++; 1540 op->state->received_fresh++;
1536 op_register_element (op, ee, GNUNET_YES); 1541 op_register_element (op, ee, GNUNET_YES);
1537 /* only send results immediately if the client wants it */ 1542 /* only send results immediately if the client wants it */
1538 switch (op->spec->result_mode) 1543 switch (op->result_mode)
1539 { 1544 {
1540 case GNUNET_SET_RESULT_ADDED: 1545 case GNUNET_SET_RESULT_ADDED:
1541 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); 1546 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
@@ -1575,7 +1580,7 @@ check_union_p2p_full_element (void *cls,
1575{ 1580{
1576 struct Operation *op = cls; 1581 struct Operation *op = cls;
1577 1582
1578 if (GNUNET_SET_OPERATION_UNION != op->operation) 1583 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1579 { 1584 {
1580 GNUNET_break_op (0); 1585 GNUNET_break_op (0);
1581 return GNUNET_SYSERR; 1586 return GNUNET_SYSERR;
@@ -1644,7 +1649,7 @@ handle_union_p2p_full_element (void *cls,
1644 op->state->received_fresh++; 1649 op->state->received_fresh++;
1645 op_register_element (op, ee, GNUNET_YES); 1650 op_register_element (op, ee, GNUNET_YES);
1646 /* only send results immediately if the client wants it */ 1651 /* only send results immediately if the client wants it */
1647 switch (op->spec->result_mode) 1652 switch (op->result_mode)
1648 { 1653 {
1649 case GNUNET_SET_RESULT_ADDED: 1654 case GNUNET_SET_RESULT_ADDED:
1650 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK); 1655 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
@@ -1659,7 +1664,7 @@ handle_union_p2p_full_element (void *cls,
1659 } 1664 }
1660 } 1665 }
1661 1666
1662 if ( (GNUNET_YES == op->spec->byzantine) && 1667 if ( (GNUNET_YES == op->byzantine) &&
1663 (op->state->received_total > 384 + op->state->received_fresh * 4) && 1668 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1664 (op->state->received_fresh < op->state->received_total / 6) ) 1669 (op->state->received_fresh < op->state->received_total / 6) )
1665 { 1670 {
@@ -1690,7 +1695,7 @@ check_union_p2p_inquiry (void *cls,
1690 struct Operation *op = cls; 1695 struct Operation *op = cls;
1691 unsigned int num_keys; 1696 unsigned int num_keys;
1692 1697
1693 if (GNUNET_SET_OPERATION_UNION != op->operation) 1698 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1694 { 1699 {
1695 GNUNET_break_op (0); 1700 GNUNET_break_op (0);
1696 return GNUNET_SYSERR; 1701 return GNUNET_SYSERR;
@@ -1727,6 +1732,8 @@ handle_union_p2p_inquiry (void *cls,
1727 const struct IBF_Key *ibf_key; 1732 const struct IBF_Key *ibf_key;
1728 unsigned int num_keys; 1733 unsigned int num_keys;
1729 1734
1735 LOG (GNUNET_ERROR_TYPE_INFO,
1736 "Received union inquiry\n");
1730 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage)) 1737 num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
1731 / sizeof (struct IBF_Key); 1738 / sizeof (struct IBF_Key);
1732 ibf_key = (const struct IBF_Key *) &msg[1]; 1739 ibf_key = (const struct IBF_Key *) &msg[1];
@@ -1734,8 +1741,11 @@ handle_union_p2p_inquiry (void *cls,
1734 { 1741 {
1735 struct IBF_Key unsalted_key; 1742 struct IBF_Key unsalted_key;
1736 1743
1737 unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key); 1744 unsalt_key (ibf_key,
1738 send_offers_for_key (op, unsalted_key); 1745 ntohl (msg->salt),
1746 &unsalted_key);
1747 send_offers_for_key (op,
1748 unsalted_key);
1739 ibf_key++; 1749 ibf_key++;
1740 } 1750 }
1741 GNUNET_CADET_receive_done (op->channel); 1751 GNUNET_CADET_receive_done (op->channel);
@@ -1753,9 +1763,9 @@ handle_union_p2p_inquiry (void *cls,
1753 * #GNUNET_NO if not. 1763 * #GNUNET_NO if not.
1754 */ 1764 */
1755static int 1765static int
1756send_missing_elements_iter (void *cls, 1766send_missing_full_elements_iter (void *cls,
1757 uint32_t key, 1767 uint32_t key,
1758 void *value) 1768 void *value)
1759{ 1769{
1760 struct Operation *op = cls; 1770 struct Operation *op = cls;
1761 struct KeyEntry *ke = value; 1771 struct KeyEntry *ke = value;
@@ -1765,13 +1775,15 @@ send_missing_elements_iter (void *cls,
1765 1775
1766 if (GNUNET_YES == ke->received) 1776 if (GNUNET_YES == ke->received)
1767 return GNUNET_YES; 1777 return GNUNET_YES;
1768 1778 ev = GNUNET_MQ_msg_extra (emsg,
1769 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT); 1779 ee->element.size,
1770 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size); 1780 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1771 emsg->reserved = htons (0); 1781 GNUNET_memcpy (&emsg[1],
1782 ee->element.data,
1783 ee->element.size);
1772 emsg->element_type = htons (ee->element.element_type); 1784 emsg->element_type = htons (ee->element.element_type);
1773 GNUNET_MQ_send (op->mq, ev); 1785 GNUNET_MQ_send (op->mq,
1774 1786 ev);
1775 return GNUNET_YES; 1787 return GNUNET_YES;
1776} 1788}
1777 1789
@@ -1790,7 +1802,7 @@ handle_union_p2p_request_full (void *cls,
1790 1802
1791 LOG (GNUNET_ERROR_TYPE_INFO, 1803 LOG (GNUNET_ERROR_TYPE_INFO,
1792 "Received request for full set transmission\n"); 1804 "Received request for full set transmission\n");
1793 if (GNUNET_SET_OPERATION_UNION != op->operation) 1805 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1794 { 1806 {
1795 GNUNET_break_op (0); 1807 GNUNET_break_op (0);
1796 fail_union_operation (op); 1808 fail_union_operation (op);
@@ -1833,11 +1845,15 @@ handle_union_p2p_full_done (void *cls,
1833 1845
1834 /* send all the elements that did not come from the remote peer */ 1846 /* send all the elements that did not come from the remote peer */
1835 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 1847 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1836 &send_missing_elements_iter, 1848 &send_missing_full_elements_iter,
1837 op); 1849 op);
1838 1850
1839 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE); 1851 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1840 GNUNET_MQ_send (op->mq, ev); 1852 GNUNET_MQ_notify_sent (ev,
1853 &send_done_and_destroy,
1854 op);
1855 GNUNET_MQ_send (op->mq,
1856 ev);
1841 op->state->phase = PHASE_DONE; 1857 op->state->phase = PHASE_DONE;
1842 /* we now wait until the other peer shuts the tunnel down*/ 1858 /* we now wait until the other peer shuts the tunnel down*/
1843 } 1859 }
@@ -1880,7 +1896,7 @@ check_union_p2p_demand (void *cls,
1880 struct Operation *op = cls; 1896 struct Operation *op = cls;
1881 unsigned int num_hashes; 1897 unsigned int num_hashes;
1882 1898
1883 if (GNUNET_SET_OPERATION_UNION != op->operation) 1899 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1884 { 1900 {
1885 GNUNET_break_op (0); 1901 GNUNET_break_op (0);
1886 return GNUNET_SYSERR; 1902 return GNUNET_SYSERR;
@@ -1921,7 +1937,7 @@ handle_union_p2p_demand (void *cls,
1921 num_hashes > 0; 1937 num_hashes > 0;
1922 hash++, num_hashes--) 1938 hash++, num_hashes--)
1923 { 1939 {
1924 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, 1940 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1925 hash); 1941 hash);
1926 if (NULL == ee) 1942 if (NULL == ee)
1927 { 1943 {
@@ -1952,7 +1968,7 @@ handle_union_p2p_demand (void *cls,
1952 1, 1968 1,
1953 GNUNET_NO); 1969 GNUNET_NO);
1954 1970
1955 switch (op->spec->result_mode) 1971 switch (op->result_mode)
1956 { 1972 {
1957 case GNUNET_SET_RESULT_ADDED: 1973 case GNUNET_SET_RESULT_ADDED:
1958 /* Nothing to do. */ 1974 /* Nothing to do. */
@@ -1984,7 +2000,7 @@ check_union_p2p_offer (void *cls,
1984 struct Operation *op = cls; 2000 struct Operation *op = cls;
1985 unsigned int num_hashes; 2001 unsigned int num_hashes;
1986 2002
1987 if (GNUNET_SET_OPERATION_UNION != op->operation) 2003 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1988 { 2004 {
1989 GNUNET_break_op (0); 2005 GNUNET_break_op (0);
1990 return GNUNET_SYSERR; 2006 return GNUNET_SYSERR;
@@ -1998,8 +2014,8 @@ check_union_p2p_offer (void *cls,
1998 } 2014 }
1999 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 2015 num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
2000 / sizeof (struct GNUNET_HashCode); 2016 / sizeof (struct GNUNET_HashCode);
2001 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) 2017 if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader)) !=
2002 != num_hashes * sizeof (struct GNUNET_HashCode)) 2018 num_hashes * sizeof (struct GNUNET_HashCode))
2003 { 2019 {
2004 GNUNET_break_op (0); 2020 GNUNET_break_op (0);
2005 return GNUNET_SYSERR; 2021 return GNUNET_SYSERR;
@@ -2033,7 +2049,7 @@ handle_union_p2p_offer (void *cls,
2033 struct GNUNET_MessageHeader *demands; 2049 struct GNUNET_MessageHeader *demands;
2034 struct GNUNET_MQ_Envelope *ev; 2050 struct GNUNET_MQ_Envelope *ev;
2035 2051
2036 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, 2052 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2037 hash); 2053 hash);
2038 if (NULL != ee) 2054 if (NULL != ee)
2039 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) 2055 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
@@ -2060,7 +2076,9 @@ handle_union_p2p_offer (void *cls,
2060 ev = GNUNET_MQ_msg_header_extra (demands, 2076 ev = GNUNET_MQ_msg_header_extra (demands,
2061 sizeof (struct GNUNET_HashCode), 2077 sizeof (struct GNUNET_HashCode),
2062 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); 2078 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
2063 *(struct GNUNET_HashCode *) &demands[1] = *hash; 2079 GNUNET_memcpy (&demands[1],
2080 hash,
2081 sizeof (struct GNUNET_HashCode));
2064 GNUNET_MQ_send (op->mq, ev); 2082 GNUNET_MQ_send (op->mq, ev);
2065 } 2083 }
2066 GNUNET_CADET_receive_done (op->channel); 2084 GNUNET_CADET_receive_done (op->channel);
@@ -2079,7 +2097,7 @@ handle_union_p2p_done (void *cls,
2079{ 2097{
2080 struct Operation *op = cls; 2098 struct Operation *op = cls;
2081 2099
2082 if (GNUNET_SET_OPERATION_UNION != op->operation) 2100 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2083 { 2101 {
2084 GNUNET_break_op (0); 2102 GNUNET_break_op (0);
2085 fail_union_operation (op); 2103 fail_union_operation (op);
@@ -2134,21 +2152,31 @@ handle_union_p2p_done (void *cls,
2134 * @param opaque_context message to be transmitted to the listener 2152 * @param opaque_context message to be transmitted to the listener
2135 * to convince him to accept, may be NULL 2153 * to convince him to accept, may be NULL
2136 */ 2154 */
2137static void 2155static struct OperationState *
2138union_evaluate (struct Operation *op, 2156union_evaluate (struct Operation *op,
2139 const struct GNUNET_MessageHeader *opaque_context) 2157 const struct GNUNET_MessageHeader *opaque_context)
2140{ 2158{
2159 struct OperationState *state;
2141 struct GNUNET_MQ_Envelope *ev; 2160 struct GNUNET_MQ_Envelope *ev;
2142 struct OperationRequestMessage *msg; 2161 struct OperationRequestMessage *msg;
2143 2162
2144 GNUNET_assert (NULL == op->state); 2163 ev = GNUNET_MQ_msg_nested_mh (msg,
2145 op->state = GNUNET_new (struct OperationState); 2164 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2146 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); 2165 opaque_context);
2166 if (NULL == ev)
2167 {
2168 /* the context message is too large */
2169 GNUNET_break (0);
2170 return NULL;
2171 }
2172 state = GNUNET_new (struct OperationState);
2173 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2174 GNUNET_NO);
2147 /* copy the current generation's strata estimator for this operation */ 2175 /* copy the current generation's strata estimator for this operation */
2148 op->state->se = strata_estimator_dup (op->spec->set->state->se); 2176 state->se = strata_estimator_dup (op->set->state->se);
2149 /* we started the operation, thus we have to send the operation request */ 2177 /* we started the operation, thus we have to send the operation request */
2150 op->state->phase = PHASE_EXPECT_SE; 2178 state->phase = PHASE_EXPECT_SE;
2151 op->state->salt_receive = op->state->salt_send = 42; 2179 state->salt_receive = state->salt_send = 42; // FIXME?????
2152 LOG (GNUNET_ERROR_TYPE_DEBUG, 2180 LOG (GNUNET_ERROR_TYPE_DEBUG,
2153 "Initiating union operation evaluation\n"); 2181 "Initiating union operation evaluation\n");
2154 GNUNET_STATISTICS_update (_GSS_statistics, 2182 GNUNET_STATISTICS_update (_GSS_statistics,
@@ -2159,16 +2187,6 @@ union_evaluate (struct Operation *op,
2159 "# of initiated union operations", 2187 "# of initiated union operations",
2160 1, 2188 1,
2161 GNUNET_NO); 2189 GNUNET_NO);
2162 ev = GNUNET_MQ_msg_nested_mh (msg,
2163 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
2164 opaque_context);
2165 if (NULL == ev)
2166 {
2167 /* the context message is too large */
2168 GNUNET_break (0);
2169 GNUNET_SERVICE_client_drop (op->spec->set->client);
2170 return;
2171 }
2172 msg->operation = htonl (GNUNET_SET_OPERATION_UNION); 2190 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2173 GNUNET_MQ_send (op->mq, 2191 GNUNET_MQ_send (op->mq,
2174 ev); 2192 ev);
@@ -2180,8 +2198,10 @@ union_evaluate (struct Operation *op,
2180 LOG (GNUNET_ERROR_TYPE_DEBUG, 2198 LOG (GNUNET_ERROR_TYPE_DEBUG,
2181 "sent op request without context message\n"); 2199 "sent op request without context message\n");
2182 2200
2201 op->state = state;
2183 initialize_key_to_element (op); 2202 initialize_key_to_element (op);
2184 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); 2203 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2204 return state;
2185} 2205}
2186 2206
2187 2207
@@ -2191,13 +2211,19 @@ union_evaluate (struct Operation *op,
2191 * 2211 *
2192 * @param op operation that will be accepted as a union operation 2212 * @param op operation that will be accepted as a union operation
2193 */ 2213 */
2194static void 2214static struct OperationState *
2195union_accept (struct Operation *op) 2215union_accept (struct Operation *op)
2196{ 2216{
2217 struct OperationState *state;
2218 const struct StrataEstimator *se;
2219 struct GNUNET_MQ_Envelope *ev;
2220 struct StrataEstimatorMessage *strata_msg;
2221 char *buf;
2222 size_t len;
2223 uint16_t type;
2224
2197 LOG (GNUNET_ERROR_TYPE_DEBUG, 2225 LOG (GNUNET_ERROR_TYPE_DEBUG,
2198 "accepting set union operation\n"); 2226 "accepting set union operation\n");
2199 GNUNET_assert (NULL == op->state);
2200
2201 GNUNET_STATISTICS_update (_GSS_statistics, 2227 GNUNET_STATISTICS_update (_GSS_statistics,
2202 "# of accepted union operations", 2228 "# of accepted union operations",
2203 1, 2229 1,
@@ -2207,14 +2233,37 @@ union_accept (struct Operation *op)
2207 1, 2233 1,
2208 GNUNET_NO); 2234 GNUNET_NO);
2209 2235
2210 op->state = GNUNET_new (struct OperationState); 2236 state = GNUNET_new (struct OperationState);
2211 op->state->se = strata_estimator_dup (op->spec->set->state->se); 2237 state->se = strata_estimator_dup (op->set->state->se);
2212 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); 2238 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2213 op->state->salt_receive = op->state->salt_send = 42; 2239 GNUNET_NO);
2240 state->salt_receive = state->salt_send = 42; // FIXME?????
2241 op->state = state;
2214 initialize_key_to_element (op); 2242 initialize_key_to_element (op);
2215 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element); 2243 state->initial_size = GNUNET_CONTAINER_multihashmap32_size (state->key_to_element);
2244
2216 /* kick off the operation */ 2245 /* kick off the operation */
2217 send_strata_estimator (op); 2246 se = state->se;
2247 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
2248 len = strata_estimator_write (se,
2249 buf);
2250 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2251 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
2252 else
2253 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
2254 ev = GNUNET_MQ_msg_extra (strata_msg,
2255 len,
2256 type);
2257 GNUNET_memcpy (&strata_msg[1],
2258 buf,
2259 len);
2260 GNUNET_free (buf);
2261 strata_msg->set_size
2262 = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
2263 GNUNET_MQ_send (op->mq,
2264 ev);
2265 state->phase = PHASE_EXPECT_IBF;
2266 return state;
2218} 2267}
2219 2268
2220 2269
@@ -2254,7 +2303,8 @@ union_set_create (void)
2254 * @param ee the element to add to the set 2303 * @param ee the element to add to the set
2255 */ 2304 */
2256static void 2305static void
2257union_add (struct SetState *set_state, struct ElementEntry *ee) 2306union_add (struct SetState *set_state,
2307 struct ElementEntry *ee)
2258{ 2308{
2259 strata_estimator_insert (set_state->se, 2309 strata_estimator_insert (set_state->se,
2260 get_ibf_key (&ee->element_hash)); 2310 get_ibf_key (&ee->element_hash));
@@ -2269,7 +2319,8 @@ union_add (struct SetState *set_state, struct ElementEntry *ee)
2269 * @param ee set element to remove 2319 * @param ee set element to remove
2270 */ 2320 */
2271static void 2321static void
2272union_remove (struct SetState *set_state, struct ElementEntry *ee) 2322union_remove (struct SetState *set_state,
2323 struct ElementEntry *ee)
2273{ 2324{
2274 strata_estimator_remove (set_state->se, 2325 strata_estimator_remove (set_state->se,
2275 get_ibf_key (&ee->element_hash)); 2326 get_ibf_key (&ee->element_hash));
@@ -2294,61 +2345,39 @@ union_set_destroy (struct SetState *set_state)
2294 2345
2295 2346
2296/** 2347/**
2297 * Handler for peer-disconnects, notifies the client
2298 * about the aborted operation in case the op was not concluded.
2299 *
2300 * @param op the destroyed operation
2301 */
2302static void
2303union_peer_disconnect (struct Operation *op)
2304{
2305 if (PHASE_DONE != op->state->phase)
2306 {
2307 struct GNUNET_MQ_Envelope *ev;
2308 struct GNUNET_SET_ResultMessage *msg;
2309
2310 ev = GNUNET_MQ_msg (msg,
2311 GNUNET_MESSAGE_TYPE_SET_RESULT);
2312 msg->request_id = htonl (op->spec->client_request_id);
2313 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
2314 msg->element_type = htons (0);
2315 GNUNET_MQ_send (op->spec->set->client_mq,
2316 ev);
2317 LOG (GNUNET_ERROR_TYPE_WARNING,
2318 "other peer disconnected prematurely, phase %u\n",
2319 op->state->phase);
2320 _GSS_operation_destroy (op,
2321 GNUNET_YES);
2322 return;
2323 }
2324 // else: the session has already been concluded
2325 LOG (GNUNET_ERROR_TYPE_DEBUG,
2326 "other peer disconnected (finished)\n");
2327 if (GNUNET_NO == op->state->client_done_sent)
2328 send_done_and_destroy (op);
2329}
2330
2331
2332/**
2333 * Copy union-specific set state. 2348 * Copy union-specific set state.
2334 * 2349 *
2335 * @param set source set for copying the union state 2350 * @param state source state for copying the union state
2336 * @return a copy of the union-specific set state 2351 * @return a copy of the union-specific set state
2337 */ 2352 */
2338static struct SetState * 2353static struct SetState *
2339union_copy_state (struct Set *set) 2354union_copy_state (struct SetState *state)
2340{ 2355{
2341 struct SetState *new_state; 2356 struct SetState *new_state;
2342 2357
2358 GNUNET_assert ( (NULL != state) &&
2359 (NULL != state->se) );
2343 new_state = GNUNET_new (struct SetState); 2360 new_state = GNUNET_new (struct SetState);
2344 GNUNET_assert ( (NULL != set->state) && (NULL != set->state->se) ); 2361 new_state->se = strata_estimator_dup (state->se);
2345 new_state->se = strata_estimator_dup (set->state->se);
2346 2362
2347 return new_state; 2363 return new_state;
2348} 2364}
2349 2365
2350 2366
2351/** 2367/**
2368 * Handle case where channel went down for an operation.
2369 *
2370 * @param op operation that lost the channel
2371 */
2372static void
2373union_channel_death (struct Operation *op)
2374{
2375 _GSS_operation_destroy (op,
2376 GNUNET_YES);
2377}
2378
2379
2380/**
2352 * Get the table with implementing functions for 2381 * Get the table with implementing functions for
2353 * set union. 2382 * set union.
2354 * 2383 *
@@ -2364,9 +2393,9 @@ _GSS_union_vt ()
2364 .destroy_set = &union_set_destroy, 2393 .destroy_set = &union_set_destroy,
2365 .evaluate = &union_evaluate, 2394 .evaluate = &union_evaluate,
2366 .accept = &union_accept, 2395 .accept = &union_accept,
2367 .peer_disconnect = &union_peer_disconnect,
2368 .cancel = &union_op_cancel, 2396 .cancel = &union_op_cancel,
2369 .copy_state = &union_copy_state, 2397 .copy_state = &union_copy_state,
2398 .channel_death = &union_channel_death
2370 }; 2399 };
2371 2400
2372 return &union_vt; 2401 return &union_vt;