aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-02-15 13:07:14 +0000
committerChristian Grothoff <christian@grothoff.org>2011-02-15 13:07:14 +0000
commite3d12cb6fa5ddfb181dcade2e06888619f384457 (patch)
treee06e3bee1b98f3d8b917a328085e27b1081f1398 /src/fs/gnunet-service-fs_cp.c
parent3294e6c66210cdcca65524593ce09bbf4db14c7f (diff)
downloadgnunet-e3d12cb6fa5ddfb181dcade2e06888619f384457.tar.gz
gnunet-e3d12cb6fa5ddfb181dcade2e06888619f384457.zip
stuff
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c372
1 files changed, 179 insertions, 193 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 903549cb7..f9a642199 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -33,7 +33,6 @@
33#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) 33#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
34 34
35 35
36
37/** 36/**
38 * Handle to cancel a transmission request. 37 * Handle to cancel a transmission request.
39 */ 38 */
@@ -124,15 +123,25 @@ struct GSF_ConnectedPeer
124 struct GSF_PeerTransmitHandle *pth_tail; 123 struct GSF_PeerTransmitHandle *pth_tail;
125 124
126 /** 125 /**
126 * Migration stop message in our queue, or NULL if we have none pending.
127 */
128 struct GSF_PeerTransmitHandle *migration_pth;
129
130 /**
127 * Context of our GNUNET_CORE_peer_change_preference call (or NULL). 131 * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
128 * NULL if we have successfully reserved 32k, otherwise non-NULL. 132 * NULL if we have successfully reserved 32k, otherwise non-NULL.
129 */ 133 */
130 struct GNUNET_CORE_InformationRequestContext *irc; 134 struct GNUNET_CORE_InformationRequestContext *irc;
131 135
132 /** 136 /**
137 * Active requests from this neighbour.
138 */
139 struct GNUNET_CONTAINER_MulitHashMap *request_map;
140
141 /**
133 * ID of delay task for scheduling transmission. 142 * ID of delay task for scheduling transmission.
134 */ 143 */
135 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused! 144 GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: used in 'push' (ugh!)
136 145
137 /** 146 /**
138 * Increase in traffic preference still to be submitted 147 * Increase in traffic preference still to be submitted
@@ -282,12 +291,12 @@ peer_transmit_ready_cb (void *cls,
282 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 291 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
283 cp->pth_tail, 292 cp->pth_tail,
284 pth); 293 pth);
285 if (pth->is_query) 294 if (GNUNET_YES == pth->is_query)
286 { 295 {
287 cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get (); 296 cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
288 GNUNET_assert (0 < cp->ppd.pending_queries--); 297 GNUNET_assert (0 < cp->ppd.pending_queries--);
289 } 298 }
290 else 299 else if (GNUNET_NO == pth->is_query)
291 { 300 {
292 GNUNET_assert (0 < cp->ppd.pending_replies--); 301 GNUNET_assert (0 < cp->ppd.pending_replies--);
293 } 302 }
@@ -389,6 +398,7 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
389 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust)))) 398 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
390 cp->disk_trust = cp->trust = ntohl (trust); 399 cp->disk_trust = cp->trust = ntohl (trust);
391 GNUNET_free (fn); 400 GNUNET_free (fn);
401 cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
392 GNUNET_break (GNUNET_OK == 402 GNUNET_break (GNUNET_OK ==
393 GNUNET_CONTAINER_multihashmap_put (cp_map, 403 GNUNET_CONTAINER_multihashmap_put (cp_map,
394 &peer->hashPubKey, 404 &peer->hashPubKey,
@@ -442,7 +452,8 @@ GSF_handle_p2p_migration_stop_ (void *cls,
442 * and will also not be called anymore after a call signalling 452 * and will also not be called anymore after a call signalling
443 * expiration. 453 * expiration.
444 * 454 *
445 * @param cls user-specified closure 455 * @param cls 'struct GSF_ConnectedPeer' of the peer that would
456 * have liked an answer to the request
446 * @param pr handle to the original pending request 457 * @param pr handle to the original pending request
447 * @param data response data, NULL on request expiration 458 * @param data response data, NULL on request expiration
448 * @param data_len number of bytes in data 459 * @param data_len number of bytes in data
@@ -453,12 +464,22 @@ handle_p2p_reply (void *cls,
453 const void *data, 464 const void *data,
454 size_t data_len) 465 size_t data_len)
455{ 466{
467 struct GSF_ConnectedPeer *cp = cls;
468
456#if SUPPORT_DELAYS 469#if SUPPORT_DELAYS
457 struct GNUNET_TIME_Relative art_delay; 470 struct GNUNET_TIME_Relative art_delay;
458#endif 471#endif
459 472
460 /* FIXME: adapt code fragments below to new API! */ 473 /* FIXME: adapt code fragments below to new API! */
461 474 if (NULL == data)
475 {
476 /* FIXME: request expired! clean up! */
477 GNUNET_STATISTICS_update (stats,
478 gettext_noop ("# P2P searches active"),
479 -1,
480 GNUNET_NO);
481 return;
482 }
462 483
463 /* reply will go over the network, check for cover traffic */ 484 /* reply will go over the network, check for cover traffic */
464 if ( (prq->anonymity_level > 1) && 485 if ( (prq->anonymity_level > 1) &&
@@ -515,9 +536,11 @@ handle_p2p_reply (void *cls,
515} 536}
516 537
517 538
518
519/** 539/**
520 * Handle P2P "QUERY" message. 540 * Handle P2P "QUERY" message. Creates the pending request entry
541 * and sets up all of the data structures to that we will
542 * process replies properly. Does not initiate forwarding or
543 * local database lookups.
521 * 544 *
522 * @param other the other peer involved (sender or receiver, NULL 545 * @param other the other peer involved (sender or receiver, NULL
523 * for loopback messages where we are both sender and receiver) 546 * for loopback messages where we are both sender and receiver)
@@ -528,11 +551,13 @@ struct GSF_PendingRequest *
528GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, 551GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
529 const struct GNUNET_MessageHeader *message) 552 const struct GNUNET_MessageHeader *message)
530{ 553{
531 /* FIXME: adapt old code to new API! */ 554 struct GSF_PendingRequest *pr;
532 struct PendingRequest *pr; 555 struct GSF_PendingRequestData *prd;
533 struct ConnectedPeer *cp; 556 struct GSF_ConnectedPeer *cp;
534 struct ConnectedPeer *cps; 557 struct GSF_ConnectedPeer *cps;
535 struct CheckDuplicateRequestClosure cdc; 558 GNUNET_HashCode *namespace;
559 struct GNUNET_PeerIdentity *target;
560 enum GSF_PendingRequestOptions options;
536 struct GNUNET_TIME_Relative timeout; 561 struct GNUNET_TIME_Relative timeout;
537 uint16_t msize; 562 uint16_t msize;
538 const struct GetMessage *gm; 563 const struct GetMessage *gm;
@@ -542,8 +567,9 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
542 size_t bfsize; 567 size_t bfsize;
543 uint32_t ttl_decrement; 568 uint32_t ttl_decrement;
544 int32_t priority; 569 int32_t priority;
570 int32_t ttl;
545 enum GNUNET_BLOCK_Type type; 571 enum GNUNET_BLOCK_Type type;
546 int have_ns; 572
547 573
548 msize = ntohs(message->size); 574 msize = ntohs(message->size);
549 if (msize < sizeof (struct GetMessage)) 575 if (msize < sizeof (struct GetMessage))
@@ -615,7 +641,6 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
615 gettext_noop ("# requests dropped due to missing reverse route"), 641 gettext_noop ("# requests dropped due to missing reverse route"),
616 1, 642 1,
617 GNUNET_NO); 643 GNUNET_NO);
618 /* FIXME: try connect? */
619 return GNUNET_OK; 644 return GNUNET_OK;
620 } 645 }
621 /* note that we can really only check load here since otherwise 646 /* note that we can really only check load here since otherwise
@@ -639,14 +664,9 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
639 GNUNET_i2s (other), 664 GNUNET_i2s (other),
640 (unsigned int) bm); 665 (unsigned int) bm);
641#endif 666#endif
642 have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)); 667 namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
643 pr = GNUNET_malloc (sizeof (struct PendingRequest) + 668 target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
644 (have_ns ? sizeof(GNUNET_HashCode) : 0)); 669 options = 0;
645 if (have_ns)
646 {
647 pr->namespace = (GNUNET_HashCode*) &pr[1];
648 memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
649 }
650 if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) || 670 if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
651 (GNUNET_LOAD_get_average (cp->transmission_delay) > 671 (GNUNET_LOAD_get_average (cp->transmission_delay) >
652 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) ) 672 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
@@ -654,28 +674,21 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
654 /* don't have BW to send to peer, or would likely take longer than we have for it, 674 /* don't have BW to send to peer, or would likely take longer than we have for it,
655 so at best indirect the query */ 675 so at best indirect the query */
656 priority = 0; 676 priority = 0;
657 pr->forward_only = GNUNET_YES; 677 options |= GSF_PRO_FORWARD_ONLY;
658 } 678 }
659 pr->type = type; 679 ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
660 pr->mingle = ntohl (gm->filter_mutator);
661 if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
662 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
663 pr->anonymity_level = 1;
664 pr->priority = (uint32_t) priority;
665 pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
666 pr->query = gm->query;
667 /* decrement ttl (always) */ 680 /* decrement ttl (always) */
668 ttl_decrement = 2 * TTL_DECREMENT + 681 ttl_decrement = 2 * TTL_DECREMENT +
669 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 682 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
670 TTL_DECREMENT); 683 TTL_DECREMENT);
671 if ( (pr->ttl < 0) && 684 if ( (ttl < 0) &&
672 (((int32_t)(pr->ttl - ttl_decrement)) > 0) ) 685 (((int32_t)(ttl - ttl_decrement)) > 0) )
673 { 686 {
674#if DEBUG_FS 687#if DEBUG_FS
675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 688 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
676 "Dropping query from `%s' due to TTL underflow (%d - %u).\n", 689 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
677 GNUNET_i2s (other), 690 GNUNET_i2s (other),
678 pr->ttl, 691 ttl,
679 ttl_decrement); 692 ttl_decrement);
680#endif 693#endif
681 GNUNET_STATISTICS_update (stats, 694 GNUNET_STATISTICS_update (stats,
@@ -683,74 +696,66 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
683 1, 696 1,
684 GNUNET_NO); 697 GNUNET_NO);
685 /* integer underflow => drop (should be very rare)! */ 698 /* integer underflow => drop (should be very rare)! */
686 GNUNET_free (pr);
687 return GNUNET_OK; 699 return GNUNET_OK;
688 } 700 }
689 pr->ttl -= ttl_decrement; 701 ttl -= ttl_decrement;
690 pr->start_time = GNUNET_TIME_absolute_get (); 702
691 703 /* test if the request already exists */
692 /* get bloom filter */ 704 pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
693 if (bfsize > 0) 705 &gm->query);
694 { 706 if (pr != NULL)
695 pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits], 707 {
696 bfsize, 708 prd = GSF_pending_request_get_data_ (pr);
697 BLOOMFILTER_K); 709 if ( (prd->type == type) &&
698 pr->bf_size = bfsize; 710 ( (type != GNUNET_BLOCK_TYPE_SBLOCK) ||
699 } 711 (0 == memcmp (prd->namespace,
700 cdc.have = NULL; 712 namespace,
701 cdc.pr = pr; 713 sizeof (GNUNET_HashCode))) ) )
702 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
703 &gm->query,
704 &check_duplicate_request_peer,
705 &cdc);
706 if (cdc.have != NULL)
707 {
708 if (cdc.have->start_time.abs_value + cdc.have->ttl >=
709 pr->start_time.abs_value + pr->ttl)
710 { 714 {
711 /* existing request has higher TTL, drop new one! */ 715 if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl)
712 cdc.have->priority += pr->priority; 716 {
713 destroy_pending_request (pr); 717 /* existing request has higher TTL, drop new one! */
718 prd->priority += priority;
714#if DEBUG_FS 719#if DEBUG_FS
715 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 720 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
716 "Have existing request with higher TTL, dropping new request.\n", 721 "Have existing request with higher TTL, dropping new request.\n",
717 GNUNET_i2s (other)); 722 GNUNET_i2s (other));
718#endif 723#endif
719 GNUNET_STATISTICS_update (stats, 724 GNUNET_STATISTICS_update (stats,
720 gettext_noop ("# requests dropped due to higher-TTL request"), 725 gettext_noop ("# requests dropped due to higher-TTL request"),
721 1, 726 1,
722 GNUNET_NO); 727 GNUNET_NO);
723 return GNUNET_OK; 728 return GNUNET_OK;
724 } 729 }
725 else
726 {
727 /* existing request has lower TTL, drop old one! */ 730 /* existing request has lower TTL, drop old one! */
728 pr->priority += cdc.have->priority; 731 pr->priority += prd->priority;
729 /* Possible optimization: if we have applicable pending 732 GSF_pending_request_cancel_ (pr);
730 replies in 'cdc.have', we might want to move those over 733 GNUNET_assert (GNUNET_YES ==
731 (this is a really rare special-case, so it is not clear 734 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
732 that this would be worth it) */ 735 &gm->query,
733 destroy_pending_request (cdc.have); 736 pr));
734 /* keep processing 'pr'! */
735 } 737 }
736 } 738 }
737 739
738 pr->cp = cp; 740 pr = GSF_pending_request_create (options,
741 type,
742 &gm->query,
743 namespace,
744 target,
745 (bf_size > 0) ? (const char*)&opt[bits] : NULL,
746 bf_size,
747 ntohl (gm->filter_mutator),
748 1 /* anonymity */
749 (uint32_t) priority,
750 ttl,
751 NULL, 0, /* replies_seen */
752 &handle_p2p_reply,
753 cp);
739 GNUNET_break (GNUNET_OK == 754 GNUNET_break (GNUNET_OK ==
740 GNUNET_CONTAINER_multihashmap_put (query_request_map, 755 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
741 &gm->query, 756 &gm->query,
742 pr, 757 pr,
743 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 758 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
744 GNUNET_break (GNUNET_OK ==
745 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
746 &other->hashPubKey,
747 pr,
748 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
749
750 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
751 pr,
752 pr->start_time.abs_value + pr->ttl);
753
754 GNUNET_STATISTICS_update (stats, 759 GNUNET_STATISTICS_update (stats,
755 gettext_noop ("# P2P searches received"), 760 gettext_noop ("# P2P searches received"),
756 1, 761 1,
@@ -759,83 +764,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
759 gettext_noop ("# P2P searches active"), 764 gettext_noop ("# P2P searches active"),
760 1, 765 1,
761 GNUNET_NO); 766 GNUNET_NO);
762 767 return pr;
763 /* calculate change in traffic preference */
764 cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
765 /* process locally */
766 if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
767 type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
768 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
769 (pr->priority + 1));
770 if (GNUNET_YES != pr->forward_only)
771 {
772#if DEBUG_FS
773 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774 "Handing request for `%s' to datastore\n",
775 GNUNET_h2s (&gm->query));
776#endif
777 pr->qe = GNUNET_DATASTORE_get (dsh,
778 &gm->query,
779 type,
780 pr->priority + 1,
781 MAX_DATASTORE_QUEUE,
782 timeout,
783 &process_local_reply,
784 pr);
785 if (NULL == pr->qe)
786 {
787 GNUNET_STATISTICS_update (stats,
788 gettext_noop ("# requests dropped by datastore (queue length limit)"),
789 1,
790 GNUNET_NO);
791 }
792 }
793 else
794 {
795 GNUNET_STATISTICS_update (stats,
796 gettext_noop ("# requests forwarded due to high load"),
797 1,
798 GNUNET_NO);
799 }
800
801 /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
802 switch (pr->type)
803 {
804 case GNUNET_BLOCK_TYPE_FS_DBLOCK:
805 case GNUNET_BLOCK_TYPE_FS_IBLOCK:
806 /* only one result, wait for datastore */
807 if (GNUNET_YES != pr->forward_only)
808 {
809 GNUNET_STATISTICS_update (stats,
810 gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
811 1,
812 GNUNET_NO);
813 break;
814 }
815 default:
816 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
817 pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
818 pr);
819 }
820
821 /* make sure we don't track too many requests */
822 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
823 {
824 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
825 GNUNET_assert (pr != NULL);
826 destroy_pending_request (pr);
827 }
828 return GNUNET_OK;
829
830
831
832 // FIXME!
833 // parse request
834 // setup pending request (use 'handle_p2p_reply')
835 // track pending request to cancel it on peer disconnect (!)
836 // return it!
837 // (actual planning & execution up to caller!)
838 return NULL;
839} 768}
840 769
841 770
@@ -858,9 +787,9 @@ peer_transmit_timeout (void *cls,
858 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 787 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
859 cp->pth_tail, 788 cp->pth_tail,
860 pth); 789 pth);
861 if (pth->is_query) 790 if (GNUNET_YES == pth->is_query)
862 GNUNET_assert (0 < cp->ppd.pending_queries--); 791 GNUNET_assert (0 < cp->ppd.pending_queries--);
863 else 792 else if (GNUNET_NO == pth->is_query)
864 GNUNET_assert (0 < cp->ppd.pending_replies--); 793 GNUNET_assert (0 < cp->ppd.pending_replies--);
865 GNUNET_LOAD_update (cp->ppd.transmission_delay, 794 GNUNET_LOAD_update (cp->ppd.transmission_delay,
866 UINT64_MAX); 795 UINT64_MAX);
@@ -876,7 +805,7 @@ peer_transmit_timeout (void *cls,
876 * the callback is invoked with a 'NULL' buffer. 805 * the callback is invoked with a 'NULL' buffer.
877 * 806 *
878 * @param peer target peer 807 * @param peer target peer
879 * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) 808 * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
880 * @param priority how important is this request? 809 * @param priority how important is this request?
881 * @param timeout when does this request timeout (call gmc with error) 810 * @param timeout when does this request timeout (call gmc with error)
882 * @param size number of bytes we would like to send to the peer 811 * @param size number of bytes we would like to send to the peer
@@ -933,9 +862,10 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
933 pth); 862 pth);
934 GNUNET_PEER_resolve (cp->pid, 863 GNUNET_PEER_resolve (cp->pid,
935 &target); 864 &target);
936 if (is_query) 865 if (GNUNET_YES == is_query)
937 { 866 {
938 /* query, need reservation */ 867 /* query, need reservation */
868 cp->ppd.pending_queries++;
939 if (NULL == cp->irc) 869 if (NULL == cp->irc)
940 { 870 {
941 /* reservation already done! */ 871 /* reservation already done! */
@@ -957,9 +887,15 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
957 is_ready = GNUNET_NO; 887 is_ready = GNUNET_NO;
958 } 888 }
959 } 889 }
960 else 890 else if (GNUNET_NO == is_query)
961 { 891 {
962 /* no reservation needed for content */ 892 /* no reservation needed for content */
893 cp->ppd.pending_replies++;
894 is_ready = GNUNET_YES;
895 }
896 else
897 {
898 /* not a query or content, no reservation needed */
963 is_ready = GNUNET_YES; 899 is_ready = GNUNET_YES;
964 } 900 }
965 if (is_ready) 901 if (is_ready)
@@ -1011,9 +947,9 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1011 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 947 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1012 cp->pth_tail, 948 cp->pth_tail,
1013 pth); 949 pth);
1014 if (pth->is_query) 950 if (GNUNET_YES == pth->is_query)
1015 GNUNET_assert (0 < cp->ppd.pending_queries--); 951 GNUNET_assert (0 < cp->ppd.pending_queries--);
1016 else 952 else if (GNUNET_NO == pth->is_query)
1017 GNUNET_assert (0 < cp->ppd.pending_replies--); 953 GNUNET_assert (0 < cp->ppd.pending_replies--);
1018 GNUNET_free (pth); 954 GNUNET_free (pth);
1019} 955}
@@ -1085,6 +1021,26 @@ GSF_peer_status_handler_ (void *cls,
1085 1021
1086 1022
1087/** 1023/**
1024 * Cancel all requests associated with the peer.
1025 *
1026 * @param cls unused
1027 * @param query hash code of the request
1028 * @param value the 'struct GSF_PendingRequest'
1029 * @return GNUNET_YES (continue to iterate)
1030 */
1031static int
1032cancel_pending_request (void *cls,
1033 const GNUNET_HashCode *query,
1034 void *value)
1035{
1036 struct GSF_PendingRequest *pr = value;
1037
1038 GSF_pending_request_cancel_ (pr);
1039 return GNUNET_OK;
1040}
1041
1042
1043/**
1088 * A peer disconnected from us. Tear down the connected peer 1044 * A peer disconnected from us. Tear down the connected peer
1089 * record. 1045 * record.
1090 * 1046 *
@@ -1104,11 +1060,21 @@ GSF_peer_disconnect_handler_ (void *cls,
1104 GNUNET_CONTAINER_multihashmap_remove (cp_map, 1060 GNUNET_CONTAINER_multihashmap_remove (cp_map,
1105 &peer->hashPubKey, 1061 &peer->hashPubKey,
1106 cp); 1062 cp);
1063 if (NULL != cp->migration_pth)
1064 {
1065 GSF_peer_transmit_cancel_ (cp->migration_pth);
1066 cp->migration_pth = NULL;
1067 }
1107 if (NULL != cp->irc) 1068 if (NULL != cp->irc)
1108 { 1069 {
1109 GNUNET_CORE_peer_change_preference_cancel (cp->irc); 1070 GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1110 cp->irc = NULL; 1071 cp->irc = NULL;
1111 } 1072 }
1073 GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1074 &cancel_pending_request,
1075 cp);
1076 GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1077 cp->request_map = NULL;
1112 GSF_plan_notify_peer_disconnect_ (cp); 1078 GSF_plan_notify_peer_disconnect_ (cp);
1113 GNUNET_LOAD_value_free (cp->ppd.transmission_delay); 1079 GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1114 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE); 1080 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
@@ -1206,6 +1172,34 @@ GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1206 1172
1207 1173
1208/** 1174/**
1175 * Assemble a migration stop message for transmission.
1176 *
1177 * @param cls the 'struct GSF_ConnectedPeer' to use
1178 * @param size number of bytes we're allowed to write to buf
1179 * @param buf where to copy the message
1180 * @return number of bytes copied to buf
1181 */
1182static size_t
1183create_migration_stop_message (void *cls,
1184 size_t size,
1185 void *buf)
1186{
1187 struct GSF_ConnectedPeer *cp = cls;
1188 struct MigrationStopMessage msm;
1189
1190 cp->migration_pth = NULL;
1191 if (NULL == buf)
1192 return 0;
1193 GNUNET_assert (size > sizeof (struct MigrationStopMessage));
1194 msm.header.size = htons (sizeof (struct MigrationStopMessage));
1195 msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1196 msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block));
1197 memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1198 return sizeof (struct MigrationStopMessage);
1199}
1200
1201
1202/**
1209 * Ask a peer to stop migrating data to us until the given point 1203 * Ask a peer to stop migrating data to us until the given point
1210 * in time. 1204 * in time.
1211 * 1205 *
@@ -1216,30 +1210,22 @@ void
1216GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, 1210GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1217 struct GNUNET_TIME_Relative block_time) 1211 struct GNUNET_TIME_Relative block_time)
1218{ 1212{
1219 struct PendingMessage *pm;
1220 struct MigrationStopMessage *msm;
1221
1222 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value) 1213 if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value)
1223 return; /* already blocked */ 1214 return; /* already blocked */
1224 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time); 1215 cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
1225 1216 if (cp->migration_pth != NULL)
1226 /* FIXME: adapt old code below to new API! */ 1217 GSF_peer_transmit_cancel_ (cp->migration_pth);
1227 pm = GNUNET_malloc (sizeof (struct PendingMessage) + 1218 cp->migration_pth
1228 sizeof (struct MigrationStopMessage)); 1219 = GSF_peer_transmit_ (cp,
1229 pm->msize = sizeof (struct MigrationStopMessage); 1220 GNUNET_SYSERR,
1230 pm->priority = UINT32_MAX; 1221 UINT32_MAX,
1231 msm = (struct MigrationStopMessage*) &pm[1]; 1222 GNUNET_TIME_UNIT_FOREVER_REL,
1232 msm->header.size = htons (sizeof (struct MigrationStopMessage)); 1223 sizeof (struct MigrationStopMessage),
1233 msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); 1224 &create_migration_stop_message,
1234 msm->duration = GNUNET_TIME_relative_hton (block_time); 1225 cp);
1235 add_to_pending_messages_for_peer (cp,
1236 pm,
1237 NULL);
1238} 1226}
1239 1227
1240 1228
1241
1242
1243/** 1229/**
1244 * Write host-trust information to a file - flush the buffer entry! 1230 * Write host-trust information to a file - flush the buffer entry!
1245 * 1231 *