diff options
author | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-10-21 16:45:00 +0000 |
---|---|---|
committer | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-10-21 16:45:00 +0000 |
commit | d64a78aa23dc94f97b7bbc474682b1ca8a2c8a06 (patch) | |
tree | fda6fa570388e67dd333f386766766b67975b2a4 /src/set | |
parent | 095fbe3033d707b4136d33c3c44e02241ab1534a (diff) | |
download | gnunet-d64a78aa23dc94f97b7bbc474682b1ca8a2c8a06.tar.gz gnunet-d64a78aa23dc94f97b7bbc474682b1ca8a2c8a06.zip |
more work on intersection
Diffstat (limited to 'src/set')
-rw-r--r-- | src/set/gnunet-service-set_intersection.c | 563 |
1 files changed, 35 insertions, 528 deletions
diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index a1bc550f6..221fb74a3 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c | |||
@@ -75,7 +75,7 @@ enum IntersectionOperationPhase | |||
75 | /** | 75 | /** |
76 | * We sent the request message, and expect a BF | 76 | * We sent the request message, and expect a BF |
77 | */ | 77 | */ |
78 | PHASE_EXPECT_BF, | 78 | PHASE_BF_EXCHANGE, |
79 | /** | 79 | /** |
80 | * The protocol is over. | 80 | * The protocol is over. |
81 | * Results may still have to be sent to the client. | 81 | * Results may still have to be sent to the client. |
@@ -107,32 +107,14 @@ struct OperationState | |||
107 | struct GNUNET_MQ_Handle *mq; | 107 | struct GNUNET_MQ_Handle *mq; |
108 | 108 | ||
109 | /** | 109 | /** |
110 | * Number of ibf buckets received | 110 | * The bf we currently receive |
111 | */ | 111 | */ |
112 | unsigned int ibf_buckets_received; | 112 | struct BloomFilter *remote_bf; |
113 | 113 | ||
114 | /** | 114 | /** |
115 | * Copy of the set's strata estimator at the time of | 115 | * BF of the set's element. |
116 | * creation of this operation | ||
117 | */ | 116 | */ |
118 | struct StrataEstimator *se; | 117 | struct BloomFilter *local_bf; |
119 | |||
120 | /** | ||
121 | * The ibf we currently receive | ||
122 | */ | ||
123 | struct InvertibleBloomFilter *remote_ibf; | ||
124 | |||
125 | /** | ||
126 | * IBF of the set's element. | ||
127 | */ | ||
128 | struct InvertibleBloomFilter *local_ibf; | ||
129 | |||
130 | /** | ||
131 | * Maps IBF-Keys (specific to the current salt) to elements. | ||
132 | * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key. | ||
133 | * Colliding IBF-Keys are linked. | ||
134 | */ | ||
135 | struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element; | ||
136 | 118 | ||
137 | /** | 119 | /** |
138 | * Current state of the operation. | 120 | * Current state of the operation. |
@@ -297,27 +279,7 @@ intersection_operation_destroy (struct OperationState *eo) | |||
297 | eo->tunnel = NULL; | 279 | eo->tunnel = NULL; |
298 | GNUNET_MESH_tunnel_destroy (t); | 280 | GNUNET_MESH_tunnel_destroy (t); |
299 | } | 281 | } |
300 | if (NULL != eo->remote_ibf) | 282 | // TODO: destroy set elements? |
301 | { | ||
302 | ibf_destroy (eo->remote_ibf); | ||
303 | eo->remote_ibf = NULL; | ||
304 | } | ||
305 | if (NULL != eo->local_ibf) | ||
306 | { | ||
307 | ibf_destroy (eo->local_ibf); | ||
308 | eo->local_ibf = NULL; | ||
309 | } | ||
310 | if (NULL != eo->se) | ||
311 | { | ||
312 | strata_estimator_destroy (eo->se); | ||
313 | eo->se = NULL; | ||
314 | } | ||
315 | if (NULL != eo->key_to_element) | ||
316 | { | ||
317 | GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, destroy_key_to_element_iter, NULL); | ||
318 | GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element); | ||
319 | eo->key_to_element = NULL; | ||
320 | } | ||
321 | if (NULL != eo->spec) | 283 | if (NULL != eo->spec) |
322 | { | 284 | { |
323 | if (NULL != eo->spec->context_msg) | 285 | if (NULL != eo->spec->context_msg) |
@@ -488,27 +450,6 @@ op_register_element (struct OperationState *eo, struct ElementEntry *ee) | |||
488 | } | 450 | } |
489 | 451 | ||
490 | 452 | ||
491 | /** | ||
492 | * Insert a key into an ibf. | ||
493 | * | ||
494 | * @param cls the ibf | ||
495 | * @param key unused | ||
496 | * @param value the key entry to get the key from | ||
497 | */ | ||
498 | static int | ||
499 | prepare_ibf_iterator (void *cls, | ||
500 | uint32_t key, | ||
501 | void *value) | ||
502 | { | ||
503 | struct InvertibleBloomFilter *ibf = cls; | ||
504 | struct KeyEntry *ke = value; | ||
505 | |||
506 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting %x into ibf\n", ke->ibf_key.key_val); | ||
507 | |||
508 | ibf_insert (ibf, ke->ibf_key); | ||
509 | return GNUNET_YES; | ||
510 | } | ||
511 | |||
512 | 453 | ||
513 | /** | 454 | /** |
514 | * Iterator for initializing the | 455 | * Iterator for initializing the |
@@ -542,324 +483,6 @@ init_key_to_element_iterator (void *cls, | |||
542 | return GNUNET_YES; | 483 | return GNUNET_YES; |
543 | } | 484 | } |
544 | 485 | ||
545 | |||
546 | /** | ||
547 | * Create an ibf with the operation's elements | ||
548 | * of the specified size | ||
549 | * | ||
550 | * @param eo the intersection operation | ||
551 | * @param size size of the ibf to create | ||
552 | */ | ||
553 | static void | ||
554 | prepare_ibf (struct OperationState *eo, uint16_t size) | ||
555 | { | ||
556 | if (NULL == eo->key_to_element) | ||
557 | { | ||
558 | unsigned int len; | ||
559 | len = GNUNET_CONTAINER_multihashmap_size (eo->set->elements); | ||
560 | eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); | ||
561 | GNUNET_CONTAINER_multihashmap_iterate (eo->set->elements, | ||
562 | init_key_to_element_iterator, eo); | ||
563 | } | ||
564 | if (NULL != eo->local_ibf) | ||
565 | ibf_destroy (eo->local_ibf); | ||
566 | eo->local_ibf = ibf_create (size, SE_IBF_HASH_NUM); | ||
567 | GNUNET_CONTAINER_multihashmap32_iterate (eo->key_to_element, | ||
568 | prepare_ibf_iterator, eo->local_ibf); | ||
569 | } | ||
570 | |||
571 | |||
572 | /** | ||
573 | * Send an ibf of appropriate size. | ||
574 | * | ||
575 | * @param eo the intersection operation | ||
576 | * @param ibf_order order of the ibf to send, size=2^order | ||
577 | */ | ||
578 | static void | ||
579 | send_ibf (struct OperationState *eo, uint16_t ibf_order) | ||
580 | { | ||
581 | unsigned int buckets_sent = 0; | ||
582 | struct InvertibleBloomFilter *ibf; | ||
583 | |||
584 | prepare_ibf (eo, 1<<ibf_order); | ||
585 | |||
586 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 1<<ibf_order); | ||
587 | |||
588 | ibf = eo->local_ibf; | ||
589 | |||
590 | while (buckets_sent < (1 << ibf_order)) | ||
591 | { | ||
592 | unsigned int buckets_in_message; | ||
593 | struct GNUNET_MQ_Envelope *ev; | ||
594 | struct IBFMessage *msg; | ||
595 | |||
596 | buckets_in_message = (1 << ibf_order) - buckets_sent; | ||
597 | /* limit to maximum */ | ||
598 | if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) | ||
599 | buckets_in_message = MAX_BUCKETS_PER_MESSAGE; | ||
600 | |||
601 | ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, | ||
602 | GNUNET_MESSAGE_TYPE_SET_P2P_IBF); | ||
603 | msg->reserved = 0; | ||
604 | msg->order = ibf_order; | ||
605 | msg->offset = htons (buckets_sent); | ||
606 | ibf_write_slice (ibf, buckets_sent, | ||
607 | buckets_in_message, &msg[1]); | ||
608 | buckets_sent += buckets_in_message; | ||
609 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n", | ||
610 | buckets_in_message, buckets_sent, 1<<ibf_order); | ||
611 | GNUNET_MQ_send (eo->mq, ev); | ||
612 | } | ||
613 | |||
614 | eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; | ||
615 | } | ||
616 | |||
617 | |||
618 | /** | ||
619 | * Send a strata estimator to the remote peer. | ||
620 | * | ||
621 | * @param eo the intersection operation with the remote peer | ||
622 | */ | ||
623 | static void | ||
624 | send_strata_estimator (struct OperationState *eo) | ||
625 | { | ||
626 | struct GNUNET_MQ_Envelope *ev; | ||
627 | struct GNUNET_MessageHeader *strata_msg; | ||
628 | |||
629 | ev = GNUNET_MQ_msg_header_extra (strata_msg, | ||
630 | SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, | ||
631 | GNUNET_MESSAGE_TYPE_SET_P2P_SE); | ||
632 | strata_estimator_write (eo->set->state->se, &strata_msg[1]); | ||
633 | GNUNET_MQ_send (eo->mq, ev); | ||
634 | eo->phase = PHASE_EXPECT_BF; | ||
635 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n"); | ||
636 | } | ||
637 | |||
638 | |||
639 | /** | ||
640 | * Compute the necessary order of an ibf | ||
641 | * from the size of the symmetric set difference. | ||
642 | * | ||
643 | * @param diff the difference | ||
644 | * @return the required size of the ibf | ||
645 | */ | ||
646 | static unsigned int | ||
647 | get_order_from_difference (unsigned int diff) | ||
648 | { | ||
649 | unsigned int ibf_order; | ||
650 | |||
651 | ibf_order = 2; | ||
652 | while ((1<<ibf_order) < (IBF_ALPHA * diff) || (1<<ibf_order) < SE_IBF_HASH_NUM) | ||
653 | ibf_order++; | ||
654 | if (ibf_order > MAX_IBF_ORDER) | ||
655 | ibf_order = MAX_IBF_ORDER; | ||
656 | return ibf_order; | ||
657 | } | ||
658 | |||
659 | |||
660 | /** | ||
661 | * Handle a strata estimator from a remote peer | ||
662 | * | ||
663 | * @param cls the intersection operation | ||
664 | * @param mh the message | ||
665 | */ | ||
666 | static void | ||
667 | handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) | ||
668 | { | ||
669 | struct OperationState *eo = cls; | ||
670 | struct StrataEstimator *remote_se; | ||
671 | int diff; | ||
672 | |||
673 | if (eo->phase != PHASE_EXPECT_SE) | ||
674 | { | ||
675 | fail_intersection_operation (eo); | ||
676 | GNUNET_break (0); | ||
677 | return; | ||
678 | } | ||
679 | remote_se = strata_estimator_create (SE_STRATA_COUNT, SE_IBF_SIZE, | ||
680 | SE_IBF_HASH_NUM); | ||
681 | strata_estimator_read (&mh[1], remote_se); | ||
682 | GNUNET_assert (NULL != eo->se); | ||
683 | diff = strata_estimator_difference (remote_se, eo->se); | ||
684 | strata_estimator_destroy (remote_se); | ||
685 | strata_estimator_destroy (eo->se); | ||
686 | eo->se = NULL; | ||
687 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se diff=%d, using ibf size %d\n", | ||
688 | diff, 1<<get_order_from_difference (diff)); | ||
689 | send_ibf (eo, get_order_from_difference (diff)); | ||
690 | } | ||
691 | |||
692 | |||
693 | |||
694 | /** | ||
695 | * Iterator to send elements to a remote peer | ||
696 | * | ||
697 | * @param cls closure with the element key and the intersection operation | ||
698 | * @param key ignored | ||
699 | * @param value the key entry | ||
700 | */ | ||
701 | static int | ||
702 | send_element_iterator (void *cls, | ||
703 | uint32_t key, | ||
704 | void *value) | ||
705 | { | ||
706 | struct SendElementClosure *sec = cls; | ||
707 | struct IBF_Key ibf_key = sec->ibf_key; | ||
708 | struct OperationState *eo = sec->eo; | ||
709 | struct KeyEntry *ke = value; | ||
710 | |||
711 | if (ke->ibf_key.key_val != ibf_key.key_val) | ||
712 | return GNUNET_YES; | ||
713 | while (NULL != ke) | ||
714 | { | ||
715 | const struct GNUNET_SET_Element *const element = &ke->element->element; | ||
716 | struct GNUNET_MQ_Envelope *ev; | ||
717 | struct GNUNET_MessageHeader *mh; | ||
718 | |||
719 | GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); | ||
720 | ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); | ||
721 | if (NULL == ev) | ||
722 | { | ||
723 | /* element too large */ | ||
724 | GNUNET_break (0); | ||
725 | continue; | ||
726 | } | ||
727 | memcpy (&mh[1], element->data, element->size); | ||
728 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element (%s) to peer\n", | ||
729 | GNUNET_h2s (&ke->element->element_hash)); | ||
730 | GNUNET_MQ_send (eo->mq, ev); | ||
731 | ke = ke->next_colliding; | ||
732 | } | ||
733 | return GNUNET_NO; | ||
734 | } | ||
735 | |||
736 | /** | ||
737 | * Send all elements that have the specified IBF key | ||
738 | * to the remote peer of the intersection operation | ||
739 | * | ||
740 | * @param eo intersection operation | ||
741 | * @param ibf_key IBF key of interest | ||
742 | */ | ||
743 | static void | ||
744 | send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key) | ||
745 | { | ||
746 | struct SendElementClosure send_cls; | ||
747 | |||
748 | send_cls.ibf_key = ibf_key; | ||
749 | send_cls.eo = eo; | ||
750 | GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val, | ||
751 | &send_element_iterator, &send_cls); | ||
752 | } | ||
753 | |||
754 | |||
755 | /** | ||
756 | * Decode which elements are missing on each side, and | ||
757 | * send the appropriate elemens and requests | ||
758 | * | ||
759 | * @param eo intersection operation | ||
760 | */ | ||
761 | static void | ||
762 | decode_and_send (struct OperationState *eo) | ||
763 | { | ||
764 | struct IBF_Key key; | ||
765 | struct IBF_Key last_key; | ||
766 | int side; | ||
767 | unsigned int num_decoded; | ||
768 | struct InvertibleBloomFilter *diff_ibf; | ||
769 | |||
770 | GNUNET_assert (PHASE_EXPECT_ELEMENTS == eo->phase); | ||
771 | |||
772 | prepare_ibf (eo, eo->remote_ibf->size); | ||
773 | diff_ibf = ibf_dup (eo->local_ibf); | ||
774 | ibf_subtract (diff_ibf, eo->remote_ibf); | ||
775 | |||
776 | ibf_destroy (eo->remote_ibf); | ||
777 | eo->remote_ibf = NULL; | ||
778 | |||
779 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", diff_ibf->size); | ||
780 | |||
781 | num_decoded = 0; | ||
782 | last_key.key_val = 0; | ||
783 | |||
784 | while (1) | ||
785 | { | ||
786 | int res; | ||
787 | int cycle_detected = GNUNET_NO; | ||
788 | |||
789 | last_key = key; | ||
790 | |||
791 | res = ibf_decode (diff_ibf, &side, &key); | ||
792 | if (res == GNUNET_OK) | ||
793 | { | ||
794 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n", | ||
795 | key.key_val); | ||
796 | num_decoded += 1; | ||
797 | if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val == key.key_val)) | ||
798 | { | ||
799 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded %u/%u)\n", | ||
800 | num_decoded, diff_ibf->size); | ||
801 | cycle_detected = GNUNET_YES; | ||
802 | } | ||
803 | } | ||
804 | if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected)) | ||
805 | { | ||
806 | int next_order; | ||
807 | next_order = 0; | ||
808 | while (1<<next_order < diff_ibf->size) | ||
809 | next_order++; | ||
810 | next_order++; | ||
811 | if (next_order <= MAX_IBF_ORDER) | ||
812 | { | ||
813 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
814 | "decoding failed, sending larger ibf (size %u)\n", | ||
815 | 1<<next_order); | ||
816 | send_ibf (eo, next_order); | ||
817 | } | ||
818 | else | ||
819 | { | ||
820 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||
821 | "set intersection failed: reached ibf limit\n"); | ||
822 | } | ||
823 | break; | ||
824 | } | ||
825 | if (GNUNET_NO == res) | ||
826 | { | ||
827 | struct GNUNET_MQ_Envelope *ev; | ||
828 | |||
829 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending DONE\n"); | ||
830 | ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); | ||
831 | GNUNET_MQ_send (eo->mq, ev); | ||
832 | break; | ||
833 | } | ||
834 | if (1 == side) | ||
835 | { | ||
836 | send_elements_for_key (eo, key); | ||
837 | } | ||
838 | else if (-1 == side) | ||
839 | { | ||
840 | struct GNUNET_MQ_Envelope *ev; | ||
841 | struct GNUNET_MessageHeader *msg; | ||
842 | |||
843 | /* FIXME: before sending the request, check if we may just have the element */ | ||
844 | /* FIXME: merge multiple requests */ | ||
845 | /* FIXME: remember somewhere that we already requested the element, | ||
846 | * so that we don't request it again with the next ibf if decoding fails */ | ||
847 | ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), | ||
848 | GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); | ||
849 | |||
850 | *(struct IBF_Key *) &msg[1] = key; | ||
851 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element request\n"); | ||
852 | GNUNET_MQ_send (eo->mq, ev); | ||
853 | } | ||
854 | else | ||
855 | { | ||
856 | GNUNET_assert (0); | ||
857 | } | ||
858 | } | ||
859 | ibf_destroy (diff_ibf); | ||
860 | } | ||
861 | |||
862 | |||
863 | /** | 486 | /** |
864 | * Handle an IBF message from a remote peer. | 487 | * Handle an IBF message from a remote peer. |
865 | * | 488 | * |
@@ -867,63 +490,34 @@ decode_and_send (struct OperationState *eo) | |||
867 | * @param mh the header of the message | 490 | * @param mh the header of the message |
868 | */ | 491 | */ |
869 | static void | 492 | static void |
870 | handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh) | 493 | handle_p2p_bf (void *cls, const struct GNUNET_MessageHeader *mh) |
871 | { | 494 | { |
872 | struct OperationState *eo = cls; | 495 | struct OperationState *eo = cls; |
873 | struct IBFMessage *msg = (struct IBFMessage *) mh; | 496 | struct BFMessage *msg = (struct BFMessage *) mh; |
874 | unsigned int buckets_in_message; | 497 | unsigned int buckets_in_message; |
875 | 498 | ||
876 | if ( (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) || | 499 | if (eo->phase == PHASE_EXPECT_INITIAL ) |
877 | (eo->phase == PHASE_EXPECT_BF) ) | 500 | { |
878 | { | 501 | eo->phase = PHASE_BF_EXCHANGE; |
879 | eo->phase = PHASE_EXPECT_BF_CONT; | 502 | |
880 | GNUNET_assert (NULL == eo->remote_ibf); | 503 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new bf of size %u\n", 1<<msg->order); |
881 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 1<<msg->order); | 504 | |
882 | eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM); | 505 | // if (the remote peer has less elements than us) |
883 | eo->ibf_buckets_received = 0; | 506 | // run our elements through his bloomfilter |
884 | if (0 != ntohs (msg->offset)) | 507 | // else if (we have the same elements) |
885 | { | 508 | // done; |
886 | GNUNET_break (0); | 509 | // |
887 | fail_intersection_operation (eo); | 510 | // evict elements we can exclude through the bloomfilter |
888 | return; | 511 | // |
889 | } | 512 | // create a new bloomfilter over our remaining elements |
513 | // | ||
514 | // send our new count and the bloomfilter back | ||
890 | } | 515 | } |
891 | else if (eo->phase == PHASE_EXPECT_BF_CONT) | 516 | else if (eo->phase == PHASE_BF_EXCHANGE) |
892 | { | 517 | { |
893 | if ( (ntohs (msg->offset) != eo->ibf_buckets_received) || | ||
894 | (1<<msg->order != eo->remote_ibf->size) ) | ||
895 | { | ||
896 | GNUNET_break (0); | ||
897 | fail_intersection_operation (eo); | ||
898 | return; | ||
899 | } | ||
900 | } | ||
901 | 518 | ||
902 | buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE; | ||
903 | |||
904 | if (0 == buckets_in_message) | ||
905 | { | ||
906 | GNUNET_break_op (0); | ||
907 | fail_intersection_operation (eo); | ||
908 | return; | ||
909 | } | 519 | } |
910 | 520 | ||
911 | if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE) | ||
912 | { | ||
913 | GNUNET_break (0); | ||
914 | fail_intersection_operation (eo); | ||
915 | return; | ||
916 | } | ||
917 | |||
918 | ibf_read_slice (&msg[1], eo->ibf_buckets_received, buckets_in_message, eo->remote_ibf); | ||
919 | eo->ibf_buckets_received += buckets_in_message; | ||
920 | |||
921 | if (eo->ibf_buckets_received == eo->remote_ibf->size) | ||
922 | { | ||
923 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n"); | ||
924 | eo->phase = PHASE_EXPECT_ELEMENTS; | ||
925 | decode_and_send (eo); | ||
926 | } | ||
927 | } | 521 | } |
928 | 522 | ||
929 | 523 | ||
@@ -987,82 +581,6 @@ send_client_done_and_destroy (struct OperationState *eo) | |||
987 | 581 | ||
988 | 582 | ||
989 | /** | 583 | /** |
990 | * Handle an element message from a remote peer. | ||
991 | * | ||
992 | * @param cls the intersection operation | ||
993 | * @param mh the message | ||
994 | */ | ||
995 | static void | ||
996 | handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh) | ||
997 | { | ||
998 | struct OperationState *eo = cls; | ||
999 | struct ElementEntry *ee; | ||
1000 | uint16_t element_size; | ||
1001 | |||
1002 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n"); | ||
1003 | |||
1004 | if ( (eo->phase != PHASE_EXPECT_ELEMENTS) && | ||
1005 | (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ) | ||
1006 | { | ||
1007 | fail_intersection_operation (eo); | ||
1008 | GNUNET_break (0); | ||
1009 | return; | ||
1010 | } | ||
1011 | element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader); | ||
1012 | ee = GNUNET_malloc (sizeof *ee + element_size); | ||
1013 | memcpy (&ee[1], &mh[1], element_size); | ||
1014 | ee->element.size = element_size; | ||
1015 | ee->element.data = &ee[1]; | ||
1016 | ee->remote = GNUNET_YES; | ||
1017 | GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash); | ||
1018 | |||
1019 | /* FIXME: see if the element has already been inserted! */ | ||
1020 | |||
1021 | op_register_element (eo, ee); | ||
1022 | send_client_element (eo, &ee->element); | ||
1023 | } | ||
1024 | |||
1025 | |||
1026 | /** | ||
1027 | * Handle an element request from a remote peer. | ||
1028 | * | ||
1029 | * @param cls the intersection operation | ||
1030 | * @param mh the message | ||
1031 | */ | ||
1032 | static void | ||
1033 | handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh) | ||
1034 | { | ||
1035 | struct OperationState *eo = cls; | ||
1036 | struct IBF_Key *ibf_key; | ||
1037 | unsigned int num_keys; | ||
1038 | |||
1039 | /* look up elements and send them */ | ||
1040 | if (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) | ||
1041 | { | ||
1042 | GNUNET_break (0); | ||
1043 | fail_intersection_operation (eo); | ||
1044 | return; | ||
1045 | } | ||
1046 | |||
1047 | num_keys = (ntohs (mh->size) - sizeof *mh) / sizeof (struct IBF_Key); | ||
1048 | |||
1049 | if ((ntohs (mh->size) - sizeof *mh) != num_keys * sizeof (struct IBF_Key)) | ||
1050 | { | ||
1051 | GNUNET_break (0); | ||
1052 | fail_intersection_operation (eo); | ||
1053 | return; | ||
1054 | } | ||
1055 | |||
1056 | ibf_key = (struct IBF_Key *) &mh[1]; | ||
1057 | while (0 != num_keys--) | ||
1058 | { | ||
1059 | send_elements_for_key (eo, *ibf_key); | ||
1060 | ibf_key++; | ||
1061 | } | ||
1062 | } | ||
1063 | |||
1064 | |||
1065 | /** | ||
1066 | * Handle a done message from a remote peer | 584 | * Handle a done message from a remote peer |
1067 | * | 585 | * |
1068 | * @param cls the intersection operation | 586 | * @param cls the intersection operation |
@@ -1116,7 +634,6 @@ intersection_evaluate (struct OperationSpecification *spec, | |||
1116 | eo = GNUNET_new (struct OperationState); | 634 | eo = GNUNET_new (struct OperationState); |
1117 | tc->vt = _GSS_intersection_vt (); | 635 | tc->vt = _GSS_intersection_vt (); |
1118 | tc->op = eo; | 636 | tc->op = eo; |
1119 | eo->se = strata_estimator_dup (spec->set->state->se); | ||
1120 | eo->generation_created = spec->set->current_generation++; | 637 | eo->generation_created = spec->set->current_generation++; |
1121 | eo->set = spec->set; | 638 | eo->set = spec->set; |
1122 | eo->spec = spec; | 639 | eo->spec = spec; |
@@ -1134,7 +651,7 @@ intersection_evaluate (struct OperationSpecification *spec, | |||
1134 | eo->set->state->ops_tail, | 651 | eo->set->state->ops_tail, |
1135 | eo); | 652 | eo); |
1136 | 653 | ||
1137 | send_operation_request (eo); | 654 | send_initial_bloomfilter (eo); |
1138 | } | 655 | } |
1139 | 656 | ||
1140 | 657 | ||
@@ -1164,13 +681,12 @@ intersection_accept (struct OperationSpecification *spec, | |||
1164 | eo->spec = spec; | 681 | eo->spec = spec; |
1165 | eo->tunnel = tunnel; | 682 | eo->tunnel = tunnel; |
1166 | eo->mq = GNUNET_MESH_mq_create (tunnel); | 683 | eo->mq = GNUNET_MESH_mq_create (tunnel); |
1167 | eo->se = strata_estimator_dup (eo->set->state->se); | ||
1168 | /* transfer ownership of mq and socket from incoming to eo */ | 684 | /* transfer ownership of mq and socket from incoming to eo */ |
1169 | GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, | 685 | GNUNET_CONTAINER_DLL_insert (eo->set->state->ops_head, |
1170 | eo->set->state->ops_tail, | 686 | eo->set->state->ops_tail, |
1171 | eo); | 687 | eo); |
1172 | /* kick off the operation */ | 688 | /* kick off the operation */ |
1173 | send_strata_estimator (eo); | 689 | send_bloomfilter (eo); |
1174 | } | 690 | } |
1175 | 691 | ||
1176 | 692 | ||
@@ -1187,8 +703,9 @@ intersection_set_create (void) | |||
1187 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n"); | 703 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "intersection set created\n"); |
1188 | 704 | ||
1189 | set_state = GNUNET_new (struct SetState); | 705 | set_state = GNUNET_new (struct SetState); |
1190 | set_state->se = strata_estimator_create (SE_STRATA_COUNT, | 706 | |
1191 | SE_IBF_SIZE, SE_IBF_HASH_NUM); | 707 | //TODO: actually create that thing |
708 | |||
1192 | return set_state; | 709 | return set_state; |
1193 | } | 710 | } |
1194 | 711 | ||
@@ -1202,7 +719,7 @@ intersection_set_create (void) | |||
1202 | static void | 719 | static void |
1203 | intersection_add (struct SetState *set_state, struct ElementEntry *ee) | 720 | intersection_add (struct SetState *set_state, struct ElementEntry *ee) |
1204 | { | 721 | { |
1205 | strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0)); | 722 | //TODO |
1206 | } | 723 | } |
1207 | 724 | ||
1208 | 725 | ||
@@ -1220,7 +737,7 @@ intersection_set_destroy (struct SetState *set_state) | |||
1220 | intersection_operation_destroy (set_state->ops_head); | 737 | intersection_operation_destroy (set_state->ops_head); |
1221 | if (NULL != set_state->se) | 738 | if (NULL != set_state->se) |
1222 | { | 739 | { |
1223 | strata_estimator_destroy (set_state->se); | 740 | //TODO: actually destroy that thing |
1224 | set_state->se = NULL; | 741 | set_state->se = NULL; |
1225 | } | 742 | } |
1226 | GNUNET_free (set_state); | 743 | GNUNET_free (set_state); |
@@ -1229,7 +746,6 @@ intersection_set_destroy (struct SetState *set_state) | |||
1229 | 746 | ||
1230 | /** | 747 | /** |
1231 | * Remove the element given in the element message from the set. | 748 | * Remove the element given in the element message from the set. |
1232 | * Only marks the element as removed, so that older set operations can still exchange it. | ||
1233 | * | 749 | * |
1234 | * @param set_state state of the set to remove from | 750 | * @param set_state state of the set to remove from |
1235 | * @param element set element to remove | 751 | * @param element set element to remove |
@@ -1237,7 +753,7 @@ intersection_set_destroy (struct SetState *set_state) | |||
1237 | static void | 753 | static void |
1238 | intersection_remove (struct SetState *set_state, struct ElementEntry *element) | 754 | intersection_remove (struct SetState *set_state, struct ElementEntry *element) |
1239 | { | 755 | { |
1240 | /* FIXME: remove from strata estimator */ | 756 | //TODO |
1241 | } | 757 | } |
1242 | 758 | ||
1243 | 759 | ||
@@ -1257,17 +773,8 @@ intersection_handle_p2p_message (struct OperationState *eo, | |||
1257 | ntohs (mh->type), ntohs (mh->size)); | 773 | ntohs (mh->type), ntohs (mh->size)); |
1258 | switch (ntohs (mh->type)) | 774 | switch (ntohs (mh->type)) |
1259 | { | 775 | { |
1260 | case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: | 776 | case GNUNET_MESSAGE_TYPE_SET_P2P_BF: |
1261 | handle_p2p_ibf (eo, mh); | 777 | handle_p2p_bf (eo, mh); |
1262 | break; | ||
1263 | case GNUNET_MESSAGE_TYPE_SET_P2P_SE: | ||
1264 | handle_p2p_strata_estimator (eo, mh); | ||
1265 | break; | ||
1266 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: | ||
1267 | handle_p2p_elements (eo, mh); | ||
1268 | break; | ||
1269 | case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: | ||
1270 | handle_p2p_element_requests (eo, mh); | ||
1271 | break; | 778 | break; |
1272 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: | 779 | case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: |
1273 | handle_p2p_done (eo, mh); | 780 | handle_p2p_done (eo, mh); |