summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2010-09-23 05:16:24 +0000
committerChristian Grothoff <christian@grothoff.org>2010-09-23 05:16:24 +0000
commit9c32f6a7fab6f0fbf26d9ded338f6733068a45e0 (patch)
tree1ae9f805553e842ff0a1a7257d9a2741bc7e03db
parentb5aad3d89699df1d0b81f87df307796ebabd9bcc (diff)
train hacking
-rw-r--r--TODO1
-rw-r--r--src/fs/gnunet-service-fs.c147
2 files changed, 120 insertions, 28 deletions
diff --git a/TODO b/TODO
index 04947882b..b4e4d2614 100644
--- a/TODO
+++ b/TODO
@@ -1,7 +1,6 @@
0.9.0pre2:
FS:
- measure latencies (core, datastore) => trust economy
- - refuse content migration message (or solicit?)
- FS performance benchmarking
- integrate with DHT (need DHT API to fit block API better first; also, get rid of the continuation!)
* DHT: [Nate]
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 9e53d2fd5..2bfdeb674 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -28,8 +28,6 @@
* - consider more precise latency estimation (per-peer & request) -- again load API?
* - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
* - introduce random latency in processing
- * - tell other peers to stop migration if our PUTs fail (or if
- * we don't support migration per configuration?)
* - more statistics
*/
#include "platform.h"
@@ -190,6 +188,12 @@ struct ConnectedPeer
struct GNUNET_TIME_Absolute migration_blocked;
/**
+ * Time until when we blocked this peer from migrating
+ * data to us.
+ */
+ struct GNUNET_TIME_Absolute last_migration_block;
+
+ /**
* Handle for an active request for transmission to this
* peer, or NULL.
*/
@@ -752,9 +756,14 @@ static int active_migration;
static double current_priorities;
/**
- * Datastore load tracking.
+ * Datastore 'GET' load tracking.
*/
-static struct GNUNET_LOAD_Value *datastore_load;
+static struct GNUNET_LOAD_Value *datastore_get_load;
+
+/**
+ * Datastore 'PUT' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_put_load;
/**
@@ -769,7 +778,7 @@ update_datastore_delays (struct GNUNET_TIME_Absolute start)
struct GNUNET_TIME_Relative delay;
delay = GNUNET_TIME_absolute_get_duration (start);
- GNUNET_LOAD_update (datastore_load,
+ GNUNET_LOAD_update (datastore_get_load,
delay.value);
}
@@ -1126,12 +1135,20 @@ destroy_pending_message (struct PendingMessage *pm,
TransmissionContinuation cont;
void *cont_cls;
- GNUNET_assert (pml->pm == pm);
- GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
- cont = pm->cont;
- cont_cls = pm->cont_cls;
- destroy_pending_message_list_entry (pml);
- cont (cont_cls, tpid);
+ if (pml != NULL)
+ {
+ GNUNET_assert (pml->pm == pm);
+ GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
+ cont = pm->cont;
+ cont_cls = pm->cont_cls;
+ destroy_pending_message_list_entry (pml);
+ }
+ else
+ {
+ GNUNET_free (pm);
+ }
+ if (cont != NULL)
+ cont (cont_cls, tpid);
}
@@ -1636,8 +1653,10 @@ shutdown_task (void *cls,
GNUNET_assert (0 == mig_size);
GNUNET_DHT_disconnect (dht_handle);
dht_handle = NULL;
- GNUNET_LOAD_value_free (datastore_load);
- datastore_load = NULL;
+ GNUNET_LOAD_value_free (datastore_get_load);
+ datastore_get_load = NULL;
+ GNUNET_LOAD_value_free (datastore_put_load);
+ datastore_put_load = NULL;
GNUNET_BLOCK_context_destroy (block_ctx);
block_ctx = NULL;
GNUNET_CONFIGURATION_destroy (block_cfg);
@@ -1793,14 +1812,17 @@ add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
GNUNET_assert (pm->next == NULL);
GNUNET_assert (pm->pml == NULL);
- pml = GNUNET_malloc (sizeof (struct PendingMessageList));
- pml->req = pr;
- pml->target = cp;
- pml->pm = pm;
- pm->pml = pml;
- GNUNET_CONTAINER_DLL_insert (pr->pending_head,
- pr->pending_tail,
- pml);
+ if (pr != NULL)
+ {
+ pml = GNUNET_malloc (sizeof (struct PendingMessageList));
+ pml->req = pr;
+ pml->target = cp;
+ pml->pm = pm;
+ pm->pml = pml;
+ GNUNET_CONTAINER_DLL_insert (pr->pending_head,
+ pr->pending_tail,
+ pml);
+ }
pos = cp->pending_messages_head;
while ( (pos != NULL) &&
(pm->priority < pos->priority) )
@@ -2560,6 +2582,11 @@ struct ProcessReplyClosure
* Did we finish processing the associated request?
*/
int finished;
+
+ /**
+ * Did we find a matching request?
+ */
+ int request_found;
};
@@ -2715,6 +2742,7 @@ process_reply (void *cls,
prq->priority += pr->remaining_priority;
pr->remaining_priority = 0;
pr->results_found++;
+ prq->request_found = GNUNET_YES;
if (NULL != pr->client_request_list)
{
GNUNET_STATISTICS_update (stats,
@@ -2800,7 +2828,19 @@ put_migration_continuation (void *cls,
int success,
const char *msg)
{
- /* FIXME */
+ struct GNUNET_TIME_Absolute *start = cls;
+ struct GNUNET_TIME_Relative delay;
+
+ delay = GNUNET_TIME_absolute_get_duration (*start);
+ GNUNET_free (start);
+ GNUNET_LOAD_update (datastore_put_load,
+ delay.value);
+ if (GNUNET_OK == success)
+ return;
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# datastore 'put' failures"),
+ 1,
+ GNUNET_NO);
}
@@ -2830,6 +2870,12 @@ handle_p2p_put (void *cls,
struct GNUNET_TIME_Absolute expiration;
GNUNET_HashCode query;
struct ProcessReplyClosure prq;
+ struct GNUNET_TIME_Absolute *start;
+ struct GNUNET_TIME_Relative block_time;
+ double putl;
+ struct ConnectedPeer *cp;
+ struct PendingMessage *pm;
+ struct MigrationStopMessage *msm;
msize = ntohs (message->size);
if (msize < sizeof (struct PutMessage))
@@ -2876,6 +2922,7 @@ handle_p2p_put (void *cls,
prq.expiration = expiration;
prq.priority = 0;
prq.finished = GNUNET_NO;
+ prq.request_found = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
&query,
&process_reply,
@@ -2893,6 +2940,8 @@ handle_p2p_put (void *cls,
GNUNET_h2s (&query),
prq.priority);
#endif
+ start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
+ *start = GNUNET_TIME_absolute_get ();
GNUNET_DATASTORE_put (dsh,
0, &query, dsize, &put[1],
type, prq.priority, 1 /* anonymity */,
@@ -2900,7 +2949,36 @@ handle_p2p_put (void *cls,
1 + prq.priority, MAX_DATASTORE_QUEUE,
GNUNET_CONSTANTS_SERVICE_TIMEOUT,
&put_migration_continuation,
- NULL);
+ start);
+ }
+ putl = GNUNET_LOAD_get_load (datastore_put_load);
+ if ( (GNUNET_NO == prq.request_found) &&
+ ( (GNUNET_YES != active_migration) ||
+ (putl > 2.0) ) )
+ {
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
+ return GNUNET_OK; /* already blocked */
+ /* We're too busy; send MigrationStop message! */
+ if (GNUNET_YES != active_migration)
+ putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
+ block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ (unsigned int) (60000 * putl * putl)));
+
+ cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) +
+ sizeof (struct MigrationStopMessage));
+ pm->msize = sizeof (struct MigrationStopMessage);
+ pm->priority = UINT32_MAX;
+ msm = (struct MigrationStopMessage*) &pm[1];
+ msm->header.size = htons (sizeof (struct MigrationStopMessage));
+ msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+ msm->duration = GNUNET_TIME_relative_hton (block_time);
+ add_to_pending_messages_for_peer (cp,
+ pm,
+ NULL);
}
return GNUNET_OK;
}
@@ -2925,7 +3003,18 @@ handle_p2p_migration_stop (void *cls,
struct GNUNET_TIME_Relative latency,
uint32_t distance)
{
- // FIXME!
+ struct ConnectedPeer *cp;
+ const struct MigrationStopMessage *msm;
+
+ msm = (const struct MigrationStopMessage*) message;
+ cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+ &other->hashPubKey);
+ if (cp == NULL)
+ {
+ GNUNET_break (0);
+ return GNUNET_OK;
+ }
+ cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
return GNUNET_OK;
}
@@ -3110,6 +3199,7 @@ process_local_reply (void *cls,
prq.type = type;
prq.priority = priority;
prq.finished = GNUNET_NO;
+ prq.request_found = GNUNET_NO;
process_reply (&prq, key, pr);
if ( (old_rf == 0) &&
(pr->results_found == 1) )
@@ -3798,7 +3888,8 @@ run (void *cls,
GNUNET_SCHEDULER_shutdown (sched);
return;
}
- datastore_load = GNUNET_LOAD_value_init ();
+ datastore_get_load = GNUNET_LOAD_value_init ();
+ datastore_put_load = GNUNET_LOAD_value_init ();
block_cfg = GNUNET_CONFIGURATION_create ();
GNUNET_CONFIGURATION_set_value_string (block_cfg,
"block",
@@ -3821,8 +3912,10 @@ run (void *cls,
block_ctx = NULL;
GNUNET_CONFIGURATION_destroy (block_cfg);
block_cfg = NULL;
- GNUNET_LOAD_value_free (datastore_load);
- datastore_load = NULL;
+ GNUNET_LOAD_value_free (datastore_get_load);
+ datastore_get_load = NULL;
+ GNUNET_LOAD_value_free (datastore_put_load);
+ datastore_put_load = NULL;
return;
}
}