From 46ef10befdb1ab8e1fd1d7f5304648a578e7f833 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 1 Mar 2011 15:09:28 +0000 Subject: fs hacking --- contrib/defaults.conf | 2 +- src/fs/fs.h | 6 +- src/fs/gnunet-service-fs_cp.c | 77 ++++-- src/fs/gnunet-service-fs_cp.h | 5 + src/fs/gnunet-service-fs_push.c | 530 ++++++++++++++++++++++++++-------------- src/fs/gnunet-service-fs_push.h | 68 ++++++ src/fs/gnunet-service-fs_put.c | 41 +++- src/fs/gnunet-service-fs_put.h | 48 ++++ 8 files changed, 565 insertions(+), 212 deletions(-) create mode 100644 src/fs/gnunet-service-fs_push.h create mode 100644 src/fs/gnunet-service-fs_put.h diff --git a/contrib/defaults.conf b/contrib/defaults.conf index 3848d4af6..f0ee4e3e0 100644 --- a/contrib/defaults.conf +++ b/contrib/defaults.conf @@ -311,7 +311,7 @@ MAX_PENDING_REQUESTS = 65536 # Maximum frequency we're allowed to poll the datastore # for content for migration (can be used to reduce # GNUnet's disk-IO rate) -MIN_MIGRATION_DELAY = 1000 +MIN_MIGRATION_DELAY = 100 EXPECTED_NEIGHBOUR_COUNT = 128 [dht] diff --git a/src/fs/fs.h b/src/fs/fs.h index a5b15dcef..ff769cfca 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h @@ -65,10 +65,10 @@ #define MAX_MIGRATION_QUEUE 32 /** - * How many peers do we select as possible - * targets per block obtained for migration? + * Blocks are at most migrated to this number of peers + * plus one, each time they are fetched from the database. */ -#define MIGRATION_LIST_SIZE 4 +#define MIGRATION_LIST_SIZE 2 /** * To how many peers do we forward each migration block ultimately? diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index 3ce03be2e..be041c861 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c @@ -26,6 +26,7 @@ #include "platform.h" #include "gnunet-service-fs.h" #include "gnunet-service-fs_cp.h" +#include "gnunet-service-fs_push.h" /** * How often do we flush trust values to disk? @@ -115,6 +116,11 @@ struct GSF_ConnectedPeer */ struct GNUNET_TIME_Absolute last_migration_block; + /** + * Task scheduled to revive migration to this peer. + */ + struct GNUNET_SCHEDULER_TaskIdentifier mig_revive_task; + /** * Messages (replies, queries, content migration) we would like to * send to this peer in the near future. Sorted by priority, head. @@ -143,11 +149,6 @@ struct GSF_ConnectedPeer */ struct GNUNET_CONTAINER_MulitHashMap *request_map; - /** - * ID of delay task for scheduling transmission. - */ - GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: used in 'push' (ugh!) - /** * Increase in traffic preference still to be submitted * to the core service for this peer. @@ -159,11 +160,6 @@ struct GSF_ConnectedPeer */ uint32_t disk_trust; - /** - * The peer's identity. - */ - GNUNET_PEER_Id pid; - /** * Which offset in "last_p2p_replies" will be updated next? * (we go round-robin). @@ -388,7 +384,7 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer)); cp->transmission_delay = GNUNET_LOAD_value_init (latency); - cp->pid = GNUNET_PEER_intern (peer); + cp->ppd.pid = GNUNET_PEER_intern (peer); cp->transmission_delay = GNUNET_LOAD_value_init (0); cp->irc = GNUNET_CORE_peer_change_preference (core, peer, @@ -411,10 +407,40 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); update_atsi (cp, atsi); GSF_plan_notify_new_peer_ (cp); + GSF_push_start_ (cp); return cp; } +/** + * It may be time to re-start migrating content to this + * peer. Check, and if so, restart migration. + * + * @param cls the 'struct GSF_ConnectedPeer' + * @param tc scheduler context + */ +static void +revive_migration (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GSF_ConnectedPeer *cp = cls; + struct GNUNET_TIME_Relative bt; + + cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK; + bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until); + if (0 != bt.rel_value) + { + /* still time left... */ + cp->mig_revive_task + = GNUNET_SCHEDULER_add_delayed (bt, + &revive_migration, + cp); + return; + } + GSF_push_start_ (cp); +} + + /** * Handle P2P "MIGRATION_STOP" message. * @@ -434,6 +460,7 @@ GSF_handle_p2p_migration_stop_ (void *cls, { struct GSF_ConnectedPeer *cp; const struct MigrationStopMessage *msm; + struct GNUNET_TIME_Relative bt; msm = (const struct MigrationStopMessage*) message; cp = GNUNET_CONTAINER_multihashmap_get (cp_map, @@ -443,7 +470,16 @@ GSF_handle_p2p_migration_stop_ (void *cls, GNUNET_break (0); return GNUNET_OK; } - cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration)); + bt = GNUNET_TIME_relative_ntoh (msm->duration); + cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt); + if (cp->mig_revive_task == GNUNET_SCHEDULER_NO_TASK) + { + GSF_push_stop_ (cp); + cp->mig_revive_task + = GNUNET_SCHEDULER_add_delayed (bt, + &revive_migration, + cp); + } update_atsi (cp, atsi); return GNUNET_OK; } @@ -880,7 +916,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer, cp->pth_tail, prev, pth); - GNUNET_PEER_resolve (cp->pid, + GNUNET_PEER_resolve (cp->ppd.pid, &target); if (GNUNET_YES == is_query) { @@ -1022,8 +1058,8 @@ GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp, const struct GSF_ConnectedPeer *initiator_peer) { GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1); - cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->pid; - GNUNET_PEER_change_rc (initiator_peer->pid, 1); + cp->ppd.last_p2p_replies[cp->last_p2p_replies_woff++ % P2P_SUCCESS_LIST_SIZE] = initiator_peer->ppd.pid; + GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1); } @@ -1125,7 +1161,12 @@ GSF_peer_disconnect_handler_ (void *cls, pth); GNUNET_free (pth); } - GNUNET_PEER_change_rc (cp->pid, -1); + GNUNET_PEER_change_rc (cp->ppd.pid, -1); + if (GNUNET_SCHEDULER_NO_TASK != cp->mig_revive_task) + { + GNUNET_SCHEDULER_cancel (cp->mig_revive_task); + cp->mig_revive_task = GNUNET_SCHEDULER_NO_TASK; + } GNUNET_free (cp); } @@ -1201,7 +1242,7 @@ void GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp, struct GNUNET_PeerIdentity *id) { - GNUNET_PEER_resolve (cp->pid, + GNUNET_PEER_resolve (cp->ppd.pid, &id); } @@ -1281,7 +1322,7 @@ flush_trust (void *cls, if (cp->trust == cp->disk_trust) return GNUNET_OK; /* unchanged */ - GNUNET_PEER_resolve (cp->pid, + GNUNET_PEER_resolve (cp->ppd.pid, &pid); fn = get_trust_filename (&pid); if (cp->trust == 0) diff --git a/src/fs/gnunet-service-fs_cp.h b/src/fs/gnunet-service-fs_cp.h index 081a1d5ba..48a019712 100644 --- a/src/fs/gnunet-service-fs_cp.h +++ b/src/fs/gnunet-service-fs_cp.h @@ -89,6 +89,11 @@ struct GSF_PeerPerformanceData */ double avg_priority; + /** + * The peer's identity. + */ + GNUNET_PEER_Id pid; + /** * Trust rating for this peer */ diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c index 9f515e2ee..2180a520d 100644 --- a/src/fs/gnunet-service-fs_push.c +++ b/src/fs/gnunet-service-fs_push.c @@ -27,9 +27,6 @@ #include "platform.h" #include "gnunet-service-fs_push.h" - -/* FIXME: below are only old code fragments to use... */ - /** * Block that is ready for migration to other peers. Actual data is at the end of the block. */ @@ -57,7 +54,7 @@ struct MigrationReadyBlock struct GNUNET_TIME_Absolute expiration; /** - * Peers we would consider forwarding this + * Peers we already forwarded this * block to. Zero for empty entries. */ GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE]; @@ -79,6 +76,40 @@ struct MigrationReadyBlock }; +/** + * Information about a peer waiting for + * migratable data. + */ +struct MigrationReadyPeer +{ + /** + * This is a doubly-linked list. + */ + struct MigrationReadyPeer *next; + + /** + * This is a doubly-linked list. + */ + struct MigrationReadyPeer *prev; + + /** + * Handle to peer. + */ + struct GSF_ConnectedPeer *peer; + + /** + * Handle for current transmission request, + * or NULL for none. + */ + struct GSF_PeerTransmitHandle *th; + + /** + * Message we are trying to push right now (or NULL) + */ + struct PutMessage *msg; +}; + + /** * Head of linked list of blocks that can be migrated. */ @@ -89,6 +120,16 @@ static struct MigrationReadyBlock *mig_head; */ static struct MigrationReadyBlock *mig_tail; +/** + * Head of linked list of peers. + */ +static struct MigrationReadyPeer *peer_head; + +/** + * Tail of linked list of peers. + */ +static struct MigrationReadyPeer *peer_tail; + /** * Request to datastore for migration (or NULL). */ @@ -106,14 +147,14 @@ static GNUNET_SCHEDULER_TaskIdentifier mig_task; static struct GNUNET_TIME_Relative min_migration_delay; /** - * Are we allowed to push out content from this peer. + * Size of the doubly-linked list of migration blocks. */ -static int active_from_migration; +static unsigned int mig_size; /** - * Size of the doubly-linked list of migration blocks. + * Is this module enabled? */ -static unsigned int mig_size; +static int enabled; /** @@ -135,134 +176,212 @@ delete_migration_block (struct MigrationReadyBlock *mb) /** - * Compare the distance of two peers to a key. + * Find content for migration to this peer. + */ +static void +find_content (struct MigrationReadyPeer *mrp); + + +/** + * Transmit the message currently scheduled for + * transmission. * - * @param key key - * @param p1 first peer - * @param p2 second peer - * @return GNUNET_YES if P1 is closer to key than P2 + * @param cls the 'struct MigrationReadyPeer' + * @param buf_size number of bytes available in buf + * @param buf where to copy the message, NULL on error (peer disconnect) + * @return number of bytes copied to 'buf', can be 0 (without indicating an error) */ -static int -is_closer (const GNUNET_HashCode *key, - const struct GNUNET_PeerIdentity *p1, - const struct GNUNET_PeerIdentity *p2) +static size_t +transmit_message (void *cls, + size_t buf_size, + void *buf) { - return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey, - &p2->hashPubKey, - key); + struct MigrationReadyPeer *peer = cls; + struct PutMessage *msg; + uint16_t msize; + + peer->th = NULL; + msg = peer->msg; + peer->msg = NULL; + if (buf == NULL) + { + GNUNET_free (msg); + return 0; + } + msize = ntohs (msg->header.size); + GNUNET_assert (msize <= buf_size); + memcpy (buf, msg, msize); + GNUNET_free (msg); + find_content (peer); + return msize; } /** - * Consider migrating content to a given peer. + * Send the given block to the given peer. * - * @param cls 'struct MigrationReadyBlock*' to select - * targets for (or NULL for none) - * @param key ID of the peer - * @param value 'struct ConnectedPeer' of the peer - * @return GNUNET_YES (always continue iteration) + * @param peer target peer + * @param block the block + * @return GNUNET_YES if the block was deleted (!) */ static int -consider_migration (void *cls, - const GNUNET_HashCode *key, - void *value) +transmit_content (struct MigrationReadyPeer *peer, + struct MigrationReadyBlock *block) { - struct MigrationReadyBlock *mb = cls; - struct ConnectedPeer *cp = value; - struct MigrationReadyBlock *pos; - struct GNUNET_PeerIdentity cppid; - struct GNUNET_PeerIdentity otherpid; - struct GNUNET_PeerIdentity worstpid; size_t msize; - unsigned int i; - unsigned int repl; + struct PutMessage *msg; + unsigned int i; + struct GSF_PeerPerformanceData *ppd; + int ret; + + ppd = GSF_get_peer_performance_data (peer->peer); + GNUNET_assert (NULL == peer->th); + msize = sizeof (struct PutMessage) + block->size; + msg = GNUNET_malloc (msize); + msg->header.type = htons (42); + msg->header.size = htons (msize); - /* consider 'cp' as a migration target for mb */ - if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0) - return GNUNET_YES; /* peer has requested no migration! */ - if (mb != NULL) + memcpy (&msg[1], + &block[1], + block->size); + peer->msg = msg; + for (i=0;ipid, - &cppid); - repl = MIGRATION_LIST_SIZE; - for (i=0;itarget_list[i] == 0) { - if (mb->target_list[i] == 0) - { - mb->target_list[i] = cp->pid; - GNUNET_PEER_change_rc (mb->target_list[i], 1); - repl = MIGRATION_LIST_SIZE; - break; - } - GNUNET_PEER_resolve (mb->target_list[i], - &otherpid); - if ( (repl == MIGRATION_LIST_SIZE) && - is_closer (&mb->query, - &cppid, - &otherpid)) - { - repl = i; - worstpid = otherpid; - } - else if ( (repl != MIGRATION_LIST_SIZE) && - (is_closer (&mb->query, - &worstpid, - &otherpid) ) ) - { - repl = i; - worstpid = otherpid; - } - } - if (repl != MIGRATION_LIST_SIZE) - { - GNUNET_PEER_change_rc (mb->target_list[repl], -1); - mb->target_list[repl] = cp->pid; - GNUNET_PEER_change_rc (mb->target_list[repl], 1); + block->target_list[i] = ppd->pid; + GNUNET_PEER_change_rc (block->target_list[i], 1); + break; } } + if (MIGRATION_LIST_SIZE == i) + { + delete_migration_block (block); + ret = GNUNET_YES; + } + else + { + ret = GNUNET_NO; + } + peer->th = GSF_peer_transmit_ (peer->peer, + GNUNET_NO, + 0 /* priority */, + GNUNET_TIME_UNIT_FOREVER_REL, + msize, + &transmit_message, + peer); + return ret; +} - /* consider scheduling transmission to cp for content migration */ - if (cp->cth != NULL) - return GNUNET_YES; - msize = 0; - pos = mig_head; - while (pos != NULL) + +/** + * Count the number of peers this block has + * already been forwarded to. + * + * @param block the block + * @return number of times block was forwarded + */ +static unsigned int +count_targets (struct MigrationReadyBlock *block) +{ + unsigned int i; + + for (i=0;itarget_list[i] == 0) + return i; + return i; +} + + +/** + * Check if sending this block to this peer would + * be a good idea. + * + * @param peer target peer + * @param block the block + * @return score (>= 0: feasible, negative: infeasible) + */ +static long +score_content (struct MigrationReadyPeer *peer, + struct MigrationReadyBlock *block) +{ + unsigned int i; + struct GSF_PeerPerformanceData *ppd; + struct GNUNET_PeerIdentity id; + uint32_t dist; + + ppd = GSF_get_peer_performance_data (peer->peer); + for (i=0;itarget_list[i] == ppd->pid) + return -1; + GSF_connected_peer_get_identity (peer->peer, + &id); + dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, + &id.hashPubKey); + /* closer distance, higher score: */ + return UINT32_MAX - dist; +} + + +/** + * If the migration task is not currently running, consider + * (re)scheduling it with the appropriate delay. + */ +static void +consider_gathering (void); + + +/** + * Find content for migration to this peer. + * + * @param mig peer to find content for + */ +static void +find_content (struct MigrationReadyPeer *mrp) +{ + struct MigrationReadyBlock *pos; + long score; + long best_score; + struct MigrationReadyBlock *best; + + GNUNET_assert (NULL == mrp->th); + best = NULL; + best_score = -1; + pos = mig_qe; + while (NULL != pos) { - for (i=0;i best_score) { - if (cp->pid == pos->target_list[i]) - { - if (msize == 0) - msize = pos->size; - else - msize = GNUNET_MIN (msize, - pos->size); - break; - } + best_score = score; + best = mrp; } pos = pos->next; } - if (msize == 0) - return GNUNET_YES; /* no content available */ -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Trying to migrate at least %u bytes to peer `%s'\n", - msize, - GNUNET_h2s (key)); -#endif - if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK) + if ( (NULL == best) && + (mig_size >= MAX_MIGRATION_QUEUE) ) { - GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task); - cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK; + /* failed to find migration target AND + queue is full, purge most-forwarded + block from queue to make room for more */ + score = 0; + pos = mig_qe; + while (NULL != pos) + { + score = count_targets (pos); + if (score >= best_score) + { + best_score = score; + best = mrp; + } + pos = pos->next; + } + GNUNET_assert (NULL != best); + delete_migration_block (best); + consider_gathering (); + return; } - cp->cth - = GNUNET_CORE_notify_transmit_ready (core, - 0, GNUNET_TIME_UNIT_FOREVER_REL, - (const struct GNUNET_PeerIdentity*) key, - msize + sizeof (struct PutMessage), - &transmit_to_peer, - cp); - return GNUNET_YES; + transmit_content (peer, best); } @@ -278,23 +397,23 @@ gather_migration_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); - - /** * If the migration task is not currently running, consider * (re)scheduling it with the appropriate delay. */ static void -consider_migration_gathering () +consider_gathering () { struct GNUNET_TIME_Relative delay; - if (dsh == NULL) + if (GSF_dsh == NULL) return; if (mig_qe != NULL) return; if (mig_task != GNUNET_SCHEDULER_NO_TASK) return; + if (mig_size >= MAX_MIGRATION_QUEUE) + return; delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); delay = GNUNET_TIME_relative_divide (delay, @@ -307,8 +426,6 @@ consider_migration_gathering () } - - /** * Process content offered for migration. * @@ -335,19 +452,19 @@ process_migration_content (void *cls, expiration, uint64_t uid) { struct MigrationReadyBlock *mb; + struct MigrationReadyPeer *pos; if (key == NULL) { mig_qe = NULL; - if (mig_size < MAX_MIGRATION_QUEUE) - consider_migration_gathering (); + consider_gathering (); return; } if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < MIN_MIGRATION_CONTENT_LIFETIME.rel_value) { /* content will expire soon, don't bother */ - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES); return; } if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) @@ -359,7 +476,7 @@ process_migration_content (void *cls, &process_migration_content, NULL)) { - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES); } return; } @@ -380,14 +497,20 @@ process_migration_content (void *cls, mig_tail, mb); mig_size++; - GNUNET_CONTAINER_multihashmap_iterate (connected_peers, - &consider_migration, - mb); - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + pos = peer_head; + while (pos != NULL) + { + if (NULL == pos->th) + { + if (GNUNET_YES == transmit_content (pos, mb)) + break; /* 'mb' was freed! */ + } + pos = pos->next; + } + GNUNET_DATASTORE_get_next (GSF_dsh, GNUNET_YES); } - /** * Task that is run periodically to obtain blocks for content * migration @@ -400,9 +523,10 @@ gather_migration_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { mig_task = GNUNET_SCHEDULER_NO_TASK; - if (dsh != NULL) + if (GSF_dsh != NULL) { - mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX, + mig_qe = GNUNET_DATASTORE_get_random (GSF_dsh, + 0, UINT_MAX, GNUNET_TIME_UNIT_FOREVER_REL, &process_migration_content, NULL); GNUNET_assert (mig_qe != NULL); @@ -410,66 +534,108 @@ gather_migration_blocks (void *cls, } +/** + * A peer connected to us. Start pushing content + * to this peer. + * + * @param peer handle for the peer that connected + */ +void +GSF_push_start_ (struct GSF_ConnectedPeer *peer) +{ + struct MigrationReadyPeer *mrp; -size_t -API_ (void *cls, - size_t size, void *buf) + if (GNUNET_YES != enabled) + return; + mrp = GNUNET_malloc (sizeof (struct MigrationReadyPeer)); + mrp->peer = peer; + find_content (mrp); + GNUNET_CONTAINER_DLL_insert (peer_head, + peer_tail, + mrp); +} + + +/** + * A peer disconnected from us. Stop pushing content + * to this peer. + * + * @param peer handle for the peer that disconnected + */ +void +GSF_push_stop_ (struct GSF_ConnectedPeer *peer) { - next = mig_head; - while (NULL != (mb = next)) + struct MigrationReadyPeer *pos; + + pos = peer_head; + while (pos != NULL) + { + if (pos->peer == peer) { - next = mb->next; - for (i=0;ipid == mb->target_list[i]) && - (mb->size + sizeof (migm) <= size) ) - { - GNUNET_PEER_change_rc (mb->target_list[i], -1); - mb->target_list[i] = 0; - mb->used_targets++; - memset (&migm, 0, sizeof (migm)); - migm.header.size = htons (sizeof (migm) + mb->size); - migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); - migm.type = htonl (mb->type); - migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration); - memcpy (&cbuf[msize], &migm, sizeof (migm)); - msize += sizeof (migm); - size -= sizeof (migm); - memcpy (&cbuf[msize], &mb[1], mb->size); - msize += mb->size; - size -= mb->size; -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Pushing migration block `%s' (%u bytes) to `%s'\n", - GNUNET_h2s (&mb->query), - (unsigned int) mb->size, - GNUNET_i2s (&pid)); -#endif - break; - } - else - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n", - GNUNET_h2s (&mb->query), - (unsigned int) mb->size, - GNUNET_i2s (&pid)); -#endif - } - } - if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) || - (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) ) - { - delete_migration_block (mb); - consider_migration_gathering (); - } + GNUNET_CONTAINER_DLL_remove (peer_head, + peer_tail, + pos); + if (NULL != pos->th) + GSF_peer_transmit_cancel_ (pos->th); + GNUNET_free (pos); + return; } - consider_migration (NULL, - &pid.hashPubKey, - cp); + pos = pos->next; + } +} + +/** + * Setup the module. + * + * @param cfg configuration to use + */ +void +GSF_push_init_ (struct GNUNET_CONFIGURATION_Handle *cfg) +{ + int enabled; + + enabled = GNUNET_CONFIGURATION_get_value_yesno (cfg, + "FS", + "CONTENT_PUSHING"); + if (GNUNET_YES != enabled) + return; + + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_time (cfg, + "fs", + "MIN_MIGRATION_DELAY", + &min_migration_delay)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Invalid value specified for option `%s' in section `%s', content pushing disabled\n"), + "MIN_MIGRATION_DELAY", + "fs"); + return; + } + consider_gathering (); } +/** + * Shutdown the module. + */ +void +GSF_push_done_ () +{ + if (GNUNET_SCHEDULER_NO_TASK != mig_task) + { + GNUNET_SCHEDULER_cancel (mig_task); + mig_task = GNUNET_SCHEDULER_NO_TASK; + } + if (NULL != mig_qe) + { + GNUNET_DATASTORE_cancel (mig_qe); + mig_qe = NULL; + } + while (NULL != mig_head) + delete_migration_block (mig_head); + GNUNET_assert (0 == mig_size); +} +/* end of gnunet-service-fs_push.c */ diff --git a/src/fs/gnunet-service-fs_push.h b/src/fs/gnunet-service-fs_push.h new file mode 100644 index 000000000..5f7a0030c --- /dev/null +++ b/src/fs/gnunet-service-fs_push.h @@ -0,0 +1,68 @@ +/* + This file is part of GNUnet. + (C) 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_push.h + * @brief support for pushing out content + * @author Christian Grothoff + */ +#ifndef GNUNET_SERVICE_FS_PUSH_H +#define GNUNET_SERVICE_FS_PUSH_H + +#include "gnunet-service-fs.h" + + +/** + * Setup the module. + * + * @param cfg configuration to use + */ +void +GSF_push_init_ (struct GNUNET_CONFIGURATION_Handle *cfg); + + +/** + * Shutdown the module. + */ +void +GSF_push_done_ (void); + + +/** + * A peer connected to us or we are now again allowed to push content. + * Start pushing content to this peer. + * + * @param peer handle for the peer that connected + */ +void +GSF_push_start_ (struct GSF_ConnectedPeer *peer); + + +/** + * A peer disconnected from us or asked us to stop pushing content for + * a while. Stop pushing content to this peer. + * + * @param peer handle for the peer that disconnected + */ +void +GSF_push_stop_ (struct GSF_ConnectedPeer *peer); + + +#endif diff --git a/src/fs/gnunet-service-fs_put.c b/src/fs/gnunet-service-fs_put.c index eb7289f1e..862a795ba 100644 --- a/src/fs/gnunet-service-fs_put.c +++ b/src/fs/gnunet-service-fs_put.c @@ -26,15 +26,12 @@ #include "platform.h" #include "gnunet-service-fs_put.h" -/* FIXME: below are only old code fragments to use... */ - /** * Request to datastore for DHT PUTs (or NULL). */ static struct GNUNET_DATASTORE_QueueEntry *dht_qe; - /** * Type we will request for the next DHT PUT round from the datastore. */ @@ -52,9 +49,6 @@ static GNUNET_SCHEDULER_TaskIdentifier dht_task; static unsigned int zero_anonymity_count_estimate; - - - /** * Task that is run periodically to obtain blocks for DHT PUTs. * @@ -101,7 +95,6 @@ consider_dht_put_gathering (void *cls) } - /** * Store content in DHT. * @@ -172,7 +165,6 @@ process_dht_put_content (void *cls, } - /** * Task that is run periodically to obtain blocks for DHT PUTs. * @@ -195,3 +187,36 @@ gather_dht_put_blocks (void *cls, GNUNET_assert (dht_qe != NULL); } } + + +/** + * Setup the module. + * + * @param cfg configuration to use + */ +void +GSF_put_init_ (struct GNUNET_CONFIGURATION_Handle *cfg) +{ + dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL); +} + + +/** + * Shutdown the module. + */ +void +GSF_put_done_ () +{ + if (GNUNET_SCHEDULER_NO_TASK != dht_task) + { + GNUNET_SCHEDULER_cancel (dht_task); + dht_task = GNUNET_SCHEDULER_NO_TASK; + } + if (NULL != dht_qe) + { + GNUNET_DATASTORE_cancel (dht_qe); + dht_qe = NULL; + } +} + +/* end of gnunet-service-fs_put.c */ diff --git a/src/fs/gnunet-service-fs_put.h b/src/fs/gnunet-service-fs_put.h new file mode 100644 index 000000000..ebec818b2 --- /dev/null +++ b/src/fs/gnunet-service-fs_put.h @@ -0,0 +1,48 @@ +/* + This file is part of GNUnet. + (C) 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_put.h + * @brief support for putting content into the DHT + * @author Christian Grothoff + */ +#ifndef GNUNET_SERVICE_FS_PUT_H +#define GNUNET_SERVICE_FS_PUT_H + +#include "gnunet-service-fs.h" + + +/** + * Setup the module. + * + * @param cfg configuration to use + */ +void +GSF_put_init_ (struct GNUNET_CONFIGURATION_Handle *cfg); + + +/** + * Shutdown the module. + */ +void +GSF_put_done_ (void); + + +#endif -- cgit v1.2.3