From 984627dce529221b7e814218af4c761172bee670 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 19 Oct 2010 18:50:03 +0000 Subject: bugfixes, more todo --- TODO | 5 + src/fs/fs.h | 11 +- src/fs/fs_test_lib_data.conf | 7 +- src/fs/gnunet-service-fs.c | 228 ++++++++++++++++++++++++++++-------- src/fs/perf_gnunet_service_fs_p2p.c | 19 ++- 5 files changed, 208 insertions(+), 62 deletions(-) diff --git a/TODO b/TODO index 5d50342a3..0618570fd 100644 --- a/TODO +++ b/TODO @@ -33,6 +33,11 @@ - also do UPnP-based (external) IP detection (Note: build library always, build UPnP service when dependencies like libxml2 are available) * FS: [CG] + - service: + + 2-peer perf test does NOT terminate for large (500 MB) files because + somehow blocks are not found (suspect: load-based no DB lookup + forward first, no clean up of routing table?) + + 2-peer perf test goes WAY over bandwidth limit (i.e. 300 kbps/set, 2 MB/s transfer rate); clearly core does + not properly enforce the limit - library: + reconstruct IBLOCKS from DBLOCKS if possible (during download; see FIXME in fs_download) + add support for pushing "already seen" search results to FS service for bloomfilter diff --git a/src/fs/fs.h b/src/fs/fs.h index d48af35b4..d2c2d3c7f 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h @@ -34,10 +34,19 @@ #include "gnunet_block_lib.h" #include "block_fs.h" + +/** + * Maximum number of outgoing messages we queue per peer. + */ +#define MAX_QUEUE_PER_PEER 16 + /** * Maximum size of the datastore queue for P2P operations. + * Needs to be large enough to queue MAX_QUEUE_PER_PEER + * operations for roughly the number of active (connected) + * peers. */ -#define MAX_DATASTORE_QUEUE 16 +#define MAX_DATASTORE_QUEUE (16 * MAX_QUEUE_PER_PEER) /** * Maximum number of blocks we keep in memory for migration. diff --git a/src/fs/fs_test_lib_data.conf b/src/fs/fs_test_lib_data.conf index c9dafa748..7228629ee 100644 --- a/src/fs/fs_test_lib_data.conf +++ b/src/fs/fs_test_lib_data.conf @@ -22,6 +22,7 @@ DEFAULTSERVICES = fs [datastore] #DEBUG = YES #PREFIX = valgrind --tool=memcheck --leak-check=yes +QUOTA = 2000000000 [statistics] PORT = 43467 @@ -40,8 +41,8 @@ PORT = 43470 HOSTNAME = localhost #TOTAL_QUOTA_IN = 9321 #TOTAL_QUOTA_OUT = 9321 -TOTAL_QUOTA_IN = 3932160 -TOTAL_QUOTA_OUT = 3932160 +TOTAL_QUOTA_IN = 393216 +TOTAL_QUOTA_OUT = 393216 #DEBUG = YES #PREFIX = valgrind --tool=memcheck --leak-check=yes #BINARY = /home/grothoff/bin/gnunet-service-core @@ -51,7 +52,7 @@ PORT = 43471 HOSTNAME = localhost #OPTIONS = -L DEBUG ACTIVEMIGRATION = NO -#DEBUG = YES +DEBUG = YES #PREFIX = valgrind --tool=memcheck --leak-check=yes #BINARY = /home/grothoff/gn9/bin/gnunet-service-fs #PREFIX = xterm -e gdb -x cmd --args diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 23ddf60c6..95b000778 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -50,11 +50,6 @@ */ #define SUPPORT_DELAYS GNUNET_NO -/** - * Maximum number of outgoing messages we queue per peer. - */ -#define MAX_QUEUE_PER_PEER 16 - /** * Size for the hash map for DHT requests from the FS * service. Should be about the number of concurrent @@ -191,6 +186,10 @@ struct ConnectedPeer * getting a reply (only calculated over the requests for * which we actually got a reply). Calculated * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n + * + * FIXME: actually, this is currently the delay between us originally + * receiving (not forwarding!) a request and us receiving a reply from + * this peer (regardless of when we transmitted this request to this peer!) */ struct GNUNET_TIME_Relative avg_delay; @@ -206,6 +205,15 @@ struct ConnectedPeer */ struct GNUNET_TIME_Absolute last_migration_block; + /** + * Transmission times for the last MAX_QUEUE_PER_PEER + * requests for this peer. Used as a ring buffer, current + * offset is stored in 'last_request_times_off'. If the + * oldest entry is more recent than the 'avg_delay', we should + * not send any more requests right now. + */ + struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER]; + /** * Handle for an active request for transmission to this * peer, or NULL. @@ -285,6 +293,11 @@ struct ConnectedPeer */ unsigned int last_client_replies_woff; + /** + * Current offset into 'last_request_times' ring buffer. + */ + unsigned int last_request_times_off; + }; @@ -401,6 +414,34 @@ struct ClientList }; +/** + * Information about a peer that we have forwarded this + * request to already. + */ +struct UsedTargetEntry +{ + /** + * What was the last time we have transmitted this request to this + * peer? + */ + struct GNUNET_TIME_Absolute last_request_time; + + /** + * How often have we transmitted this request to this peer? + */ + unsigned int num_requests; + + /** + * PID of the target peer. + */ + GNUNET_PEER_Id pid; + +}; + + + + + /** * Doubly-linked list of messages we are performing * due to a pending request. @@ -531,7 +572,7 @@ struct PendingRequest * (Interned) Peer identifiers of peers that have already * received our query for this content. */ - GNUNET_PEER_Id *used_pids; + struct UsedTargetEntry *used_targets; /** * Our entry in the queue (non-NULL while we wait for our @@ -550,14 +591,14 @@ struct PendingRequest uint32_t anonymity_level; /** - * How many entries in "used_pids" are actually valid? + * How many entries in "used_targets" are actually valid? */ - unsigned int used_pids_off; + unsigned int used_targets_off; /** - * How long is the "used_pids" array? + * How long is the "used_targets" array? */ - unsigned int used_pids_size; + unsigned int used_targets_size; /** * Number of results found for this request. @@ -1384,6 +1425,7 @@ static void destroy_pending_request (struct PendingRequest *pr) { struct GNUNET_PeerIdentity pid; + unsigned int i; if (pr->hnode != NULL) { @@ -1464,13 +1506,14 @@ destroy_pending_request (struct PendingRequest *pr) while (NULL != pr->pending_head) destroy_pending_message_list_entry (pr->pending_head); GNUNET_PEER_change_rc (pr->target_pid, -1); - if (pr->used_pids != NULL) + if (pr->used_targets != NULL) { - GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); - GNUNET_free (pr->used_pids); - pr->used_pids_off = 0; - pr->used_pids_size = 0; - pr->used_pids = NULL; + for (i=0;iused_targets_off;i++) + GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1); + GNUNET_free (pr->used_targets); + pr->used_targets_off = 0; + pr->used_targets_size = 0; + pr->used_targets = NULL; } GNUNET_free (pr); } @@ -2142,7 +2185,13 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp, pm); cp->pending_requests++; if (cp->pending_requests > MAX_QUEUE_PER_PEER) - destroy_pending_message (cp->pending_messages_tail, 0); + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# P2P searches discarded (queue length bound)"), + 1, + GNUNET_NO); + destroy_pending_message (cp->pending_messages_tail, 0); + } GNUNET_PEER_resolve (cp->pid, &pid); if (NULL != cp->cth) { @@ -2298,6 +2347,7 @@ transmit_query_continuation (void *cls, GNUNET_PEER_Id tpid) { struct PendingRequest *pr = cls; + unsigned int i; GNUNET_STATISTICS_update (stats, gettext_noop ("# queries scheduled for forwarding"), @@ -2316,16 +2366,32 @@ transmit_query_continuation (void *cls, pr); return; } +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmitted query `%s'\n", + GNUNET_h2s (&pr->query)); +#endif GNUNET_STATISTICS_update (stats, gettext_noop ("# queries forwarded"), 1, GNUNET_NO); - GNUNET_PEER_change_rc (tpid, 1); - if (pr->used_pids_off == pr->used_pids_size) - GNUNET_array_grow (pr->used_pids, - pr->used_pids_size, - pr->used_pids_size * 2 + 2); - pr->used_pids[pr->used_pids_off++] = tpid; + for (i=0;iused_targets_off;i++) + if (pr->used_targets[i].pid == tpid) + break; /* found match! */ + if (i == pr->used_targets_off) + { + /* need to create new entry */ + if (pr->used_targets_off == pr->used_targets_size) + GNUNET_array_grow (pr->used_targets, + pr->used_targets_size, + pr->used_targets_size * 2 + 2); + GNUNET_PEER_change_rc (tpid, 1); + pr->used_targets[pr->used_targets_off].pid = tpid; + pr->used_targets[pr->used_targets_off].num_requests = 0; + i = pr->used_targets_off++; + } + pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get (); + pr->used_targets[i].num_requests++; if (pr->task == GNUNET_SCHEDULER_NO_TASK) pr->task = GNUNET_SCHEDULER_add_delayed (sched, get_processing_delay (), @@ -2431,6 +2497,7 @@ target_reservation_cb (void *cls, unsigned int k; int no_route; uint32_t bm; + unsigned int i; pr->irc = NULL; if (peer == NULL) @@ -2443,7 +2510,7 @@ target_reservation_cb (void *cls, pr); return; } - // (3) transmit, update ttl/priority + /* (3) transmit, update ttl/priority */ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, &peer->hashPubKey); if (cp == NULL) @@ -2489,6 +2556,16 @@ target_reservation_cb (void *cls, gettext_noop ("# queries scheduled for forwarding"), 1, GNUNET_NO); + for (i=0;iused_targets_off;i++) + if (pr->used_targets[i].pid == cp->pid) + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# queries retransmitted to same target"), + 1, + GNUNET_NO); + break; + } + /* build message and insert message into priority queue */ #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2542,6 +2619,7 @@ target_reservation_cb (void *cls, pr->bf_size); pm->cont = &transmit_query_continuation; pm->cont_cls = pr; + cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get (); add_to_pending_messages_for_peer (cp, pm, pr); } @@ -2589,6 +2667,7 @@ target_peer_select_cb (void *cls, struct PeerSelectionContext *psc = cls; struct ConnectedPeer *cp = value; struct PendingRequest *pr = psc->pr; + struct GNUNET_TIME_Relative delay; double score; unsigned int i; unsigned int pc; @@ -2604,28 +2683,46 @@ target_peer_select_cb (void *cls, } /* 2) check if we have already (recently) forwarded to this peer */ + /* 2a) this particular request */ pc = 0; - for (i=0;iused_pids_off;i++) - if (pr->used_pids[i] == cp->pid) + for (i=0;iused_targets_off;i++) + if (pr->used_targets[i].pid == cp->pid) { - pc++; + pc = pr->used_targets[i].num_requests; + GNUNET_assert (pc > 0); if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - RETRY_PROBABILITY_INV)) + RETRY_PROBABILITY_INV * pc)) { #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "NOT re-trying query that was previously transmitted %u times\n", - (unsigned int) pr->used_pids_off); + (unsigned int) pc); #endif return GNUNET_YES; /* skip */ } + break; } #if DEBUG_FS if (0 < pc) - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Re-trying query that was previously transmitted %u times to this peer\n", - (unsigned int) pc); + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Re-trying query that was previously transmitted %u times to this peer\n", + (unsigned int) pc); + } #endif + /* 2b) many other requests to this peer */ + delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]); + if (delay.value <= cp->avg_delay.value) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "NOT sending query since we send %u others to this peer in the last %llums\n", + MAX_QUEUE_PER_PEER, + cp->avg_delay.value); +#endif + return GNUNET_YES; /* skip */ + } + /* 3) calculate how much we'd like to forward to this peer, starting with a random value that is strong enough to at least give any peer a chance sometimes @@ -3023,6 +3120,7 @@ process_reply (void *cls, struct GNUNET_TIME_Relative art_delay; #endif size_t msize; + unsigned int i; #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -3036,13 +3134,19 @@ struct GNUNET_TIME_Relative art_delay; GNUNET_NO); if (prq->sender != NULL) { - cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time); - prq->sender->avg_delay.value - = (prq->sender->avg_delay.value * - (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; - prq->sender->avg_priority - = (prq->sender->avg_priority * - (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N; + for (i=0;iused_targets_off;i++) + if (pr->used_targets[i].pid == prq->sender->pid) + break; + if (i < pr->used_targets_off) + { + cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time); + prq->sender->avg_delay.value + = (prq->sender->avg_delay.value * + (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; + prq->sender->avg_priority + = (prq->sender->avg_priority * + (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N; + } if (pr->cp != NULL) { GNUNET_PEER_change_rc (prq->sender->last_p2p_replies @@ -3812,6 +3916,11 @@ handle_p2p_get (void *cls, return GNUNET_SYSERR; } gm = (const struct GetMessage*) message; +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received request for `%s'\n", + GNUNET_h2s (&gm->query)); +#endif type = ntohl (gm->type); bm = ntohl (gm->hash_bitmap); bits = 0; @@ -3949,7 +4058,6 @@ handle_p2p_get (void *cls, BLOOMFILTER_K); pr->bf_size = bfsize; } - cdc.have = NULL; cdc.pr = pr; GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, @@ -4021,19 +4129,35 @@ handle_p2p_get (void *cls, timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, (pr->priority + 1)); if (GNUNET_YES != pr->forward_only) - pr->qe = GNUNET_DATASTORE_get (dsh, - &gm->query, - type, - pr->priority + 1, - MAX_DATASTORE_QUEUE, - timeout, - &process_local_reply, - pr); + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Handing request for `%s' to datastore\n", + GNUNET_h2s (&gm->query)); +#endif + pr->qe = GNUNET_DATASTORE_get (dsh, + &gm->query, + type, + pr->priority + 1, + MAX_DATASTORE_QUEUE, + timeout, + &process_local_reply, + pr); + if (NULL == pr->qe) + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests dropped by datastore (queue length limit)"), + 1, + GNUNET_NO); + } + } else - GNUNET_STATISTICS_update (stats, - gettext_noop ("# requests forwarded due to high load"), - 1, - GNUNET_NO); + { + GNUNET_STATISTICS_update (stats, + gettext_noop ("# requests forwarded due to high load"), + 1, + GNUNET_NO); + } /* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */ switch (pr->type) diff --git a/src/fs/perf_gnunet_service_fs_p2p.c b/src/fs/perf_gnunet_service_fs_p2p.c index 0b66150db..5d8b726dc 100644 --- a/src/fs/perf_gnunet_service_fs_p2p.c +++ b/src/fs/perf_gnunet_service_fs_p2p.c @@ -27,17 +27,17 @@ #include "fs_test_lib.h" #include "gnunet_testing_lib.h" -#define VERBOSE GNUNET_NO +#define VERBOSE GNUNET_YES /** * File-size we use for testing. */ -#define FILESIZE (1024 * 1024 * 10) +#define FILESIZE (1024 * 1024 * 1) /** * How long until we give up on transmitting the message? */ -#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 300) +#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 3) #define NUM_DAEMONS 2 @@ -89,6 +89,10 @@ static struct StatValues stats[] = { "fs", "# requests done for free (low load)"}, { "fs", "# P2P searches received"}, { "fs", "# replies received for local clients"}, + { "fs", "# P2P searches discarded (queue length bound)"}, + { "fs", "# requests dropped due to high load"}, + { "fs", "# requests dropped by datastore (queue length limit)"}, + { "fs", "# queries retransmitted to same target"}, { "fs", "cummulative artificial delay introduced (ms)"}, { "core", "# bytes decrypted"}, { "core", "# bytes encrypted"}, @@ -129,6 +133,7 @@ print_stat (void *cls, return GNUNET_OK; } + /** * Function that gathers stats from all daemons. */ @@ -136,6 +141,7 @@ static void stat_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + /** * Function called when GET operation on stats is done. */ @@ -149,6 +155,7 @@ get_done (void *cls, GNUNET_SCHEDULER_add_now (sched, &stat_run, sm); } + /** * Function that gathers stats from all daemons. */ @@ -217,7 +224,7 @@ do_report (void *cls, } else { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Timeout during download, shutting down with error\n"); ok = 1; GNUNET_SCHEDULER_add_now (sched, &do_stop, NULL); @@ -234,7 +241,7 @@ do_download (void *cls, GNUNET_FS_TEST_daemons_stop (sched, NUM_DAEMONS, daemons); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Timeout during upload attempt, shutting down with error\n"); ok = 1; return; @@ -261,7 +268,7 @@ do_publish (void *cls, GNUNET_FS_TEST_daemons_stop (sched, NUM_DAEMONS, daemons); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Error trying to connect: %s\n", emsg); ok = 1; -- cgit v1.2.3