summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/fs/fs.h5
-rw-r--r--src/fs/gnunet-service-fs.c230
2 files changed, 234 insertions, 1 deletions
diff --git a/src/fs/fs.h b/src/fs/fs.h
index 9901cd173..3277ea340 100644
--- a/src/fs/fs.h
+++ b/src/fs/fs.h
@@ -37,6 +37,11 @@
#define MAX_DATASTORE_QUEUE 16
/**
+ * Maximum number of blocks we keep in memory for migration.
+ */
+#define MAX_MIGRATION_QUEUE 32
+
+/**
* Size of the individual blocks used for file-sharing.
*/
#define DBLOCK_SIZE (32*1024)
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 8bec62b08..a08a041da 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -560,6 +560,44 @@ struct PendingRequest
/**
+ * Block that is ready for migration to other peers. Actual data is at the end of the block.
+ */
+struct MigrationReadyBlock
+{
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct MigrationReadyBlock *next;
+
+ /**
+ * This is a doubly-linked list.
+ */
+ struct MigrationReadyBlock *prev;
+
+ /**
+ * Query for the block.
+ */
+ GNUNET_HashCode query;
+
+ /**
+ * When does this block expire?
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
+ /**
+ * Size of the block.
+ */
+ size_t size;
+
+ /**
+ * Type of the block.
+ */
+ enum GNUNET_BLOCK_Type type;
+};
+
+
+/**
* Our scheduler.
*/
static struct GNUNET_SCHEDULER_Handle *sched;
@@ -611,6 +649,37 @@ static struct ClientList *client_list;
static struct GNUNET_CORE_Handle *core;
/**
+ * Head of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_head;
+
+/**
+ * Tail of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_tail;
+
+/**
+ * Request to datastore for migration (or NULL).
+ */
+static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
+
+/**
+ * ID of task that collects blocks for migration.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier mig_task;
+
+/**
+ * What is the maximum frequency at which we are allowed to
+ * poll the datastore for migration content?
+ */
+static struct GNUNET_TIME_Relative min_migration_delay;
+
+/**
+ * Size of the doubly-linked list of migration blocks.
+ */
+static unsigned int mig_size;
+
+/**
* Are we allowed to migrate content to this peer.
*/
static int active_migration;
@@ -619,6 +688,141 @@ static int active_migration;
/**
+ * Delete the given migration block.
+ *
+ * @param mb block to delete
+ */
+static void
+delete_migration_block (struct MigrationReadyBlock *mb)
+{
+ GNUNET_CONTAINER_DLL_remove (mig_head,
+ mig_tail,
+ mb);
+ mig_size--;
+ GNUNET_free (mb);
+}
+
+
+/**
+ * Consider migrating content to a given peer.
+ *
+ * @param cls not used
+ * @param key ID of the peer (not used)
+ * @param value 'struct ConnectedPeer' of the peer
+ * @return GNUNET_YES (always continue iteration)2
+ */
+static int
+consider_migration (void *cls,
+ const GNUNET_HashCode *key,
+ void *value)
+{
+ struct ConnectedPeer *cp = value;
+
+ if (cp->cth != NULL)
+ return GNUNET_YES; /* or what? */
+ /* FIXME: not implemented! */
+ return GNUNET_YES;
+}
+
+
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ *
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Process content offered for migration.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ */
+static void
+process_migration_content (void *cls,
+ const GNUNET_HashCode * key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration, uint64_t uid)
+{
+ struct MigrationReadyBlock *mb;
+ struct GNUNET_TIME_Relative delay;
+
+ if (key == NULL)
+ {
+ mig_qe = NULL;
+ if (mig_size < MAX_MIGRATION_QUEUE)
+ {
+ delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ mig_size);
+ delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
+ MAX_MIGRATION_QUEUE);
+ delay = GNUNET_TIME_relative_max (delay,
+ min_migration_delay);
+ mig_task = GNUNET_SCHEDULER_add_delayed (sched,
+ delay,
+ &gather_migration_blocks,
+ NULL);
+ }
+ return;
+ }
+ mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
+ mb->query = *key;
+ mb->expiration = expiration;
+ mb->size = size;
+ mb->type = type;
+ memcpy (&mb[1], data, size);
+ GNUNET_CONTAINER_DLL_insert_after (mig_head,
+ mig_tail,
+ mig_tail,
+ mb);
+ mig_size++;
+ if (mig_size == 1)
+ GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+ &consider_migration,
+ NULL);
+ GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ *
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ mig_task = GNUNET_SCHEDULER_NO_TASK;
+ mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_migration_content, NULL);
+}
+
+
+/**
* We're done with a particular message list entry.
* Free all associated resources.
*
@@ -782,9 +986,12 @@ peer_connect_handler (void *cls,
&peer->hashPubKey,
cp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ if (mig_size > 0)
+ (void) consider_migration (NULL, &peer->hashPubKey, cp);
}
+
/**
* Free (each) request made by the peer.
*
@@ -974,6 +1181,16 @@ static void
shutdown_task (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ if (mig_qe != NULL)
+ {
+ GNUNET_DATASTORE_cancel (mig_qe);
+ mig_qe = NULL;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != mig_task)
+ {
+ GNUNET_SCHEDULER_cancel (sched, mig_task);
+ mig_task = GNUNET_SCHEDULER_NO_TASK;
+ }
while (client_list != NULL)
handle_client_disconnect (NULL,
client_list->client);
@@ -1001,6 +1218,9 @@ shutdown_task (void *cls,
}
GNUNET_DATASTORE_disconnect (dsh,
GNUNET_NO);
+ while (mig_head != NULL)
+ delete_migration_block (mig_head);
+ GNUNET_assert (0 == mig_size);
dsh = NULL;
sched = NULL;
cfg = NULL;
@@ -1065,7 +1285,7 @@ transmit_to_peer (void *cls,
}
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u bytes to peer %u.\n",
+ "Transmitting %u bytes to peer %u\n",
msize,
cp->pid);
#endif
@@ -2930,6 +3150,7 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
sched = s;
cfg = c;
stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
+ min_migration_delay = GNUNET_TIME_UNIT_SECONDS; // FIXME: get from config
connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
@@ -2964,6 +3185,13 @@ main_init (struct GNUNET_SCHEDULER_Handle *s,
}
return GNUNET_SYSERR;
}
+ /* FIXME: distinguish between sending and storing in options? */
+ if (active_migration)
+ {
+ mig_task = GNUNET_SCHEDULER_add_now (sched,
+ &gather_migration_blocks,
+ NULL);
+ }
GNUNET_SERVER_disconnect_notify (server,
&handle_client_disconnect,
NULL);