From e87e273ce5f864e20fcca02c34bef72de4fc00bd Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 10 Mar 2011 17:34:42 +0000 Subject: load --- src/fs/gnunet-service-fs.c | 1 + src/fs/gnunet-service-fs.h | 23 ++++++++- src/fs/gnunet-service-fs_cp.c | 31 +----------- src/fs/gnunet-service-fs_lc.c | 4 ++ src/fs/gnunet-service-fs_new.c | 68 +++++++++++++++++++++++++- src/fs/gnunet-service-fs_pr.c | 106 ++++++++++++++++++----------------------- src/fs/gnunet-service-fs_pr.h | 6 +-- 7 files changed, 146 insertions(+), 93 deletions(-) (limited to 'src/fs') diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index a9a3b7c5f..cbfac9322 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -955,6 +955,7 @@ static unsigned int cover_content_count; */ static GNUNET_SCHEDULER_TaskIdentifier cover_age_task; + static void age_cover_counters (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h index db9de3cb8..a82206e9c 100644 --- a/src/fs/gnunet-service-fs.h +++ b/src/fs/gnunet-service-fs.h @@ -126,13 +126,34 @@ extern unsigned int GSF_cover_query_count; */ extern unsigned int GSF_cover_content_count; - /** * Our block context. */ extern struct GNUNET_BLOCK_Context *GSF_block_ctx; +/** + * Test if the DATABASE (GET) load on this peer is too high + * to even consider processing the query at + * all. + * + * @return GNUNET_YES if the load is too high to do anything (load high) + * GNUNET_NO to process normally (load normal) + * GNUNET_SYSERR to process for free (load low) + */ +int +GSF_test_get_load_too_high_ (uint32_t priority); + + +/** + * We've just now completed a datastore request. Update our + * datastore load calculations. + * + * @param start time when the datastore request was issued + */ +void +GSF_update_datastore_delay_ (struct GNUNET_TIME_Absolute start); + #endif diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 407c9f7fe..6e39d7bd2 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -617,33 +617,6 @@ handle_p2p_reply (void *cls, } -/** - * Test if the DATABASE (GET) load on this peer is too high - * to even consider processing the query at - * all. - * - * @return GNUNET_YES if the load is too high to do anything (load high) - * GNUNET_NO to process normally (load normal) - * GNUNET_SYSERR to process for free (load low) - */ -static int -test_get_load_too_high (uint32_t priority) -{ -#if FIXME_later - double ld; - - ld = GNUNET_LOAD_get_load (datastore_get_load); - if (ld < 1) - return GNUNET_SYSERR; - if (ld <= priority) - return GNUNET_NO; - return GNUNET_YES; -#else - return GNUNET_SYSERR; -#endif -} - - /** * Increase the host credit by a value. * @@ -699,7 +672,7 @@ bound_priority (uint32_t prio_in, double rret; int ld; - ld = test_get_load_too_high (0); + ld = GSF_test_get_load_too_high_ (0); if (ld == GNUNET_SYSERR) { GNUNET_STATISTICS_update (GSF_stats, @@ -723,7 +696,7 @@ bound_priority (uint32_t prio_in, if ( (ld == GNUNET_YES) && (ret > 0) ) { /* try with charging */ - ld = test_get_load_too_high (ret); + ld = GSF_test_get_load_too_high_ (ret); } if (ld == GNUNET_YES) { diff --git a/src/fs/gnunet-service-fs_lc.c b/src/fs/gnunet-service-fs_lc.c index f284ba0f8..d5b6f4297 100644 --- a/src/fs/gnunet-service-fs_lc.c +++ b/src/fs/gnunet-service-fs_lc.c @@ -292,6 +292,10 @@ GSF_local_client_start_search_handler_ (struct GNUNET_SERVER_Client *client, (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) || (type == GNUNET_BLOCK_TYPE_ANY) ) { + /* FIXME: this does currently not work to filter duplicate + results from *local* datastore since the local store is + queried before we continue to process additional + messages from the client! -- fix protocol? */ cr = lc->cr_head; while (cr != NULL) { diff --git a/src/fs/gnunet-service-fs_new.c b/src/fs/gnunet-service-fs_new.c index 4b22a0c52..7ad1874f7 100644 --- a/src/fs/gnunet-service-fs_new.c +++ b/src/fs/gnunet-service-fs_new.c @@ -70,6 +70,13 @@ */ #define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) +/** + * At what frequency should our datastore load decrease + * automatically (since if we don't use it, clearly the + * load must be going down). + */ +#define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250) + /* ****************************** globals ****************************** */ @@ -152,6 +159,11 @@ static GNUNET_SCHEDULER_TaskIdentifier cover_age_task; */ static struct GNUNET_CORE_Handle *core; +/** + * Datastore 'GET' load tracking. + */ +static struct GNUNET_LOAD_Value *datastore_get_load; + /** * Identity of this peer. */ @@ -175,6 +187,50 @@ age_cover_counters (void *cls, } + +/** + * We've just now completed a datastore request. Update our + * datastore load calculations. + * + * @param start time when the datastore request was issued + */ +void +GSF_update_datastore_delay_ (struct GNUNET_TIME_Absolute start) +{ + struct GNUNET_TIME_Relative delay; + + delay = GNUNET_TIME_absolute_get_duration (start); + GNUNET_LOAD_update (datastore_get_load, + delay.rel_value); +} + + +/** + * Test if the DATABASE (GET) load on this peer is too high + * to even consider processing the query at + * all. + * + * @return GNUNET_YES if the load is too high to do anything (load high) + * GNUNET_NO to process normally (load normal) + * GNUNET_SYSERR to process for free (load low) + */ +int +GSF_test_get_load_too_high_ (uint32_t priority) +{ + double ld; + + ld = GNUNET_LOAD_get_load (datastore_get_load); + if (ld < 1) + return GNUNET_SYSERR; + if (ld <= priority) + return GNUNET_NO; + return GNUNET_YES; +} + + + + + /** * Handle P2P "PUT" message. * @@ -311,11 +367,18 @@ start_p2p_processing (void *cls, enum GNUNET_BLOCK_EvaluationResult result) { struct GNUNET_SERVER_Client *client = cls; + struct GSF_PendingRequestData *prd; GNUNET_SERVER_receive_done (client, GNUNET_OK); if (GNUNET_BLOCK_EVALUATION_OK_LAST == result) - return; /* we're done... */ + return; /* we're done, 'pr' was already destroyed... */ + prd = GSF_pending_request_get_data_ (pr); + if (0 != (GSF_PRO_LOCAL_ONLY & prd->options) ) + { + GSF_pending_request_cancel_ (pr); + return; + } GSF_dht_lookup_ (pr); consider_forwarding (NULL, pr, result); } @@ -381,6 +444,8 @@ shutdown_task (void *cls, GNUNET_SCHEDULER_cancel (cover_age_task); cover_age_task = GNUNET_SCHEDULER_NO_TASK; } + GNUNET_LOAD_value_free (datastore_get_load); + datastore_get_load = NULL; } @@ -512,6 +577,7 @@ main_init (struct GNUNET_SERVER_Handle *server, cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters, NULL); + datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 58af8be65..0fdcd0cf1 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c @@ -26,6 +26,7 @@ #include "platform.h" #include "gnunet_load_lib.h" #include "gnunet-service-fs_cp.h" +#include "gnunet-service-fs_indexing.h" #include "gnunet-service-fs_pr.h" @@ -74,6 +75,22 @@ struct GSF_PendingRequest */ struct GNUNET_DHT_GetHandle *gh; + /** + * Function to call upon completion of the local get + * request, or NULL for none. + */ + GSF_LocalLookupContinuation llc_cont; + + /** + * Closure for llc_cont. + */ + void *llc_cont_cls; + + /** + * Last result from the local datastore lookup evaluation. + */ + enum GNUNET_BLOCK_EvaluationResult local_result; + /** * Identity of the peer that we should use for the 'sender' * (recipient of the response) when forwarding (0 for none). @@ -683,6 +700,7 @@ process_reply (void *cls, prq->data, prq->size, GNUNET_NO); /* destroy request, we're done */ + prq->finished = GNUNET_YES; GSF_pending_request_cancel_ (pr); return GNUNET_YES; case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: @@ -909,14 +927,13 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr) pr); } - /** * We're processing (local) results for a search request * from another peer. Pass applicable results to the * peer and if we are done either clean up (operation * complete) or forward to other peers (more results possible). * - * @param cls our closure (struct LocalGetContext) + * @param cls our closure (struct PendingRequest) * @param key key for the content * @param size number of bytes in data * @param data content stored @@ -938,53 +955,23 @@ process_local_reply (void *cls, struct GNUNET_TIME_Absolute expiration, uint64_t uid) { -#if FIXME - struct PendingRequest *pr = cls; + struct GSF_PendingRequest *pr = cls; + GSF_LocalLookupContinuation cont; + struct ProcessReplyClosure prq; - struct CheckDuplicateRequestClosure cdrc; GNUNET_HashCode query; unsigned int old_rf; if (NULL == key) { -#if DEBUG_FS > 1 - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Done processing local replies, forwarding request to other peers.\n"); -#endif pr->qe = NULL; - if (pr->client_request_list != NULL) - { - GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, - GNUNET_YES); - /* Figure out if this is a duplicate request and possibly - merge 'struct PendingRequest' entries */ - cdrc.have = NULL; - cdrc.pr = pr; - GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, - &pr->query, - &check_duplicate_request_client, - &cdrc); - if (cdrc.have != NULL) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received request for block `%s' twice from client, will only request once.\n", - GNUNET_h2s (&pr->query)); -#endif - - destroy_pending_request (pr); - return; - } - } - if (pr->local_only == GNUNET_YES) + if (NULL != (cont = pr->llc_cont)) { - destroy_pending_request (pr); - return; + pr->llc_cont = NULL; + cont (pr->llc_cont_cls, + pr, + pr->local_result); } - /* no more results */ - if (pr->task == GNUNET_SCHEDULER_NO_TASK) - pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task, - pr); return; } #if DEBUG_FS @@ -999,7 +986,7 @@ process_local_reply (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found ONDEMAND block, performing on-demand encoding\n"); #endif - GNUNET_STATISTICS_update (stats, + GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# on-demand blocks matched requests"), 1, GNUNET_NO); @@ -1008,32 +995,32 @@ process_local_reply (void *cls, anonymity, expiration, uid, &process_local_reply, pr)) - if (pr->qe != NULL) { - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + if (pr->qe != NULL) + GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES); } return; } - old_rf = pr->results_found; + old_rf = pr->public_data.results_found; memset (&prq, 0, sizeof (prq)); prq.data = data; prq.expiration = expiration; prq.size = size; if (GNUNET_OK != - GNUNET_BLOCK_get_key (block_ctx, + GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, size, &query)) { GNUNET_break (0); - GNUNET_DATASTORE_remove (dsh, + GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1, GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL); - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES); return; } prq.type = type; @@ -1042,36 +1029,35 @@ process_local_reply (void *cls, prq.request_found = GNUNET_NO; prq.anonymity_level = anonymity; if ( (old_rf == 0) && - (pr->results_found == 0) ) - update_datastore_delays (pr->start_time); + (pr->public_data.results_found == 0) ) + GSF_update_datastore_delay_ (pr->public_data.start_time); process_reply (&prq, key, pr); if (prq.finished == GNUNET_YES) return; + pr->local_result = prq.eval; if (pr->qe == NULL) return; /* done here */ if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) { - pr->local_only = GNUNET_YES; /* do not forward */ - GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); + GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_NO); return; } - if ( (pr->client_request_list == NULL) && - ( (GNUNET_YES == test_get_load_too_high (0)) || - (pr->results_found > 5 + 2 * pr->priority) ) ) + if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && + ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) || + (pr->public_data.results_found > 5 + 2 * pr->public_data.priority) ) ) { #if DEBUG_FS > 2 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n"); #endif - GNUNET_STATISTICS_update (stats, + GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# processing result set cut short due to load"), 1, GNUNET_NO); - GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); + GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_NO); return; } - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); -#endif + GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES); } @@ -1087,8 +1073,10 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, GSF_LocalLookupContinuation cont, void *cont_cls) { - // FIXME: fix process_local_reply / cont! GNUNET_assert (NULL == pr->gh); + GNUNET_assert (NULL == pr->llc_cont); + pr->llc_cont = cont; + pr->llc_cont_cls = cont_cls; pr->qe = GNUNET_DATASTORE_get (GSF_dsh, &pr->public_data.query, pr->public_data.type, diff --git a/src/fs/gnunet-service-fs_pr.h b/src/fs/gnunet-service-fs_pr.h index 39a5fc77f..885947295 100644 --- a/src/fs/gnunet-service-fs_pr.h +++ b/src/fs/gnunet-service-fs_pr.h @@ -315,9 +315,9 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr); * @param pr the pending request we were processing * @param result final datastore lookup result */ -typedef void (GSF_LocalLookupContinuation)(void *cls, - struct GSF_PendingRequest *pr, - enum GNUNET_BLOCK_EvaluationResult result); +typedef void (*GSF_LocalLookupContinuation)(void *cls, + struct GSF_PendingRequest *pr, + enum GNUNET_BLOCK_EvaluationResult result); /** -- cgit v1.2.3