From 9c32f6a7fab6f0fbf26d9ded338f6733068a45e0 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 23 Sep 2010 05:16:24 +0000 Subject: train hacking --- TODO | 1 - src/fs/gnunet-service-fs.c | 147 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 120 insertions(+), 28 deletions(-) diff --git a/TODO b/TODO index 04947882b..b4e4d2614 100644 --- a/TODO +++ b/TODO @@ -1,7 +1,6 @@ 0.9.0pre2: FS: - measure latencies (core, datastore) => trust economy - - refuse content migration message (or solicit?) - FS performance benchmarking - integrate with DHT (need DHT API to fit block API better first; also, get rid of the continuation!) * DHT: [Nate] diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 9e53d2fd5..2bfdeb674 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -28,8 +28,6 @@ * - consider more precise latency estimation (per-peer & request) -- again load API? * - implement test_load_too_high, make decision priority-based, implement forwarding, etc. * - introduce random latency in processing - * - tell other peers to stop migration if our PUTs fail (or if - * we don't support migration per configuration?) * - more statistics */ #include "platform.h" @@ -189,6 +187,12 @@ struct ConnectedPeer */ struct GNUNET_TIME_Absolute migration_blocked; + /** + * Time until when we blocked this peer from migrating + * data to us. + */ + struct GNUNET_TIME_Absolute last_migration_block; + /** * Handle for an active request for transmission to this * peer, or NULL. @@ -752,9 +756,14 @@ static int active_migration; static double current_priorities; /** - * Datastore load tracking. + * Datastore 'GET' load tracking. */ -static struct GNUNET_LOAD_Value *datastore_load; +static struct GNUNET_LOAD_Value *datastore_get_load; + +/** + * Datastore 'PUT' load tracking. + */ +static struct GNUNET_LOAD_Value *datastore_put_load; /** @@ -769,7 +778,7 @@ update_datastore_delays (struct GNUNET_TIME_Absolute start) struct GNUNET_TIME_Relative delay; delay = GNUNET_TIME_absolute_get_duration (start); - GNUNET_LOAD_update (datastore_load, + GNUNET_LOAD_update (datastore_get_load, delay.value); } @@ -1126,12 +1135,20 @@ destroy_pending_message (struct PendingMessage *pm, TransmissionContinuation cont; void *cont_cls; - GNUNET_assert (pml->pm == pm); - GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) ); - cont = pm->cont; - cont_cls = pm->cont_cls; - destroy_pending_message_list_entry (pml); - cont (cont_cls, tpid); + if (pml != NULL) + { + GNUNET_assert (pml->pm == pm); + GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) ); + cont = pm->cont; + cont_cls = pm->cont_cls; + destroy_pending_message_list_entry (pml); + } + else + { + GNUNET_free (pm); + } + if (cont != NULL) + cont (cont_cls, tpid); } @@ -1636,8 +1653,10 @@ shutdown_task (void *cls, GNUNET_assert (0 == mig_size); GNUNET_DHT_disconnect (dht_handle); dht_handle = NULL; - GNUNET_LOAD_value_free (datastore_load); - datastore_load = NULL; + GNUNET_LOAD_value_free (datastore_get_load); + datastore_get_load = NULL; + GNUNET_LOAD_value_free (datastore_put_load); + datastore_put_load = NULL; GNUNET_BLOCK_context_destroy (block_ctx); block_ctx = NULL; GNUNET_CONFIGURATION_destroy (block_cfg); @@ -1793,14 +1812,17 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp, GNUNET_assert (pm->next == NULL); GNUNET_assert (pm->pml == NULL); - pml = GNUNET_malloc (sizeof (struct PendingMessageList)); - pml->req = pr; - pml->target = cp; - pml->pm = pm; - pm->pml = pml; - GNUNET_CONTAINER_DLL_insert (pr->pending_head, - pr->pending_tail, - pml); + if (pr != NULL) + { + pml = GNUNET_malloc (sizeof (struct PendingMessageList)); + pml->req = pr; + pml->target = cp; + pml->pm = pm; + pm->pml = pml; + GNUNET_CONTAINER_DLL_insert (pr->pending_head, + pr->pending_tail, + pml); + } pos = cp->pending_messages_head; while ( (pos != NULL) && (pm->priority < pos->priority) ) @@ -2560,6 +2582,11 @@ struct ProcessReplyClosure * Did we finish processing the associated request? */ int finished; + + /** + * Did we find a matching request? + */ + int request_found; }; @@ -2715,6 +2742,7 @@ process_reply (void *cls, prq->priority += pr->remaining_priority; pr->remaining_priority = 0; pr->results_found++; + prq->request_found = GNUNET_YES; if (NULL != pr->client_request_list) { GNUNET_STATISTICS_update (stats, @@ -2800,7 +2828,19 @@ put_migration_continuation (void *cls, int success, const char *msg) { - /* FIXME */ + struct GNUNET_TIME_Absolute *start = cls; + struct GNUNET_TIME_Relative delay; + + delay = GNUNET_TIME_absolute_get_duration (*start); + GNUNET_free (start); + GNUNET_LOAD_update (datastore_put_load, + delay.value); + if (GNUNET_OK == success) + return; + GNUNET_STATISTICS_update (stats, + gettext_noop ("# datastore 'put' failures"), + 1, + GNUNET_NO); } @@ -2830,6 +2870,12 @@ handle_p2p_put (void *cls, struct GNUNET_TIME_Absolute expiration; GNUNET_HashCode query; struct ProcessReplyClosure prq; + struct GNUNET_TIME_Absolute *start; + struct GNUNET_TIME_Relative block_time; + double putl; + struct ConnectedPeer *cp; + struct PendingMessage *pm; + struct MigrationStopMessage *msm; msize = ntohs (message->size); if (msize < sizeof (struct PutMessage)) @@ -2876,6 +2922,7 @@ handle_p2p_put (void *cls, prq.expiration = expiration; prq.priority = 0; prq.finished = GNUNET_NO; + prq.request_found = GNUNET_NO; GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, &query, &process_reply, @@ -2893,6 +2940,8 @@ handle_p2p_put (void *cls, GNUNET_h2s (&query), prq.priority); #endif + start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); + *start = GNUNET_TIME_absolute_get (); GNUNET_DATASTORE_put (dsh, 0, &query, dsize, &put[1], type, prq.priority, 1 /* anonymity */, @@ -2900,7 +2949,36 @@ handle_p2p_put (void *cls, 1 + prq.priority, MAX_DATASTORE_QUEUE, GNUNET_CONSTANTS_SERVICE_TIMEOUT, &put_migration_continuation, - NULL); + start); + } + putl = GNUNET_LOAD_get_load (datastore_put_load); + if ( (GNUNET_NO == prq.request_found) && + ( (GNUNET_YES != active_migration) || + (putl > 2.0) ) ) + { + cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &other->hashPubKey); + if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000) + return GNUNET_OK; /* already blocked */ + /* We're too busy; send MigrationStop message! */ + if (GNUNET_YES != active_migration) + putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5); + block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + (unsigned int) (60000 * putl * putl))); + + cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time); + pm = GNUNET_malloc (sizeof (struct PendingMessage) + + sizeof (struct MigrationStopMessage)); + pm->msize = sizeof (struct MigrationStopMessage); + pm->priority = UINT32_MAX; + msm = (struct MigrationStopMessage*) &pm[1]; + msm->header.size = htons (sizeof (struct MigrationStopMessage)); + msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); + msm->duration = GNUNET_TIME_relative_hton (block_time); + add_to_pending_messages_for_peer (cp, + pm, + NULL); } return GNUNET_OK; } @@ -2925,7 +3003,18 @@ handle_p2p_migration_stop (void *cls, struct GNUNET_TIME_Relative latency, uint32_t distance) { - // FIXME! + struct ConnectedPeer *cp; + const struct MigrationStopMessage *msm; + + msm = (const struct MigrationStopMessage*) message; + cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, + &other->hashPubKey); + if (cp == NULL) + { + GNUNET_break (0); + return GNUNET_OK; + } + cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration)); return GNUNET_OK; } @@ -3110,6 +3199,7 @@ process_local_reply (void *cls, prq.type = type; prq.priority = priority; prq.finished = GNUNET_NO; + prq.request_found = GNUNET_NO; process_reply (&prq, key, pr); if ( (old_rf == 0) && (pr->results_found == 1) ) @@ -3798,7 +3888,8 @@ run (void *cls, GNUNET_SCHEDULER_shutdown (sched); return; } - datastore_load = GNUNET_LOAD_value_init (); + datastore_get_load = GNUNET_LOAD_value_init (); + datastore_put_load = GNUNET_LOAD_value_init (); block_cfg = GNUNET_CONFIGURATION_create (); GNUNET_CONFIGURATION_set_value_string (block_cfg, "block", @@ -3821,8 +3912,10 @@ run (void *cls, block_ctx = NULL; GNUNET_CONFIGURATION_destroy (block_cfg); block_cfg = NULL; - GNUNET_LOAD_value_free (datastore_load); - datastore_load = NULL; + GNUNET_LOAD_value_free (datastore_get_load); + datastore_get_load = NULL; + GNUNET_LOAD_value_free (datastore_put_load); + datastore_put_load = NULL; return; } } -- cgit v1.2.3