From 848b0e9e3a4d586050d05aa4f3f796e1f978480e Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Wed, 7 May 2014 12:07:44 +0000 Subject: - update fs --- src/fs/gnunet-service-fs_cadet_client.c | 766 ++++++++++++++++++++++++++++++++ 1 file changed, 766 insertions(+) create mode 100644 src/fs/gnunet-service-fs_cadet_client.c (limited to 'src/fs/gnunet-service-fs_cadet_client.c') diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c new file mode 100644 index 000000000..dde7aba48 --- /dev/null +++ b/src/fs/gnunet-service-fs_cadet_client.c @@ -0,0 +1,766 @@ +/* + This file is part of GNUnet. + (C) 2012, 2013 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_cadet_client.c + * @brief non-anonymous file-transfer + * @author Christian Grothoff + * + * TODO: + * - PORT is set to old application type, unsure if we should keep + * it that way (fine for now) + */ +#include "platform.h" +#include "gnunet_constants.h" +#include "gnunet_util_lib.h" +#include "gnunet_cadet_service.h" +#include "gnunet_protocols.h" +#include "gnunet_applications.h" +#include "gnunet-service-fs.h" +#include "gnunet-service-fs_indexing.h" +#include "gnunet-service-fs_cadet.h" + + +/** + * After how long do we reset connections without replies? + */ +#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) + + +/** + * Handle for a cadet to another peer. + */ +struct CadetHandle; + + +/** + * Handle for a request that is going out via cadet API. + */ +struct GSF_CadetRequest +{ + + /** + * DLL. + */ + struct GSF_CadetRequest *next; + + /** + * DLL. + */ + struct GSF_CadetRequest *prev; + + /** + * Which cadet is this request associated with? + */ + struct CadetHandle *mh; + + /** + * Function to call with the result. + */ + GSF_CadetReplyProcessor proc; + + /** + * Closure for 'proc' + */ + void *proc_cls; + + /** + * Query to transmit to the other peer. + */ + struct GNUNET_HashCode query; + + /** + * Desired type for the reply. + */ + enum GNUNET_BLOCK_Type type; + + /** + * Did we transmit this request already? #GNUNET_YES if we are + * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL. + */ + int was_transmitted; +}; + + +/** + * Handle for a cadet to another peer. + */ +struct CadetHandle +{ + /** + * Head of DLL of pending requests on this cadet. + */ + struct GSF_CadetRequest *pending_head; + + /** + * Tail of DLL of pending requests on this cadet. + */ + struct GSF_CadetRequest *pending_tail; + + /** + * Map from query to `struct GSF_CadetRequest`s waiting for + * a reply. + */ + struct GNUNET_CONTAINER_MultiHashMap *waiting_map; + + /** + * Channel to the other peer. + */ + struct GNUNET_CADET_Channel *channel; + + /** + * Handle for active write operation, or NULL. + */ + struct GNUNET_CADET_TransmitHandle *wh; + + /** + * Which peer does this cadet go to? + */ + struct GNUNET_PeerIdentity target; + + /** + * Task to kill inactive cadets (we keep them around for + * a few seconds to give the application a chance to give + * us another query). + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Task to reset cadets that had errors (asynchronously, + * as we may not be able to do it immediately during a + * callback from the cadet API). + */ + GNUNET_SCHEDULER_TaskIdentifier reset_task; + +}; + + +/** + * Cadet channel for creating outbound channels. + */ +static struct GNUNET_CADET_Handle *cadet_handle; + +/** + * Map from peer identities to 'struct CadetHandles' with cadet + * channels to those peers. + */ +static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map; + + +/* ********************* client-side code ************************* */ + + +/** + * Transmit pending requests via the cadet. + * + * @param mh cadet to process + */ +static void +transmit_pending (struct CadetHandle *mh); + + +/** + * Iterator called on each entry in a waiting map to + * move it back to the pending list. + * + * @param cls the `struct CadetHandle` + * @param key the key of the entry in the map (the query) + * @param value the `struct GSF_CadetRequest` to move to pending + * @return #GNUNET_YES (continue to iterate) + */ +static int +move_to_pending (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct CadetHandle *mh = cls; + struct GSF_CadetRequest *sr = value; + + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, + key, + value)); + GNUNET_CONTAINER_DLL_insert (mh->pending_head, + mh->pending_tail, + sr); + sr->was_transmitted = GNUNET_NO; + return GNUNET_YES; +} + + +/** + * We had a serious error, tear down and re-create cadet from scratch. + * + * @param mh cadet to reset + */ +static void +reset_cadet (struct CadetHandle *mh) +{ + struct GNUNET_CADET_Channel *channel = mh->channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Resetting cadet channel to %s\n", + GNUNET_i2s (&mh->target)); + mh->channel = NULL; + if (NULL != channel) + GNUNET_CADET_channel_destroy (channel); + GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, + &move_to_pending, + mh); + mh->channel = GNUNET_CADET_channel_create (cadet_handle, + mh, + &mh->target, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + GNUNET_CADET_OPTION_RELIABLE); + transmit_pending (mh); +} + + +/** + * Task called when it is time to destroy an inactive cadet channel. + * + * @param cls the `struct CadetHandle` to tear down + * @param tc scheduler context, unused + */ +static void +cadet_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct CadetHandle *mh = cls; + struct GNUNET_CADET_Channel *tun; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout on cadet channel to %s\n", + GNUNET_i2s (&mh->target)); + mh->timeout_task = GNUNET_SCHEDULER_NO_TASK; + tun = mh->channel; + mh->channel = NULL; + GNUNET_CADET_channel_destroy (tun); +} + + +/** + * Task called when it is time to reset an cadet. + * + * @param cls the `struct CadetHandle` to tear down + * @param tc scheduler context, unused + */ +static void +reset_cadet_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct CadetHandle *mh = cls; + + mh->reset_task = GNUNET_SCHEDULER_NO_TASK; + reset_cadet (mh); +} + + +/** + * We had a serious error, tear down and re-create cadet from scratch, + * but do so asynchronously. + * + * @param mh cadet to reset + */ +static void +reset_cadet_async (struct CadetHandle *mh) +{ + if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task) + GNUNET_SCHEDULER_cancel (mh->reset_task); + mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, + mh); +} + + +/** + * Functions of this signature are called whenever we are ready to transmit + * query via a cadet. + * + * @param cls the struct CadetHandle for which we did the write call + * @param size the number of bytes that can be written to @a buf + * @param buf where to write the message + * @return number of bytes written to @a buf + */ +static size_t +transmit_sqm (void *cls, + size_t size, + void *buf) +{ + struct CadetHandle *mh = cls; + struct CadetQueryMessage sqm; + struct GSF_CadetRequest *sr; + + mh->wh = NULL; + if (NULL == buf) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cadet channel to %s failed during transmission attempt, rebuilding\n", + GNUNET_i2s (&mh->target)); + reset_cadet_async (mh); + return 0; + } + sr = mh->pending_head; + if (NULL == sr) + return 0; + GNUNET_assert (size >= sizeof (struct CadetQueryMessage)); + GNUNET_CONTAINER_DLL_remove (mh->pending_head, + mh->pending_tail, + sr); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (mh->waiting_map, + &sr->query, + sr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); + sr->was_transmitted = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending query for %s via cadet to %s\n", + GNUNET_h2s (&sr->query), + GNUNET_i2s (&mh->target)); + sqm.header.size = htons (sizeof (sqm)); + sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); + sqm.type = htonl (sr->type); + sqm.query = sr->query; + memcpy (buf, &sqm, sizeof (sqm)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Successfully transmitted %u bytes via cadet to %s\n", + (unsigned int) size, + GNUNET_i2s (&mh->target)); + transmit_pending (mh); + return sizeof (sqm); +} + + +/** + * Transmit pending requests via the cadet. + * + * @param mh cadet to process + */ +static void +transmit_pending (struct CadetHandle *mh) +{ + if (NULL == mh->channel) + return; + if (NULL != mh->wh) + return; + mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /* allow cork */, + GNUNET_TIME_UNIT_FOREVER_REL, + sizeof (struct CadetQueryMessage), + &transmit_sqm, mh); +} + + +/** + * Closure for handle_reply(). + */ +struct HandleReplyClosure +{ + + /** + * Reply payload. + */ + const void *data; + + /** + * Expiration time for the block. + */ + struct GNUNET_TIME_Absolute expiration; + + /** + * Number of bytes in 'data'. + */ + size_t data_size; + + /** + * Type of the block. + */ + enum GNUNET_BLOCK_Type type; + + /** + * Did we have a matching query? + */ + int found; +}; + + +/** + * Iterator called on each entry in a waiting map to + * process a result. + * + * @param cls the `struct HandleReplyClosure` + * @param key the key of the entry in the map (the query) + * @param value the `struct GSF_CadetRequest` to handle result for + * @return #GNUNET_YES (continue to iterate) + */ +static int +handle_reply (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct HandleReplyClosure *hrc = cls; + struct GSF_CadetRequest *sr = value; + + sr->proc (sr->proc_cls, + hrc->type, + hrc->expiration, + hrc->data_size, + hrc->data); + sr->proc = NULL; + GSF_cadet_query_cancel (sr); + hrc->found = GNUNET_YES; + return GNUNET_YES; +} + + +/** + * Functions with this signature are called whenever a complete reply + * is received. + * + * @param cls closure with the `struct CadetHandle` + * @param channel channel handle + * @param channel_ctx channel context + * @param message the actual message + * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing + */ +static int +reply_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + void **channel_ctx, + const struct GNUNET_MessageHeader *message) +{ + struct CadetHandle *mh = *channel_ctx; + const struct CadetReplyMessage *srm; + struct HandleReplyClosure hrc; + uint16_t msize; + enum GNUNET_BLOCK_Type type; + struct GNUNET_HashCode query; + + msize = ntohs (message->size); + if (sizeof (struct CadetReplyMessage) > msize) + { + GNUNET_break_op (0); + reset_cadet_async (mh); + return GNUNET_SYSERR; + } + srm = (const struct CadetReplyMessage *) message; + msize -= sizeof (struct CadetReplyMessage); + type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); + if (GNUNET_YES != + GNUNET_BLOCK_get_key (GSF_block_ctx, + type, + &srm[1], msize, &query)) + { + GNUNET_break_op (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Received bogus reply of type %u with %u bytes via cadet from peer %s\n", + type, + msize, + GNUNET_i2s (&mh->target)); + reset_cadet_async (mh); + return GNUNET_SYSERR; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received reply `%s' via cadet from peer %s\n", + GNUNET_h2s (&query), + GNUNET_i2s (&mh->target)); + GNUNET_CADET_receive_done (channel); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# replies received via cadet"), 1, + GNUNET_NO); + hrc.data = &srm[1]; + hrc.data_size = msize; + hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration); + hrc.type = type; + hrc.found = GNUNET_NO; + GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map, + &query, + &handle_reply, + &hrc); + if (GNUNET_NO == hrc.found) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# replies received via cadet dropped"), 1, + GNUNET_NO); + return GNUNET_OK; + } + return GNUNET_OK; +} + + +/** + * Get (or create) a cadet to talk to the given peer. + * + * @param target peer we want to communicate with + */ +static struct CadetHandle * +get_cadet (const struct GNUNET_PeerIdentity *target) +{ + struct CadetHandle *mh; + + mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, + target); + if (NULL != mh) + { + if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task) + { + GNUNET_SCHEDULER_cancel (mh->timeout_task); + mh->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + return mh; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating cadet channel to %s\n", + GNUNET_i2s (target)); + mh = GNUNET_new (struct CadetHandle); + mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, + &reset_cadet_task, + mh); + mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES); + mh->target = *target; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_put (cadet_map, + &mh->target, + mh, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + mh->channel = GNUNET_CADET_channel_create (cadet_handle, + mh, + &mh->target, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + GNUNET_CADET_OPTION_RELIABLE); + GNUNET_assert (mh == + GNUNET_CONTAINER_multipeermap_get (cadet_map, + target)); + return mh; +} + + +/** + * Look for a block by directly contacting a particular peer. + * + * @param target peer that should have the block + * @param query hash to query for the block + * @param type desired type for the block + * @param proc function to call with result + * @param proc_cls closure for @a proc + * @return handle to cancel the operation + */ +struct GSF_CadetRequest * +GSF_cadet_query (const struct GNUNET_PeerIdentity *target, + const struct GNUNET_HashCode *query, + enum GNUNET_BLOCK_Type type, + GSF_CadetReplyProcessor proc, void *proc_cls) +{ + struct CadetHandle *mh; + struct GSF_CadetRequest *sr; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Preparing to send query for %s via cadet to %s\n", + GNUNET_h2s (query), + GNUNET_i2s (target)); + mh = get_cadet (target); + sr = GNUNET_new (struct GSF_CadetRequest); + sr->mh = mh; + sr->proc = proc; + sr->proc_cls = proc_cls; + sr->type = type; + sr->query = *query; + GNUNET_CONTAINER_DLL_insert (mh->pending_head, + mh->pending_tail, + sr); + transmit_pending (mh); + return sr; +} + + +/** + * Cancel an active request; must not be called after 'proc' + * was calld. + * + * @param sr request to cancel + */ +void +GSF_cadet_query_cancel (struct GSF_CadetRequest *sr) +{ + struct CadetHandle *mh = sr->mh; + GSF_CadetReplyProcessor p; + + p = sr->proc; + sr->proc = NULL; + if (NULL != p) + { + /* signal failure / cancellation to callback */ + p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, + GNUNET_TIME_UNIT_ZERO_ABS, + 0, NULL); + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cancelled query for %s via cadet to %s\n", + GNUNET_h2s (&sr->query), + GNUNET_i2s (&sr->mh->target)); + if (GNUNET_YES == sr->was_transmitted) + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, + &sr->query, + sr)); + else + GNUNET_CONTAINER_DLL_remove (mh->pending_head, + mh->pending_tail, + sr); + GNUNET_free (sr); + if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) && + (NULL == mh->pending_head) ) + mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &cadet_timeout, + mh); +} + + +/** + * Iterator called on each entry in a waiting map to + * call the 'proc' continuation and release associated + * resources. + * + * @param cls the `struct CadetHandle` + * @param key the key of the entry in the map (the query) + * @param value the `struct GSF_CadetRequest` to clean up + * @return #GNUNET_YES (continue to iterate) + */ +static int +free_waiting_entry (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GSF_CadetRequest *sr = value; + + GSF_cadet_query_cancel (sr); + return GNUNET_YES; +} + + +/** + * Function called by cadet when a client disconnects. + * Cleans up our `struct CadetClient` of that channel. + * + * @param cls NULL + * @param channel channel of the disconnecting client + * @param channel_ctx our `struct CadetClient` + */ +static void +cleaner_cb (void *cls, + const struct GNUNET_CADET_Channel *channel, + void *channel_ctx) +{ + struct CadetHandle *mh = channel_ctx; + struct GSF_CadetRequest *sr; + + if (NULL == mh->channel) + return; /* being destroyed elsewhere */ + GNUNET_assert (channel == mh->channel); + mh->channel = NULL; + while (NULL != (sr = mh->pending_head)) + GSF_cadet_query_cancel (sr); + /* first remove `mh` from the `cadet_map`, so that if the + callback from `free_waiting_entry()` happens to re-issue + the request, we don't immediately have it back in the + `waiting_map`. */ + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_remove (cadet_map, + &mh->target, + mh)); + GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, + &free_waiting_entry, + mh); + if (NULL != mh->wh) + GNUNET_CADET_notify_transmit_ready_cancel (mh->wh); + if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task) + GNUNET_SCHEDULER_cancel (mh->timeout_task); + if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task) + GNUNET_SCHEDULER_cancel (mh->reset_task); + GNUNET_assert (0 == + GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)); + GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map); + GNUNET_free (mh); +} + + +/** + * Initialize subsystem for non-anonymous file-sharing. + */ +void +GSF_cadet_start_client () +{ + static const struct GNUNET_CADET_MessageHandler handlers[] = { + { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 }, + { NULL, 0, 0 } + }; + + cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES); + cadet_handle = GNUNET_CADET_connect (GSF_cfg, + NULL, + NULL, + &cleaner_cb, + handlers, + NULL); +} + + +/** + * Function called on each active cadets to shut them down. + * + * @param cls NULL + * @param key target peer, unused + * @param value the `struct CadetHandle` to destroy + * @return #GNUNET_YES (continue to iterate) + */ +static int +release_cadets (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) +{ + struct CadetHandle *mh = value; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout on cadet channel to %s\n", + GNUNET_i2s (&mh->target)); + if (NULL != mh->channel) + GNUNET_CADET_channel_destroy (mh->channel); + return GNUNET_YES; +} + + +/** + * Shutdown subsystem for non-anonymous file-sharing. + */ +void +GSF_cadet_stop_client () +{ + GNUNET_CONTAINER_multipeermap_iterate (cadet_map, + &release_cadets, + NULL); + GNUNET_CONTAINER_multipeermap_destroy (cadet_map); + cadet_map = NULL; + if (NULL != cadet_handle) + { + GNUNET_CADET_disconnect (cadet_handle); + cadet_handle = NULL; + } +} + + +/* end of gnunet-service-fs_cadet_client.c */ -- cgit v1.2.3