summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2010-10-19 18:50:03 +0000
committerChristian Grothoff <christian@grothoff.org>2010-10-19 18:50:03 +0000
commit984627dce529221b7e814218af4c761172bee670 (patch)
tree064d2b2444fd238b19d90ca10baf93e9c77f565a
parent50ffac4098e1f092dad04942586547b3318e3daf (diff)
bugfixes, more todo
-rw-r--r--TODO5
-rw-r--r--src/fs/fs.h11
-rw-r--r--src/fs/fs_test_lib_data.conf7
-rw-r--r--src/fs/gnunet-service-fs.c228
-rw-r--r--src/fs/perf_gnunet_service_fs_p2p.c19
5 files changed, 208 insertions, 62 deletions
diff --git a/TODO b/TODO
index 5d50342a3..0618570fd 100644
--- a/TODO
+++ b/TODO
@@ -33,6 +33,11 @@
- also do UPnP-based (external) IP detection
(Note: build library always, build UPnP service when dependencies like libxml2 are available)
* FS: [CG]
+ - service:
+ + 2-peer perf test does NOT terminate for large (500 MB) files because
+ somehow blocks are not found (suspect: load-based no DB lookup + forward first, no clean up of routing table?)
+ + 2-peer perf test goes WAY over bandwidth limit (i.e. 300 kbps/set, 2 MB/s transfer rate); clearly core does
+ not properly enforce the limit
- library:
+ reconstruct IBLOCKS from DBLOCKS if possible (during download; see FIXME in fs_download)
+ add support for pushing "already seen" search results to FS service for bloomfilter
diff --git a/src/fs/fs.h b/src/fs/fs.h
index d48af35b4..d2c2d3c7f 100644
--- a/src/fs/fs.h
+++ b/src/fs/fs.h
@@ -34,10 +34,19 @@
#include "gnunet_block_lib.h"
#include "block_fs.h"
+
+/**
+ * Maximum number of outgoing messages we queue per peer.
+ */
+#define MAX_QUEUE_PER_PEER 16
+
/**
* Maximum size of the datastore queue for P2P operations.
+ * Needs to be large enough to queue MAX_QUEUE_PER_PEER
+ * operations for roughly the number of active (connected)
+ * peers.
*/
-#define MAX_DATASTORE_QUEUE 16
+#define MAX_DATASTORE_QUEUE (16 * MAX_QUEUE_PER_PEER)
/**
* Maximum number of blocks we keep in memory for migration.
diff --git a/src/fs/fs_test_lib_data.conf b/src/fs/fs_test_lib_data.conf
index c9dafa748..7228629ee 100644
--- a/src/fs/fs_test_lib_data.conf
+++ b/src/fs/fs_test_lib_data.conf
@@ -22,6 +22,7 @@ DEFAULTSERVICES = fs
[datastore]
#DEBUG = YES
#PREFIX = valgrind --tool=memcheck --leak-check=yes
+QUOTA = 2000000000
[statistics]
PORT = 43467
@@ -40,8 +41,8 @@ PORT = 43470
HOSTNAME = localhost
#TOTAL_QUOTA_IN = 9321
#TOTAL_QUOTA_OUT = 9321
-TOTAL_QUOTA_IN = 3932160
-TOTAL_QUOTA_OUT = 3932160
+TOTAL_QUOTA_IN = 393216
+TOTAL_QUOTA_OUT = 393216
#DEBUG = YES
#PREFIX = valgrind --tool=memcheck --leak-check=yes
#BINARY = /home/grothoff/bin/gnunet-service-core
@@ -51,7 +52,7 @@ PORT = 43471
HOSTNAME = localhost
#OPTIONS = -L DEBUG
ACTIVEMIGRATION = NO
-#DEBUG = YES
+DEBUG = YES
#PREFIX = valgrind --tool=memcheck --leak-check=yes
#BINARY = /home/grothoff/gn9/bin/gnunet-service-fs
#PREFIX = xterm -e gdb -x cmd --args
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 23ddf60c6..95b000778 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -51,11 +51,6 @@
#define SUPPORT_DELAYS GNUNET_NO
/**
- * Maximum number of outgoing messages we queue per peer.
- */
-#define MAX_QUEUE_PER_PEER 16
-
-/**
* Size for the hash map for DHT requests from the FS
* service. Should be about the number of concurrent
* DHT requests we plan to make.
@@ -191,6 +186,10 @@ struct ConnectedPeer
* getting a reply (only calculated over the requests for
* which we actually got a reply). Calculated
* as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
+ *
+ * FIXME: actually, this is currently the delay between us originally
+ * receiving (not forwarding!) a request and us receiving a reply from
+ * this peer (regardless of when we transmitted this request to this peer!)
*/
struct GNUNET_TIME_Relative avg_delay;
@@ -207,6 +206,15 @@ struct ConnectedPeer
struct GNUNET_TIME_Absolute last_migration_block;
/**
+ * Transmission times for the last MAX_QUEUE_PER_PEER
+ * requests for this peer. Used as a ring buffer, current
+ * offset is stored in 'last_request_times_off'. If the
+ * oldest entry is more recent than the 'avg_delay', we should
+ * not send any more requests right now.
+ */
+ struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
+
+ /**
* Handle for an active request for transmission to this
* peer, or NULL.
*/
@@ -285,6 +293,11 @@ struct ConnectedPeer
*/
unsigned int last_client_replies_woff;
+ /**
+ * Current offset into 'last_request_times' ring buffer.
+ */
+ unsigned int last_request_times_off;
+
};
@@ -402,6 +415,34 @@ struct ClientList
/**
+ * Information about a peer that we have forwarded this
+ * request to already.
+ */
+struct UsedTargetEntry
+{
+ /**
+ * What was the last time we have transmitted this request to this
+ * peer?
+ */
+ struct GNUNET_TIME_Absolute last_request_time;
+
+ /**
+ * How often have we transmitted this request to this peer?
+ */
+ unsigned int num_requests;
+
+ /**
+ * PID of the target peer.
+ */
+ GNUNET_PEER_Id pid;
+
+};
+
+
+
+
+
+/**
* Doubly-linked list of messages we are performing
* due to a pending request.
*/
@@ -531,7 +572,7 @@ struct PendingRequest
* (Interned) Peer identifiers of peers that have already
* received our query for this content.
*/
- GNUNET_PEER_Id *used_pids;
+ struct UsedTargetEntry *used_targets;
/**
* Our entry in the queue (non-NULL while we wait for our
@@ -550,14 +591,14 @@ struct PendingRequest
uint32_t anonymity_level;
/**
- * How many entries in "used_pids" are actually valid?
+ * How many entries in "used_targets" are actually valid?
*/
- unsigned int used_pids_off;
+ unsigned int used_targets_off;
/**
- * How long is the "used_pids" array?
+ * How long is the "used_targets" array?
*/
- unsigned int used_pids_size;
+ unsigned int used_targets_size;
/**
* Number of results found for this request.
@@ -1384,6 +1425,7 @@ static void
destroy_pending_request (struct PendingRequest *pr)
{
struct GNUNET_PeerIdentity pid;
+ unsigned int i;
if (pr->hnode != NULL)
{
@@ -1464,13 +1506,14 @@ destroy_pending_request (struct PendingRequest *pr)
while (NULL != pr->pending_head)
destroy_pending_message_list_entry (pr->pending_head);
GNUNET_PEER_change_rc (pr->target_pid, -1);
- if (pr->used_pids != NULL)
+ if (pr->used_targets != NULL)
{
- GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
- GNUNET_free (pr->used_pids);
- pr->used_pids_off = 0;
- pr->used_pids_size = 0;
- pr->used_pids = NULL;
+ for (i=0;i<pr->used_targets_off;i++)
+ GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
+ GNUNET_free (pr->used_targets);
+ pr->used_targets_off = 0;
+ pr->used_targets_size = 0;
+ pr->used_targets = NULL;
}
GNUNET_free (pr);
}
@@ -2142,7 +2185,13 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
pm);
cp->pending_requests++;
if (cp->pending_requests > MAX_QUEUE_PER_PEER)
- destroy_pending_message (cp->pending_messages_tail, 0);
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# P2P searches discarded (queue length bound)"),
+ 1,
+ GNUNET_NO);
+ destroy_pending_message (cp->pending_messages_tail, 0);
+ }
GNUNET_PEER_resolve (cp->pid, &pid);
if (NULL != cp->cth)
{
@@ -2298,6 +2347,7 @@ transmit_query_continuation (void *cls,
GNUNET_PEER_Id tpid)
{
struct PendingRequest *pr = cls;
+ unsigned int i;
GNUNET_STATISTICS_update (stats,
gettext_noop ("# queries scheduled for forwarding"),
@@ -2316,16 +2366,32 @@ transmit_query_continuation (void *cls,
pr);
return;
}
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitted query `%s'\n",
+ GNUNET_h2s (&pr->query));
+#endif
GNUNET_STATISTICS_update (stats,
gettext_noop ("# queries forwarded"),
1,
GNUNET_NO);
- GNUNET_PEER_change_rc (tpid, 1);
- if (pr->used_pids_off == pr->used_pids_size)
- GNUNET_array_grow (pr->used_pids,
- pr->used_pids_size,
- pr->used_pids_size * 2 + 2);
- pr->used_pids[pr->used_pids_off++] = tpid;
+ for (i=0;i<pr->used_targets_off;i++)
+ if (pr->used_targets[i].pid == tpid)
+ break; /* found match! */
+ if (i == pr->used_targets_off)
+ {
+ /* need to create new entry */
+ if (pr->used_targets_off == pr->used_targets_size)
+ GNUNET_array_grow (pr->used_targets,
+ pr->used_targets_size,
+ pr->used_targets_size * 2 + 2);
+ GNUNET_PEER_change_rc (tpid, 1);
+ pr->used_targets[pr->used_targets_off].pid = tpid;
+ pr->used_targets[pr->used_targets_off].num_requests = 0;
+ i = pr->used_targets_off++;
+ }
+ pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
+ pr->used_targets[i].num_requests++;
if (pr->task == GNUNET_SCHEDULER_NO_TASK)
pr->task = GNUNET_SCHEDULER_add_delayed (sched,
get_processing_delay (),
@@ -2431,6 +2497,7 @@ target_reservation_cb (void *cls,
unsigned int k;
int no_route;
uint32_t bm;
+ unsigned int i;
pr->irc = NULL;
if (peer == NULL)
@@ -2443,7 +2510,7 @@ target_reservation_cb (void *cls,
pr);
return;
}
- // (3) transmit, update ttl/priority
+ /* (3) transmit, update ttl/priority */
cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
&peer->hashPubKey);
if (cp == NULL)
@@ -2489,6 +2556,16 @@ target_reservation_cb (void *cls,
gettext_noop ("# queries scheduled for forwarding"),
1,
GNUNET_NO);
+ for (i=0;i<pr->used_targets_off;i++)
+ if (pr->used_targets[i].pid == cp->pid)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# queries retransmitted to same target"),
+ 1,
+ GNUNET_NO);
+ break;
+ }
+
/* build message and insert message into priority queue */
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2542,6 +2619,7 @@ target_reservation_cb (void *cls,
pr->bf_size);
pm->cont = &transmit_query_continuation;
pm->cont_cls = pr;
+ cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
add_to_pending_messages_for_peer (cp, pm, pr);
}
@@ -2589,6 +2667,7 @@ target_peer_select_cb (void *cls,
struct PeerSelectionContext *psc = cls;
struct ConnectedPeer *cp = value;
struct PendingRequest *pr = psc->pr;
+ struct GNUNET_TIME_Relative delay;
double score;
unsigned int i;
unsigned int pc;
@@ -2604,28 +2683,46 @@ target_peer_select_cb (void *cls,
}
/* 2) check if we have already (recently) forwarded to this peer */
+ /* 2a) this particular request */
pc = 0;
- for (i=0;i<pr->used_pids_off;i++)
- if (pr->used_pids[i] == cp->pid)
+ for (i=0;i<pr->used_targets_off;i++)
+ if (pr->used_targets[i].pid == cp->pid)
{
- pc++;
+ pc = pr->used_targets[i].num_requests;
+ GNUNET_assert (pc > 0);
if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
- RETRY_PROBABILITY_INV))
+ RETRY_PROBABILITY_INV * pc))
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"NOT re-trying query that was previously transmitted %u times\n",
- (unsigned int) pr->used_pids_off);
+ (unsigned int) pc);
#endif
return GNUNET_YES; /* skip */
}
+ break;
}
#if DEBUG_FS
if (0 < pc)
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Re-trying query that was previously transmitted %u times to this peer\n",
- (unsigned int) pc);
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Re-trying query that was previously transmitted %u times to this peer\n",
+ (unsigned int) pc);
+ }
#endif
+ /* 2b) many other requests to this peer */
+ delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
+ if (delay.value <= cp->avg_delay.value)
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "NOT sending query since we send %u others to this peer in the last %llums\n",
+ MAX_QUEUE_PER_PEER,
+ cp->avg_delay.value);
+#endif
+ return GNUNET_YES; /* skip */
+ }
+
/* 3) calculate how much we'd like to forward to this peer,
starting with a random value that is strong enough
to at least give any peer a chance sometimes
@@ -3023,6 +3120,7 @@ process_reply (void *cls,
struct GNUNET_TIME_Relative art_delay;
#endif
size_t msize;
+ unsigned int i;
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -3036,13 +3134,19 @@ struct GNUNET_TIME_Relative art_delay;
GNUNET_NO);
if (prq->sender != NULL)
{
- cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
- prq->sender->avg_delay.value
- = (prq->sender->avg_delay.value *
- (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N;
- prq->sender->avg_priority
- = (prq->sender->avg_priority *
- (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+ for (i=0;i<pr->used_targets_off;i++)
+ if (pr->used_targets[i].pid == prq->sender->pid)
+ break;
+ if (i < pr->used_targets_off)
+ {
+ cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
+ prq->sender->avg_delay.value
+ = (prq->sender->avg_delay.value *
+ (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N;
+ prq->sender->avg_priority
+ = (prq->sender->avg_priority *
+ (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+ }
if (pr->cp != NULL)
{
GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
@@ -3812,6 +3916,11 @@ handle_p2p_get (void *cls,
return GNUNET_SYSERR;
}
gm = (const struct GetMessage*) message;
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received request for `%s'\n",
+ GNUNET_h2s (&gm->query));
+#endif
type = ntohl (gm->type);
bm = ntohl (gm->hash_bitmap);
bits = 0;
@@ -3949,7 +4058,6 @@ handle_p2p_get (void *cls,
BLOOMFILTER_K);
pr->bf_size = bfsize;
}
-
cdc.have = NULL;
cdc.pr = pr;
GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
@@ -4021,19 +4129,35 @@ handle_p2p_get (void *cls,
timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
(pr->priority + 1));
if (GNUNET_YES != pr->forward_only)
- pr->qe = GNUNET_DATASTORE_get (dsh,
- &gm->query,
- type,
- pr->priority + 1,
- MAX_DATASTORE_QUEUE,
- timeout,
- &process_local_reply,
- pr);
+ {
+#if DEBUG_FS
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Handing request for `%s' to datastore\n",
+ GNUNET_h2s (&gm->query));
+#endif
+ pr->qe = GNUNET_DATASTORE_get (dsh,
+ &gm->query,
+ type,
+ pr->priority + 1,
+ MAX_DATASTORE_QUEUE,
+ timeout,
+ &process_local_reply,
+ pr);
+ if (NULL == pr->qe)
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped by datastore (queue length limit)"),
+ 1,
+ GNUNET_NO);
+ }
+ }
else
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# requests forwarded due to high load"),
- 1,
- GNUNET_NO);
+ {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests forwarded due to high load"),
+ 1,
+ GNUNET_NO);
+ }
/* Are multiple results possible (and did we look locally)? If so, start processing remotely now! */
switch (pr->type)
diff --git a/src/fs/perf_gnunet_service_fs_p2p.c b/src/fs/perf_gnunet_service_fs_p2p.c
index 0b66150db..5d8b726dc 100644
--- a/src/fs/perf_gnunet_service_fs_p2p.c
+++ b/src/fs/perf_gnunet_service_fs_p2p.c
@@ -27,17 +27,17 @@
#include "fs_test_lib.h"
#include "gnunet_testing_lib.h"
-#define VERBOSE GNUNET_NO
+#define VERBOSE GNUNET_YES
/**
* File-size we use for testing.
*/
-#define FILESIZE (1024 * 1024 * 10)
+#define FILESIZE (1024 * 1024 * 1)
/**
* How long until we give up on transmitting the message?
*/
-#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 300)
+#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 3)
#define NUM_DAEMONS 2
@@ -89,6 +89,10 @@ static struct StatValues stats[] =
{ "fs", "# requests done for free (low load)"},
{ "fs", "# P2P searches received"},
{ "fs", "# replies received for local clients"},
+ { "fs", "# P2P searches discarded (queue length bound)"},
+ { "fs", "# requests dropped due to high load"},
+ { "fs", "# requests dropped by datastore (queue length limit)"},
+ { "fs", "# queries retransmitted to same target"},
{ "fs", "cummulative artificial delay introduced (ms)"},
{ "core", "# bytes decrypted"},
{ "core", "# bytes encrypted"},
@@ -129,6 +133,7 @@ print_stat (void *cls,
return GNUNET_OK;
}
+
/**
* Function that gathers stats from all daemons.
*/
@@ -136,6 +141,7 @@ static void
stat_run (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc);
+
/**
* Function called when GET operation on stats is done.
*/
@@ -149,6 +155,7 @@ get_done (void *cls,
GNUNET_SCHEDULER_add_now (sched, &stat_run, sm);
}
+
/**
* Function that gathers stats from all daemons.
*/
@@ -217,7 +224,7 @@ do_report (void *cls,
}
else
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Timeout during download, shutting down with error\n");
ok = 1;
GNUNET_SCHEDULER_add_now (sched, &do_stop, NULL);
@@ -234,7 +241,7 @@ do_download (void *cls,
GNUNET_FS_TEST_daemons_stop (sched,
NUM_DAEMONS,
daemons);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Timeout during upload attempt, shutting down with error\n");
ok = 1;
return;
@@ -261,7 +268,7 @@ do_publish (void *cls,
GNUNET_FS_TEST_daemons_stop (sched,
NUM_DAEMONS,
daemons);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Error trying to connect: %s\n",
emsg);
ok = 1;