diff options
author | Christian Grothoff <christian@grothoff.org> | 2011-02-15 13:07:14 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2011-02-15 13:07:14 +0000 |
commit | e3d12cb6fa5ddfb181dcade2e06888619f384457 (patch) | |
tree | e06e3bee1b98f3d8b917a328085e27b1081f1398 /src/fs/gnunet-service-fs_cp.c | |
parent | 3294e6c66210cdcca65524593ce09bbf4db14c7f (diff) | |
download | gnunet-e3d12cb6fa5ddfb181dcade2e06888619f384457.tar.gz gnunet-e3d12cb6fa5ddfb181dcade2e06888619f384457.zip |
stuff
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r-- | src/fs/gnunet-service-fs_cp.c | 372 |
1 files changed, 179 insertions, 193 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 903549cb7..f9a642199 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c | |||
@@ -33,7 +33,6 @@ | |||
33 | #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) | 33 | #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) |
34 | 34 | ||
35 | 35 | ||
36 | |||
37 | /** | 36 | /** |
38 | * Handle to cancel a transmission request. | 37 | * Handle to cancel a transmission request. |
39 | */ | 38 | */ |
@@ -124,15 +123,25 @@ struct GSF_ConnectedPeer | |||
124 | struct GSF_PeerTransmitHandle *pth_tail; | 123 | struct GSF_PeerTransmitHandle *pth_tail; |
125 | 124 | ||
126 | /** | 125 | /** |
126 | * Migration stop message in our queue, or NULL if we have none pending. | ||
127 | */ | ||
128 | struct GSF_PeerTransmitHandle *migration_pth; | ||
129 | |||
130 | /** | ||
127 | * Context of our GNUNET_CORE_peer_change_preference call (or NULL). | 131 | * Context of our GNUNET_CORE_peer_change_preference call (or NULL). |
128 | * NULL if we have successfully reserved 32k, otherwise non-NULL. | 132 | * NULL if we have successfully reserved 32k, otherwise non-NULL. |
129 | */ | 133 | */ |
130 | struct GNUNET_CORE_InformationRequestContext *irc; | 134 | struct GNUNET_CORE_InformationRequestContext *irc; |
131 | 135 | ||
132 | /** | 136 | /** |
137 | * Active requests from this neighbour. | ||
138 | */ | ||
139 | struct GNUNET_CONTAINER_MulitHashMap *request_map; | ||
140 | |||
141 | /** | ||
133 | * ID of delay task for scheduling transmission. | 142 | * ID of delay task for scheduling transmission. |
134 | */ | 143 | */ |
135 | GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: unused! | 144 | GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: used in 'push' (ugh!) |
136 | 145 | ||
137 | /** | 146 | /** |
138 | * Increase in traffic preference still to be submitted | 147 | * Increase in traffic preference still to be submitted |
@@ -282,12 +291,12 @@ peer_transmit_ready_cb (void *cls, | |||
282 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, | 291 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, |
283 | cp->pth_tail, | 292 | cp->pth_tail, |
284 | pth); | 293 | pth); |
285 | if (pth->is_query) | 294 | if (GNUNET_YES == pth->is_query) |
286 | { | 295 | { |
287 | cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get (); | 296 | cp->ppd.last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get (); |
288 | GNUNET_assert (0 < cp->ppd.pending_queries--); | 297 | GNUNET_assert (0 < cp->ppd.pending_queries--); |
289 | } | 298 | } |
290 | else | 299 | else if (GNUNET_NO == pth->is_query) |
291 | { | 300 | { |
292 | GNUNET_assert (0 < cp->ppd.pending_replies--); | 301 | GNUNET_assert (0 < cp->ppd.pending_replies--); |
293 | } | 302 | } |
@@ -389,6 +398,7 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, | |||
389 | (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust)))) | 398 | (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust)))) |
390 | cp->disk_trust = cp->trust = ntohl (trust); | 399 | cp->disk_trust = cp->trust = ntohl (trust); |
391 | GNUNET_free (fn); | 400 | GNUNET_free (fn); |
401 | cp->request_map = GNUNET_CONTAINER_multihashmap_create (128); | ||
392 | GNUNET_break (GNUNET_OK == | 402 | GNUNET_break (GNUNET_OK == |
393 | GNUNET_CONTAINER_multihashmap_put (cp_map, | 403 | GNUNET_CONTAINER_multihashmap_put (cp_map, |
394 | &peer->hashPubKey, | 404 | &peer->hashPubKey, |
@@ -442,7 +452,8 @@ GSF_handle_p2p_migration_stop_ (void *cls, | |||
442 | * and will also not be called anymore after a call signalling | 452 | * and will also not be called anymore after a call signalling |
443 | * expiration. | 453 | * expiration. |
444 | * | 454 | * |
445 | * @param cls user-specified closure | 455 | * @param cls 'struct GSF_ConnectedPeer' of the peer that would |
456 | * have liked an answer to the request | ||
446 | * @param pr handle to the original pending request | 457 | * @param pr handle to the original pending request |
447 | * @param data response data, NULL on request expiration | 458 | * @param data response data, NULL on request expiration |
448 | * @param data_len number of bytes in data | 459 | * @param data_len number of bytes in data |
@@ -453,12 +464,22 @@ handle_p2p_reply (void *cls, | |||
453 | const void *data, | 464 | const void *data, |
454 | size_t data_len) | 465 | size_t data_len) |
455 | { | 466 | { |
467 | struct GSF_ConnectedPeer *cp = cls; | ||
468 | |||
456 | #if SUPPORT_DELAYS | 469 | #if SUPPORT_DELAYS |
457 | struct GNUNET_TIME_Relative art_delay; | 470 | struct GNUNET_TIME_Relative art_delay; |
458 | #endif | 471 | #endif |
459 | 472 | ||
460 | /* FIXME: adapt code fragments below to new API! */ | 473 | /* FIXME: adapt code fragments below to new API! */ |
461 | 474 | if (NULL == data) | |
475 | { | ||
476 | /* FIXME: request expired! clean up! */ | ||
477 | GNUNET_STATISTICS_update (stats, | ||
478 | gettext_noop ("# P2P searches active"), | ||
479 | -1, | ||
480 | GNUNET_NO); | ||
481 | return; | ||
482 | } | ||
462 | 483 | ||
463 | /* reply will go over the network, check for cover traffic */ | 484 | /* reply will go over the network, check for cover traffic */ |
464 | if ( (prq->anonymity_level > 1) && | 485 | if ( (prq->anonymity_level > 1) && |
@@ -515,9 +536,11 @@ handle_p2p_reply (void *cls, | |||
515 | } | 536 | } |
516 | 537 | ||
517 | 538 | ||
518 | |||
519 | /** | 539 | /** |
520 | * Handle P2P "QUERY" message. | 540 | * Handle P2P "QUERY" message. Creates the pending request entry |
541 | * and sets up all of the data structures to that we will | ||
542 | * process replies properly. Does not initiate forwarding or | ||
543 | * local database lookups. | ||
521 | * | 544 | * |
522 | * @param other the other peer involved (sender or receiver, NULL | 545 | * @param other the other peer involved (sender or receiver, NULL |
523 | * for loopback messages where we are both sender and receiver) | 546 | * for loopback messages where we are both sender and receiver) |
@@ -528,11 +551,13 @@ struct GSF_PendingRequest * | |||
528 | GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | 551 | GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, |
529 | const struct GNUNET_MessageHeader *message) | 552 | const struct GNUNET_MessageHeader *message) |
530 | { | 553 | { |
531 | /* FIXME: adapt old code to new API! */ | 554 | struct GSF_PendingRequest *pr; |
532 | struct PendingRequest *pr; | 555 | struct GSF_PendingRequestData *prd; |
533 | struct ConnectedPeer *cp; | 556 | struct GSF_ConnectedPeer *cp; |
534 | struct ConnectedPeer *cps; | 557 | struct GSF_ConnectedPeer *cps; |
535 | struct CheckDuplicateRequestClosure cdc; | 558 | GNUNET_HashCode *namespace; |
559 | struct GNUNET_PeerIdentity *target; | ||
560 | enum GSF_PendingRequestOptions options; | ||
536 | struct GNUNET_TIME_Relative timeout; | 561 | struct GNUNET_TIME_Relative timeout; |
537 | uint16_t msize; | 562 | uint16_t msize; |
538 | const struct GetMessage *gm; | 563 | const struct GetMessage *gm; |
@@ -542,8 +567,9 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
542 | size_t bfsize; | 567 | size_t bfsize; |
543 | uint32_t ttl_decrement; | 568 | uint32_t ttl_decrement; |
544 | int32_t priority; | 569 | int32_t priority; |
570 | int32_t ttl; | ||
545 | enum GNUNET_BLOCK_Type type; | 571 | enum GNUNET_BLOCK_Type type; |
546 | int have_ns; | 572 | |
547 | 573 | ||
548 | msize = ntohs(message->size); | 574 | msize = ntohs(message->size); |
549 | if (msize < sizeof (struct GetMessage)) | 575 | if (msize < sizeof (struct GetMessage)) |
@@ -615,7 +641,6 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
615 | gettext_noop ("# requests dropped due to missing reverse route"), | 641 | gettext_noop ("# requests dropped due to missing reverse route"), |
616 | 1, | 642 | 1, |
617 | GNUNET_NO); | 643 | GNUNET_NO); |
618 | /* FIXME: try connect? */ | ||
619 | return GNUNET_OK; | 644 | return GNUNET_OK; |
620 | } | 645 | } |
621 | /* note that we can really only check load here since otherwise | 646 | /* note that we can really only check load here since otherwise |
@@ -639,14 +664,9 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
639 | GNUNET_i2s (other), | 664 | GNUNET_i2s (other), |
640 | (unsigned int) bm); | 665 | (unsigned int) bm); |
641 | #endif | 666 | #endif |
642 | have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)); | 667 | namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL; |
643 | pr = GNUNET_malloc (sizeof (struct PendingRequest) + | 668 | target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL; |
644 | (have_ns ? sizeof(GNUNET_HashCode) : 0)); | 669 | options = 0; |
645 | if (have_ns) | ||
646 | { | ||
647 | pr->namespace = (GNUNET_HashCode*) &pr[1]; | ||
648 | memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode)); | ||
649 | } | ||
650 | if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) || | 670 | if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) || |
651 | (GNUNET_LOAD_get_average (cp->transmission_delay) > | 671 | (GNUNET_LOAD_get_average (cp->transmission_delay) > |
652 | GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) ) | 672 | GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) ) |
@@ -654,28 +674,21 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
654 | /* don't have BW to send to peer, or would likely take longer than we have for it, | 674 | /* don't have BW to send to peer, or would likely take longer than we have for it, |
655 | so at best indirect the query */ | 675 | so at best indirect the query */ |
656 | priority = 0; | 676 | priority = 0; |
657 | pr->forward_only = GNUNET_YES; | 677 | options |= GSF_PRO_FORWARD_ONLY; |
658 | } | 678 | } |
659 | pr->type = type; | 679 | ttl = bound_ttl (ntohl (gm->ttl), pr->priority); |
660 | pr->mingle = ntohl (gm->filter_mutator); | ||
661 | if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) | ||
662 | pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]); | ||
663 | pr->anonymity_level = 1; | ||
664 | pr->priority = (uint32_t) priority; | ||
665 | pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority); | ||
666 | pr->query = gm->query; | ||
667 | /* decrement ttl (always) */ | 680 | /* decrement ttl (always) */ |
668 | ttl_decrement = 2 * TTL_DECREMENT + | 681 | ttl_decrement = 2 * TTL_DECREMENT + |
669 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | 682 | GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, |
670 | TTL_DECREMENT); | 683 | TTL_DECREMENT); |
671 | if ( (pr->ttl < 0) && | 684 | if ( (ttl < 0) && |
672 | (((int32_t)(pr->ttl - ttl_decrement)) > 0) ) | 685 | (((int32_t)(ttl - ttl_decrement)) > 0) ) |
673 | { | 686 | { |
674 | #if DEBUG_FS | 687 | #if DEBUG_FS |
675 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 688 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
676 | "Dropping query from `%s' due to TTL underflow (%d - %u).\n", | 689 | "Dropping query from `%s' due to TTL underflow (%d - %u).\n", |
677 | GNUNET_i2s (other), | 690 | GNUNET_i2s (other), |
678 | pr->ttl, | 691 | ttl, |
679 | ttl_decrement); | 692 | ttl_decrement); |
680 | #endif | 693 | #endif |
681 | GNUNET_STATISTICS_update (stats, | 694 | GNUNET_STATISTICS_update (stats, |
@@ -683,74 +696,66 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
683 | 1, | 696 | 1, |
684 | GNUNET_NO); | 697 | GNUNET_NO); |
685 | /* integer underflow => drop (should be very rare)! */ | 698 | /* integer underflow => drop (should be very rare)! */ |
686 | GNUNET_free (pr); | ||
687 | return GNUNET_OK; | 699 | return GNUNET_OK; |
688 | } | 700 | } |
689 | pr->ttl -= ttl_decrement; | 701 | ttl -= ttl_decrement; |
690 | pr->start_time = GNUNET_TIME_absolute_get (); | 702 | |
691 | 703 | /* test if the request already exists */ | |
692 | /* get bloom filter */ | 704 | pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map, |
693 | if (bfsize > 0) | 705 | &gm->query); |
694 | { | 706 | if (pr != NULL) |
695 | pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits], | 707 | { |
696 | bfsize, | 708 | prd = GSF_pending_request_get_data_ (pr); |
697 | BLOOMFILTER_K); | 709 | if ( (prd->type == type) && |
698 | pr->bf_size = bfsize; | 710 | ( (type != GNUNET_BLOCK_TYPE_SBLOCK) || |
699 | } | 711 | (0 == memcmp (prd->namespace, |
700 | cdc.have = NULL; | 712 | namespace, |
701 | cdc.pr = pr; | 713 | sizeof (GNUNET_HashCode))) ) ) |
702 | GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, | ||
703 | &gm->query, | ||
704 | &check_duplicate_request_peer, | ||
705 | &cdc); | ||
706 | if (cdc.have != NULL) | ||
707 | { | ||
708 | if (cdc.have->start_time.abs_value + cdc.have->ttl >= | ||
709 | pr->start_time.abs_value + pr->ttl) | ||
710 | { | 714 | { |
711 | /* existing request has higher TTL, drop new one! */ | 715 | if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl) |
712 | cdc.have->priority += pr->priority; | 716 | { |
713 | destroy_pending_request (pr); | 717 | /* existing request has higher TTL, drop new one! */ |
718 | prd->priority += priority; | ||
714 | #if DEBUG_FS | 719 | #if DEBUG_FS |
715 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 720 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
716 | "Have existing request with higher TTL, dropping new request.\n", | 721 | "Have existing request with higher TTL, dropping new request.\n", |
717 | GNUNET_i2s (other)); | 722 | GNUNET_i2s (other)); |
718 | #endif | 723 | #endif |
719 | GNUNET_STATISTICS_update (stats, | 724 | GNUNET_STATISTICS_update (stats, |
720 | gettext_noop ("# requests dropped due to higher-TTL request"), | 725 | gettext_noop ("# requests dropped due to higher-TTL request"), |
721 | 1, | 726 | 1, |
722 | GNUNET_NO); | 727 | GNUNET_NO); |
723 | return GNUNET_OK; | 728 | return GNUNET_OK; |
724 | } | 729 | } |
725 | else | ||
726 | { | ||
727 | /* existing request has lower TTL, drop old one! */ | 730 | /* existing request has lower TTL, drop old one! */ |
728 | pr->priority += cdc.have->priority; | 731 | pr->priority += prd->priority; |
729 | /* Possible optimization: if we have applicable pending | 732 | GSF_pending_request_cancel_ (pr); |
730 | replies in 'cdc.have', we might want to move those over | 733 | GNUNET_assert (GNUNET_YES == |
731 | (this is a really rare special-case, so it is not clear | 734 | GNUNET_CONTAINER_multihashmap_remove (cp->request_map, |
732 | that this would be worth it) */ | 735 | &gm->query, |
733 | destroy_pending_request (cdc.have); | 736 | pr)); |
734 | /* keep processing 'pr'! */ | ||
735 | } | 737 | } |
736 | } | 738 | } |
737 | 739 | ||
738 | pr->cp = cp; | 740 | pr = GSF_pending_request_create (options, |
741 | type, | ||
742 | &gm->query, | ||
743 | namespace, | ||
744 | target, | ||
745 | (bf_size > 0) ? (const char*)&opt[bits] : NULL, | ||
746 | bf_size, | ||
747 | ntohl (gm->filter_mutator), | ||
748 | 1 /* anonymity */ | ||
749 | (uint32_t) priority, | ||
750 | ttl, | ||
751 | NULL, 0, /* replies_seen */ | ||
752 | &handle_p2p_reply, | ||
753 | cp); | ||
739 | GNUNET_break (GNUNET_OK == | 754 | GNUNET_break (GNUNET_OK == |
740 | GNUNET_CONTAINER_multihashmap_put (query_request_map, | 755 | GNUNET_CONTAINER_multihashmap_put (cp->request_map, |
741 | &gm->query, | 756 | &gm->query, |
742 | pr, | 757 | pr, |
743 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | 758 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
744 | GNUNET_break (GNUNET_OK == | ||
745 | GNUNET_CONTAINER_multihashmap_put (peer_request_map, | ||
746 | &other->hashPubKey, | ||
747 | pr, | ||
748 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | ||
749 | |||
750 | pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, | ||
751 | pr, | ||
752 | pr->start_time.abs_value + pr->ttl); | ||
753 | |||
754 | GNUNET_STATISTICS_update (stats, | 759 | GNUNET_STATISTICS_update (stats, |
755 | gettext_noop ("# P2P searches received"), | 760 | gettext_noop ("# P2P searches received"), |
756 | 1, | 761 | 1, |
@@ -759,83 +764,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
759 | gettext_noop ("# P2P searches active"), | 764 | gettext_noop ("# P2P searches active"), |
760 | 1, | 765 | 1, |
761 | GNUNET_NO); | 766 | GNUNET_NO); |
762 | 767 | return pr; | |
763 | /* calculate change in traffic preference */ | ||
764 | cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE; | ||
765 | /* process locally */ | ||
766 | if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) | ||
767 | type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */ | ||
768 | timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, | ||
769 | (pr->priority + 1)); | ||
770 | if (GNUNET_YES != pr->forward_only) | ||
771 | { | ||
772 | #if DEBUG_FS | ||
773 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
774 | "Handing request for `%s' to datastore\n", | ||
775 | GNUNET_h2s (&gm->query)); | ||
776 | #endif | ||
777 | pr->qe = GNUNET_DATASTORE_get (dsh, | ||
778 | &gm->query, | ||
779 | type, | ||
780 | pr->priority + 1, | ||
781 | MAX_DATASTORE_QUEUE, | ||
782 | timeout, | ||
783 | &process_local_reply, | ||
784 | pr); | ||
785 | if (NULL == pr->qe) | ||
786 | { | ||
787 | GNUNET_STATISTICS_update (stats, | ||
788 | gettext_noop ("# requests dropped by datastore (queue length limit)"), | ||
789 | 1, | ||
790 | GNUNET_NO); | ||
791 | } | ||
792 | } | ||
793 | else | ||
794 | { | ||
795 | GNUNET_STATISTICS_update (stats, | ||
796 | gettext_noop ("# requests forwarded due to high load"), | ||
797 | 1, | ||
798 | GNUNET_NO); | ||
799 | } | ||
800 | |||
801 | /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */ | ||
802 | switch (pr->type) | ||
803 | { | ||
804 | case GNUNET_BLOCK_TYPE_FS_DBLOCK: | ||
805 | case GNUNET_BLOCK_TYPE_FS_IBLOCK: | ||
806 | /* only one result, wait for datastore */ | ||
807 | if (GNUNET_YES != pr->forward_only) | ||
808 | { | ||
809 | GNUNET_STATISTICS_update (stats, | ||
810 | gettext_noop ("# requests not instantly forwarded (waiting for datastore)"), | ||
811 | 1, | ||
812 | GNUNET_NO); | ||
813 | break; | ||
814 | } | ||
815 | default: | ||
816 | if (pr->task == GNUNET_SCHEDULER_NO_TASK) | ||
817 | pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task, | ||
818 | pr); | ||
819 | } | ||
820 | |||
821 | /* make sure we don't track too many requests */ | ||
822 | if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) | ||
823 | { | ||
824 | pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); | ||
825 | GNUNET_assert (pr != NULL); | ||
826 | destroy_pending_request (pr); | ||
827 | } | ||
828 | return GNUNET_OK; | ||
829 | |||
830 | |||
831 | |||
832 | // FIXME! | ||
833 | // parse request | ||
834 | // setup pending request (use 'handle_p2p_reply') | ||
835 | // track pending request to cancel it on peer disconnect (!) | ||
836 | // return it! | ||
837 | // (actual planning & execution up to caller!) | ||
838 | return NULL; | ||
839 | } | 768 | } |
840 | 769 | ||
841 | 770 | ||
@@ -858,9 +787,9 @@ peer_transmit_timeout (void *cls, | |||
858 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, | 787 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, |
859 | cp->pth_tail, | 788 | cp->pth_tail, |
860 | pth); | 789 | pth); |
861 | if (pth->is_query) | 790 | if (GNUNET_YES == pth->is_query) |
862 | GNUNET_assert (0 < cp->ppd.pending_queries--); | 791 | GNUNET_assert (0 < cp->ppd.pending_queries--); |
863 | else | 792 | else if (GNUNET_NO == pth->is_query) |
864 | GNUNET_assert (0 < cp->ppd.pending_replies--); | 793 | GNUNET_assert (0 < cp->ppd.pending_replies--); |
865 | GNUNET_LOAD_update (cp->ppd.transmission_delay, | 794 | GNUNET_LOAD_update (cp->ppd.transmission_delay, |
866 | UINT64_MAX); | 795 | UINT64_MAX); |
@@ -876,7 +805,7 @@ peer_transmit_timeout (void *cls, | |||
876 | * the callback is invoked with a 'NULL' buffer. | 805 | * the callback is invoked with a 'NULL' buffer. |
877 | * | 806 | * |
878 | * @param peer target peer | 807 | * @param peer target peer |
879 | * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) | 808 | * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR) |
880 | * @param priority how important is this request? | 809 | * @param priority how important is this request? |
881 | * @param timeout when does this request timeout (call gmc with error) | 810 | * @param timeout when does this request timeout (call gmc with error) |
882 | * @param size number of bytes we would like to send to the peer | 811 | * @param size number of bytes we would like to send to the peer |
@@ -933,9 +862,10 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer, | |||
933 | pth); | 862 | pth); |
934 | GNUNET_PEER_resolve (cp->pid, | 863 | GNUNET_PEER_resolve (cp->pid, |
935 | &target); | 864 | &target); |
936 | if (is_query) | 865 | if (GNUNET_YES == is_query) |
937 | { | 866 | { |
938 | /* query, need reservation */ | 867 | /* query, need reservation */ |
868 | cp->ppd.pending_queries++; | ||
939 | if (NULL == cp->irc) | 869 | if (NULL == cp->irc) |
940 | { | 870 | { |
941 | /* reservation already done! */ | 871 | /* reservation already done! */ |
@@ -957,9 +887,15 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer, | |||
957 | is_ready = GNUNET_NO; | 887 | is_ready = GNUNET_NO; |
958 | } | 888 | } |
959 | } | 889 | } |
960 | else | 890 | else if (GNUNET_NO == is_query) |
961 | { | 891 | { |
962 | /* no reservation needed for content */ | 892 | /* no reservation needed for content */ |
893 | cp->ppd.pending_replies++; | ||
894 | is_ready = GNUNET_YES; | ||
895 | } | ||
896 | else | ||
897 | { | ||
898 | /* not a query or content, no reservation needed */ | ||
963 | is_ready = GNUNET_YES; | 899 | is_ready = GNUNET_YES; |
964 | } | 900 | } |
965 | if (is_ready) | 901 | if (is_ready) |
@@ -1011,9 +947,9 @@ GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth) | |||
1011 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, | 947 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, |
1012 | cp->pth_tail, | 948 | cp->pth_tail, |
1013 | pth); | 949 | pth); |
1014 | if (pth->is_query) | 950 | if (GNUNET_YES == pth->is_query) |
1015 | GNUNET_assert (0 < cp->ppd.pending_queries--); | 951 | GNUNET_assert (0 < cp->ppd.pending_queries--); |
1016 | else | 952 | else if (GNUNET_NO == pth->is_query) |
1017 | GNUNET_assert (0 < cp->ppd.pending_replies--); | 953 | GNUNET_assert (0 < cp->ppd.pending_replies--); |
1018 | GNUNET_free (pth); | 954 | GNUNET_free (pth); |
1019 | } | 955 | } |
@@ -1085,6 +1021,26 @@ GSF_peer_status_handler_ (void *cls, | |||
1085 | 1021 | ||
1086 | 1022 | ||
1087 | /** | 1023 | /** |
1024 | * Cancel all requests associated with the peer. | ||
1025 | * | ||
1026 | * @param cls unused | ||
1027 | * @param query hash code of the request | ||
1028 | * @param value the 'struct GSF_PendingRequest' | ||
1029 | * @return GNUNET_YES (continue to iterate) | ||
1030 | */ | ||
1031 | static int | ||
1032 | cancel_pending_request (void *cls, | ||
1033 | const GNUNET_HashCode *query, | ||
1034 | void *value) | ||
1035 | { | ||
1036 | struct GSF_PendingRequest *pr = value; | ||
1037 | |||
1038 | GSF_pending_request_cancel_ (pr); | ||
1039 | return GNUNET_OK; | ||
1040 | } | ||
1041 | |||
1042 | |||
1043 | /** | ||
1088 | * A peer disconnected from us. Tear down the connected peer | 1044 | * A peer disconnected from us. Tear down the connected peer |
1089 | * record. | 1045 | * record. |
1090 | * | 1046 | * |
@@ -1104,11 +1060,21 @@ GSF_peer_disconnect_handler_ (void *cls, | |||
1104 | GNUNET_CONTAINER_multihashmap_remove (cp_map, | 1060 | GNUNET_CONTAINER_multihashmap_remove (cp_map, |
1105 | &peer->hashPubKey, | 1061 | &peer->hashPubKey, |
1106 | cp); | 1062 | cp); |
1063 | if (NULL != cp->migration_pth) | ||
1064 | { | ||
1065 | GSF_peer_transmit_cancel_ (cp->migration_pth); | ||
1066 | cp->migration_pth = NULL; | ||
1067 | } | ||
1107 | if (NULL != cp->irc) | 1068 | if (NULL != cp->irc) |
1108 | { | 1069 | { |
1109 | GNUNET_CORE_peer_change_preference_cancel (cp->irc); | 1070 | GNUNET_CORE_peer_change_preference_cancel (cp->irc); |
1110 | cp->irc = NULL; | 1071 | cp->irc = NULL; |
1111 | } | 1072 | } |
1073 | GNUNET_CONTAINER_multihashmap_iterate (cp->request_map, | ||
1074 | &cancel_pending_request, | ||
1075 | cp); | ||
1076 | GNUNET_CONTAINER_multihashmap_destroy (cp->request_map); | ||
1077 | cp->request_map = NULL; | ||
1112 | GSF_plan_notify_peer_disconnect_ (cp); | 1078 | GSF_plan_notify_peer_disconnect_ (cp); |
1113 | GNUNET_LOAD_value_free (cp->ppd.transmission_delay); | 1079 | GNUNET_LOAD_value_free (cp->ppd.transmission_delay); |
1114 | GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE); | 1080 | GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE); |
@@ -1206,6 +1172,34 @@ GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp, | |||
1206 | 1172 | ||
1207 | 1173 | ||
1208 | /** | 1174 | /** |
1175 | * Assemble a migration stop message for transmission. | ||
1176 | * | ||
1177 | * @param cls the 'struct GSF_ConnectedPeer' to use | ||
1178 | * @param size number of bytes we're allowed to write to buf | ||
1179 | * @param buf where to copy the message | ||
1180 | * @return number of bytes copied to buf | ||
1181 | */ | ||
1182 | static size_t | ||
1183 | create_migration_stop_message (void *cls, | ||
1184 | size_t size, | ||
1185 | void *buf) | ||
1186 | { | ||
1187 | struct GSF_ConnectedPeer *cp = cls; | ||
1188 | struct MigrationStopMessage msm; | ||
1189 | |||
1190 | cp->migration_pth = NULL; | ||
1191 | if (NULL == buf) | ||
1192 | return 0; | ||
1193 | GNUNET_assert (size > sizeof (struct MigrationStopMessage)); | ||
1194 | msm.header.size = htons (sizeof (struct MigrationStopMessage)); | ||
1195 | msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); | ||
1196 | msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (cp->last_migration_block)); | ||
1197 | memcpy (buf, &msm, sizeof (struct MigrationStopMessage)); | ||
1198 | return sizeof (struct MigrationStopMessage); | ||
1199 | } | ||
1200 | |||
1201 | |||
1202 | /** | ||
1209 | * Ask a peer to stop migrating data to us until the given point | 1203 | * Ask a peer to stop migrating data to us until the given point |
1210 | * in time. | 1204 | * in time. |
1211 | * | 1205 | * |
@@ -1216,30 +1210,22 @@ void | |||
1216 | GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, | 1210 | GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, |
1217 | struct GNUNET_TIME_Relative block_time) | 1211 | struct GNUNET_TIME_Relative block_time) |
1218 | { | 1212 | { |
1219 | struct PendingMessage *pm; | ||
1220 | struct MigrationStopMessage *msm; | ||
1221 | |||
1222 | if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value) | 1213 | if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > block_time.rel_value) |
1223 | return; /* already blocked */ | 1214 | return; /* already blocked */ |
1224 | cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time); | 1215 | cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time); |
1225 | 1216 | if (cp->migration_pth != NULL) | |
1226 | /* FIXME: adapt old code below to new API! */ | 1217 | GSF_peer_transmit_cancel_ (cp->migration_pth); |
1227 | pm = GNUNET_malloc (sizeof (struct PendingMessage) + | 1218 | cp->migration_pth |
1228 | sizeof (struct MigrationStopMessage)); | 1219 | = GSF_peer_transmit_ (cp, |
1229 | pm->msize = sizeof (struct MigrationStopMessage); | 1220 | GNUNET_SYSERR, |
1230 | pm->priority = UINT32_MAX; | 1221 | UINT32_MAX, |
1231 | msm = (struct MigrationStopMessage*) &pm[1]; | 1222 | GNUNET_TIME_UNIT_FOREVER_REL, |
1232 | msm->header.size = htons (sizeof (struct MigrationStopMessage)); | 1223 | sizeof (struct MigrationStopMessage), |
1233 | msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); | 1224 | &create_migration_stop_message, |
1234 | msm->duration = GNUNET_TIME_relative_hton (block_time); | 1225 | cp); |
1235 | add_to_pending_messages_for_peer (cp, | ||
1236 | pm, | ||
1237 | NULL); | ||
1238 | } | 1226 | } |
1239 | 1227 | ||
1240 | 1228 | ||
1241 | |||
1242 | |||
1243 | /** | 1229 | /** |
1244 | * Write host-trust information to a file - flush the buffer entry! | 1230 | * Write host-trust information to a file - flush the buffer entry! |
1245 | * | 1231 | * |