From 5f0ced15a7ac2363341fc5657f93f5932bda2719 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 7 Oct 2010 12:52:19 +0000 Subject: entry lifetime tracking --- src/fs/gnunet-service-fs.c | 220 ++++++++++++++++++++++++++++++++------------- 1 file changed, 159 insertions(+), 61 deletions(-) (limited to 'src/fs') diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 7c78dd21c..c2baf1ada 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -24,9 +24,6 @@ * @author Christian Grothoff * * TODO: - * - track per-peer request latency (using new load API) - * - consider more precise latency estimation (per-peer & request) -- again load API? - * - implement test_load_too_high, make decision priority-based, implement forwarding, etc. * - introduce random latency in processing * - more statistics */ @@ -594,12 +591,17 @@ struct PendingRequest /** * Remove this request after transmission of the current response. */ - int16_t do_remove; + int8_t do_remove; /** * GNUNET_YES if we should not forward this request to other peers. */ - int16_t local_only; + int8_t local_only; + + /** + * GNUNET_YES if we should not forward this request to other peers. + */ + int8_t forward_only; }; @@ -808,6 +810,10 @@ static struct GNUNET_LOAD_Value *datastore_get_load; */ static struct GNUNET_LOAD_Value *datastore_put_load; +/** + * How long do requests typically stay in the routing table? + */ +static struct GNUNET_LOAD_Value *rt_entry_lifetime; /** * We've just now completed a datastore request. Update our @@ -1378,12 +1384,14 @@ destroy_pending_request (struct PendingRequest *pr) -1, GNUNET_NO); } - /* might have already been removed from map in 'process_reply' (if - there was a unique reply) or never inserted if it was a - duplicate; hence ignore the return value here */ - (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map, - &pr->query, - pr); + if (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (query_request_map, + &pr->query, + pr)) + { + GNUNET_LOAD_update (rt_entry_lifetime, + GNUNET_TIME_absolute_get_duration (pr->start_time).value); + } if (pr->qe != NULL) { GNUNET_DATASTORE_cancel (pr->qe); @@ -1843,6 +1851,8 @@ shutdown_task (void *cls, GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map)); GNUNET_CONTAINER_multihashmap_destroy (query_request_map); query_request_map = NULL; + GNUNET_LOAD_value_free (rt_entry_lifetime); + rt_entry_lifetime = NULL; GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map)); GNUNET_CONTAINER_multihashmap_destroy (peer_request_map); peer_request_map = NULL; @@ -2078,16 +2088,69 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp, /** - * Test if the load on this peer is too high + * Test if the DATABASE (GET) load on this peer is too high * to even consider processing the query at - * all. + * all. * - * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low) + * @return GNUNET_YES if the load is too high to do anything (load high) + * GNUNET_NO to process normally (load normal) + * GNUNET_SYSERR to process for free (load low) */ static int -test_load_too_high () +test_get_load_too_high (uint32_t priority) { - return GNUNET_SYSERR; // FIXME + double ld; + + ld = GNUNET_LOAD_get_load (datastore_get_load); + if (ld < 1) + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests done for free (low load)"), + 1, + GNUNET_NO); + return GNUNET_SYSERR; + } + if (ld <= priority) + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests done for a price (normal load)"), + 1, + GNUNET_NO); + return GNUNET_NO; + } + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests dropped due to high load"), + 1, + GNUNET_NO); + return GNUNET_YES; +} + + + + +/** + * Test if the DATABASE (PUT) load on this peer is too high + * to even consider processing the query at + * all. + * + * @return GNUNET_YES if the load is too high to do anything (load high) + * GNUNET_NO to process normally (load normal or low) + */ +static int +test_put_load_too_high (uint32_t priority) +{ + double ld; + + if (GNUNET_LOAD_get_average (datastore_put_load) < 50) + return GNUNET_NO; /* very fast */ + ld = GNUNET_LOAD_get_load (datastore_put_load); + if ( (ld < 1) || (ld < priority) ) + return GNUNET_NO; + GNUNET_STATISTICS_update (stats, + gettext_noop ("# storage requests dropped due to high load"), + 1, + GNUNET_NO); + return GNUNET_YES; } @@ -2329,8 +2392,6 @@ target_reservation_cb (void *cls, pr); return; /* this target round failed */ } - /* FIXME: if we are "quite" busy, we may still want to skip - this round; need more load detection code! */ no_route = GNUNET_YES; } @@ -2486,7 +2547,7 @@ target_peer_select_cb (void *cls, P2P_SUCCESS_LIST_SIZE); for (i=0;ilast_p2p_replies[i] == pr->cp->pid) - score += 1; /* likely successful based on hot path */ + score += 1.0; /* likely successful based on hot path */ } else { @@ -2494,14 +2555,14 @@ target_peer_select_cb (void *cls, CS2P_SUCCESS_LIST_SIZE); for (i=0;ilast_client_replies[i] == pr->client_request_list->client_list->client) - score += 1; /* likely successful based on hot path */ + score += 1.0; /* likely successful based on hot path */ } /* 3b) include latency */ if (cp->avg_delay.value < 4 * TTL_DECREMENT) - score += 1; /* likely fast based on latency */ + score += 1.0; /* likely fast based on latency */ /* 3c) include priorities */ if (cp->avg_priority <= pr->remaining_priority / 2.0) - score += 1; /* likely successful based on priorities */ + score += 1.0; /* likely successful based on priorities */ /* 3d) penalize for queue size */ score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); /* 3e) include peer proximity */ @@ -2616,6 +2677,7 @@ forward_request_task (void *cls, return; /* configured to not do P2P search */ /* (0) try DHT */ if ( (0 == pr->anonymity_level) && + (GNUNET_YES != pr->forward_only) && (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) && (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) ) { @@ -2677,18 +2739,28 @@ forward_request_task (void *cls, } /* (3) reserve reply bandwidth */ - cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, - &psc.target.hashPubKey); - GNUNET_assert (NULL != cp); - pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, - &psc.target, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - GNUNET_BANDWIDTH_value_init (UINT32_MAX), - DBLOCK_SIZE * 2, - cp->inc_preference, - &target_reservation_cb, - pr); - cp->inc_preference = 0; + if (GNUNET_NO == pr->forward_only) + { + cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &psc.target.hashPubKey); + GNUNET_assert (NULL != cp); + pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, + &psc.target, + GNUNET_CONSTANTS_SERVICE_TIMEOUT, + GNUNET_BANDWIDTH_value_init (UINT32_MAX), + DBLOCK_SIZE * 2, + cp->inc_preference, + &target_reservation_cb, + pr); + cp->inc_preference = 0; + } + else + { + /* force forwarding */ + static struct GNUNET_BANDWIDTH_Value32NBO zerobw; + target_reservation_cb (pr, &psc.target, + zerobw, zerobw, 0, 0.0); + } } @@ -2871,8 +2943,6 @@ process_reply (void *cls, GNUNET_NO); if (prq->sender != NULL) { - /* FIXME: should we be more precise here and not use - "start_time" but a peer-specific time stamp? */ cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time); prq->sender->avg_delay.value = (prq->sender->avg_delay.value * @@ -2936,6 +3006,8 @@ process_reply (void *cls, GNUNET_CONTAINER_multihashmap_remove (query_request_map, key, pr)); + GNUNET_LOAD_update (rt_entry_lifetime, + GNUNET_TIME_absolute_get_duration (pr->start_time).value); break; case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: GNUNET_STATISTICS_update (stats, @@ -3216,8 +3288,9 @@ handle_p2p_put (void *cls, prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority; prq.sender->trust += prq.priority; } - if (GNUNET_YES == active_migration) - { + if ( (GNUNET_YES == active_migration) && + (GNUNET_NO == test_put_load_too_high (prq.priority)) ) + { #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Replicating result for query `%s' with priority %u\n", @@ -3498,7 +3571,7 @@ process_local_reply (void *cls, return; } if ( (pr->client_request_list == NULL) && - ( (GNUNET_YES == test_load_too_high()) || + ( (GNUNET_YES == test_get_load_too_high (0)) || (pr->results_found > 5 + 2 * pr->priority) ) ) { #if DEBUG_FS > 2 @@ -3524,7 +3597,7 @@ process_local_reply (void *cls, * @param cp the peer making the request * @return effective priority */ -static uint32_t +static int32_t bound_priority (uint32_t prio_in, struct ConnectedPeer *cp) { @@ -3533,7 +3606,7 @@ bound_priority (uint32_t prio_in, double rret; int ld; - ld = test_load_too_high (); + ld = test_get_load_too_high (0); if (ld == GNUNET_SYSERR) return 0; /* excess resources */ ret = change_host_trust (cp, prio_in); @@ -3546,6 +3619,18 @@ bound_priority (uint32_t prio_in, current_priorities = (current_priorities * (N-1) + rret)/N; } + if ( (ld == GNUNET_YES) && (ret > 0) ) + { + /* try with charging */ + ld = test_get_load_too_high (ret); + } + if (ld == GNUNET_YES) + { + /* undo charge */ + if (ret != 0) + change_host_trust (cp, -ret); + return -1; /* not enough resources */ + } #undef N return ret; } @@ -3612,9 +3697,9 @@ handle_p2p_get (void *cls, uint32_t bm; size_t bfsize; uint32_t ttl_decrement; + int32_t priority; enum GNUNET_BLOCK_Type type; int have_ns; - int ld; msize = ntohs(message->size); if (msize < sizeof (struct GetMessage)) @@ -3680,11 +3765,8 @@ handle_p2p_get (void *cls, /* note that we can really only check load here since otherwise peers could find out that we are overloaded by not being disconnected after sending us a malformed query... */ - - /* FIXME: query priority should play - a major role here! */ - ld = test_load_too_high (); - if (GNUNET_YES == ld) + priority = bound_priority (ntohl (gm->priority), cps); + if (priority < 0) { #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -3697,9 +3779,6 @@ handle_p2p_get (void *cls, GNUNET_NO); return GNUNET_OK; } - /* FIXME: if ld == GNUNET_NO, forward - instead of indirecting! */ - #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received request for `%s' of type %u from peer `%4s' with flags %u\n", @@ -3716,12 +3795,21 @@ handle_p2p_get (void *cls, pr->namespace = (GNUNET_HashCode*) &pr[1]; memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode)); } + if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3) || + (GNUNET_LOAD_get_average (cp->transmission_delay) > + GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) ) + { + /* don't have BW to send to peer, or would likely take longer than we have for it, + so at best indirect the query */ + priority = 0; + pr->forward_only = GNUNET_YES; + } pr->type = type; pr->mingle = ntohl (gm->filter_mutator); if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]); pr->anonymity_level = 1; - pr->priority = bound_priority (ntohl (gm->priority), cps); + pr->priority = (uint32_t) priority; pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority); pr->query = gm->query; /* decrement ttl (always) */ @@ -3828,22 +3916,29 @@ handle_p2p_get (void *cls, type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */ timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, (pr->priority + 1)); - pr->qe = GNUNET_DATASTORE_get (dsh, - &gm->query, - type, - pr->priority + 1, - MAX_DATASTORE_QUEUE, - timeout, - &process_local_reply, - pr); + if (GNUNET_YES != pr->forward_only) + pr->qe = GNUNET_DATASTORE_get (dsh, + &gm->query, + type, + pr->priority + 1, + MAX_DATASTORE_QUEUE, + timeout, + &process_local_reply, + pr); + else + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests forwarded due to high load"), + 1, + GNUNET_NO); - /* Are multiple results possible? If so, start processing remotely now! */ + /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */ switch (pr->type) { case GNUNET_BLOCK_TYPE_FS_DBLOCK: case GNUNET_BLOCK_TYPE_FS_IBLOCK: /* only one result, wait for datastore */ - break; + if (GNUNET_YES != pr->forward_only) + break; default: if (pr->task == GNUNET_SCHEDULER_NO_TASK) pr->task = GNUNET_SCHEDULER_add_now (sched, @@ -4085,6 +4180,7 @@ main_init (struct GNUNET_SCHEDULER_Handle *s, } connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests); + rt_entry_lifetime = GNUNET_LOAD_value_init (); peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc); requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); core = GNUNET_CORE_connect (sched, @@ -4107,6 +4203,8 @@ main_init (struct GNUNET_SCHEDULER_Handle *s, connected_peers = NULL; GNUNET_CONTAINER_multihashmap_destroy (query_request_map); query_request_map = NULL; + GNUNET_LOAD_value_free (rt_entry_lifetime); + rt_entry_lifetime = NULL; GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap); requests_by_expiration_heap = NULL; GNUNET_CONTAINER_multihashmap_destroy (peer_request_map); -- cgit v1.2.3