aboutsummaryrefslogtreecommitdiff
path: root/src/set/gnunet-service-set_union.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/set/gnunet-service-set_union.c')
-rw-r--r--src/set/gnunet-service-set_union.c482
1 files changed, 440 insertions, 42 deletions
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index acaabd94a..f46713c31 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -85,6 +85,7 @@ enum UnionOperationPhase
85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS. 85 * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
86 * 86 *
87 * XXX: could use better wording. 87 * XXX: could use better wording.
88 * XXX: repurposed to also expect a "request full set" message, should be renamed
88 * 89 *
89 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS 90 * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
90 */ 91 */
@@ -115,14 +116,22 @@ enum UnionOperationPhase
115 * In the penultimate phase, 116 * In the penultimate phase,
116 * we wait until all our demands 117 * we wait until all our demands
117 * are satisfied. Then we send a done 118 * are satisfied. Then we send a done
118 * message, and wait for another done message.*/ 119 * message, and wait for another done message.
120 */
119 PHASE_FINISH_WAITING, 121 PHASE_FINISH_WAITING,
120 122
121 /** 123 /**
122 * In the ultimate phase, we wait until 124 * In the ultimate phase, we wait until
123 * our demands are satisfied and then 125 * our demands are satisfied and then
124 * quit (sending another DONE message). */ 126 * quit (sending another DONE message).
125 PHASE_DONE 127 */
128 PHASE_DONE,
129
130 /**
131 * After sending the full set, wait for responses with the elements
132 * that the local peer is missing.
133 */
134 PHASE_FULL_SENDING,
126}; 135};
127 136
128 137
@@ -148,7 +157,7 @@ struct OperationState
148 struct InvertibleBloomFilter *local_ibf; 157 struct InvertibleBloomFilter *local_ibf;
149 158
150 /** 159 /**
151 * Maps IBF-Keys (specific to the current salt) to elements. 160 * Maps unsalted IBF-Keys to elements.
152 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. 161 * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
153 * Colliding IBF-Keys are linked. 162 * Colliding IBF-Keys are linked.
154 */ 163 */
@@ -183,6 +192,23 @@ struct OperationState
183 * Salt for the IBF we've received and that we're currently decoding. 192 * Salt for the IBF we've received and that we're currently decoding.
184 */ 193 */
185 uint32_t salt_receive; 194 uint32_t salt_receive;
195
196 /**
197 * Number of elements we received from the other peer
198 * that were not in the local set yet.
199 */
200 uint32_t received_fresh;
201
202 /**
203 * Total number of elements received from the other peer.
204 */
205 uint32_t received_total;
206
207 /**
208 * Initial size of our set, just before
209 * the operation started.
210 */
211 uint64_t initial_size;
186}; 212};
187 213
188 214
@@ -203,6 +229,14 @@ struct KeyEntry
203 * is #GNUNET_YES. 229 * is #GNUNET_YES.
204 */ 230 */
205 struct ElementEntry *element; 231 struct ElementEntry *element;
232
233 /**
234 * Did we receive this element?
235 * Even if element->is_foreign is false, we might
236 * have received the element, so this indicates that
237 * the other peer has it.
238 */
239 int received;
206}; 240};
207 241
208 242
@@ -362,6 +396,16 @@ get_ibf_key (const struct GNUNET_HashCode *src)
362 396
363 397
364/** 398/**
399 * Context for #op_get_element_iterator
400 */
401struct GetElementContext
402{
403 struct GNUNET_HashCode hash;
404 struct KeyEntry *k;
405};
406
407
408/**
365 * Iterator over the mapping from IBF keys to element entries. Checks if we 409 * Iterator over the mapping from IBF keys to element entries. Checks if we
366 * have an element with a given GNUNET_HashCode. 410 * have an element with a given GNUNET_HashCode.
367 * 411 *
@@ -372,17 +416,20 @@ get_ibf_key (const struct GNUNET_HashCode *src)
372 * #GNUNET_NO if we've found the element. 416 * #GNUNET_NO if we've found the element.
373 */ 417 */
374static int 418static int
375op_has_element_iterator (void *cls, 419op_get_element_iterator (void *cls,
376 uint32_t key, 420 uint32_t key,
377 void *value) 421 void *value)
378{ 422{
379 struct GNUNET_HashCode *element_hash = cls; 423 struct GetElementContext *ctx = cls;
380 struct KeyEntry *k = value; 424 struct KeyEntry *k = value;
381 425
382 GNUNET_assert (NULL != k); 426 GNUNET_assert (NULL != k);
383 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash, 427 if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
384 element_hash)) 428 &ctx->hash))
429 {
430 ctx->k = k;
385 return GNUNET_NO; 431 return GNUNET_NO;
432 }
386 return GNUNET_YES; 433 return GNUNET_YES;
387} 434}
388 435
@@ -395,23 +442,29 @@ op_has_element_iterator (void *cls,
395 * @param element_hash hash of the element to look for 442 * @param element_hash hash of the element to look for
396 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise 443 * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
397 */ 444 */
398static int 445static struct KeyEntry *
399op_has_element (struct Operation *op, 446op_get_element (struct Operation *op,
400 const struct GNUNET_HashCode *element_hash) 447 const struct GNUNET_HashCode *element_hash)
401{ 448{
402 int ret; 449 int ret;
403 struct IBF_Key ibf_key; 450 struct IBF_Key ibf_key;
451 struct GetElementContext ctx = {{{ 0 }} , 0};
452
453 ctx.hash = *element_hash;
404 454
405 ibf_key = get_ibf_key (element_hash); 455 ibf_key = get_ibf_key (element_hash);
406 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element, 456 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
407 (uint32_t) ibf_key.key_val, 457 (uint32_t) ibf_key.key_val,
408 op_has_element_iterator, 458 op_get_element_iterator,
409 (void *) element_hash); 459 &ctx);
410 460
411 /* was the iteration aborted because we found the element? */ 461 /* was the iteration aborted because we found the element? */
412 if (GNUNET_SYSERR == ret) 462 if (GNUNET_SYSERR == ret)
413 return GNUNET_YES; 463 {
414 return GNUNET_NO; 464 GNUNET_assert (NULL != ctx.k);
465 return ctx.k;
466 }
467 return NULL;
415} 468}
416 469
417 470
@@ -427,10 +480,12 @@ op_has_element (struct Operation *op,
427 * 480 *
428 * @param op the union operation 481 * @param op the union operation
429 * @param ee the element entry 482 * @param ee the element entry
483 * @parem received was this element received from the remote peer?
430 */ 484 */
431static void 485static void
432op_register_element (struct Operation *op, 486op_register_element (struct Operation *op,
433 struct ElementEntry *ee) 487 struct ElementEntry *ee,
488 int received)
434{ 489{
435 struct IBF_Key ibf_key; 490 struct IBF_Key ibf_key;
436 struct KeyEntry *k; 491 struct KeyEntry *k;
@@ -439,6 +494,7 @@ op_register_element (struct Operation *op,
439 k = GNUNET_new (struct KeyEntry); 494 k = GNUNET_new (struct KeyEntry);
440 k->element = ee; 495 k->element = ee;
441 k->ibf_key = ibf_key; 496 k->ibf_key = ibf_key;
497 k->received = received;
442 GNUNET_assert (GNUNET_OK == 498 GNUNET_assert (GNUNET_OK ==
443 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element, 499 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
444 (uint32_t) ibf_key.key_val, 500 (uint32_t) ibf_key.key_val,
@@ -524,12 +580,30 @@ init_key_to_element_iterator (void *cls,
524 580
525 GNUNET_assert (GNUNET_NO == ee->remote); 581 GNUNET_assert (GNUNET_NO == ee->remote);
526 582
527 op_register_element (op, ee); 583 op_register_element (op, ee, GNUNET_NO);
528 return GNUNET_YES; 584 return GNUNET_YES;
529} 585}
530 586
531 587
532/** 588/**
589 * Initialize the IBF key to element mapping local to this set
590 * operation.
591 *
592 * @param op the set union operation
593 */
594static void
595initialize_key_to_element (struct Operation *op)
596{
597 unsigned int len;
598
599 GNUNET_assert (NULL == op->state->key_to_element);
600 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
601 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
602 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, init_key_to_element_iterator, op);
603}
604
605
606/**
533 * Create an ibf with the operation's elements 607 * Create an ibf with the operation's elements
534 * of the specified size 608 * of the specified size
535 * 609 *
@@ -541,15 +615,8 @@ static int
541prepare_ibf (struct Operation *op, 615prepare_ibf (struct Operation *op,
542 uint32_t size) 616 uint32_t size)
543{ 617{
544 if (NULL == op->state->key_to_element) 618 GNUNET_assert (NULL != op->state->key_to_element);
545 {
546 unsigned int len;
547 619
548 len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
549 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
550 GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
551 init_key_to_element_iterator, op);
552 }
553 if (NULL != op->state->local_ibf) 620 if (NULL != op->state->local_ibf)
554 ibf_destroy (op->state->local_ibf); 621 ibf_destroy (op->state->local_ibf);
555 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); 622 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
@@ -648,7 +715,7 @@ send_strata_estimator (struct Operation *op)
648{ 715{
649 const struct StrataEstimator *se = op->state->se; 716 const struct StrataEstimator *se = op->state->se;
650 struct GNUNET_MQ_Envelope *ev; 717 struct GNUNET_MQ_Envelope *ev;
651 struct GNUNET_MessageHeader *strata_msg; 718 struct StrataEstimatorMessage *strata_msg;
652 char *buf; 719 char *buf;
653 size_t len; 720 size_t len;
654 uint16_t type; 721 uint16_t type;
@@ -660,13 +727,14 @@ send_strata_estimator (struct Operation *op)
660 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC; 727 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC;
661 else 728 else
662 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE; 729 type = GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE;
663 ev = GNUNET_MQ_msg_header_extra (strata_msg, 730 ev = GNUNET_MQ_msg_extra (strata_msg,
664 len, 731 len,
665 type); 732 type);
666 GNUNET_memcpy (&strata_msg[1], 733 GNUNET_memcpy (&strata_msg[1],
667 buf, 734 buf,
668 len); 735 len);
669 GNUNET_free (buf); 736 GNUNET_free (buf);
737 strata_msg->set_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements));
670 GNUNET_MQ_send (op->mq, 738 GNUNET_MQ_send (op->mq,
671 ev); 739 ev);
672 op->state->phase = PHASE_EXPECT_IBF; 740 op->state->phase = PHASE_EXPECT_IBF;
@@ -693,7 +761,51 @@ get_order_from_difference (unsigned int diff)
693 ibf_order++; 761 ibf_order++;
694 if (ibf_order > MAX_IBF_ORDER) 762 if (ibf_order > MAX_IBF_ORDER)
695 ibf_order = MAX_IBF_ORDER; 763 ibf_order = MAX_IBF_ORDER;
696 return ibf_order; 764 // add one for correction
765 return ibf_order + 1;
766}
767
768
769/**
770 * Send a set element.
771 *
772 * @param cls the union operation `struct Operation *`
773 * @param key unused
774 * @param value the `struct ElementEntry *` to insert
775 * into the key-to-element mapping
776 * @return #GNUNET_YES (to continue iterating)
777 */
778static int
779send_element_iterator (void *cls,
780 const struct GNUNET_HashCode *key,
781 void *value)
782{
783 struct Operation *op = cls;
784 struct GNUNET_SET_ElementMessage *emsg;
785 struct ElementEntry *ee = value;
786 struct GNUNET_SET_Element *el = &ee->element;
787 struct GNUNET_MQ_Envelope *ev;
788
789
790 ev = GNUNET_MQ_msg_extra (emsg, el->size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
791 emsg->element_type = htons (el->element_type);
792 GNUNET_memcpy (&emsg[1], el->data, el->size);
793 GNUNET_MQ_send (op->mq, ev);
794 return GNUNET_YES;
795}
796
797
798static void
799send_full_set (struct Operation *op)
800{
801 struct GNUNET_MQ_Envelope *ev;
802
803 op->state->phase = PHASE_FULL_SENDING;
804
805 (void) GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
806 &send_element_iterator, op);
807 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
808 GNUNET_MQ_send (op->mq, ev);
697} 809}
698 810
699 811
@@ -713,7 +825,9 @@ handle_p2p_strata_estimator (void *cls,
713{ 825{
714 struct Operation *op = cls; 826 struct Operation *op = cls;
715 struct StrataEstimator *remote_se; 827 struct StrataEstimator *remote_se;
716 int diff; 828 struct StrataEstimatorMessage *msg = (void *) mh;
829 unsigned int diff;
830 uint64_t other_size;
717 size_t len; 831 size_t len;
718 832
719 GNUNET_STATISTICS_update (_GSS_statistics, 833 GNUNET_STATISTICS_update (_GSS_statistics,
@@ -723,11 +837,11 @@ handle_p2p_strata_estimator (void *cls,
723 837
724 if (op->state->phase != PHASE_EXPECT_SE) 838 if (op->state->phase != PHASE_EXPECT_SE)
725 { 839 {
726 fail_union_operation (op);
727 GNUNET_break (0); 840 GNUNET_break (0);
841 fail_union_operation (op);
728 return GNUNET_SYSERR; 842 return GNUNET_SYSERR;
729 } 843 }
730 len = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); 844 len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
731 if ( (GNUNET_NO == is_compressed) && 845 if ( (GNUNET_NO == is_compressed) &&
732 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) ) 846 (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
733 { 847 {
@@ -735,6 +849,7 @@ handle_p2p_strata_estimator (void *cls,
735 GNUNET_break (0); 849 GNUNET_break (0);
736 return GNUNET_SYSERR; 850 return GNUNET_SYSERR;
737 } 851 }
852 other_size = GNUNET_ntohll (msg->set_size);
738 remote_se = strata_estimator_create (SE_STRATA_COUNT, 853 remote_se = strata_estimator_create (SE_STRATA_COUNT,
739 SE_IBF_SIZE, 854 SE_IBF_SIZE,
740 SE_IBF_HASH_NUM); 855 SE_IBF_HASH_NUM);
@@ -745,7 +860,7 @@ handle_p2p_strata_estimator (void *cls,
745 return GNUNET_SYSERR; 860 return GNUNET_SYSERR;
746 } 861 }
747 if (GNUNET_OK != 862 if (GNUNET_OK !=
748 strata_estimator_read (&mh[1], 863 strata_estimator_read (&msg[1],
749 len, 864 len,
750 is_compressed, 865 is_compressed,
751 remote_se)) 866 remote_se))
@@ -758,6 +873,10 @@ handle_p2p_strata_estimator (void *cls,
758 GNUNET_assert (NULL != op->state->se); 873 GNUNET_assert (NULL != op->state->se);
759 diff = strata_estimator_difference (remote_se, 874 diff = strata_estimator_difference (remote_se,
760 op->state->se); 875 op->state->se);
876
877 if (diff > 200)
878 diff = diff * 3 / 2;
879
761 strata_estimator_destroy (remote_se); 880 strata_estimator_destroy (remote_se);
762 strata_estimator_destroy (op->state->se); 881 strata_estimator_destroy (op->state->se);
763 op->state->se = NULL; 882 op->state->se = NULL;
@@ -765,16 +884,55 @@ handle_p2p_strata_estimator (void *cls,
765 "got se diff=%d, using ibf size %d\n", 884 "got se diff=%d, using ibf size %d\n",
766 diff, 885 diff,
767 1<<get_order_from_difference (diff)); 886 1<<get_order_from_difference (diff));
768 if (GNUNET_OK != 887
769 send_ibf (op, 888 if ((GNUNET_YES == op->spec->byzantine) && (other_size < op->spec->byzantine_lower_bound))
770 get_order_from_difference (diff)))
771 { 889 {
772 /* Internal error, best we can do is shut the connection */ 890 GNUNET_break (0);
773 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
774 "Failed to send IBF, closing connection\n");
775 fail_union_operation (op); 891 fail_union_operation (op);
776 return GNUNET_SYSERR; 892 return GNUNET_SYSERR;
777 } 893 }
894
895
896 if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size / 4))
897 {
898 LOG (GNUNET_ERROR_TYPE_INFO,
899 "Sending full set (diff=%d, own set=%u)\n",
900 diff,
901 op->state->initial_size);
902 GNUNET_STATISTICS_update (_GSS_statistics,
903 "# of full sends",
904 1,
905 GNUNET_NO);
906 if (op->state->initial_size <= other_size)
907 {
908 send_full_set (op);
909 }
910 else
911 {
912 struct GNUNET_MQ_Envelope *ev;
913 op->state->phase = PHASE_EXPECT_IBF;
914 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
915 GNUNET_MQ_send (op->mq, ev);
916 }
917 }
918 else
919 {
920 GNUNET_STATISTICS_update (_GSS_statistics,
921 "# of ibf sends",
922 1,
923 GNUNET_NO);
924 if (GNUNET_OK !=
925 send_ibf (op,
926 get_order_from_difference (diff)))
927 {
928 /* Internal error, best we can do is shut the connection */
929 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
930 "Failed to send IBF, closing connection\n");
931 fail_union_operation (op);
932 return GNUNET_SYSERR;
933 }
934 }
935
778 return GNUNET_OK; 936 return GNUNET_OK;
779} 937}
780 938
@@ -1141,7 +1299,8 @@ send_client_element (struct Operation *op,
1141 } 1299 }
1142 rm->result_status = htons (status); 1300 rm->result_status = htons (status);
1143 rm->request_id = htonl (op->spec->client_request_id); 1301 rm->request_id = htonl (op->spec->client_request_id);
1144 rm->element_type = element->element_type; 1302 rm->element_type = htons (element->element_type);
1303 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1145 GNUNET_memcpy (&rm[1], element->data, element->size); 1304 GNUNET_memcpy (&rm[1], element->data, element->size);
1146 GNUNET_MQ_send (op->spec->set->client_mq, ev); 1305 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1147} 1306}
@@ -1164,6 +1323,7 @@ send_done_and_destroy (void *cls)
1164 rm->request_id = htonl (op->spec->client_request_id); 1323 rm->request_id = htonl (op->spec->client_request_id);
1165 rm->result_status = htons (GNUNET_SET_STATUS_DONE); 1324 rm->result_status = htons (GNUNET_SET_STATUS_DONE);
1166 rm->element_type = htons (0); 1325 rm->element_type = htons (0);
1326 rm->current_size = GNUNET_htonll (GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element));
1167 GNUNET_MQ_send (op->spec->set->client_mq, ev); 1327 GNUNET_MQ_send (op->spec->set->client_mq, ev);
1168 /* Will also call the union-specific cancel function. */ 1328 /* Will also call the union-specific cancel function. */
1169 _GSS_operation_destroy (op, GNUNET_YES); 1329 _GSS_operation_destroy (op, GNUNET_YES);
@@ -1210,6 +1370,8 @@ maybe_finish (struct Operation *op)
1210 1370
1211/** 1371/**
1212 * Handle an element message from a remote peer. 1372 * Handle an element message from a remote peer.
1373 * Sent by the other peer either because we decoded an IBF and placed a demand,
1374 * or because the other peer switched to full set transmission.
1213 * 1375 *
1214 * @param cls the union operation 1376 * @param cls the union operation
1215 * @param mh the message 1377 * @param mh the message
@@ -1273,7 +1435,11 @@ handle_p2p_elements (void *cls,
1273 1, 1435 1,
1274 GNUNET_NO); 1436 GNUNET_NO);
1275 1437
1276 if (GNUNET_YES == op_has_element (op, &ee->element_hash)) 1438 op->state->received_total += 1;
1439
1440 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1441
1442 if (NULL != ke)
1277 { 1443 {
1278 /* Got repeated element. Should not happen since 1444 /* Got repeated element. Should not happen since
1279 * we track demands. */ 1445 * we track demands. */
@@ -1281,13 +1447,15 @@ handle_p2p_elements (void *cls,
1281 "# repeated elements", 1447 "# repeated elements",
1282 1, 1448 1,
1283 GNUNET_NO); 1449 GNUNET_NO);
1450 ke->received = GNUNET_YES;
1284 GNUNET_free (ee); 1451 GNUNET_free (ee);
1285 } 1452 }
1286 else 1453 else
1287 { 1454 {
1288 LOG (GNUNET_ERROR_TYPE_DEBUG, 1455 LOG (GNUNET_ERROR_TYPE_DEBUG,
1289 "Registering new element from remote peer\n"); 1456 "Registering new element from remote peer\n");
1290 op_register_element (op, ee); 1457 op->state->received_fresh += 1;
1458 op_register_element (op, ee, GNUNET_YES);
1291 /* only send results immediately if the client wants it */ 1459 /* only send results immediately if the client wants it */
1292 switch (op->spec->result_mode) 1460 switch (op->spec->result_mode)
1293 { 1461 {
@@ -1304,11 +1472,118 @@ handle_p2p_elements (void *cls,
1304 } 1472 }
1305 } 1473 }
1306 1474
1475 if (op->state->received_total > 8 && op->state->received_fresh < op->state->received_total / 3)
1476 {
1477 /* The other peer gave us lots of old elements, there's something wrong. */
1478 GNUNET_break_op (0);
1479 fail_union_operation (op);
1480 return;
1481 }
1482
1307 maybe_finish (op); 1483 maybe_finish (op);
1308} 1484}
1309 1485
1310 1486
1311/** 1487/**
1488 * Handle an element message from a remote peer.
1489 *
1490 * @param cls the union operation
1491 * @param mh the message
1492 */
1493static void
1494handle_p2p_full_element (void *cls,
1495 const struct GNUNET_MessageHeader *mh)
1496{
1497 struct Operation *op = cls;
1498 struct ElementEntry *ee;
1499 const struct GNUNET_SET_ElementMessage *emsg;
1500 uint16_t element_size;
1501
1502 if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
1503 {
1504 GNUNET_break_op (0);
1505 fail_union_operation (op);
1506 return;
1507 }
1508
1509 emsg = (const struct GNUNET_SET_ElementMessage *) mh;
1510
1511 element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
1512 ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
1513 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1514 ee->element.size = element_size;
1515 ee->element.data = &ee[1];
1516 ee->element.element_type = ntohs (emsg->element_type);
1517 ee->remote = GNUNET_YES;
1518 GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
1519
1520 LOG (GNUNET_ERROR_TYPE_DEBUG,
1521 "Got element (full diff, size %u, hash %s) from peer\n",
1522 (unsigned int) element_size,
1523 GNUNET_h2s (&ee->element_hash));
1524
1525 GNUNET_STATISTICS_update (_GSS_statistics,
1526 "# received elements",
1527 1,
1528 GNUNET_NO);
1529 GNUNET_STATISTICS_update (_GSS_statistics,
1530 "# exchanged elements",
1531 1,
1532 GNUNET_NO);
1533
1534 op->state->received_total += 1;
1535
1536 struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
1537
1538 if (NULL != ke)
1539 {
1540 /* Got repeated element. Should not happen since
1541 * we track demands. */
1542 GNUNET_STATISTICS_update (_GSS_statistics,
1543 "# repeated elements",
1544 1,
1545 GNUNET_NO);
1546 ke->received = GNUNET_YES;
1547 GNUNET_free (ee);
1548 }
1549 else
1550 {
1551 LOG (GNUNET_ERROR_TYPE_DEBUG,
1552 "Registering new element from remote peer\n");
1553 op->state->received_fresh += 1;
1554 op_register_element (op, ee, GNUNET_YES);
1555 /* only send results immediately if the client wants it */
1556 switch (op->spec->result_mode)
1557 {
1558 case GNUNET_SET_RESULT_ADDED:
1559 send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
1560 break;
1561 case GNUNET_SET_RESULT_SYMMETRIC:
1562 send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
1563 break;
1564 default:
1565 /* Result mode not supported, should have been caught earlier. */
1566 GNUNET_break (0);
1567 break;
1568 }
1569 }
1570
1571 if ( (GNUNET_YES == op->spec->byzantine) &&
1572 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1573 (op->state->received_fresh < op->state->received_total / 6) )
1574 {
1575 /* The other peer gave us lots of old elements, there's something wrong. */
1576 LOG (GNUNET_ERROR_TYPE_ERROR,
1577 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1578 (unsigned long long) op->state->received_fresh,
1579 (unsigned long long) op->state->received_total);
1580 GNUNET_break_op (0);
1581 fail_union_operation (op);
1582 return;
1583 }
1584}
1585
1586/**
1312 * Send offers (for GNUNET_Hash-es) in response 1587 * Send offers (for GNUNET_Hash-es) in response
1313 * to inquiries (for IBF_Key-s). 1588 * to inquiries (for IBF_Key-s).
1314 * 1589 *
@@ -1355,7 +1630,116 @@ handle_p2p_inquiry (void *cls,
1355 1630
1356 1631
1357/** 1632/**
1358 * FIXME 1633 * Iterator over hash map entries, called to
1634 * destroy the linked list of colliding ibf key entries.
1635 *
1636 * @param cls closure
1637 * @param key current key code
1638 * @param value value in the hash map
1639 * @return #GNUNET_YES if we should continue to iterate,
1640 * #GNUNET_NO if not.
1641 */
1642static int
1643send_missing_elements_iter (void *cls,
1644 uint32_t key,
1645 void *value)
1646{
1647 struct Operation *op = cls;
1648 struct KeyEntry *ke = value;
1649 struct GNUNET_MQ_Envelope *ev;
1650 struct GNUNET_SET_ElementMessage *emsg;
1651 struct ElementEntry *ee = ke->element;
1652
1653 if (GNUNET_YES == ke->received)
1654 return GNUNET_YES;
1655
1656 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
1657 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
1658 emsg->reserved = htons (0);
1659 emsg->element_type = htons (ee->element.element_type);
1660 GNUNET_MQ_send (op->mq, ev);
1661
1662 return GNUNET_YES;
1663}
1664
1665
1666/**
1667 * Handle a
1668 *
1669 * @parem cls closure, a set union operation
1670 * @param mh the demand message
1671 */
1672static void
1673handle_p2p_request_full (void *cls,
1674 const struct GNUNET_MessageHeader *mh)
1675{
1676 struct Operation *op = cls;
1677
1678 if (PHASE_EXPECT_IBF != op->state->phase)
1679 {
1680 fail_union_operation (op);
1681 GNUNET_break_op (0);
1682 return;
1683 }
1684
1685 // FIXME: we need to check that our set is larger than the
1686 // byzantine_lower_bound by some threshold
1687 send_full_set (op);
1688}
1689
1690
1691/**
1692 * Handle a "full done" message.
1693 *
1694 * @parem cls closure, a set union operation
1695 * @param mh the demand message
1696 */
1697static void
1698handle_p2p_full_done (void *cls,
1699 const struct GNUNET_MessageHeader *mh)
1700{
1701 struct Operation *op = cls;
1702
1703 if (PHASE_EXPECT_IBF == op->state->phase)
1704 {
1705 struct GNUNET_MQ_Envelope *ev;
1706
1707 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other peer is missing\n");
1708
1709 /* send all the elements that did not come from the remote peer */
1710 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1711 &send_missing_elements_iter,
1712 op);
1713
1714 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
1715 GNUNET_MQ_send (op->mq, ev);
1716 op->state->phase = PHASE_DONE;
1717
1718 /* we now wait until the other peer shuts the tunnel down*/
1719 }
1720 else if (PHASE_FULL_SENDING == op->state->phase)
1721 {
1722 LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
1723 /* We sent the full set, and got the response for that. We're done. */
1724 op->state->phase = PHASE_DONE;
1725 send_done_and_destroy (op);
1726 }
1727 else
1728 {
1729 GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", (unsigned) op->state->phase);
1730 GNUNET_break_op (0);
1731 fail_union_operation (op);
1732 return;
1733 }
1734}
1735
1736
1737/**
1738 * Handle a demand by the other peer for elements based on a list
1739 * of GNUNET_HashCode-s.
1740 *
1741 * @parem cls closure, a set union operation
1742 * @param mh the demand message
1359 */ 1743 */
1360static void 1744static void
1361handle_p2p_demand (void *cls, 1745handle_p2p_demand (void *cls,
@@ -1607,6 +1991,9 @@ union_evaluate (struct Operation *op,
1607 else 1991 else
1608 LOG (GNUNET_ERROR_TYPE_DEBUG, 1992 LOG (GNUNET_ERROR_TYPE_DEBUG,
1609 "sent op request without context message\n"); 1993 "sent op request without context message\n");
1994
1995 initialize_key_to_element (op);
1996 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
1610} 1997}
1611 1998
1612 1999
@@ -1636,6 +2023,8 @@ union_accept (struct Operation *op)
1636 op->state->se = strata_estimator_dup (op->spec->set->state->se); 2023 op->state->se = strata_estimator_dup (op->spec->set->state->se);
1637 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO); 2024 op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_NO);
1638 op->state->salt_receive = op->state->salt_send = 42; 2025 op->state->salt_receive = op->state->salt_send = 42;
2026 initialize_key_to_element (op);
2027 op->state->initial_size = GNUNET_CONTAINER_multihashmap32_size (op->state->key_to_element);
1639 /* kick off the operation */ 2028 /* kick off the operation */
1640 send_strata_estimator (op); 2029 send_strata_estimator (op);
1641} 2030}
@@ -1743,6 +2132,9 @@ union_handle_p2p_message (struct Operation *op,
1743 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: 2132 case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
1744 handle_p2p_elements (op, mh); 2133 handle_p2p_elements (op, mh);
1745 break; 2134 break;
2135 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
2136 handle_p2p_full_element (op, mh);
2137 break;
1746 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY: 2138 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
1747 handle_p2p_inquiry (op, mh); 2139 handle_p2p_inquiry (op, mh);
1748 break; 2140 break;
@@ -1755,6 +2147,12 @@ union_handle_p2p_message (struct Operation *op,
1755 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND: 2147 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
1756 handle_p2p_demand (op, mh); 2148 handle_p2p_demand (op, mh);
1757 break; 2149 break;
2150 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
2151 handle_p2p_full_done (op, mh);
2152 break;
2153 case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
2154 handle_p2p_request_full (op, mh);
2155 break;
1758 default: 2156 default:
1759 /* Something wrong with cadet's message handlers? */ 2157 /* Something wrong with cadet's message handlers? */
1760 GNUNET_assert (0); 2158 GNUNET_assert (0);