aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_union.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-10-08 11:30:19 +0000
committerChristian Grothoff <christian@grothoff.org>2015-10-08 11:30:19 +0000
commitae53a8d1d043cbd5967fb01316a32685c7a8af9b (patch)
treea8d791aed993ac8691f68ee6152a6ffb14a88ee3 /src/set/gnunet-service-set_union.c
parente105a542a8ddb22184712bfe565aa734f1d0349d (diff)
downloadgnunet-ae53a8d1d043cbd5967fb01316a32685c7a8af9b.tar.gz
gnunet-ae53a8d1d043cbd5967fb01316a32685c7a8af9b.zip
add logic for strata compression
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r--src/set/gnunet-service-set_union.c170
1 files changed, 140 insertions, 30 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 1c5dfe3fb..bdcd4352e 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -39,10 +39,12 @@
39 * Number of IBFs in a strata estimator. 39 * Number of IBFs in a strata estimator.
40 */ 40 */
41#define SE_STRATA_COUNT 32 41#define SE_STRATA_COUNT 32
42
42/** 43/**
43 * Size of the IBFs in the strata estimator. 44 * Size of the IBFs in the strata estimator.
44 */ 45 */
45#define SE_IBF_SIZE 80 46#define SE_IBF_SIZE 80
47
46/** 48/**
47 * The hash num parameter for the difference digests and strata estimators. 49 * The hash num parameter for the difference digests and strata estimators.
48 */ 50 */
@@ -119,7 +121,7 @@ enum UnionOperationPhase
119 * In the ultimate phase, we wait until 121 * In the ultimate phase, we wait until
120 * our demands are satisfied and then 122 * our demands are satisfied and then
121 * quit (sending another DONE message). */ 123 * quit (sending another DONE message). */
122 PHASE_DONE, 124 PHASE_DONE
123}; 125};
124 126
125 127
@@ -216,7 +218,7 @@ struct SendElementClosure
216/** 218/**
217 * Extra state required for efficient set union. 219 * Extra state required for efficient set union.
218 */ 220 */
219 struct SetState 221struct SetState
220{ 222{
221 /** 223 /**
222 * The strata estimator is only generated once for 224 * The strata estimator is only generated once for
@@ -500,8 +502,9 @@ init_key_to_element_iterator (void *cls,
500 * 502 *
501 * @param op the union operation 503 * @param op the union operation
502 * @param size size of the ibf to create 504 * @param size size of the ibf to create
505 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
503 */ 506 */
504static void 507static int
505prepare_ibf (struct Operation *op, 508prepare_ibf (struct Operation *op,
506 uint32_t size) 509 uint32_t size)
507{ 510{
@@ -517,9 +520,16 @@ prepare_ibf (struct Operation *op,
517 if (NULL != op->state->local_ibf) 520 if (NULL != op->state->local_ibf)
518 ibf_destroy (op->state->local_ibf); 521 ibf_destroy (op->state->local_ibf);
519 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 522 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
523 if (NULL == op->state->local_ibf)
524 {
525 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
526 "Failed to allocate local IBF\n");
527 return GNUNET_SYSERR;
528 }
520 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 529 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
521 &prepare_ibf_iterator, 530 &prepare_ibf_iterator,
522 op); 531 op);
532 return GNUNET_OK;
523} 533}
524 534
525 535
@@ -530,15 +540,21 @@ prepare_ibf (struct Operation *op,
530 * 540 *
531 * @param op the union operation 541 * @param op the union operation
532 * @param ibf_order order of the ibf to send, size=2^order 542 * @param ibf_order order of the ibf to send, size=2^order
543 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
533 */ 544 */
534static void 545static int
535send_ibf (struct Operation *op, 546send_ibf (struct Operation *op,
536 uint16_t ibf_order) 547 uint16_t ibf_order)
537{ 548{
538 unsigned int buckets_sent = 0; 549 unsigned int buckets_sent = 0;
539 struct InvertibleBloomFilter *ibf; 550 struct InvertibleBloomFilter *ibf;
540 551
541 prepare_ibf (op, 1<<ibf_order); 552 if (GNUNET_OK !=
553 prepare_ibf (op, 1<<ibf_order))
554 {
555 /* allocation failed */
556 return GNUNET_SYSERR;
557 }
542 558
543 LOG (GNUNET_ERROR_TYPE_DEBUG, 559 LOG (GNUNET_ERROR_TYPE_DEBUG,
544 "sending ibf of size %u\n", 560 "sending ibf of size %u\n",
@@ -583,6 +599,7 @@ send_ibf (struct Operation *op,
583 /* The other peer must decode the IBF, so 599 /* The other peer must decode the IBF, so
584 * we're passive. */ 600 * we're passive. */
585 op->state->phase = PHASE_INVENTORY_PASSIVE; 601 op->state->phase = PHASE_INVENTORY_PASSIVE;
602 return GNUNET_OK;
586} 603}
587 604
588 605
@@ -594,13 +611,27 @@ send_ibf (struct Operation *op,
594static void 611static void
595send_strata_estimator (struct Operation *op) 612send_strata_estimator (struct Operation *op)
596{ 613{
614 const struct StrataEstimator *se = op->state->se;
597 struct GNUNET_MQ_Envelope *ev; 615 struct GNUNET_MQ_Envelope *ev;
598 struct GNUNET_MessageHeader *strata_msg; 616 struct GNUNET_MessageHeader *strata_msg;
599 617 char *buf;
618 size_t len;
619 uint16_t type;
620
621 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
622 len = strata_estimator_write (op->state->se,
623 buf);
624 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
625 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
626 else
627 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
600 ev = GNUNET_MQ_msg_header_extra (strata_msg, 628 ev = GNUNET_MQ_msg_header_extra (strata_msg,
601 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, 629 len,
602 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE); 630 type);
603 strata_estimator_write (op->state->se, &strata_msg[1]); 631 memcpy (&strata_msg[1],
632 buf,
633 len);
634 GNUNET_free (buf);
604 GNUNET_MQ_send (op->mq, 635 GNUNET_MQ_send (op->mq,
605 ev); 636 ev);
606 op->state->phase = PHASE_EXPECT_IBF; 637 op->state->phase = PHASE_EXPECT_IBF;
@@ -636,16 +667,19 @@ get_order_from_difference (unsigned int diff)
636 * 667 *
637 * @param cls the union operation 668 * @param cls the union operation
638 * @param mh the message 669 * @param mh the message
670 * @param is_compressed #GNUNET_YES if the estimator is compressed
639 * @return #GNUNET_SYSERR if the tunnel should be disconnected, 671 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
640 * #GNUNET_OK otherwise 672 * #GNUNET_OK otherwise
641 */ 673 */
642static int 674static int
643handle_p2p_strata_estimator (void *cls, 675handle_p2p_strata_estimator (void *cls,
644 const struct GNUNET_MessageHeader *mh) 676 const struct GNUNET_MessageHeader *mh,
677 int is_compressed)
645{ 678{
646 struct Operation *op = cls; 679 struct Operation *op = cls;
647 struct StrataEstimator *remote_se; 680 struct StrataEstimator *remote_se;
648 int diff; 681 int diff;
682 size_t len;
649 683
650 if (op->state->phase != PHASE_EXPECT_SE) 684 if (op->state->phase != PHASE_EXPECT_SE)
651 { 685 {
@@ -653,9 +687,10 @@ handle_p2p_strata_estimator (void *cls,
653 GNUNET_break (0); 687 GNUNET_break (0);
654 return GNUNET_SYSERR; 688 return GNUNET_SYSERR;
655 } 689 }
656 if (ntohs (mh->size) != 690 len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
657 SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE + 691 if ( (GNUNET_NO == is_compressed) &&
658 sizeof (struct GNUNET_MessageHeader)) 692 (len !=
693 SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
659 { 694 {
660 fail_union_operation (op); 695 fail_union_operation (op);
661 GNUNET_break (0); 696 GNUNET_break (0);
@@ -664,7 +699,22 @@ handle_p2p_strata_estimator (void *cls,
664 remote_se = strata_estimator_create (SE_STRATA_COUNT, 699 remote_se = strata_estimator_create (SE_STRATA_COUNT,
665 SE_IBF_SIZE, 700 SE_IBF_SIZE,
666 SE_IBF_HASH_NUM); 701 SE_IBF_HASH_NUM);
667 strata_estimator_read (&mh[1], remote_se); 702 if (NULL == remote_se)
703 {
704 /* insufficient resources, fail */
705 fail_union_operation (op);
706 return GNUNET_SYSERR;
707 }
708 if (GNUNET_OK !=
709 strata_estimator_read (&mh[1],
710 len,
711 is_compressed,
712 remote_se))
713 {
714 /* decompression failed */
715 fail_union_operation (op);
716 return GNUNET_SYSERR;
717 }
668 GNUNET_assert (NULL != op->state->se); 718 GNUNET_assert (NULL != op->state->se);
669 diff = strata_estimator_difference (remote_se, 719 diff = strata_estimator_difference (remote_se,
670 op->state->se); 720 op->state->se);
@@ -675,8 +725,16 @@ handle_p2p_strata_estimator (void *cls,
675 "got se diff=%d, using ibf size %d\n", 725 "got se diff=%d, using ibf size %d\n",
676 diff, 726 diff,
677 1<<get_order_from_difference (diff)); 727 1<<get_order_from_difference (diff));
678 send_ibf (op, 728 if (GNUNET_OK !=
679 get_order_from_difference (diff)); 729 send_ibf (op,
730 get_order_from_difference (diff)))
731 {
732 /* Internal error, best we can do is shut the connection */
733 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
734 "Failed to send IBF, closing connection\n");
735 fail_union_operation (op);
736 return GNUNET_SYSERR;
737 }
680 return GNUNET_OK; 738 return GNUNET_OK;
681} 739}
682 740
@@ -744,8 +802,9 @@ send_offers_for_key (struct Operation *op,
744 * send the appropriate offers and inquiries. 802 * send the appropriate offers and inquiries.
745 * 803 *
746 * @param op union operation 804 * @param op union operation
805 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
747 */ 806 */
748static void 807static int
749decode_and_send (struct Operation *op) 808decode_and_send (struct Operation *op)
750{ 809{
751 struct IBF_Key key; 810 struct IBF_Key key;
@@ -756,7 +815,12 @@ decode_and_send (struct Operation *op)
756 815
757 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); 816 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
758 817
759 prepare_ibf (op, op->state->remote_ibf->size); 818 if (GNUNET_OK !=
819 prepare_ibf (op, op->state->remote_ibf->size))
820 {
821 /* allocation failed */
822 return GNUNET_SYSERR;
823 }
760 diff_ibf = ibf_dup (op->state->local_ibf); 824 diff_ibf = ibf_dup (op->state->local_ibf);
761 ibf_subtract (diff_ibf, op->state->remote_ibf); 825 ibf_subtract (diff_ibf, op->state->remote_ibf);
762 826
@@ -811,7 +875,16 @@ decode_and_send (struct Operation *op)
811 "# of IBF retries", 875 "# of IBF retries",
812 1, 876 1,
813 GNUNET_NO); 877 GNUNET_NO);
814 send_ibf (op, next_order); 878 if (GNUNET_OK !=
879 send_ibf (op, next_order))
880 {
881 /* Internal error, best we can do is shut the connection */
882 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
883 "Failed to send IBF, closing connection\n");
884 fail_union_operation (op);
885 ibf_destroy (diff_ibf);
886 return GNUNET_SYSERR;
887 }
815 } 888 }
816 else 889 else
817 { 890 {
@@ -822,6 +895,9 @@ decode_and_send (struct Operation *op)
822 // XXX: Send the whole set, element-by-element 895 // XXX: Send the whole set, element-by-element
823 LOG (GNUNET_ERROR_TYPE_ERROR, 896 LOG (GNUNET_ERROR_TYPE_ERROR,
824 "set union failed: reached ibf limit\n"); 897 "set union failed: reached ibf limit\n");
898 fail_union_operation (op);
899 ibf_destroy (diff_ibf);
900 return GNUNET_SYSERR;
825 } 901 }
826 break; 902 break;
827 } 903 }
@@ -867,6 +943,7 @@ decode_and_send (struct Operation *op)
867 } 943 }
868 } 944 }
869 ibf_destroy (diff_ibf); 945 ibf_destroy (diff_ibf);
946 return GNUNET_OK;
870} 947}
871 948
872 949
@@ -905,6 +982,13 @@ handle_p2p_ibf (void *cls,
905 "Creating new ibf of size %u\n", 982 "Creating new ibf of size %u\n",
906 1 << msg->order); 983 1 << msg->order);
907 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); 984 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
985 if (NULL == op->state->remote_ibf)
986 {
987 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
988 "Failed to parse remote IBF, closing connection\n");
989 fail_union_operation (op);
990 return GNUNET_SYSERR;
991 }
908 op->state->ibf_buckets_received = 0; 992 op->state->ibf_buckets_received = 0;
909 if (0 != ntohs (msg->offset)) 993 if (0 != ntohs (msg->offset))
910 { 994 {
@@ -957,7 +1041,14 @@ handle_p2p_ibf (void *cls,
957 LOG (GNUNET_ERROR_TYPE_DEBUG, 1041 LOG (GNUNET_ERROR_TYPE_DEBUG,
958 "received full ibf\n"); 1042 "received full ibf\n");
959 op->state->phase = PHASE_INVENTORY_ACTIVE; 1043 op->state->phase = PHASE_INVENTORY_ACTIVE;
960 decode_and_send (op); 1044 if (GNUNET_OK !=
1045 decode_and_send (op))
1046 {
1047 /* Internal error, best we can do is shut down */
1048 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1049 "Failed to decode IBF, closing connection\n");
1050 return GNUNET_SYSERR;
1051 }
961 } 1052 }
962 return GNUNET_OK; 1053 return GNUNET_OK;
963} 1054}
@@ -1080,7 +1171,6 @@ handle_p2p_elements (void *cls,
1080 fail_union_operation (op); 1171 fail_union_operation (op);
1081 return; 1172 return;
1082 } 1173 }
1083
1084 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) 1174 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1085 { 1175 {
1086 GNUNET_break_op (0); 1176 GNUNET_break_op (0);
@@ -1088,7 +1178,7 @@ handle_p2p_elements (void *cls,
1088 return; 1178 return;
1089 } 1179 }
1090 1180
1091 emsg = (struct GNUNET_SET_ElementMessage *) mh; 1181 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1092 1182
1093 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); 1183 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1094 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); 1184 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
@@ -1099,7 +1189,10 @@ handle_p2p_elements (void *cls,
1099 ee->remote = GNUNET_YES; 1189 ee->remote = GNUNET_YES;
1100 GNUNET_SET_element_hash (&ee->element, &ee->element_hash); 1190 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1101 1191
1102 if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, &ee->element_hash, NULL)) 1192 if (GNUNET_NO ==
1193 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1194 &ee->element_hash,
1195 NULL))
1103 { 1196 {
1104 /* We got something we didn't demand, since it's not in our map. */ 1197 /* We got something we didn't demand, since it's not in our map. */
1105 GNUNET_break_op (0); 1198 GNUNET_break_op (0);
@@ -1194,10 +1287,12 @@ handle_p2p_inquiry (void *cls,
1194} 1287}
1195 1288
1196 1289
1197 1290/**
1291 * FIXME
1292 */
1198static void 1293static void
1199handle_p2p_demand (void *cls, 1294handle_p2p_demand (void *cls,
1200 const struct GNUNET_MessageHeader *mh) 1295 const struct GNUNET_MessageHeader *mh)
1201{ 1296{
1202 struct Operation *op = cls; 1297 struct Operation *op = cls;
1203 struct ElementEntry *ee; 1298 struct ElementEntry *ee;
@@ -1303,12 +1398,16 @@ handle_p2p_offer (void *cls,
1303 struct ElementEntry *ee; 1398 struct ElementEntry *ee;
1304 struct GNUNET_MessageHeader *demands; 1399 struct GNUNET_MessageHeader *demands;
1305 struct GNUNET_MQ_Envelope *ev; 1400 struct GNUNET_MQ_Envelope *ev;
1306 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); 1401
1402 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1403 hash);
1307 if (NULL != ee) 1404 if (NULL != ee)
1308 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) 1405 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1309 continue; 1406 continue;
1310 1407
1311 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, hash)) 1408 if (GNUNET_YES ==
1409 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1410 hash))
1312 { 1411 {
1313 LOG (GNUNET_ERROR_TYPE_DEBUG, 1412 LOG (GNUNET_ERROR_TYPE_DEBUG,
1314 "Skipped sending duplicate demand\n"); 1413 "Skipped sending duplicate demand\n");
@@ -1324,7 +1423,9 @@ handle_p2p_offer (void *cls,
1324 LOG (GNUNET_ERROR_TYPE_DEBUG, 1423 LOG (GNUNET_ERROR_TYPE_DEBUG,
1325 "[OP %x] Requesting element (hash %s)\n", 1424 "[OP %x] Requesting element (hash %s)\n",
1326 (void *) op, GNUNET_h2s (hash)); 1425 (void *) op, GNUNET_h2s (hash));
1327 ev = GNUNET_MQ_msg_header_extra (demands, sizeof (struct GNUNET_HashCode), GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); 1426 ev = GNUNET_MQ_msg_header_extra (demands,
1427 sizeof (struct GNUNET_HashCode),
1428 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1328 *(struct GNUNET_HashCode *) &demands[1] = *hash; 1429 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1329 GNUNET_MQ_send (op->mq, ev); 1430 GNUNET_MQ_send (op->mq, ev);
1330 } 1431 }
@@ -1466,7 +1567,7 @@ union_accept (struct Operation *op)
1466 * We maintain one strata estimator per set and then manipulate it over the 1567 * We maintain one strata estimator per set and then manipulate it over the
1467 * lifetime of the set, as recreating a strata estimator would be expensive. 1568 * lifetime of the set, as recreating a strata estimator would be expensive.
1468 * 1569 *
1469 * @return the newly created set 1570 * @return the newly created set, NULL on error
1470 */ 1571 */
1471static struct SetState * 1572static struct SetState *
1472union_set_create (void) 1573union_set_create (void)
@@ -1478,6 +1579,13 @@ union_set_create (void)
1478 set_state = GNUNET_new (struct SetState); 1579 set_state = GNUNET_new (struct SetState);
1479 set_state->se = strata_estimator_create (SE_STRATA_COUNT, 1580 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1480 SE_IBF_SIZE, SE_IBF_HASH_NUM); 1581 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1582 if (NULL == set_state->se)
1583 {
1584 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1585 "Failed to allocate strata estimator\n");
1586 GNUNET_free (set_state);
1587 return NULL;
1588 }
1481 return set_state; 1589 return set_state;
1482} 1590}
1483 1591
@@ -1549,7 +1657,9 @@ union_handle_p2p_message (struct Operation *op,
1549 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: 1657 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1550 return handle_p2p_ibf (op, mh); 1658 return handle_p2p_ibf (op, mh);
1551 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE: 1659 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1552 return handle_p2p_strata_estimator (op, mh); 1660 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
1661 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
1662 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
1553 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: 1663 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1554 handle_p2p_elements (op, mh); 1664 handle_p2p_elements (op, mh);
1555 break; 1665 break;