aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-10-08 11:30:19 +0000
committerChristian Grothoff <christian@grothoff.org>2015-10-08 11:30:19 +0000
commitae53a8d1d043cbd5967fb01316a32685c7a8af9b (patch)
treea8d791aed993ac8691f68ee6152a6ffb14a88ee3
parente105a542a8ddb22184712bfe565aa734f1d0349d (diff)
downloadgnunet-ae53a8d1d043cbd5967fb01316a32685c7a8af9b.tar.gz
gnunet-ae53a8d1d043cbd5967fb01316a32685c7a8af9b.zip
add logic for strata compression
-rw-r--r--src/include/gnunet_protocols.h5
-rw-r--r--src/set/gnunet-service-set.c11
-rw-r--r--src/set/gnunet-service-set.h2
-rw-r--r--src/set/gnunet-service-set_intersection.c1
-rw-r--r--src/set/gnunet-service-set_union.c170
-rw-r--r--src/set/gnunet-service-set_union_strata_estimator.c90
-rw-r--r--src/set/gnunet-service-set_union_strata_estimator.h17
-rw-r--r--src/set/gnunet-set-ibf-profiler.c15
-rw-r--r--src/set/ibf.c32
-rw-r--r--src/set/ibf.h28
10 files changed, 305 insertions, 66 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index a06c8ad67..a47af7f59 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1925,6 +1925,11 @@ extern "C"
1925#define GNUNET_MESSAGE_TYPE_SET_ITER_DONE 589 1925#define GNUNET_MESSAGE_TYPE_SET_ITER_DONE 589
1926 1926
1927/** 1927/**
1928 * Compressed strata estimator.
1929 */
1930#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC 590
1931
1932/**
1928 * Information about the element count for intersection 1933 * Information about the element count for intersection
1929 */ 1934 */
1930#define GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO 591 1935#define GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO 591
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index cfddef6fb..441637a93 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -975,7 +975,7 @@ again:
975 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter); 975 GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
976 set->iter = NULL; 976 set->iter = NULL;
977 set->iteration_id++; 977 set->iteration_id++;
978 978
979 GNUNET_assert (set->content->iterator_count > 0); 979 GNUNET_assert (set->content->iterator_count > 0);
980 set->content->iterator_count -= 1; 980 set->content->iterator_count -= 1;
981 981
@@ -1113,6 +1113,13 @@ handle_client_create_set (void *cls,
1113 } 1113 }
1114 set->operation = ntohl (msg->operation); 1114 set->operation = ntohl (msg->operation);
1115 set->state = set->vt->create (); 1115 set->state = set->vt->create ();
1116 if (NULL == set->state)
1117 {
1118 /* initialization failed (i.e. out of memory) */
1119 GNUNET_free (set);
1120 GNUNET_SERVER_client_disconnect (client);
1121 return;
1122 }
1116 set->content = GNUNET_new (struct SetContent); 1123 set->content = GNUNET_new (struct SetContent);
1117 set->content->refcount = 1; 1124 set->content->refcount = 1;
1118 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); 1125 set->content->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
@@ -1497,7 +1504,7 @@ handle_client_copy_lazy_connect (void *cls,
1497 { 1504 {
1498 found = GNUNET_YES; 1505 found = GNUNET_YES;
1499 break; 1506 break;
1500 } 1507 }
1501 } 1508 }
1502 1509
1503 if (GNUNET_NO == found) 1510 if (GNUNET_NO == found)
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index 982967744..f8a2d88ba 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -126,7 +126,7 @@ struct OperationSpecification
126 * Signature of functions that create the implementation-specific 126 * Signature of functions that create the implementation-specific
127 * state for a set supporting a specific operation. 127 * state for a set supporting a specific operation.
128 * 128 *
129 * @return a set state specific to the supported operation 129 * @return a set state specific to the supported operation, NULL on error
130 */ 130 */
131typedef struct SetState * 131typedef struct SetState *
132(*CreateImpl) (void); 132(*CreateImpl) (void);
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c
index af0ce1988..7d172df29 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -59,6 +59,7 @@ enum IntersectionOperationPhase
59 * client. 59 * client.
60 */ 60 */
61 PHASE_FINISHED 61 PHASE_FINISHED
62
62}; 63};
63 64
64 65
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index 1c5dfe3fb..bdcd4352e 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -39,10 +39,12 @@
39 * Number of IBFs in a strata estimator. 39 * Number of IBFs in a strata estimator.
40 */ 40 */
41#define SE_STRATA_COUNT 32 41#define SE_STRATA_COUNT 32
42
42/** 43/**
43 * Size of the IBFs in the strata estimator. 44 * Size of the IBFs in the strata estimator.
44 */ 45 */
45#define SE_IBF_SIZE 80 46#define SE_IBF_SIZE 80
47
46/** 48/**
47 * The hash num parameter for the difference digests and strata estimators. 49 * The hash num parameter for the difference digests and strata estimators.
48 */ 50 */
@@ -119,7 +121,7 @@ enum UnionOperationPhase
119 * In the ultimate phase, we wait until 121 * In the ultimate phase, we wait until
120 * our demands are satisfied and then 122 * our demands are satisfied and then
121 * quit (sending another DONE message). */ 123 * quit (sending another DONE message). */
122 PHASE_DONE, 124 PHASE_DONE
123}; 125};
124 126
125 127
@@ -216,7 +218,7 @@ struct SendElementClosure
216/** 218/**
217 * Extra state required for efficient set union. 219 * Extra state required for efficient set union.
218 */ 220 */
219 struct SetState 221struct SetState
220{ 222{
221 /** 223 /**
222 * The strata estimator is only generated once for 224 * The strata estimator is only generated once for
@@ -500,8 +502,9 @@ init_key_to_element_iterator (void *cls,
500 * 502 *
501 * @param op the union operation 503 * @param op the union operation
502 * @param size size of the ibf to create 504 * @param size size of the ibf to create
505 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
503 */ 506 */
504static void 507static int
505prepare_ibf (struct Operation *op, 508prepare_ibf (struct Operation *op,
506 uint32_t size) 509 uint32_t size)
507{ 510{
@@ -517,9 +520,16 @@ prepare_ibf (struct Operation *op,
517 if (NULL != op->state->local_ibf) 520 if (NULL != op->state->local_ibf)
518 ibf_destroy (op->state->local_ibf); 521 ibf_destroy (op->state->local_ibf);
519 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 522 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
523 if (NULL == op->state->local_ibf)
524 {
525 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
526 "Failed to allocate local IBF\n");
527 return GNUNET_SYSERR;
528 }
520 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element, 529 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
521 &prepare_ibf_iterator, 530 &prepare_ibf_iterator,
522 op); 531 op);
532 return GNUNET_OK;
523} 533}
524 534
525 535
@@ -530,15 +540,21 @@ prepare_ibf (struct Operation *op,
530 * 540 *
531 * @param op the union operation 541 * @param op the union operation
532 * @param ibf_order order of the ibf to send, size=2^order 542 * @param ibf_order order of the ibf to send, size=2^order
543 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
533 */ 544 */
534static void 545static int
535send_ibf (struct Operation *op, 546send_ibf (struct Operation *op,
536 uint16_t ibf_order) 547 uint16_t ibf_order)
537{ 548{
538 unsigned int buckets_sent = 0; 549 unsigned int buckets_sent = 0;
539 struct InvertibleBloomFilter *ibf; 550 struct InvertibleBloomFilter *ibf;
540 551
541 prepare_ibf (op, 1<<ibf_order); 552 if (GNUNET_OK !=
553 prepare_ibf (op, 1<<ibf_order))
554 {
555 /* allocation failed */
556 return GNUNET_SYSERR;
557 }
542 558
543 LOG (GNUNET_ERROR_TYPE_DEBUG, 559 LOG (GNUNET_ERROR_TYPE_DEBUG,
544 "sending ibf of size %u\n", 560 "sending ibf of size %u\n",
@@ -583,6 +599,7 @@ send_ibf (struct Operation *op,
583 /* The other peer must decode the IBF, so 599 /* The other peer must decode the IBF, so
584 * we're passive. */ 600 * we're passive. */
585 op->state->phase = PHASE_INVENTORY_PASSIVE; 601 op->state->phase = PHASE_INVENTORY_PASSIVE;
602 return GNUNET_OK;
586} 603}
587 604
588 605
@@ -594,13 +611,27 @@ send_ibf (struct Operation *op,
594static void 611static void
595send_strata_estimator (struct Operation *op) 612send_strata_estimator (struct Operation *op)
596{ 613{
614 const struct StrataEstimator *se = op->state->se;
597 struct GNUNET_MQ_Envelope *ev; 615 struct GNUNET_MQ_Envelope *ev;
598 struct GNUNET_MessageHeader *strata_msg; 616 struct GNUNET_MessageHeader *strata_msg;
599 617 char *buf;
618 size_t len;
619 uint16_t type;
620
621 buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
622 len = strata_estimator_write (op->state->se,
623 buf);
624 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
625 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
626 else
627 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
600 ev = GNUNET_MQ_msg_header_extra (strata_msg, 628 ev = GNUNET_MQ_msg_header_extra (strata_msg,
601 SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, 629 len,
602 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE); 630 type);
603 strata_estimator_write (op->state->se, &strata_msg[1]); 631 memcpy (&strata_msg[1],
632 buf,
633 len);
634 GNUNET_free (buf);
604 GNUNET_MQ_send (op->mq, 635 GNUNET_MQ_send (op->mq,
605 ev); 636 ev);
606 op->state->phase = PHASE_EXPECT_IBF; 637 op->state->phase = PHASE_EXPECT_IBF;
@@ -636,16 +667,19 @@ get_order_from_difference (unsigned int diff)
636 * 667 *
637 * @param cls the union operation 668 * @param cls the union operation
638 * @param mh the message 669 * @param mh the message
670 * @param is_compressed #GNUNET_YES if the estimator is compressed
639 * @return #GNUNET_SYSERR if the tunnel should be disconnected, 671 * @return #GNUNET_SYSERR if the tunnel should be disconnected,
640 * #GNUNET_OK otherwise 672 * #GNUNET_OK otherwise
641 */ 673 */
642static int 674static int
643handle_p2p_strata_estimator (void *cls, 675handle_p2p_strata_estimator (void *cls,
644 const struct GNUNET_MessageHeader *mh) 676 const struct GNUNET_MessageHeader *mh,
677 int is_compressed)
645{ 678{
646 struct Operation *op = cls; 679 struct Operation *op = cls;
647 struct StrataEstimator *remote_se; 680 struct StrataEstimator *remote_se;
648 int diff; 681 int diff;
682 size_t len;
649 683
650 if (op->state->phase != PHASE_EXPECT_SE) 684 if (op->state->phase != PHASE_EXPECT_SE)
651 { 685 {
@@ -653,9 +687,10 @@ handle_p2p_strata_estimator (void *cls,
653 GNUNET_break (0); 687 GNUNET_break (0);
654 return GNUNET_SYSERR; 688 return GNUNET_SYSERR;
655 } 689 }
656 if (ntohs (mh->size) != 690 len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
657 SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE + 691 if ( (GNUNET_NO == is_compressed) &&
658 sizeof (struct GNUNET_MessageHeader)) 692 (len !=
693 SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
659 { 694 {
660 fail_union_operation (op); 695 fail_union_operation (op);
661 GNUNET_break (0); 696 GNUNET_break (0);
@@ -664,7 +699,22 @@ handle_p2p_strata_estimator (void *cls,
664 remote_se = strata_estimator_create (SE_STRATA_COUNT, 699 remote_se = strata_estimator_create (SE_STRATA_COUNT,
665 SE_IBF_SIZE, 700 SE_IBF_SIZE,
666 SE_IBF_HASH_NUM); 701 SE_IBF_HASH_NUM);
667 strata_estimator_read (&mh[1], remote_se); 702 if (NULL == remote_se)
703 {
704 /* insufficient resources, fail */
705 fail_union_operation (op);
706 return GNUNET_SYSERR;
707 }
708 if (GNUNET_OK !=
709 strata_estimator_read (&mh[1],
710 len,
711 is_compressed,
712 remote_se))
713 {
714 /* decompression failed */
715 fail_union_operation (op);
716 return GNUNET_SYSERR;
717 }
668 GNUNET_assert (NULL != op->state->se); 718 GNUNET_assert (NULL != op->state->se);
669 diff = strata_estimator_difference (remote_se, 719 diff = strata_estimator_difference (remote_se,
670 op->state->se); 720 op->state->se);
@@ -675,8 +725,16 @@ handle_p2p_strata_estimator (void *cls,
675 "got se diff=%d, using ibf size %d\n", 725 "got se diff=%d, using ibf size %d\n",
676 diff, 726 diff,
677 1<<get_order_from_difference (diff)); 727 1<<get_order_from_difference (diff));
678 send_ibf (op, 728 if (GNUNET_OK !=
679 get_order_from_difference (diff)); 729 send_ibf (op,
730 get_order_from_difference (diff)))
731 {
732 /* Internal error, best we can do is shut the connection */
733 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
734 "Failed to send IBF, closing connection\n");
735 fail_union_operation (op);
736 return GNUNET_SYSERR;
737 }
680 return GNUNET_OK; 738 return GNUNET_OK;
681} 739}
682 740
@@ -744,8 +802,9 @@ send_offers_for_key (struct Operation *op,
744 * send the appropriate offers and inquiries. 802 * send the appropriate offers and inquiries.
745 * 803 *
746 * @param op union operation 804 * @param op union operation
805 * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
747 */ 806 */
748static void 807static int
749decode_and_send (struct Operation *op) 808decode_and_send (struct Operation *op)
750{ 809{
751 struct IBF_Key key; 810 struct IBF_Key key;
@@ -756,7 +815,12 @@ decode_and_send (struct Operation *op)
756 815
757 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase); 816 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
758 817
759 prepare_ibf (op, op->state->remote_ibf->size); 818 if (GNUNET_OK !=
819 prepare_ibf (op, op->state->remote_ibf->size))
820 {
821 /* allocation failed */
822 return GNUNET_SYSERR;
823 }
760 diff_ibf = ibf_dup (op->state->local_ibf); 824 diff_ibf = ibf_dup (op->state->local_ibf);
761 ibf_subtract (diff_ibf, op->state->remote_ibf); 825 ibf_subtract (diff_ibf, op->state->remote_ibf);
762 826
@@ -811,7 +875,16 @@ decode_and_send (struct Operation *op)
811 "# of IBF retries", 875 "# of IBF retries",
812 1, 876 1,
813 GNUNET_NO); 877 GNUNET_NO);
814 send_ibf (op, next_order); 878 if (GNUNET_OK !=
879 send_ibf (op, next_order))
880 {
881 /* Internal error, best we can do is shut the connection */
882 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
883 "Failed to send IBF, closing connection\n");
884 fail_union_operation (op);
885 ibf_destroy (diff_ibf);
886 return GNUNET_SYSERR;
887 }
815 } 888 }
816 else 889 else
817 { 890 {
@@ -822,6 +895,9 @@ decode_and_send (struct Operation *op)
822 // XXX: Send the whole set, element-by-element 895 // XXX: Send the whole set, element-by-element
823 LOG (GNUNET_ERROR_TYPE_ERROR, 896 LOG (GNUNET_ERROR_TYPE_ERROR,
824 "set union failed: reached ibf limit\n"); 897 "set union failed: reached ibf limit\n");
898 fail_union_operation (op);
899 ibf_destroy (diff_ibf);
900 return GNUNET_SYSERR;
825 } 901 }
826 break; 902 break;
827 } 903 }
@@ -867,6 +943,7 @@ decode_and_send (struct Operation *op)
867 } 943 }
868 } 944 }
869 ibf_destroy (diff_ibf); 945 ibf_destroy (diff_ibf);
946 return GNUNET_OK;
870} 947}
871 948
872 949
@@ -905,6 +982,13 @@ handle_p2p_ibf (void *cls,
905 "Creating new ibf of size %u\n", 982 "Creating new ibf of size %u\n",
906 1 << msg->order); 983 1 << msg->order);
907 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); 984 op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
985 if (NULL == op->state->remote_ibf)
986 {
987 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
988 "Failed to parse remote IBF, closing connection\n");
989 fail_union_operation (op);
990 return GNUNET_SYSERR;
991 }
908 op->state->ibf_buckets_received = 0; 992 op->state->ibf_buckets_received = 0;
909 if (0 != ntohs (msg->offset)) 993 if (0 != ntohs (msg->offset))
910 { 994 {
@@ -957,7 +1041,14 @@ handle_p2p_ibf (void *cls,
957 LOG (GNUNET_ERROR_TYPE_DEBUG, 1041 LOG (GNUNET_ERROR_TYPE_DEBUG,
958 "received full ibf\n"); 1042 "received full ibf\n");
959 op->state->phase = PHASE_INVENTORY_ACTIVE; 1043 op->state->phase = PHASE_INVENTORY_ACTIVE;
960 decode_and_send (op); 1044 if (GNUNET_OK !=
1045 decode_and_send (op))
1046 {
1047 /* Internal error, best we can do is shut down */
1048 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1049 "Failed to decode IBF, closing connection\n");
1050 return GNUNET_SYSERR;
1051 }
961 } 1052 }
962 return GNUNET_OK; 1053 return GNUNET_OK;
963} 1054}
@@ -1080,7 +1171,6 @@ handle_p2p_elements (void *cls,
1080 fail_union_operation (op); 1171 fail_union_operation (op);
1081 return; 1172 return;
1082 } 1173 }
1083
1084 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage)) 1174 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1085 { 1175 {
1086 GNUNET_break_op (0); 1176 GNUNET_break_op (0);
@@ -1088,7 +1178,7 @@ handle_p2p_elements (void *cls,
1088 return; 1178 return;
1089 } 1179 }
1090 1180
1091 emsg = (struct GNUNET_SET_ElementMessage *) mh; 1181 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1092 1182
1093 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage); 1183 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1094 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size); 1184 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
@@ -1099,7 +1189,10 @@ handle_p2p_elements (void *cls,
1099 ee->remote = GNUNET_YES; 1189 ee->remote = GNUNET_YES;
1100 GNUNET_SET_element_hash (&ee->element, &ee->element_hash); 1190 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1101 1191
1102 if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes, &ee->element_hash, NULL)) 1192 if (GNUNET_NO ==
1193 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1194 &ee->element_hash,
1195 NULL))
1103 { 1196 {
1104 /* We got something we didn't demand, since it's not in our map. */ 1197 /* We got something we didn't demand, since it's not in our map. */
1105 GNUNET_break_op (0); 1198 GNUNET_break_op (0);
@@ -1194,10 +1287,12 @@ handle_p2p_inquiry (void *cls,
1194} 1287}
1195 1288
1196 1289
1197 1290/**
1291 * FIXME
1292 */
1198static void 1293static void
1199handle_p2p_demand (void *cls, 1294handle_p2p_demand (void *cls,
1200 const struct GNUNET_MessageHeader *mh) 1295 const struct GNUNET_MessageHeader *mh)
1201{ 1296{
1202 struct Operation *op = cls; 1297 struct Operation *op = cls;
1203 struct ElementEntry *ee; 1298 struct ElementEntry *ee;
@@ -1303,12 +1398,16 @@ handle_p2p_offer (void *cls,
1303 struct ElementEntry *ee; 1398 struct ElementEntry *ee;
1304 struct GNUNET_MessageHeader *demands; 1399 struct GNUNET_MessageHeader *demands;
1305 struct GNUNET_MQ_Envelope *ev; 1400 struct GNUNET_MQ_Envelope *ev;
1306 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, hash); 1401
1402 ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
1403 hash);
1307 if (NULL != ee) 1404 if (NULL != ee)
1308 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op)) 1405 if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
1309 continue; 1406 continue;
1310 1407
1311 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes, hash)) 1408 if (GNUNET_YES ==
1409 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
1410 hash))
1312 { 1411 {
1313 LOG (GNUNET_ERROR_TYPE_DEBUG, 1412 LOG (GNUNET_ERROR_TYPE_DEBUG,
1314 "Skipped sending duplicate demand\n"); 1413 "Skipped sending duplicate demand\n");
@@ -1324,7 +1423,9 @@ handle_p2p_offer (void *cls,
1324 LOG (GNUNET_ERROR_TYPE_DEBUG, 1423 LOG (GNUNET_ERROR_TYPE_DEBUG,
1325 "[OP %x] Requesting element (hash %s)\n", 1424 "[OP %x] Requesting element (hash %s)\n",
1326 (void *) op, GNUNET_h2s (hash)); 1425 (void *) op, GNUNET_h2s (hash));
1327 ev = GNUNET_MQ_msg_header_extra (demands, sizeof (struct GNUNET_HashCode), GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND); 1426 ev = GNUNET_MQ_msg_header_extra (demands,
1427 sizeof (struct GNUNET_HashCode),
1428 GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
1328 *(struct GNUNET_HashCode *) &demands[1] = *hash; 1429 *(struct GNUNET_HashCode *) &demands[1] = *hash;
1329 GNUNET_MQ_send (op->mq, ev); 1430 GNUNET_MQ_send (op->mq, ev);
1330 } 1431 }
@@ -1466,7 +1567,7 @@ union_accept (struct Operation *op)
1466 * We maintain one strata estimator per set and then manipulate it over the 1567 * We maintain one strata estimator per set and then manipulate it over the
1467 * lifetime of the set, as recreating a strata estimator would be expensive. 1568 * lifetime of the set, as recreating a strata estimator would be expensive.
1468 * 1569 *
1469 * @return the newly created set 1570 * @return the newly created set, NULL on error
1470 */ 1571 */
1471static struct SetState * 1572static struct SetState *
1472union_set_create (void) 1573union_set_create (void)
@@ -1478,6 +1579,13 @@ union_set_create (void)
1478 set_state = GNUNET_new (struct SetState); 1579 set_state = GNUNET_new (struct SetState);
1479 set_state->se = strata_estimator_create (SE_STRATA_COUNT, 1580 set_state->se = strata_estimator_create (SE_STRATA_COUNT,
1480 SE_IBF_SIZE, SE_IBF_HASH_NUM); 1581 SE_IBF_SIZE, SE_IBF_HASH_NUM);
1582 if (NULL == set_state->se)
1583 {
1584 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1585 "Failed to allocate strata estimator\n");
1586 GNUNET_free (set_state);
1587 return NULL;
1588 }
1481 return set_state; 1589 return set_state;
1482} 1590}
1483 1591
@@ -1549,7 +1657,9 @@ union_handle_p2p_message (struct Operation *op,
1549 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF: 1657 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
1550 return handle_p2p_ibf (op, mh); 1658 return handle_p2p_ibf (op, mh);
1551 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE: 1659 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
1552 return handle_p2p_strata_estimator (op, mh); 1660 return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
1661 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
1662 return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
1553 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: 1663 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1554 handle_p2p_elements (op, mh); 1664 handle_p2p_elements (op, mh);
1555 break; 1665 break;
diff --git a/src/set/gnunet-service-set_union_strata_estimator.c b/src/set/gnunet-service-set_union_strata_estimator.c
index 38b32961c..e94d52650 100644
--- a/src/set/gnunet-service-set_union_strata_estimator.c
+++ b/src/set/gnunet-service-set_union_strata_estimator.c
@@ -21,6 +21,7 @@
21 * @file set/gnunet-service-set_union_strata_estimator.c 21 * @file set/gnunet-service-set_union_strata_estimator.c
22 * @brief invertible bloom filter 22 * @brief invertible bloom filter
23 * @author Florian Dold 23 * @author Florian Dold
24 * @author Christian Grothoff
24 */ 25 */
25#include "platform.h" 26#include "platform.h"
26#include "gnunet_util_lib.h" 27#include "gnunet_util_lib.h"
@@ -29,12 +30,20 @@
29 30
30 31
31/** 32/**
33 * Should we try compressing the strata estimator? This will
34 * break compatibility with the 0.10.1-network.
35 */
36#define FAIL_10_1_COMPATIBILTIY 0
37
38
39/**
32 * Write the given strata estimator to the buffer. 40 * Write the given strata estimator to the buffer.
33 * 41 *
34 * @param se strata estimator to serialize 42 * @param se strata estimator to serialize
35 * @param buf buffer to write to, must be of appropriate size 43 * @param[out] buf buffer to write to, must be of appropriate size
44 * @return number of bytes written to @a buf
36 */ 45 */
37void 46size_t
38strata_estimator_write (const struct StrataEstimator *se, 47strata_estimator_write (const struct StrataEstimator *se,
39 void *buf) 48 void *buf)
40{ 49{
@@ -43,9 +52,32 @@ strata_estimator_write (const struct StrataEstimator *se,
43 GNUNET_assert (NULL != se); 52 GNUNET_assert (NULL != se);
44 for (i = 0; i < se->strata_count; i++) 53 for (i = 0; i < se->strata_count; i++)
45 { 54 {
46 ibf_write_slice (se->strata[i], 0, se->ibf_size, buf); 55 ibf_write_slice (se->strata[i],
56 0,
57 se->ibf_size,
58 buf);
47 buf += se->ibf_size * IBF_BUCKET_SIZE; 59 buf += se->ibf_size * IBF_BUCKET_SIZE;
48 } 60 }
61 osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
62#if FAIL_10_1_COMPATIBILTIY
63 {
64 size_t osize;
65 char *cbuf;
66 size_t nsize;
67
68 if (GNUNET_YES ==
69 GNUNET_try_compression (buf,
70 osize,
71 &cbuf,
72 &nsize))
73 {
74 memcpy (buf, cbuf, nsize);
75 osize = nsize;
76 GNUNET_free (cbuf);
77 }
78 }
79#endif
80 return osize;
49} 81}
50 82
51 83
@@ -54,19 +86,50 @@ strata_estimator_write (const struct StrataEstimator *se,
54 * estimator. The strata estimator must already be allocated. 86 * estimator. The strata estimator must already be allocated.
55 * 87 *
56 * @param buf buffer to read from 88 * @param buf buffer to read from
57 * @param se strata estimator to write to 89 * @param buf_len number of bytes in @a buf
90 * @param is_compressed is the data compressed?
91 * @param[out] se strata estimator to write to
92 * @return #GNUNET_OK on success
58 */ 93 */
59void 94int
60strata_estimator_read (const void *buf, 95strata_estimator_read (const void *buf,
96 size_t buf_len,
97 int is_compressed,
61 struct StrataEstimator *se) 98 struct StrataEstimator *se)
62{ 99{
63 unsigned int i; 100 unsigned int i;
101 size_t osize;
102 char *dbuf;
64 103
104 dbuf = NULL;
105 if (GNUNET_YES == is_compressed)
106 {
107 osize = se->ibf_size * IBF_BUCKET_SIZE * se->strata_count;
108 dbuf = GNUNET_decompress (buf,
109 buf_len,
110 osize);
111 if (NULL == dbuf)
112 {
113 GNUNET_break_op (0); /* bad compressed input data */
114 return GNUNET_SYSERR;
115 }
116 buf = dbuf;
117 buf_len = osize;
118 }
119
120 if (buf_len != se->strata_count * se->ibf_size * IBF_BUCKET_SIZE)
121 {
122 GNUNET_break (0); /* very odd error */
123 GNUNET_free_non_null (dbuf);
124 return GNUNET_SYSERR;
125 }
65 for (i = 0; i < se->strata_count; i++) 126 for (i = 0; i < se->strata_count; i++)
66 { 127 {
67 ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]); 128 ibf_read_slice (buf, 0, se->ibf_size, se->strata[i]);
68 buf += se->ibf_size * IBF_BUCKET_SIZE; 129 buf += se->ibf_size * IBF_BUCKET_SIZE;
69 } 130 }
131 GNUNET_free_non_null (dbuf);
132 return GNUNET_OK;
70} 133}
71 134
72 135
@@ -118,7 +181,7 @@ strata_estimator_remove (struct StrataEstimator *se,
118 * @param strata_count number of stratas, that is, number of ibfs in the estimator 181 * @param strata_count number of stratas, that is, number of ibfs in the estimator
119 * @param ibf_size size of each ibf stratum 182 * @param ibf_size size of each ibf stratum
120 * @param ibf_hashnum hashnum parameter of each ibf 183 * @param ibf_hashnum hashnum parameter of each ibf
121 * @return a freshly allocated, empty strata estimator 184 * @return a freshly allocated, empty strata estimator, NULL on error
122 */ 185 */
123struct StrataEstimator * 186struct StrataEstimator *
124strata_estimator_create (unsigned int strata_count, 187strata_estimator_create (unsigned int strata_count,
@@ -127,14 +190,24 @@ strata_estimator_create (unsigned int strata_count,
127{ 190{
128 struct StrataEstimator *se; 191 struct StrataEstimator *se;
129 unsigned int i; 192 unsigned int i;
193 unsigned int j;
130 194
131 /* fixme: allocate everything in one chunk */
132 se = GNUNET_new (struct StrataEstimator); 195 se = GNUNET_new (struct StrataEstimator);
133 se->strata_count = strata_count; 196 se->strata_count = strata_count;
134 se->ibf_size = ibf_size; 197 se->ibf_size = ibf_size;
135 se->strata = GNUNET_malloc (sizeof (struct InvertibleBloomFilter *) * strata_count); 198 se->strata = GNUNET_new_array (strata_count,
199 struct InvertibleBloomFilter *);
136 for (i = 0; i < strata_count; i++) 200 for (i = 0; i < strata_count; i++)
201 {
137 se->strata[i] = ibf_create (ibf_size, ibf_hashnum); 202 se->strata[i] = ibf_create (ibf_size, ibf_hashnum);
203 if (NULL == se->strata[i])
204 {
205 for (j = 0; j < i; j++)
206 ibf_destroy (se->strata[i]);
207 GNUNET_free (se);
208 return NULL;
209 }
210 }
138 return se; 211 return se;
139} 212}
140 213
@@ -226,4 +299,3 @@ strata_estimator_destroy (struct StrataEstimator *se)
226 GNUNET_free (se->strata); 299 GNUNET_free (se->strata);
227 GNUNET_free (se); 300 GNUNET_free (se);
228} 301}
229
diff --git a/src/set/gnunet-service-set_union_strata_estimator.h b/src/set/gnunet-service-set_union_strata_estimator.h
index 2b59e8ed3..8c50e2c9f 100644
--- a/src/set/gnunet-service-set_union_strata_estimator.h
+++ b/src/set/gnunet-service-set_union_strata_estimator.h
@@ -66,9 +66,10 @@ struct StrataEstimator
66 * Write the given strata estimator to the buffer. 66 * Write the given strata estimator to the buffer.
67 * 67 *
68 * @param se strata estimator to serialize 68 * @param se strata estimator to serialize
69 * @param buf buffer to write to, must be of appropriate size 69 * @param[out] buf buffer to write to, must be of appropriate size
70 * @return number of bytes written to @a buf
70 */ 71 */
71void 72size_t
72strata_estimator_write (const struct StrataEstimator *se, 73strata_estimator_write (const struct StrataEstimator *se,
73 void *buf); 74 void *buf);
74 75
@@ -78,10 +79,15 @@ strata_estimator_write (const struct StrataEstimator *se,
78 * estimator. The strata estimator must already be allocated. 79 * estimator. The strata estimator must already be allocated.
79 * 80 *
80 * @param buf buffer to read from 81 * @param buf buffer to read from
81 * @param se strata estimator to write to 82 * @param buf_len number of bytes in @a buf
83 * @param is_compressed is the data compressed?
84 * @param[out] se strata estimator to write to
85 * @return #GNUNET_OK on success
82 */ 86 */
83void 87int
84strata_estimator_read (const void *buf, 88strata_estimator_read (const void *buf,
89 size_t buf_len,
90 int is_compressed,
85 struct StrataEstimator *se); 91 struct StrataEstimator *se);
86 92
87 93
@@ -91,7 +97,7 @@ strata_estimator_read (const void *buf,
91 * @param strata_count number of stratas, that is, number of ibfs in the estimator 97 * @param strata_count number of stratas, that is, number of ibfs in the estimator
92 * @param ibf_size size of each ibf stratum 98 * @param ibf_size size of each ibf stratum
93 * @param ibf_hashnum hashnum parameter of each ibf 99 * @param ibf_hashnum hashnum parameter of each ibf
94 * @return a freshly allocated, empty strata estimator 100 * @return a freshly allocated, empty strata estimator, NULL on error
95 */ 101 */
96struct StrataEstimator * 102struct StrataEstimator *
97strata_estimator_create (unsigned int strata_count, 103strata_estimator_create (unsigned int strata_count,
@@ -161,4 +167,3 @@ strata_estimator_dup (struct StrataEstimator *se);
161#endif 167#endif
162 168
163#endif 169#endif
164
diff --git a/src/set/gnunet-set-ibf-profiler.c b/src/set/gnunet-set-ibf-profiler.c
index 86004d96e..412a9120e 100644
--- a/src/set/gnunet-set-ibf-profiler.c
+++ b/src/set/gnunet-set-ibf-profiler.c
@@ -102,7 +102,9 @@ remove_iterator (void *cls,
102 102
103 103
104static void 104static void
105run (void *cls, char *const *args, const char *cfgfile, 105run (void *cls,
106 char *const *args,
107 const char *cfgfile,
106 const struct GNUNET_CONFIGURATION_Handle *cfg) 108 const struct GNUNET_CONFIGURATION_Handle *cfg)
107{ 109{
108 struct GNUNET_HashCode id; 110 struct GNUNET_HashCode id;
@@ -171,6 +173,15 @@ run (void *cls, char *const *args, const char *cfgfile,
171 173
172 ibf_a = ibf_create (ibf_size, hash_num); 174 ibf_a = ibf_create (ibf_size, hash_num);
173 ibf_b = ibf_create (ibf_size, hash_num); 175 ibf_b = ibf_create (ibf_size, hash_num);
176 if ( (NULL == ibf_a) ||
177 (NULL == ibf_b) )
178 {
179 /* insufficient memory */
180 GNUNET_break (0);
181 GNUNET_SCHEDULER_shutdown ();
182 return;
183 }
184
174 185
175 printf ("generated sets\n"); 186 printf ("generated sets\n");
176 187
@@ -229,6 +240,7 @@ run (void *cls, char *const *args, const char *cfgfile,
229 asize + bsize); 240 asize + bsize);
230} 241}
231 242
243
232int 244int
233main (int argc, char **argv) 245main (int argc, char **argv)
234{ 246{
@@ -255,4 +267,3 @@ main (int argc, char **argv)
255 options, &run, NULL, GNUNET_YES); 267 options, &run, NULL, GNUNET_YES);
256 return 0; 268 return 0;
257} 269}
258
diff --git a/src/set/ibf.c b/src/set/ibf.c
index 19d15a5b2..0e8e1839e 100644
--- a/src/set/ibf.c
+++ b/src/set/ibf.c
@@ -41,7 +41,6 @@
41struct IBF_Key 41struct IBF_Key
42ibf_key_from_hashcode (const struct GNUNET_HashCode *hash) 42ibf_key_from_hashcode (const struct GNUNET_HashCode *hash)
43{ 43{
44 /* FIXME: endianess */
45 return *(struct IBF_Key *) hash; 44 return *(struct IBF_Key *) hash;
46} 45}
47 46
@@ -53,11 +52,13 @@ ibf_key_from_hashcode (const struct GNUNET_HashCode *hash)
53 * @param dst hashcode to store the result in 52 * @param dst hashcode to store the result in
54 */ 53 */
55void 54void
56ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst) 55ibf_hashcode_from_key (struct IBF_Key key,
56 struct GNUNET_HashCode *dst)
57{ 57{
58 struct IBF_Key *p; 58 struct IBF_Key *p;
59 unsigned int i; 59 unsigned int i;
60 const unsigned int keys_per_hashcode = sizeof (struct GNUNET_HashCode) / sizeof (struct IBF_Key); 60 const unsigned int keys_per_hashcode = sizeof (struct GNUNET_HashCode) / sizeof (struct IBF_Key);
61
61 p = (struct IBF_Key *) dst; 62 p = (struct IBF_Key *) dst;
62 for (i = 0; i < keys_per_hashcode; i++) 63 for (i = 0; i < keys_per_hashcode; i++)
63 *p++ = key; 64 *p++ = key;
@@ -69,7 +70,7 @@ ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst)
69 * 70 *
70 * @param size number of IBF buckets 71 * @param size number of IBF buckets
71 * @param hash_num number of buckets one element is hashed in 72 * @param hash_num number of buckets one element is hashed in
72 * @return the newly created invertible bloom filter 73 * @return the newly created invertible bloom filter, NULL on error
73 */ 74 */
74struct InvertibleBloomFilter * 75struct InvertibleBloomFilter *
75ibf_create (uint32_t size, uint8_t hash_num) 76ibf_create (uint32_t size, uint8_t hash_num)
@@ -80,20 +81,40 @@ ibf_create (uint32_t size, uint8_t hash_num)
80 81
81 ibf = GNUNET_new (struct InvertibleBloomFilter); 82 ibf = GNUNET_new (struct InvertibleBloomFilter);
82 ibf->count = GNUNET_malloc_large (size * sizeof (uint8_t)); 83 ibf->count = GNUNET_malloc_large (size * sizeof (uint8_t));
84 if (NULL == ibf->count)
85 {
86 GNUNET_free (ibf);
87 return NULL;
88 }
83 ibf->key_sum = GNUNET_malloc_large (size * sizeof (struct IBF_Key)); 89 ibf->key_sum = GNUNET_malloc_large (size * sizeof (struct IBF_Key));
90 if (NULL == ibf->key_sum)
91 {
92 GNUNET_free (ibf->count);
93 GNUNET_free (ibf);
94 return NULL;
95 }
84 ibf->key_hash_sum = GNUNET_malloc_large (size * sizeof (struct IBF_KeyHash)); 96 ibf->key_hash_sum = GNUNET_malloc_large (size * sizeof (struct IBF_KeyHash));
97 if (NULL == ibf->key_hash_sum)
98 {
99 GNUNET_free (ibf->key_sum);
100 GNUNET_free (ibf->count);
101 GNUNET_free (ibf);
102 return NULL;
103 }
85 ibf->size = size; 104 ibf->size = size;
86 ibf->hash_num = hash_num; 105 ibf->hash_num = hash_num;
87 106
88 return ibf; 107 return ibf;
89} 108}
90 109
110
91/** 111/**
92 * Store unique bucket indices for the specified key in dst. 112 * Store unique bucket indices for the specified key in dst.
93 */ 113 */
94static inline void 114static void
95ibf_get_indices (const struct InvertibleBloomFilter *ibf, 115ibf_get_indices (const struct InvertibleBloomFilter *ibf,
96 struct IBF_Key key, int *dst) 116 struct IBF_Key key,
117 int *dst)
97{ 118{
98 uint32_t filled; 119 uint32_t filled;
99 uint32_t i; 120 uint32_t i;
@@ -371,4 +392,3 @@ ibf_destroy (struct InvertibleBloomFilter *ibf)
371 GNUNET_free (ibf->count); 392 GNUNET_free (ibf->count);
372 GNUNET_free (ibf); 393 GNUNET_free (ibf);
373} 394}
374
diff --git a/src/set/ibf.h b/src/set/ibf.h
index 831aa6b82..616cca5f6 100644
--- a/src/set/ibf.h
+++ b/src/set/ibf.h
@@ -123,7 +123,10 @@ struct InvertibleBloomFilter
123 * @param buf buffer to write the data to 123 * @param buf buffer to write the data to
124 */ 124 */
125void 125void
126ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf); 126ibf_write_slice (const struct InvertibleBloomFilter *ibf,
127 uint32_t start,
128 uint32_t count,
129 void *buf);
127 130
128 131
129/** 132/**
@@ -132,10 +135,13 @@ ibf_write_slice (const struct InvertibleBloomFilter *ibf, uint32_t start, uint32
132 * @param buf pointer to the buffer to read from 135 * @param buf pointer to the buffer to read from
133 * @param start which bucket to start at 136 * @param start which bucket to start at
134 * @param count how many buckets to read 137 * @param count how many buckets to read
135 * @param ibf the ibf to read from 138 * @param ibf the ibf to write to
136 */ 139 */
137void 140void
138ibf_read_slice (const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf); 141ibf_read_slice (const void *buf,
142 uint32_t start,
143 uint32_t count,
144 struct InvertibleBloomFilter *ibf);
139 145
140 146
141/** 147/**
@@ -164,7 +170,7 @@ ibf_hashcode_from_key (struct IBF_Key key, struct GNUNET_HashCode *dst);
164 * 170 *
165 * @param size number of IBF buckets 171 * @param size number of IBF buckets
166 * @param hash_num number of buckets one element is hashed in, usually 3 or 4 172 * @param hash_num number of buckets one element is hashed in, usually 3 or 4
167 * @return the newly created invertible bloom filter 173 * @return the newly created invertible bloom filter, NULL on error
168 */ 174 */
169struct InvertibleBloomFilter * 175struct InvertibleBloomFilter *
170ibf_create (uint32_t size, uint8_t hash_num); 176ibf_create (uint32_t size, uint8_t hash_num);
@@ -198,7 +204,8 @@ ibf_remove (struct InvertibleBloomFilter *ibf, struct IBF_Key key);
198 * @param ibf2 IBF that will be subtracted from ibf1 204 * @param ibf2 IBF that will be subtracted from ibf1
199 */ 205 */
200void 206void
201ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2); 207ibf_subtract (struct InvertibleBloomFilter *ibf1,
208 const struct InvertibleBloomFilter *ibf2);
202 209
203 210
204/** 211/**
@@ -209,12 +216,14 @@ ibf_subtract (struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFi
209 * A negative sign indicates that the element was recovered 216 * A negative sign indicates that the element was recovered
210 * resides in an IBF that was previously subtracted from. 217 * resides in an IBF that was previously subtracted from.
211 * @param ret_id receives the hash code of the decoded element, if successful 218 * @param ret_id receives the hash code of the decoded element, if successful
212 * @return GNUNET_YES if decoding an element was successful, 219 * @return #GNUNET_YES if decoding an element was successful,
213 * GNUNET_NO if the IBF is empty, 220 * #GNUNET_NO if the IBF is empty,
214 * GNUNET_SYSERR if the decoding has failed 221 * #GNUNET_SYSERR if the decoding has failed
215 */ 222 */
216int 223int
217ibf_decode (struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id); 224ibf_decode (struct InvertibleBloomFilter *ibf,
225 int *ret_side,
226 struct IBF_Key *ret_id);
218 227
219 228
220/** 229/**
@@ -244,4 +253,3 @@ ibf_destroy (struct InvertibleBloomFilter *ibf);
244#endif 253#endif
245 254
246#endif 255#endif
247