/* This file is part of GNUnet. Copyright (C) 2011 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ /** * @file fs/gnunet-service-fs_push.c * @brief API to push content from our datastore to other peers * ('anonymous'-content P2P migration) * @author Christian Grothoff */ #include "platform.h" #include "gnunet-service-fs.h" #include "gnunet-service-fs_cp.h" #include "gnunet-service-fs_indexing.h" #include "gnunet-service-fs_push.h" /** * Maximum number of blocks we keep in memory for migration. */ #define MAX_MIGRATION_QUEUE 8 /** * Blocks are at most migrated to this number of peers * plus one, each time they are fetched from the database. */ #define MIGRATION_LIST_SIZE 2 /** * How long must content remain valid for us to consider it for migration? * If content will expire too soon, there is clearly no point in pushing * it to other peers. This value gives the threshold for migration. Note * that if this value is increased, the migration testcase may need to be * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c). */ #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 30) /** * Block that is ready for migration to other peers. Actual data is at the end of the block. */ struct MigrationReadyBlock { /** * This is a doubly-linked list. */ struct MigrationReadyBlock *next; /** * This is a doubly-linked list. */ struct MigrationReadyBlock *prev; /** * Query for the block. */ struct GNUNET_HashCode query; /** * When does this block expire? */ struct GNUNET_TIME_Absolute expiration; /** * Peers we already forwarded this * block to. Zero for empty entries. */ GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE]; /** * Size of the block. */ size_t size; /** * Number of targets already used. */ unsigned int used_targets; /** * Type of the block. */ enum GNUNET_BLOCK_Type type; }; /** * Information about a peer waiting for * migratable data. */ struct MigrationReadyPeer { /** * This is a doubly-linked list. */ struct MigrationReadyPeer *next; /** * This is a doubly-linked list. */ struct MigrationReadyPeer *prev; /** * Handle to peer. */ struct GSF_ConnectedPeer *peer; /** * Handle for current transmission request, * or NULL for none. */ struct GSF_PeerTransmitHandle *th; /** * Message we are trying to push right now (or NULL) */ struct PutMessage *msg; }; /** * Head of linked list of blocks that can be migrated. */ static struct MigrationReadyBlock *mig_head; /** * Tail of linked list of blocks that can be migrated. */ static struct MigrationReadyBlock *mig_tail; /** * Head of linked list of peers. */ static struct MigrationReadyPeer *peer_head; /** * Tail of linked list of peers. */ static struct MigrationReadyPeer *peer_tail; /** * Request to datastore for migration (or NULL). */ static struct GNUNET_DATASTORE_QueueEntry *mig_qe; /** * ID of task that collects blocks for migration. */ static struct GNUNET_SCHEDULER_Task * mig_task; /** * What is the maximum frequency at which we are allowed to * poll the datastore for migration content? */ static struct GNUNET_TIME_Relative min_migration_delay; /** * Size of the doubly-linked list of migration blocks. */ static unsigned int mig_size; /** * Is this module enabled? */ static int enabled; /** * Did we find anything in the datastore? */ static int value_found; /** * Delete the given migration block. * * @param mb block to delete */ static void delete_migration_block (struct MigrationReadyBlock *mb) { GNUNET_CONTAINER_DLL_remove (mig_head, mig_tail, mb); GNUNET_PEER_decrement_rcs (mb->target_list, MIGRATION_LIST_SIZE); mig_size--; GNUNET_free (mb); } /** * Find content for migration to this peer. */ static void find_content (struct MigrationReadyPeer *mrp); /** * Transmit the message currently scheduled for transmission. * * @param cls the `struct MigrationReadyPeer` * @param buf_size number of bytes available in @a buf * @param buf where to copy the message, NULL on error (peer disconnect) * @return number of bytes copied to @a buf, can be 0 (without indicating an error) */ static size_t transmit_message (void *cls, size_t buf_size, void *buf) { struct MigrationReadyPeer *peer = cls; struct PutMessage *msg; uint16_t msize; peer->th = NULL; msg = peer->msg; peer->msg = NULL; if (NULL == buf) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed to migrate content to another peer (disconnect)\n"); GNUNET_free (msg); return 0; } msize = ntohs (msg->header.size); GNUNET_assert (msize <= buf_size); memcpy (buf, msg, msize); GNUNET_free (msg); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pushing %u bytes to %s\n", msize, GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); find_content (peer); return msize; } /** * Send the given block to the given peer. * * @param peer target peer * @param block the block * @return #GNUNET_YES if the block was deleted (!) */ static int transmit_content (struct MigrationReadyPeer *peer, struct MigrationReadyBlock *block) { size_t msize; struct PutMessage *msg; unsigned int i; struct GSF_PeerPerformanceData *ppd; int ret; ppd = GSF_get_peer_performance_data_ (peer->peer); GNUNET_assert (NULL == peer->th); msize = sizeof (struct PutMessage) + block->size; msg = GNUNET_malloc (msize); msg->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); msg->header.size = htons (msize); msg->type = htonl (block->type); msg->expiration = GNUNET_TIME_absolute_hton (block->expiration); memcpy (&msg[1], &block[1], block->size); peer->msg = msg; for (i = 0; i < MIGRATION_LIST_SIZE; i++) { if (block->target_list[i] == 0) { block->target_list[i] = ppd->pid; GNUNET_PEER_change_rc (block->target_list[i], 1); break; } } if (MIGRATION_LIST_SIZE == i) { delete_migration_block (block); ret = GNUNET_YES; } else { ret = GNUNET_NO; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking for transmission of %u bytes to %s for migration\n", msize, GNUNET_i2s (GSF_connected_peer_get_identity2_(peer->peer))); peer->th = GSF_peer_transmit_ (peer->peer, GNUNET_NO, 0 /* priority */ , GNUNET_TIME_UNIT_FOREVER_REL, msize, &transmit_message, peer); return ret; } /** * Count the number of peers this block has * already been forwarded to. * * @param block the block * @return number of times block was forwarded */ static unsigned int count_targets (struct MigrationReadyBlock *block) { unsigned int i; for (i = 0; i < MIGRATION_LIST_SIZE; i++) if (block->target_list[i] == 0) return i; return i; } /** * Check if sending this block to this peer would * be a good idea. * * @param peer target peer * @param block the block * @return score (>= 0: feasible, negative: infeasible) */ static long score_content (struct MigrationReadyPeer *peer, struct MigrationReadyBlock *block) { unsigned int i; struct GSF_PeerPerformanceData *ppd; struct GNUNET_PeerIdentity id; struct GNUNET_HashCode hc; uint32_t dist; ppd = GSF_get_peer_performance_data_ (peer->peer); for (i = 0; i < MIGRATION_LIST_SIZE; i++) if (block->target_list[i] == ppd->pid) return -1; GNUNET_assert (0 != ppd->pid); GNUNET_PEER_resolve (ppd->pid, &id); GNUNET_CRYPTO_hash (&id, sizeof (struct GNUNET_PeerIdentity), &hc); dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query, &hc); /* closer distance, higher score: */ return UINT32_MAX - dist; } /** * If the migration task is not currently running, consider * (re)scheduling it with the appropriate delay. */ static void consider_gathering (void); /** * Find content for migration to this peer. * * @param mrp peer to find content for */ static void find_content (struct MigrationReadyPeer *mrp) { struct MigrationReadyBlock *pos; long score; long best_score; struct MigrationReadyBlock *best; GNUNET_assert (NULL == mrp->th); best = NULL; best_score = -1; pos = mig_head; while (NULL != pos) { score = score_content (mrp, pos); if (score > best_score) { best_score = score; best = pos; } pos = pos->next; } if (NULL == best) { if (mig_size < MAX_MIGRATION_QUEUE) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for pushing, waiting for queue to fill\n"); return; /* will fill up eventually... */ } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No suitable content found, purging content from full queue\n"); /* failed to find migration target AND * queue is full, purge most-forwarded * block from queue to make room for more */ pos = mig_head; while (NULL != pos) { score = count_targets (pos); if (score >= best_score) { best_score = score; best = pos; } pos = pos->next; } GNUNET_assert (NULL != best); delete_migration_block (best); consider_gathering (); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Preparing to push best content to peer\n"); transmit_content (mrp, best); } /** * Task that is run periodically to obtain blocks for content * migration * * @param cls unused * @param tc scheduler context (also unused) */ static void gather_migration_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); /** * If the migration task is not currently running, consider * (re)scheduling it with the appropriate delay. */ static void consider_gathering () { struct GNUNET_TIME_Relative delay; if (NULL == GSF_dsh) return; if (NULL != mig_qe) return; if (NULL != mig_task) return; if (mig_size >= MAX_MIGRATION_QUEUE) return; delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, mig_size); delay = GNUNET_TIME_relative_divide (delay, MAX_MIGRATION_QUEUE); delay = GNUNET_TIME_relative_max (delay, min_migration_delay); if (GNUNET_NO == value_found) { /* wait at least 5s if the datastore is empty */ delay = GNUNET_TIME_relative_max (delay, GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)); } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling gathering task (queue size: %u)\n", mig_size); mig_task = GNUNET_SCHEDULER_add_delayed (delay, &gather_migration_blocks, NULL); } /** * Process content offered for migration. * * @param cls closure * @param key key for the content * @param size number of bytes in data * @param data content stored * @param type type of the content * @param priority priority of the content * @param anonymity anonymity-level for the content * @param expiration expiration time for the content * @param uid unique identifier for the datum; * maybe 0 if no unique identifier is available */ static void process_migration_content (void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, struct GNUNET_TIME_Absolute expiration, uint64_t uid) { struct MigrationReadyBlock *mb; struct MigrationReadyPeer *pos; mig_qe = NULL; if (NULL == key) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No content found for migration...\n"); consider_gathering (); return; } value_found = GNUNET_YES; if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us < MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us) { /* content will expire soon, don't bother */ consider_gathering (); return; } if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) { if (GNUNET_OK != GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, anonymity, expiration, uid, &process_migration_content, NULL)) consider_gathering (); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n", GNUNET_h2s (key), type, mig_size + 1, MAX_MIGRATION_QUEUE); mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size); mb->query = *key; mb->expiration = expiration; mb->size = size; mb->type = type; memcpy (&mb[1], data, size); GNUNET_CONTAINER_DLL_insert_after (mig_head, mig_tail, mig_tail, mb); mig_size++; for (pos = peer_head; NULL != pos; pos = pos->next) { if (NULL == pos->th) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Preparing to push best content to peer %s\n", GNUNET_i2s (GSF_connected_peer_get_identity2_(pos->peer))); if (GNUNET_YES == transmit_content (pos, mb)) break; /* 'mb' was freed! */ } } consider_gathering (); } /** * Task that is run periodically to obtain blocks for content * migration * * @param cls unused * @param tc scheduler context (also unused) */ static void gather_migration_blocks (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { mig_task = NULL; if (mig_size >= MAX_MIGRATION_QUEUE) return; if (NULL == GSF_dsh) return; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking datastore for content for replication (queue size: %u)\n", mig_size); value_found = GNUNET_NO; mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh, 0, UINT_MAX, GNUNET_TIME_UNIT_FOREVER_REL, &process_migration_content, NULL); if (NULL == mig_qe) consider_gathering (); } /** * A peer connected to us. Start pushing content * to this peer. * * @param peer handle for the peer that connected */ void GSF_push_start_ (struct GSF_ConnectedPeer *peer) { struct MigrationReadyPeer *mrp; if (GNUNET_YES != enabled) return; for (mrp = peer_head; NULL != mrp; mrp = mrp->next) if (mrp->peer == peer) break; if (NULL != mrp) { /* same peer added twice, must not happen */ GNUNET_break (0); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Adding peer %s to list for pushing\n", GNUNET_i2s (GSF_connected_peer_get_identity2_(peer))); mrp = GNUNET_new (struct MigrationReadyPeer); mrp->peer = peer; find_content (mrp); GNUNET_CONTAINER_DLL_insert (peer_head, peer_tail, mrp); } /** * A peer disconnected from us. Stop pushing content * to this peer. * * @param peer handle for the peer that disconnected */ void GSF_push_stop_ (struct GSF_ConnectedPeer *peer) { struct MigrationReadyPeer *pos; for (pos = peer_head; NULL != pos; pos = pos->next) if (pos->peer == peer) break; if (NULL == pos) return; GNUNET_CONTAINER_DLL_remove (peer_head, peer_tail, pos); if (NULL != pos->th) { GSF_peer_transmit_cancel_ (pos->th); pos->th = NULL; } if (NULL != pos->msg) { GNUNET_free (pos->msg); pos->msg = NULL; } GNUNET_free (pos); } /** * Setup the module. */ void GSF_push_init_ () { enabled = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_PUSHING"); if (GNUNET_YES != enabled) return; if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (GSF_cfg, "fs", "MIN_MIGRATION_DELAY", &min_migration_delay)) { GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING, "fs", "MIN_MIGRATION_DELAY", _("time required, content pushing disabled")); return; } consider_gathering (); } /** * Shutdown the module. */ void GSF_push_done_ () { if (NULL != mig_task) { GNUNET_SCHEDULER_cancel (mig_task); mig_task = NULL; } if (NULL != mig_qe) { GNUNET_DATASTORE_cancel (mig_qe); mig_qe = NULL; } while (NULL != mig_head) delete_migration_block (mig_head); GNUNET_assert (0 == mig_size); } /* end of gnunet-service-fs_push.c */