aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/fs/gnunet-service-fs_cp.c372
-rw-r--r--src/fs/gnunet-service-fs_cp.h2
-rw-r--r--src/fs/gnunet-service-fs_lc.c2
-rw-r--r--src/fs/gnunet-service-fs_pr.c51
-rw-r--r--src/fs/gnunet-service-fs_pr.h28
5 files changed, 254 insertions, 201 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 903549cb7..f9a642199 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -33,7 +33,6 @@
33#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) 33#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
34 34
35 35
36
37/** 36/**
38 * Handle to cancel a transmission request. 37 * Handle to cancel a transmission request.
39 */ 38 */
@@ -124,15 +123,25 @@ struct GSF_ConnectedPeer
124 struct GSF_PeerTransmitHandle *pth_tail; 123 struct GSF_PeerTransmitHandle *pth_tail;
125 124
126 /** 125 /**
126 * Migration stop message in our queue, or NULL if we have none pending.
127 */
128 struct GSF_PeerTransmitHandle *migration_pth;
129
130 /**
127 * Context of our GNUNET_CORE_peer_change_preference call (or NULL). 131 * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
128 * NULL if we have successfully reserved 32k, otherwise non-NULL. 132 * NULL if we have successfully reserved 32k, otherwise non-NULL.
129 */ 133 */
130 struct GNUNET_CORE_InformationRequestContext *irc; 134 struct GNUNET_CORE_InformationRequestContext *irc;
131 135
132 /** 136 /**
137 * Active requests from this neighbour.
138 */
139 struct GNUNET_CONTAINER_MulitHashMap *request_map;
140
141 /**
133 * ID of delay task for scheduling transmission. 142 * ID of delay task for scheduling transmission.
134 */ 143 */
135 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused! 144 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: used in 'push' (ugh!)
136 145
137 /** 146 /**
138 * Increase in traffic preference still to be submitted 147 * Increase in traffic preference still to be submitted
@@ -282,12 +291,12 @@ peer_transmit_ready_cb (void *cls,
282 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 291 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
283 cp->pth_tail, 292 cp->pth_tail,
284 pth); 293 pth);
285 if (pth->is_query) 294 if (GNUNET_YES == pth->is_query)
286 { 295 {
287 cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get (); 296 cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
288 GNUNET_assert (0 < cp->ppd.pending_queries--); 297 GNUNET_assert (0 < cp->ppd.pending_queries--);
289 } 298 }
290 else 299 else if (GNUNET_NO == pth->is_query)
291 { 300 {
292 GNUNET_assert (0 < cp->ppd.pending_replies--); 301 GNUNET_assert (0 < cp->ppd.pending_replies--);
293 } 302 }
@@ -389,6 +398,7 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
389 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust)))) 398 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
390 cp->disk_trust = cp->trust = ntohl (trust); 399 cp->disk_trust = cp->trust = ntohl (trust);
391 GNUNET_free (fn); 400 GNUNET_free (fn);
401 cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
392 GNUNET_break (GNUNET_OK == 402 GNUNET_break (GNUNET_OK ==
393 GNUNET_CONTAINER_multihashmap_put (cp_map, 403 GNUNET_CONTAINER_multihashmap_put (cp_map,
394 &peer->hashPubKey, 404 &peer->hashPubKey,
@@ -442,7 +452,8 @@ GSF_handle_p2p_migration_stop_ (void *cls,
442 * and will also not be called anymore after a call signalling 452 * and will also not be called anymore after a call signalling
443 * expiration. 453 * expiration.
444 * 454 *
445 * @param cls user-specified closure 455 * @param cls 'struct GSF_ConnectedPeer' of the peer that would
456 * have liked an answer to the request
446 * @param pr handle to the original pending request 457 * @param pr handle to the original pending request
447 * @param data response data, NULL on request expiration 458 * @param data response data, NULL on request expiration
448 * @param data_len number of bytes in data 459 * @param data_len number of bytes in data
@@ -453,12 +464,22 @@ handle_p2p_reply (void *cls,
453 const void *data, 464 const void *data,
454 size_t data_len) 465 size_t data_len)
455{ 466{
467 struct GSF_ConnectedPeer *cp = cls;
468
456#if SUPPORT_DELAYS 469#if SUPPORT_DELAYS
457 struct GNUNET_TIME_Relative art_delay; 470 struct GNUNET_TIME_Relative art_delay;
458#endif 471#endif
459 472
460 /* FIXME: adapt code fragments below to new API! */ 473 /* FIXME: adapt code fragments below to new API! */
461 474 if (NULL == data)
475 {
476 /* FIXME: request expired! clean up! */
477 GNUNET_STATISTICS_update (stats,
478 gettext_noop ("# P2P searches active"),
479 -1,
480 GNUNET_NO);
481 return;
482 }
462 483
463 /* reply will go over the network, check for cover traffic */ 484 /* reply will go over the network, check for cover traffic */
464 if ( (prq->anonymity_level > 1) && 485 if ( (prq->anonymity_level > 1) &&
@@ -515,9 +536,11 @@ handle_p2p_reply (void *cls,
515} 536}
516 537
517 538
518
519/** 539/**
520 * Handle P2P "QUERY" message. 540 * Handle P2P "QUERY" message. Creates the pending request entry
541 * and sets up all of the data structures to that we will
542 * process replies properly. Does not initiate forwarding or
543 * local database lookups.
521 * 544 *
522 * @param other the other peer involved (sender or receiver, NULL 545 * @param other the other peer involved (sender or receiver, NULL
523 * for loopback messages where we are both sender and receiver) 546 * for loopback messages where we are both sender and receiver)
@@ -528,11 +551,13 @@ struct GSF_PendingRequest *
528GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, 551GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
529 const struct GNUNET_MessageHeader *message) 552 const struct GNUNET_MessageHeader *message)
530{ 553{
531 /* FIXME: adapt old code to new API! */ 554 struct GSF_PendingRequest *pr;
532 struct PendingRequest *pr; 555 struct GSF_PendingRequestData *prd;
533 struct ConnectedPeer *cp; 556 struct GSF_ConnectedPeer *cp;
534 struct ConnectedPeer *cps; 557 struct GSF_ConnectedPeer *cps;
535 struct CheckDuplicateRequestClosure cdc; 558 GNUNET_HashCode *namespace;
559 struct GNUNET_PeerIdentity *target;
560 enum GSF_PendingRequestOptions options;
536 struct GNUNET_TIME_Relative timeout; 561 struct GNUNET_TIME_Relative timeout;
537 uint16_t msize; 562 uint16_t msize;
538 const struct GetMessage *gm; 563 const struct GetMessage *gm;
@@ -542,8 +567,9 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
542 size_t bfsize; 567 size_t bfsize;
543 uint32_t ttl_decrement; 568 uint32_t ttl_decrement;
544 int32_t priority; 569 int32_t priority;
570 int32_t ttl;
545 enum GNUNET_BLOCK_Type type; 571 enum GNUNET_BLOCK_Type type;
546 int have_ns; 572
547 573
548 msize = ntohs(message->size); 574 msize = ntohs(message->size);
549 if (msize < sizeof (struct GetMessage)) 575 if (msize < sizeof (struct GetMessage))
@@ -615,7 +641,6 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
615 gettext_noop ("# requests dropped due to missing reverse route"), 641 gettext_noop ("# requests dropped due to missing reverse route"),
616 1, 642 1,
617 GNUNET_NO); 643 GNUNET_NO);
618 /* FIXME: try connect? */
619 return GNUNET_OK; 644 return GNUNET_OK;
620 } 645 }
621 /* note that we can really only check load here since otherwise 646 /* note that we can really only check load here since otherwise
@@ -639,14 +664,9 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
639 GNUNET_i2s (other), 664 GNUNET_i2s (other),
640 (unsigned int) bm); 665 (unsigned int) bm);
641#endif 666#endif
642 have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)); 667 namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
643 pr = GNUNET_malloc (sizeof (struct PendingRequest) + 668 target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
644 (have_ns ? sizeof(GNUNET_HashCode) : 0)); 669 options = 0;
645 if (have_ns)
646 {
647 pr->namespace = (GNUNET_HashCode*) &pr[1];
648 memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
649 }
650 if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) || 670 if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
651 (GNUNET_LOAD_get_average (cp->transmission_delay) > 671 (GNUNET_LOAD_get_average (cp->transmission_delay) >
652 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) ) 672 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
@@ -654,28 +674,21 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
654 /* don't have BW to send to peer, or would likely take longer than we have for it, 674 /* don't have BW to send to peer, or would likely take longer than we have for it,
655 so at best indirect the query */ 675 so at best indirect the query */
656 priority = 0; 676 priority = 0;
657 pr->forward_only = GNUNET_YES; 677 options |= GSF_PRO_FORWARD_ONLY;
658 } 678 }
659 pr->type = type; 679 ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
660 pr->mingle = ntohl (gm->filter_mutator);
661 if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
662 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
663 pr->anonymity_level = 1;
664 pr->priority = (uint32_t) priority;
665 pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
666 pr->query = gm->query;
667 /* decrement ttl (always) */ 680 /* decrement ttl (always) */
668 ttl_decrement = 2 * TTL_DECREMENT + 681 ttl_decrement = 2 * TTL_DECREMENT +
669 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 682 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
670 TTL_DECREMENT); 683 TTL_DECREMENT);
671 if ( (pr->ttl < 0) && 684 if ( (ttl < 0) &&
672 (((int32_t)(pr->ttl - ttl_decrement)) > 0) ) 685 (((int32_t)(ttl - ttl_decrement)) > 0) )
673 { 686 {
674#if DEBUG_FS 687#if DEBUG_FS
675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 688 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676 "Dropping query from `%s' due to TTL underflow (%d - %u).\n", 689 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
677 GNUNET_i2s (other), 690 GNUNET_i2s (other),
678 pr->ttl, 691 ttl,
679 ttl_decrement); 692 ttl_decrement);
680#endif 693#endif
681 GNUNET_STATISTICS_update (stats, 694 GNUNET_STATISTICS_update (stats,
@@ -683,74 +696,66 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
683 1, 696 1,
684 GNUNET_NO); 697 GNUNET_NO);
685 /* integer underflow => drop (should be very rare)! */ 698 /* integer underflow => drop (should be very rare)! */
686 GNUNET_free (pr);
687 return GNUNET_OK; 699 return GNUNET_OK;
688 } 700 }
689 pr->ttl -= ttl_decrement; 701 ttl -= ttl_decrement;
690 pr->start_time = GNUNET_TIME_absolute_get (); 702
691 703 /* test if the request already exists */
692 /* get bloom filter */ 704 pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
693 if (bfsize > 0) 705 &gm->query);
694 { 706 if (pr != NULL)
695 pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits], 707 {
696 bfsize, 708 prd = GSF_pending_request_get_data_ (pr);
697 BLOOMFILTER_K); 709 if ( (prd->type == type) &&
698 pr->bf_size = bfsize; 710 ( (type != GNUNET_BLOCK_TYPE_SBLOCK) ||
699 } 711 (0 == memcmp (prd->namespace,
700 cdc.have = NULL; 712 namespace,
701 cdc.pr = pr; 713 sizeof (GNUNET_HashCode))) ) )
702 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
703 &gm->query,
704 &check_duplicate_request_peer,
705 &cdc);
706 if (cdc.have != NULL)
707 {
708 if (cdc.have->start_time.abs_value + cdc.have->ttl >=
709 pr->start_time.abs_value + pr->ttl)
710 { 714 {
711 /* existing request has higher TTL, drop new one! */ 715 if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl)
712 cdc.have->priority += pr->priority; 716 {
713 destroy_pending_request (pr); 717 /* existing request has higher TTL, drop new one! */
718 prd->priority += priority;
714#if DEBUG_FS 719#if DEBUG_FS
715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 720 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
716 "Have existing request with higher TTL, dropping new request.\n", 721 "Have existing request with higher TTL, dropping new request.\n",
717 GNUNET_i2s (other)); 722 GNUNET_i2s (other));
718#endif 723#endif
719 GNUNET_STATISTICS_update (stats, 724 GNUNET_STATISTICS_update (stats,
720 gettext_noop ("# requests dropped due to higher-TTL request"), 725 gettext_noop ("# requests dropped due to higher-TTL request"),
721 1, 726 1,
722 GNUNET_NO); 727 GNUNET_NO);
723 return GNUNET_OK; 728 return GNUNET_OK;
724 } 729 }
725 else
726 {
727 /* existing request has lower TTL, drop old one! */ 730 /* existing request has lower TTL, drop old one! */
728 pr->priority += cdc.have->priority; 731 pr->priority += prd->priority;
729 /* Possible optimization: if we have applicable pending 732 GSF_pending_request_cancel_ (pr);
730 replies in 'cdc.have', we might want to move those over 733 GNUNET_assert (GNUNET_YES ==
731 (this is a really rare special-case, so it is not clear 734 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
732 that this would be worth it) */ 735 &gm->query,
733 destroy_pending_request (cdc.have); 736 pr));
734 /* keep processing 'pr'! */
735 } 737 }
736 } 738 }
737 739
738 pr->cp = cp; 740 pr = GSF_pending_request_create (options,
741 type,
742 &gm->query,
743 namespace,
744 target,
745 (bf_size > 0) ? (const char*)&opt[bits] : NULL,
746 bf_size,
747 ntohl (gm->filter_mutator),
748 1 /* anonymity */
749 (uint32_t) priority,
750 ttl,
751 NULL, 0, /* replies_seen */
752 &handle_p2p_reply,
753 cp);
739 GNUNET_break (GNUNET_OK == 754 GNUNET_break (GNUNET_OK ==
740 GNUNET_CONTAINER_multihashmap_put (query_request_map, 755 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
741 &gm->query, 756 &gm->query,
742 pr, 757 pr,
743 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 758 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
744 GNUNET_break (GNUNET_OK ==
745 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
746 &other->hashPubKey,
747 pr,
748 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
749
750 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
751 pr,
752 pr->start_time.abs_value + pr->ttl);
753
754 GNUNET_STATISTICS_update (stats, 759 GNUNET_STATISTICS_update (stats,
755 gettext_noop ("# P2P searches received"), 760 gettext_noop ("# P2P searches received"),
756 1, 761 1,
@@ -759,83 +764,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
759 gettext_noop ("# P2P searches active"), 764 gettext_noop ("# P2P searches active"),
760 1, 765 1,
761 GNUNET_NO); 766 GNUNET_NO);
762 767 return pr;
763 /* calculate change in traffic preference */
764 cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
765 /* process locally */
766 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
767 type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
768 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
769 (pr->priority + 1));
770 if (GNUNET_YES != pr->forward_only)
771 {
772#if DEBUG_FS
773 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774 "Handing request for `%s' to datastore\n",
775 GNUNET_h2s (&gm->query));
776#endif
777 pr->qe = GNUNET_DATASTORE_get (dsh,
778 &gm->query,
779 type,
780 pr->priority + 1,
781 MAX_DATASTORE_QUEUE,
782 timeout,
783 &process_local_reply,
784 pr);
785 if (NULL == pr->qe)
786 {
787 GNUNET_STATISTICS_update (stats,
788 gettext_noop ("# requests dropped by datastore (queue length limit)"),
789 1,
790 GNUNET_NO);
791 }
792 }
793 else
794 {
795 GNUNET_STATISTICS_update (stats,
796 gettext_noop ("# requests forwarded due to high load"),
797 1,
798 GNUNET_NO);
799 }
800
801 /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
802 switch (pr->type)
803 {
804 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
805 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
806 /* only one result, wait for datastore */
807 if (GNUNET_YES != pr->forward_only)
808 {
809 GNUNET_STATISTICS_update (stats,
810 gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
811 1,
812 GNUNET_NO);
813 break;
814 }
815 default:
816 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
817 pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
818 pr);
819 }
820
821 /* make sure we don't track too many requests */
822 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
823 {
824 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
825 GNUNET_assert (pr != NULL);
826 destroy_pending_request (pr);
827 }
828 return GNUNET_OK;
829
830
831
832 // FIXME!
833 // parse request
834 // setup pending request (use 'handle_p2p_reply')
835 // track pending request to cancel it on peer disconnect (!)
836 // return it!
837 // (actual planning & execution up to caller!)
838 return NULL;
839} 768}
840 769
841 770
@@ -858,9 +787,9 @@ peer_transmit_timeout (void *cls,
858 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 787 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
859 cp->pth_tail, 788 cp->pth_tail,
860 pth); 789 pth);
861 if (pth->is_query) 790 if (GNUNET_YES == pth->is_query)
862 GNUNET_assert (0 < cp->ppd.pending_queries--); 791 GNUNET_assert (0 < cp->ppd.pending_queries--);
863 else 792 else if (GNUNET_NO == pth->is_query)
864 GNUNET_assert (0 < cp->ppd.pending_replies--); 793 GNUNET_assert (0 < cp->ppd.pending_replies--);
865 GNUNET_LOAD_update (cp->ppd.transmission_delay, 794 GNUNET_LOAD_update (cp->ppd.transmission_delay,
866 UINT64_MAX); 795 UINT64_MAX);
@@ -876,7 +805,7 @@ peer_transmit_timeout (void *cls,
876 * the callback is invoked with a 'NULL' buffer. 805 * the callback is invoked with a 'NULL' buffer.
877 * 806 *
878 * @param peer target peer 807 * @param peer target peer
879 * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) 808 * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
880 * @param priority how important is this request? 809 * @param priority how important is this request?
881 * @param timeout when does this request timeout (call gmc with error) 810 * @param timeout when does this request timeout (call gmc with error)
882 * @param size number of bytes we would like to send to the peer 811 * @param size number of bytes we would like to send to the peer
@@ -933,9 +862,10 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
933 pth); 862 pth);
934 GNUNET_PEER_resolve (cp->pid, 863 GNUNET_PEER_resolve (cp->pid,
935 &target); 864 &target);
936 if (is_query) 865 if (GNUNET_YES == is_query)
937 { 866 {
938 /* query, need reservation */ 867 /* query, need reservation */
868 cp->ppd.pending_queries++;
939 if (NULL == cp->irc) 869 if (NULL == cp->irc)
940 { 870 {
941 /* reservation already done! */ 871 /* reservation already done! */
@@ -957,9 +887,15 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
957 is_ready = GNUNET_NO; 887 is_ready = GNUNET_NO;
958 } 888 }
959 } 889 }
960 else 890 else if (GNUNET_NO == is_query)
961 { 891 {
962 /* no reservation needed for content */ 892 /* no reservation needed for content */
893 cp->ppd.pending_replies++;
894 is_ready = GNUNET_YES;
895 }
896 else
897 {
898 /* not a query or content, no reservation needed */
963 is_ready = GNUNET_YES; 899 is_ready = GNUNET_YES;
964 } 900 }
965 if (is_ready) 901 if (is_ready)
@@ -1011,9 +947,9 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1011 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 947 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1012 cp->pth_tail, 948 cp->pth_tail,
1013 pth); 949 pth);
1014 if (pth->is_query) 950 if (GNUNET_YES == pth->is_query)
1015 GNUNET_assert (0 < cp->ppd.pending_queries--); 951 GNUNET_assert (0 < cp->ppd.pending_queries--);
1016 else 952 else if (GNUNET_NO == pth->is_query)
1017 GNUNET_assert (0 < cp->ppd.pending_replies--); 953 GNUNET_assert (0 < cp->ppd.pending_replies--);
1018 GNUNET_free (pth); 954 GNUNET_free (pth);
1019} 955}
@@ -1085,6 +1021,26 @@ GSF_peer_status_handler_ (void *cls,
1085 1021
1086 1022
1087/** 1023/**
1024 * Cancel all requests associated with the peer.
1025 *
1026 * @param cls unused
1027 * @param query hash code of the request
1028 * @param value the 'struct GSF_PendingRequest'
1029 * @return GNUNET_YES (continue to iterate)
1030 */
1031static int
1032cancel_pending_request (void *cls,
1033 const GNUNET_HashCode *query,
1034 void *value)
1035{
1036 struct GSF_PendingRequest *pr = value;
1037
1038 GSF_pending_request_cancel_ (pr);
1039 return GNUNET_OK;
1040}
1041
1042
1043/**
1088 * A peer disconnected from us. Tear down the connected peer 1044 * A peer disconnected from us. Tear down the connected peer
1089 * record. 1045 * record.
1090 * 1046 *
@@ -1104,11 +1060,21 @@ GSF_peer_disconnect_handler_ (void *cls,
1104 GNUNET_CONTAINER_multihashmap_remove (cp_map, 1060 GNUNET_CONTAINER_multihashmap_remove (cp_map,
1105 &peer->hashPubKey, 1061 &peer->hashPubKey,
1106 cp); 1062 cp);
1063 if (NULL != cp->migration_pth)
1064 {
1065 GSF_peer_transmit_cancel_ (cp->migration_pth);
1066 cp->migration_pth = NULL;
1067 }
1107 if (NULL != cp->irc) 1068 if (NULL != cp->irc)
1108 { 1069 {
1109 GNUNET_CORE_peer_change_preference_cancel (cp->irc); 1070 GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1110 cp->irc = NULL; 1071 cp->irc = NULL;
1111 } 1072 }
1073 GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1074 &cancel_pending_request,
1075 cp);
1076 GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1077 cp->request_map = NULL;
1112 GSF_plan_notify_peer_disconnect_ (cp); 1078 GSF_plan_notify_peer_disconnect_ (cp);
1113 GNUNET_LOAD_value_free (cp->ppd.transmission_delay); 1079 GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1114 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE); 1080 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
@@ -1206,6 +1172,34 @@ GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1206 1172
1207 1173
1208/** 1174/**
1175 * Assemble a migration stop message for transmission.
1176 *
1177 * @param cls the 'struct GSF_ConnectedPeer' to use
1178 * @param size number of bytes we're allowed to write to buf
1179 * @param buf where to copy the message
1180 * @return number of bytes copied to buf
1181 */
1182static size_t
1183create_migration_stop_message (void *cls,
1184 size_t size,
1185 void *buf)
1186{
1187 struct GSF_ConnectedPeer *cp = cls;
1188 struct MigrationStopMessage msm;
1189
1190 cp->migration_pth = NULL;
1191 if (NULL == buf)
1192 return 0;
1193 GNUNET_assert (size > sizeof (struct MigrationStopMessage));
1194 msm.header.size = htons (sizeof (struct MigrationStopMessage));
1195 msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1196 msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block));
1197 memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1198 return sizeof (struct MigrationStopMessage);
1199}
1200
1201
1202/**
1209 * Ask a peer to stop migrating data to us until the given point 1203 * Ask a peer to stop migrating data to us until the given point
1210 * in time. 1204 * in time.
1211 * 1205 *
@@ -1216,30 +1210,22 @@ void
1216GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, 1210GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1217 struct GNUNET_TIME_Relative block_time) 1211 struct GNUNET_TIME_Relative block_time)
1218{ 1212{
1219 struct PendingMessage *pm;
1220 struct MigrationStopMessage *msm;
1221
1222 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value) 1213 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value)
1223 return; /* already blocked */ 1214 return; /* already blocked */
1224 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time); 1215 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
1225 1216 if (cp->migration_pth != NULL)
1226 /* FIXME: adapt old code below to new API! */ 1217 GSF_peer_transmit_cancel_ (cp->migration_pth);
1227 pm = GNUNET_malloc (sizeof (struct PendingMessage) + 1218 cp->migration_pth
1228 sizeof (struct MigrationStopMessage)); 1219 = GSF_peer_transmit_ (cp,
1229 pm->msize = sizeof (struct MigrationStopMessage); 1220 GNUNET_SYSERR,
1230 pm->priority = UINT32_MAX; 1221 UINT32_MAX,
1231 msm = (struct MigrationStopMessage*) &pm[1]; 1222 GNUNET_TIME_UNIT_FOREVER_REL,
1232 msm->header.size = htons (sizeof (struct MigrationStopMessage)); 1223 sizeof (struct MigrationStopMessage),
1233 msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); 1224 &create_migration_stop_message,
1234 msm->duration = GNUNET_TIME_relative_hton (block_time); 1225 cp);
1235 add_to_pending_messages_for_peer (cp,
1236 pm,
1237 NULL);
1238} 1226}
1239 1227
1240 1228
1241
1242
1243/** 1229/**
1244 * Write host-trust information to a file - flush the buffer entry! 1230 * Write host-trust information to a file - flush the buffer entry!
1245 * 1231 *
diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h
index f08e31a72..bc561f792 100644
--- a/src/fs/gnunet-service-fs_cp.h
+++ b/src/fs/gnunet-service-fs_cp.h
@@ -257,7 +257,7 @@ GSF_handle_p2p_migration_stop_ (void *cls,
257 * Handle P2P "QUERY" message. Only responsible for creating the 257 * Handle P2P "QUERY" message. Only responsible for creating the
258 * request entry itself and setting up reply callback and cancellation 258 * request entry itself and setting up reply callback and cancellation
259 * on peer disconnect. Does NOT execute the actual request strategy 259 * on peer disconnect. Does NOT execute the actual request strategy
260 * (planning). 260 * (planning) or local database operations.
261 * 261 *
262 * @param other the other peer involved (sender or receiver, NULL 262 * @param other the other peer involved (sender or receiver, NULL
263 * for loopback messages where we are both sender and receiver) 263 * for loopback messages where we are both sender and receiver)
diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c
index ea33580f9..469475fe0 100644
--- a/src/fs/gnunet-service-fs_lc.c
+++ b/src/fs/gnunet-service-fs_lc.c
@@ -341,7 +341,7 @@ GSF_local_client_start_search_handler_ (struct GNUNET_SERVER_Client *client,
341 sizeof (GNUNET_HashCode))) 341 sizeof (GNUNET_HashCode)))
342 ? &sm->target, 342 ? &sm->target,
343 : NULL, 343 : NULL,
344 NULL /* bf */, 0 /* mingle */, 344 NULL, 0, 0 /* bf */,
345 ntohl (sm->anonymity_level), 345 ntohl (sm->anonymity_level),
346 0 /* priority */, 346 0 /* priority */,
347 &sm[1], sc, 347 &sm[1], sc,
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 047c07587..d2248989f 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -182,10 +182,12 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr)
182 * @param query key for the lookup 182 * @param query key for the lookup
183 * @param namespace namespace to lookup, NULL for no namespace 183 * @param namespace namespace to lookup, NULL for no namespace
184 * @param target preferred target for the request, NULL for none 184 * @param target preferred target for the request, NULL for none
185 * @param bf bloom filter for known replies, can be NULL 185 * @param bf_data raw data for bloom filter for known replies, can be NULL
186 * @param bf_size number of bytes in bf_data
186 * @param mingle mingle value for bf 187 * @param mingle mingle value for bf
187 * @param anonymity_level desired anonymity level 188 * @param anonymity_level desired anonymity level
188 * @param priority maximum outgoing cummulative request priority to use 189 * @param priority maximum outgoing cummulative request priority to use
190 * @param ttl current time-to-live for the request
189 * @param replies_seen hash codes of known local replies 191 * @param replies_seen hash codes of known local replies
190 * @param replies_seen_count size of the 'replies_seen' array 192 * @param replies_seen_count size of the 'replies_seen' array
191 * @param rh handle to call when we get a reply 193 * @param rh handle to call when we get a reply
@@ -198,10 +200,12 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
198 const GNUNET_HashCode *query, 200 const GNUNET_HashCode *query,
199 const GNUNET_HashCode *namespace, 201 const GNUNET_HashCode *namespace,
200 const struct GNUNET_PeerIdentity *target, 202 const struct GNUNET_PeerIdentity *target,
201 const struct GNUNET_CONTAINER_BloomFilter *bf, 203 const char *bf_data,
204 size_t bf_size,
202 int32_t mingle, 205 int32_t mingle,
203 uint32_t anonymity_level, 206 uint32_t anonymity_level,
204 uint32_t priority, 207 uint32_t priority,
208 int32_t ttl,
205 const GNUNET_HashCode *replies_seen, 209 const GNUNET_HashCode *replies_seen,
206 unsigned int replies_seen_count, 210 unsigned int replies_seen_count,
207 GSF_PendingRequestReplyHandler rh, 211 GSF_PendingRequestReplyHandler rh,
@@ -226,8 +230,16 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
226 pr->public_data.priority = priority; 230 pr->public_data.priority = priority;
227 pr->public_data.options = options; 231 pr->public_data.options = options;
228 pr->public_data.type = type; 232 pr->public_data.type = type;
233 pr->public_data.start_time = GNUNET_TIME_absolute_get ();
229 pr->rh = rh; 234 pr->rh = rh;
230 pr->rh_cls = rh_cls; 235 pr->rh_cls = rh_cls;
236 if (ttl >= 0)
237 pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
238 (uint32_t) ttl));
239 else
240 pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
241 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
242 (uint32_t) (- ttl)));
231 if (replies_seen_count > 0) 243 if (replies_seen_count > 0)
232 { 244 {
233 pr->replies_seen_size = replies_seen_count; 245 pr->replies_seen_size = replies_seen_count;
@@ -237,9 +249,11 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
237 replies_seen_count * sizeof (struct GNUNET_HashCode)); 249 replies_seen_count * sizeof (struct GNUNET_HashCode));
238 pr->replies_seen_count = replies_seen_count; 250 pr->replies_seen_count = replies_seen_count;
239 } 251 }
240 if (NULL != bf) 252 if (NULL != bf_data)
241 { 253 {
242 pr->bf = GNUNET_CONTAINER_bloomfilter_copy (bf); 254 pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
255 bf_size,
256 BLOOMFILTER_K);
243 pr->mingle = mingle; 257 pr->mingle = mingle;
244 } 258 }
245 else if ( (replies_seen_count > 0) && 259 else if ( (replies_seen_count > 0) &&
@@ -254,11 +268,40 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
254 // FIXME: if not a local query, we also need to track the 268 // FIXME: if not a local query, we also need to track the
255 // total number of external queries we currently have and 269 // total number of external queries we currently have and
256 // bound it => need an additional heap! 270 // bound it => need an additional heap!
271
272 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
273 pr,
274 pr->start_time.abs_value + pr->ttl);
275
276
277
278 /* make sure we don't track too many requests */
279 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
280 {
281 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
282 GNUNET_assert (pr != NULL);
283 destroy_pending_request (pr);
284 }
285
286
257 return pr; 287 return pr;
258} 288}
259 289
260 290
261/** 291/**
292 * Obtain the public data associated with a pending request
293 *
294 * @param pr pending request
295 * @return associated public data
296 */
297struct GSF_PendingRequestData *
298GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
299{
300 return &pr->public_data;
301}
302
303
304/**
262 * Update a given pending request with additional replies 305 * Update a given pending request with additional replies
263 * that have been seen. 306 * that have been seen.
264 * 307 *
diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h
index 88c650042..bb6920ab1 100644
--- a/src/fs/gnunet-service-fs_pr.h
+++ b/src/fs/gnunet-service-fs_pr.h
@@ -92,6 +92,16 @@ struct GSF_PendingRequestData
92 struct GNUNET_PeerIdentity target; 92 struct GNUNET_PeerIdentity target;
93 93
94 /** 94 /**
95 * Current TTL for the request.
96 */
97 struct GNUNET_TIME_Absolute ttl;
98
99 /**
100 * When did we start with the request.
101 */
102 struct GNUNET_TIME_Absolute start_time;
103
104 /**
95 * Desired anonymity level. 105 * Desired anonymity level.
96 */ 106 */
97 uint32_t anonymity_level; 107 uint32_t anonymity_level;
@@ -146,10 +156,12 @@ typedef void (*GSF_PendingRequestReplyHandler)(void *cls,
146 * @param query key for the lookup 156 * @param query key for the lookup
147 * @param namespace namespace to lookup, NULL for no namespace 157 * @param namespace namespace to lookup, NULL for no namespace
148 * @param target preferred target for the request, NULL for none 158 * @param target preferred target for the request, NULL for none
149 * @param bf bloom filter for known replies, can be NULL 159 * @param bf_data raw data for bloom filter for known replies, can be NULL
160 * @param bf_size number of bytes in bf_data
150 * @param mingle mingle value for bf 161 * @param mingle mingle value for bf
151 * @param anonymity_level desired anonymity level 162 * @param anonymity_level desired anonymity level
152 * @param priority maximum outgoing cummulative request priority to use 163 * @param priority maximum outgoing cummulative request priority to use
164 * @param ttl current time-to-live for the request
153 * @param replies_seen hash codes of known local replies 165 * @param replies_seen hash codes of known local replies
154 * @param replies_seen_count size of the 'replies_seen' array 166 * @param replies_seen_count size of the 'replies_seen' array
155 * @param rh handle to call when we get a reply 167 * @param rh handle to call when we get a reply
@@ -162,10 +174,12 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
162 const GNUNET_HashCode *query, 174 const GNUNET_HashCode *query,
163 const GNUNET_HashCode *namespace, 175 const GNUNET_HashCode *namespace,
164 const struct GNUNET_PeerIdentity *target, 176 const struct GNUNET_PeerIdentity *target,
165 const struct GNUNET_CONTAINER_BloomFilter *bf, 177 const char *bf_data,
178 size_t bf_size,
166 int32_t mingle, 179 int32_t mingle,
167 uint32_t anonymity_level, 180 uint32_t anonymity_level,
168 uint32_t priority, 181 uint32_t priority,
182 int32_t ttl,
169 const GNUNET_HashCode *replies_seen, 183 const GNUNET_HashCode *replies_seen,
170 unsigned int replies_seen_count, 184 unsigned int replies_seen_count,
171 GSF_PendingRequestReplyHandler rh, 185 GSF_PendingRequestReplyHandler rh,
@@ -187,6 +201,16 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
187 201
188 202
189/** 203/**
204 * Obtain the public data associated with a pending request
205 *
206 * @param pr pending request
207 * @return associated public data
208 */
209struct GSF_PendingRequestData *
210GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr);
211
212
213/**
190 * Generate the message corresponding to the given pending request for 214 * Generate the message corresponding to the given pending request for
191 * transmission to other peers (or at least determine its size). 215 * transmission to other peers (or at least determine its size).
192 * 216 *