From 848b0e9e3a4d586050d05aa4f3f796e1f978480e Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Wed, 7 May 2014 12:07:44 +0000 Subject: - update fs --- src/fs/gnunet-service-fs_cadet.h | 159 +++++++ src/fs/gnunet-service-fs_cadet_client.c | 766 ++++++++++++++++++++++++++++++++ src/fs/gnunet-service-fs_cadet_server.c | 595 +++++++++++++++++++++++++ src/fs/gnunet-service-fs_mesh.h | 159 ------- src/fs/gnunet-service-fs_mesh_client.c | 766 -------------------------------- src/fs/gnunet-service-fs_mesh_server.c | 595 ------------------------- 6 files changed, 1520 insertions(+), 1520 deletions(-) create mode 100644 src/fs/gnunet-service-fs_cadet.h create mode 100644 src/fs/gnunet-service-fs_cadet_client.c create mode 100644 src/fs/gnunet-service-fs_cadet_server.c delete mode 100644 src/fs/gnunet-service-fs_mesh.h delete mode 100644 src/fs/gnunet-service-fs_mesh_client.c delete mode 100644 src/fs/gnunet-service-fs_mesh_server.c diff --git a/src/fs/gnunet-service-fs_cadet.h b/src/fs/gnunet-service-fs_cadet.h new file mode 100644 index 000000000..ddf367668 --- /dev/null +++ b/src/fs/gnunet-service-fs_cadet.h @@ -0,0 +1,159 @@ +/* + This file is part of GNUnet. + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_cadet.h + * @brief non-anonymous file-transfer + * @author Christian Grothoff + */ +#ifndef GNUNET_SERVICE_FS_CADET_H +#define GNUNET_SERVICE_FS_CADET_H + +/** + * Handle for a request that is going out via cadet API. + */ +struct GSF_CadetRequest; + + +/** + * Function called with a reply from the cadet. + * + * @param cls closure + * @param type type of the block, ANY on error + * @param expiration expiration time for the block + * @param data_size number of bytes in 'data', 0 on error + * @param data reply block data, NULL on error + */ +typedef void (*GSF_CadetReplyProcessor)(void *cls, + enum GNUNET_BLOCK_Type type, + struct GNUNET_TIME_Absolute expiration, + size_t data_size, + const void *data); + + +/** + * Look for a block by directly contacting a particular peer. + * + * @param target peer that should have the block + * @param query hash to query for the block + * @param type desired type for the block + * @param proc function to call with result + * @param proc_cls closure for 'proc' + * @return handle to cancel the operation + */ +struct GSF_CadetRequest * +GSF_cadet_query (const struct GNUNET_PeerIdentity *target, + const struct GNUNET_HashCode *query, + enum GNUNET_BLOCK_Type type, + GSF_CadetReplyProcessor proc, void *proc_cls); + + +/** + * Cancel an active request; must not be called after 'proc' + * was calld. + * + * @param sr request to cancel + */ +void +GSF_cadet_query_cancel (struct GSF_CadetRequest *sr); + + +/** + * Initialize subsystem for non-anonymous file-sharing. + */ +void +GSF_cadet_start_server (void); + + +/** + * Shutdown subsystem for non-anonymous file-sharing. + */ +void +GSF_cadet_stop_server (void); + +/** + * Initialize subsystem for non-anonymous file-sharing. + */ +void +GSF_cadet_start_client (void); + + +/** + * Shutdown subsystem for non-anonymous file-sharing. + */ +void +GSF_cadet_stop_client (void); + + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Query from one peer, asking the other for CHK-data. + */ +struct CadetQueryMessage +{ + + /** + * Type is GNUNET_MESSAGE_TYPE_FS_CADET_QUERY. + */ + struct GNUNET_MessageHeader header; + + /** + * Block type must be DBLOCK or IBLOCK. + */ + uint32_t type GNUNET_PACKED; + + /** + * Query hash from CHK (hash of encrypted block). + */ + struct GNUNET_HashCode query; + +}; + + +/** + * Reply to a CadetQueryMessage. + */ +struct CadetReplyMessage +{ + + /** + * Type is GNUNET_MESSAGE_TYPE_FS_CADET_REPLY. + */ + struct GNUNET_MessageHeader header; + + /** + * Block type must be DBLOCK or IBLOCK. + */ + uint32_t type GNUNET_PACKED; + + /** + * Expiration time for the block. + */ + struct GNUNET_TIME_AbsoluteNBO expiration; + + /* followed by the encrypted block */ + +}; + +GNUNET_NETWORK_STRUCT_END + + +#endif diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c new file mode 100644 index 000000000..dde7aba48 --- /dev/null +++ b/src/fs/gnunet-service-fs_cadet_client.c @@ -0,0 +1,766 @@ +/* + This file is part of GNUnet. + (C) 2012, 2013 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_cadet_client.c + * @brief non-anonymous file-transfer + * @author Christian Grothoff + * + * TODO: + * - PORT is set to old application type, unsure if we should keep + * it that way (fine for now) + */ +#include "platform.h" +#include "gnunet_constants.h" +#include "gnunet_util_lib.h" +#include "gnunet_cadet_service.h" +#include "gnunet_protocols.h" +#include "gnunet_applications.h" +#include "gnunet-service-fs.h" +#include "gnunet-service-fs_indexing.h" +#include "gnunet-service-fs_cadet.h" + + +/** + * After how long do we reset connections without replies? + */ +#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) + + +/** + * Handle for a cadet to another peer. + */ +struct CadetHandle; + + +/** + * Handle for a request that is going out via cadet API. + */ +struct GSF_CadetRequest +{ + + /** + * DLL. + */ + struct GSF_CadetRequest *next; + + /** + * DLL. + */ + struct GSF_CadetRequest *prev; + + /** + * Which cadet is this request associated with? + */ + struct CadetHandle *mh; + + /** + * Function to call with the result. + */ + GSF_CadetReplyProcessor proc; + + /** + * Closure for 'proc' + */ + void *proc_cls; + + /** + * Query to transmit to the other peer. + */ + struct GNUNET_HashCode query; + + /** + * Desired type for the reply. + */ + enum GNUNET_BLOCK_Type type; + + /** + * Did we transmit this request already? #GNUNET_YES if we are + * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL. + */ + int was_transmitted; +}; + + +/** + * Handle for a cadet to another peer. + */ +struct CadetHandle +{ + /** + * Head of DLL of pending requests on this cadet. + */ + struct GSF_CadetRequest *pending_head; + + /** + * Tail of DLL of pending requests on this cadet. + */ + struct GSF_CadetRequest *pending_tail; + + /** + * Map from query to `struct GSF_CadetRequest`s waiting for + * a reply. + */ + struct GNUNET_CONTAINER_MultiHashMap *waiting_map; + + /** + * Channel to the other peer. + */ + struct GNUNET_CADET_Channel *channel; + + /** + * Handle for active write operation, or NULL. + */ + struct GNUNET_CADET_TransmitHandle *wh; + + /** + * Which peer does this cadet go to? + */ + struct GNUNET_PeerIdentity target; + + /** + * Task to kill inactive cadets (we keep them around for + * a few seconds to give the application a chance to give + * us another query). + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Task to reset cadets that had errors (asynchronously, + * as we may not be able to do it immediately during a + * callback from the cadet API). + */ + GNUNET_SCHEDULER_TaskIdentifier reset_task; + +}; + + +/** + * Cadet channel for creating outbound channels. + */ +static struct GNUNET_CADET_Handle *cadet_handle; + +/** + * Map from peer identities to 'struct CadetHandles' with cadet + * channels to those peers. + */ +static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map; + + +/* ********************* client-side code ************************* */ + + +/** + * Transmit pending requests via the cadet. + * + * @param mh cadet to process + */ +static void +transmit_pending (struct CadetHandle *mh); + + +/** + * Iterator called on each entry in a waiting map to + * move it back to the pending list. + * + * @param cls the `struct CadetHandle` + * @param key the key of the entry in the map (the query) + * @param value the `struct GSF_CadetRequest` to move to pending + * @return #GNUNET_YES (continue to iterate) + */ +static int +move_to_pending (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct CadetHandle *mh = cls; + struct GSF_CadetRequest *sr = value; + + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, + key, + value)); + GNUNET_CONTAINER_DLL_insert (mh->pending_head, + mh->pending_tail, + sr); + sr->was_transmitted = GNUNET_NO; + return GNUNET_YES; +} + + +/** + * We had a serious error, tear down and re-create cadet from scratch. + * + * @param mh cadet to reset + */ +static void +reset_cadet (struct CadetHandle *mh) +{ + struct GNUNET_CADET_Channel *channel = mh->channel; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Resetting cadet channel to %s\n", + GNUNET_i2s (&mh->target)); + mh->channel = NULL; + if (NULL != channel) + GNUNET_CADET_channel_destroy (channel); + GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, + &move_to_pending, + mh); + mh->channel = GNUNET_CADET_channel_create (cadet_handle, + mh, + &mh->target, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + GNUNET_CADET_OPTION_RELIABLE); + transmit_pending (mh); +} + + +/** + * Task called when it is time to destroy an inactive cadet channel. + * + * @param cls the `struct CadetHandle` to tear down + * @param tc scheduler context, unused + */ +static void +cadet_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct CadetHandle *mh = cls; + struct GNUNET_CADET_Channel *tun; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout on cadet channel to %s\n", + GNUNET_i2s (&mh->target)); + mh->timeout_task = GNUNET_SCHEDULER_NO_TASK; + tun = mh->channel; + mh->channel = NULL; + GNUNET_CADET_channel_destroy (tun); +} + + +/** + * Task called when it is time to reset an cadet. + * + * @param cls the `struct CadetHandle` to tear down + * @param tc scheduler context, unused + */ +static void +reset_cadet_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct CadetHandle *mh = cls; + + mh->reset_task = GNUNET_SCHEDULER_NO_TASK; + reset_cadet (mh); +} + + +/** + * We had a serious error, tear down and re-create cadet from scratch, + * but do so asynchronously. + * + * @param mh cadet to reset + */ +static void +reset_cadet_async (struct CadetHandle *mh) +{ + if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task) + GNUNET_SCHEDULER_cancel (mh->reset_task); + mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, + mh); +} + + +/** + * Functions of this signature are called whenever we are ready to transmit + * query via a cadet. + * + * @param cls the struct CadetHandle for which we did the write call + * @param size the number of bytes that can be written to @a buf + * @param buf where to write the message + * @return number of bytes written to @a buf + */ +static size_t +transmit_sqm (void *cls, + size_t size, + void *buf) +{ + struct CadetHandle *mh = cls; + struct CadetQueryMessage sqm; + struct GSF_CadetRequest *sr; + + mh->wh = NULL; + if (NULL == buf) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cadet channel to %s failed during transmission attempt, rebuilding\n", + GNUNET_i2s (&mh->target)); + reset_cadet_async (mh); + return 0; + } + sr = mh->pending_head; + if (NULL == sr) + return 0; + GNUNET_assert (size >= sizeof (struct CadetQueryMessage)); + GNUNET_CONTAINER_DLL_remove (mh->pending_head, + mh->pending_tail, + sr); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (mh->waiting_map, + &sr->query, + sr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); + sr->was_transmitted = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending query for %s via cadet to %s\n", + GNUNET_h2s (&sr->query), + GNUNET_i2s (&mh->target)); + sqm.header.size = htons (sizeof (sqm)); + sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); + sqm.type = htonl (sr->type); + sqm.query = sr->query; + memcpy (buf, &sqm, sizeof (sqm)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Successfully transmitted %u bytes via cadet to %s\n", + (unsigned int) size, + GNUNET_i2s (&mh->target)); + transmit_pending (mh); + return sizeof (sqm); +} + + +/** + * Transmit pending requests via the cadet. + * + * @param mh cadet to process + */ +static void +transmit_pending (struct CadetHandle *mh) +{ + if (NULL == mh->channel) + return; + if (NULL != mh->wh) + return; + mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /* allow cork */, + GNUNET_TIME_UNIT_FOREVER_REL, + sizeof (struct CadetQueryMessage), + &transmit_sqm, mh); +} + + +/** + * Closure for handle_reply(). + */ +struct HandleReplyClosure +{ + + /** + * Reply payload. + */ + const void *data; + + /** + * Expiration time for the block. + */ + struct GNUNET_TIME_Absolute expiration; + + /** + * Number of bytes in 'data'. + */ + size_t data_size; + + /** + * Type of the block. + */ + enum GNUNET_BLOCK_Type type; + + /** + * Did we have a matching query? + */ + int found; +}; + + +/** + * Iterator called on each entry in a waiting map to + * process a result. + * + * @param cls the `struct HandleReplyClosure` + * @param key the key of the entry in the map (the query) + * @param value the `struct GSF_CadetRequest` to handle result for + * @return #GNUNET_YES (continue to iterate) + */ +static int +handle_reply (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct HandleReplyClosure *hrc = cls; + struct GSF_CadetRequest *sr = value; + + sr->proc (sr->proc_cls, + hrc->type, + hrc->expiration, + hrc->data_size, + hrc->data); + sr->proc = NULL; + GSF_cadet_query_cancel (sr); + hrc->found = GNUNET_YES; + return GNUNET_YES; +} + + +/** + * Functions with this signature are called whenever a complete reply + * is received. + * + * @param cls closure with the `struct CadetHandle` + * @param channel channel handle + * @param channel_ctx channel context + * @param message the actual message + * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing + */ +static int +reply_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + void **channel_ctx, + const struct GNUNET_MessageHeader *message) +{ + struct CadetHandle *mh = *channel_ctx; + const struct CadetReplyMessage *srm; + struct HandleReplyClosure hrc; + uint16_t msize; + enum GNUNET_BLOCK_Type type; + struct GNUNET_HashCode query; + + msize = ntohs (message->size); + if (sizeof (struct CadetReplyMessage) > msize) + { + GNUNET_break_op (0); + reset_cadet_async (mh); + return GNUNET_SYSERR; + } + srm = (const struct CadetReplyMessage *) message; + msize -= sizeof (struct CadetReplyMessage); + type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); + if (GNUNET_YES != + GNUNET_BLOCK_get_key (GSF_block_ctx, + type, + &srm[1], msize, &query)) + { + GNUNET_break_op (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Received bogus reply of type %u with %u bytes via cadet from peer %s\n", + type, + msize, + GNUNET_i2s (&mh->target)); + reset_cadet_async (mh); + return GNUNET_SYSERR; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received reply `%s' via cadet from peer %s\n", + GNUNET_h2s (&query), + GNUNET_i2s (&mh->target)); + GNUNET_CADET_receive_done (channel); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# replies received via cadet"), 1, + GNUNET_NO); + hrc.data = &srm[1]; + hrc.data_size = msize; + hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration); + hrc.type = type; + hrc.found = GNUNET_NO; + GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map, + &query, + &handle_reply, + &hrc); + if (GNUNET_NO == hrc.found) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# replies received via cadet dropped"), 1, + GNUNET_NO); + return GNUNET_OK; + } + return GNUNET_OK; +} + + +/** + * Get (or create) a cadet to talk to the given peer. + * + * @param target peer we want to communicate with + */ +static struct CadetHandle * +get_cadet (const struct GNUNET_PeerIdentity *target) +{ + struct CadetHandle *mh; + + mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, + target); + if (NULL != mh) + { + if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task) + { + GNUNET_SCHEDULER_cancel (mh->timeout_task); + mh->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + return mh; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating cadet channel to %s\n", + GNUNET_i2s (target)); + mh = GNUNET_new (struct CadetHandle); + mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, + &reset_cadet_task, + mh); + mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES); + mh->target = *target; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_put (cadet_map, + &mh->target, + mh, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + mh->channel = GNUNET_CADET_channel_create (cadet_handle, + mh, + &mh->target, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + GNUNET_CADET_OPTION_RELIABLE); + GNUNET_assert (mh == + GNUNET_CONTAINER_multipeermap_get (cadet_map, + target)); + return mh; +} + + +/** + * Look for a block by directly contacting a particular peer. + * + * @param target peer that should have the block + * @param query hash to query for the block + * @param type desired type for the block + * @param proc function to call with result + * @param proc_cls closure for @a proc + * @return handle to cancel the operation + */ +struct GSF_CadetRequest * +GSF_cadet_query (const struct GNUNET_PeerIdentity *target, + const struct GNUNET_HashCode *query, + enum GNUNET_BLOCK_Type type, + GSF_CadetReplyProcessor proc, void *proc_cls) +{ + struct CadetHandle *mh; + struct GSF_CadetRequest *sr; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Preparing to send query for %s via cadet to %s\n", + GNUNET_h2s (query), + GNUNET_i2s (target)); + mh = get_cadet (target); + sr = GNUNET_new (struct GSF_CadetRequest); + sr->mh = mh; + sr->proc = proc; + sr->proc_cls = proc_cls; + sr->type = type; + sr->query = *query; + GNUNET_CONTAINER_DLL_insert (mh->pending_head, + mh->pending_tail, + sr); + transmit_pending (mh); + return sr; +} + + +/** + * Cancel an active request; must not be called after 'proc' + * was calld. + * + * @param sr request to cancel + */ +void +GSF_cadet_query_cancel (struct GSF_CadetRequest *sr) +{ + struct CadetHandle *mh = sr->mh; + GSF_CadetReplyProcessor p; + + p = sr->proc; + sr->proc = NULL; + if (NULL != p) + { + /* signal failure / cancellation to callback */ + p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, + GNUNET_TIME_UNIT_ZERO_ABS, + 0, NULL); + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cancelled query for %s via cadet to %s\n", + GNUNET_h2s (&sr->query), + GNUNET_i2s (&sr->mh->target)); + if (GNUNET_YES == sr->was_transmitted) + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, + &sr->query, + sr)); + else + GNUNET_CONTAINER_DLL_remove (mh->pending_head, + mh->pending_tail, + sr); + GNUNET_free (sr); + if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) && + (NULL == mh->pending_head) ) + mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &cadet_timeout, + mh); +} + + +/** + * Iterator called on each entry in a waiting map to + * call the 'proc' continuation and release associated + * resources. + * + * @param cls the `struct CadetHandle` + * @param key the key of the entry in the map (the query) + * @param value the `struct GSF_CadetRequest` to clean up + * @return #GNUNET_YES (continue to iterate) + */ +static int +free_waiting_entry (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GSF_CadetRequest *sr = value; + + GSF_cadet_query_cancel (sr); + return GNUNET_YES; +} + + +/** + * Function called by cadet when a client disconnects. + * Cleans up our `struct CadetClient` of that channel. + * + * @param cls NULL + * @param channel channel of the disconnecting client + * @param channel_ctx our `struct CadetClient` + */ +static void +cleaner_cb (void *cls, + const struct GNUNET_CADET_Channel *channel, + void *channel_ctx) +{ + struct CadetHandle *mh = channel_ctx; + struct GSF_CadetRequest *sr; + + if (NULL == mh->channel) + return; /* being destroyed elsewhere */ + GNUNET_assert (channel == mh->channel); + mh->channel = NULL; + while (NULL != (sr = mh->pending_head)) + GSF_cadet_query_cancel (sr); + /* first remove `mh` from the `cadet_map`, so that if the + callback from `free_waiting_entry()` happens to re-issue + the request, we don't immediately have it back in the + `waiting_map`. */ + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_remove (cadet_map, + &mh->target, + mh)); + GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, + &free_waiting_entry, + mh); + if (NULL != mh->wh) + GNUNET_CADET_notify_transmit_ready_cancel (mh->wh); + if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task) + GNUNET_SCHEDULER_cancel (mh->timeout_task); + if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task) + GNUNET_SCHEDULER_cancel (mh->reset_task); + GNUNET_assert (0 == + GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)); + GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map); + GNUNET_free (mh); +} + + +/** + * Initialize subsystem for non-anonymous file-sharing. + */ +void +GSF_cadet_start_client () +{ + static const struct GNUNET_CADET_MessageHandler handlers[] = { + { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 }, + { NULL, 0, 0 } + }; + + cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES); + cadet_handle = GNUNET_CADET_connect (GSF_cfg, + NULL, + NULL, + &cleaner_cb, + handlers, + NULL); +} + + +/** + * Function called on each active cadets to shut them down. + * + * @param cls NULL + * @param key target peer, unused + * @param value the `struct CadetHandle` to destroy + * @return #GNUNET_YES (continue to iterate) + */ +static int +release_cadets (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) +{ + struct CadetHandle *mh = value; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout on cadet channel to %s\n", + GNUNET_i2s (&mh->target)); + if (NULL != mh->channel) + GNUNET_CADET_channel_destroy (mh->channel); + return GNUNET_YES; +} + + +/** + * Shutdown subsystem for non-anonymous file-sharing. + */ +void +GSF_cadet_stop_client () +{ + GNUNET_CONTAINER_multipeermap_iterate (cadet_map, + &release_cadets, + NULL); + GNUNET_CONTAINER_multipeermap_destroy (cadet_map); + cadet_map = NULL; + if (NULL != cadet_handle) + { + GNUNET_CADET_disconnect (cadet_handle); + cadet_handle = NULL; + } +} + + +/* end of gnunet-service-fs_cadet_client.c */ diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c new file mode 100644 index 000000000..77add757d --- /dev/null +++ b/src/fs/gnunet-service-fs_cadet_server.c @@ -0,0 +1,595 @@ +/* + This file is part of GNUnet. + (C) 2012, 2013 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_cadet_server.c + * @brief non-anonymous file-transfer + * @author Christian Grothoff + * + * TODO: + * - PORT is set to old application type, unsure if we should keep + * it that way (fine for now) + */ +#include "platform.h" +#include "gnunet_constants.h" +#include "gnunet_util_lib.h" +#include "gnunet_cadet_service.h" +#include "gnunet_protocols.h" +#include "gnunet_applications.h" +#include "gnunet-service-fs.h" +#include "gnunet-service-fs_indexing.h" +#include "gnunet-service-fs_cadet.h" + +/** + * After how long do we termiante idle connections? + */ +#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2) + + +/** + * A message in the queue to be written to the cadet. + */ +struct WriteQueueItem +{ + /** + * Kept in a DLL. + */ + struct WriteQueueItem *next; + + /** + * Kept in a DLL. + */ + struct WriteQueueItem *prev; + + /** + * Number of bytes of payload, allocated at the end of this struct. + */ + size_t msize; +}; + + +/** + * Information we keep around for each active cadeting client. + */ +struct CadetClient +{ + /** + * DLL + */ + struct CadetClient *next; + + /** + * DLL + */ + struct CadetClient *prev; + + /** + * Channel for communication. + */ + struct GNUNET_CADET_Channel *channel; + + /** + * Handle for active write operation, or NULL. + */ + struct GNUNET_CADET_TransmitHandle *wh; + + /** + * Head of write queue. + */ + struct WriteQueueItem *wqi_head; + + /** + * Tail of write queue. + */ + struct WriteQueueItem *wqi_tail; + + /** + * Current active request to the datastore, if we have one pending. + */ + struct GNUNET_DATASTORE_QueueEntry *qe; + + /** + * Task that is scheduled to asynchronously terminate the connection. + */ + GNUNET_SCHEDULER_TaskIdentifier terminate_task; + + /** + * Task that is scheduled to terminate idle connections. + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Size of the last write that was initiated. + */ + size_t reply_size; + +}; + + +/** + * Listen channel for incoming requests. + */ +static struct GNUNET_CADET_Handle *listen_channel; + +/** + * Head of DLL of cadet clients. + */ +static struct CadetClient *sc_head; + +/** + * Tail of DLL of cadet clients. + */ +static struct CadetClient *sc_tail; + +/** + * Number of active cadet clients in the 'sc_*'-DLL. + */ +static unsigned int sc_count; + +/** + * Maximum allowed number of cadet clients. + */ +static unsigned long long sc_count_max; + + + +/** + * Task run to asynchronously terminate the cadet due to timeout. + * + * @param cls the 'struct CadetClient' + * @param tc scheduler context + */ +static void +timeout_cadet_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct CadetClient *sc = cls; + struct GNUNET_CADET_Channel *tun; + + sc->timeout_task = GNUNET_SCHEDULER_NO_TASK; + tun = sc->channel; + sc->channel = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout for inactive cadet client %p\n", + sc); + GNUNET_CADET_channel_destroy (tun); +} + + +/** + * Reset the timeout for the cadet client (due to activity). + * + * @param sc client handle to reset timeout for + */ +static void +refresh_timeout_task (struct CadetClient *sc) +{ + if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task) + GNUNET_SCHEDULER_cancel (sc->timeout_task); + sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT, + &timeout_cadet_task, + sc); +} + + +/** + * We're done handling a request from a client, read the next one. + * + * @param sc client to continue reading requests from + */ +static void +continue_reading (struct CadetClient *sc) +{ + refresh_timeout_task (sc); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Finished processing cadet request from client %p, ready to receive the next one\n", + sc); + GNUNET_CADET_receive_done (sc->channel); +} + + +/** + * Transmit the next entry from the write queue. + * + * @param sc where to process the write queue + */ +static void +continue_writing (struct CadetClient *sc); + + +/** + * Send a reply now, cadet is ready. + * + * @param cls closure with the `struct CadetClient` which sent the query + * @param size number of bytes available in @a buf + * @param buf where to write the message + * @return number of bytes written to @a buf + */ +static size_t +write_continuation (void *cls, + size_t size, + void *buf) +{ + struct CadetClient *sc = cls; + struct GNUNET_CADET_Channel *tun; + struct WriteQueueItem *wqi; + size_t ret; + + sc->wh = NULL; + if (NULL == (wqi = sc->wqi_head)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Write queue empty, reading more requests\n"); + return 0; + } + if ( (0 == size) || + (size < wqi->msize) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmission of reply failed, terminating cadet\n"); + tun = sc->channel; + sc->channel = NULL; + GNUNET_CADET_channel_destroy (tun); + return 0; + } + GNUNET_CONTAINER_DLL_remove (sc->wqi_head, + sc->wqi_tail, + wqi); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmitted %u byte reply via cadet to %p\n", + (unsigned int) size, + sc); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Blocks transferred via cadet"), 1, + GNUNET_NO); + memcpy (buf, &wqi[1], ret = wqi->msize); + GNUNET_free (wqi); + continue_writing (sc); + return ret; +} + + +/** + * Transmit the next entry from the write queue. + * + * @param sc where to process the write queue + */ +static void +continue_writing (struct CadetClient *sc) +{ + struct WriteQueueItem *wqi; + struct GNUNET_CADET_Channel *tun; + + if (NULL != sc->wh) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Write pending, waiting for it to complete\n"); + return; /* write already pending */ + } + if (NULL == (wqi = sc->wqi_head)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Write queue empty, reading more requests\n"); + continue_reading (sc); + return; + } + sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO, + GNUNET_TIME_UNIT_FOREVER_REL, + wqi->msize, + &write_continuation, + sc); + if (NULL == sc->wh) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Write failed; terminating cadet\n"); + tun = sc->channel; + sc->channel = NULL; + GNUNET_CADET_channel_destroy (tun); + return; + } +} + + +/** + * Process a datum that was stored in the datastore. + * + * @param cls closure with the `struct CadetClient` which sent the query + * @param key key for the content + * @param size number of bytes in @a data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param uid unique identifier for the datum; + * maybe 0 if no unique identifier is available + */ +static void +handle_datastore_reply (void *cls, + const struct GNUNET_HashCode *key, + size_t size, const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) +{ + struct CadetClient *sc = cls; + size_t msize = size + sizeof (struct CadetReplyMessage); + struct WriteQueueItem *wqi; + struct CadetReplyMessage *srm; + + sc->qe = NULL; + if (NULL == data) + { + /* no result, this should not really happen, as for + non-anonymous routing only peers that HAVE the + answers should be queried; OTOH, this is not a + hard error as we might have had the answer in the + past and the user might have unindexed it. Hence + we log at level "INFO" for now. */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Have no answer for query `%s'\n", + GNUNET_h2s (key)); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# queries received via cadet not answered"), 1, + GNUNET_NO); + continue_writing (sc); + return; + } + if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Performing on-demand encoding for query %s\n", + GNUNET_h2s (key)); + if (GNUNET_OK != + GNUNET_FS_handle_on_demand_block (key, + size, data, type, + priority, anonymity, + expiration, uid, + &handle_datastore_reply, + sc)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "On-demand encoding request failed\n"); + continue_writing (sc); + } + return; + } + if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + continue_writing (sc); + return; + } + GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n", + (unsigned int) size, + (unsigned int) type, + GNUNET_h2s (key), + sc); + wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize); + wqi->msize = msize; + srm = (struct CadetReplyMessage *) &wqi[1]; + srm->header.size = htons ((uint16_t) msize); + srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY); + srm->type = htonl (type); + srm->expiration = GNUNET_TIME_absolute_hton (expiration); + memcpy (&srm[1], data, size); + sc->reply_size = msize; + GNUNET_CONTAINER_DLL_insert (sc->wqi_head, + sc->wqi_tail, + wqi); + continue_writing (sc); +} + + +/** + * Functions with this signature are called whenever a + * complete query message is received. + * + * Do not call #GNUNET_SERVER_mst_destroy in callback + * + * @param cls closure with the 'struct CadetClient' + * @param channel channel handle + * @param channel_ctx channel context + * @param message the actual message + * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing + */ +static int +request_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + void **channel_ctx, + const struct GNUNET_MessageHeader *message) +{ + struct CadetClient *sc = *channel_ctx; + const struct CadetQueryMessage *sqm; + + sqm = (const struct CadetQueryMessage *) message; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received query for `%s' via cadet from client %p\n", + GNUNET_h2s (&sqm->query), + sc); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# queries received via cadet"), 1, + GNUNET_NO); + refresh_timeout_task (sc); + sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, + 0, + &sqm->query, + ntohl (sqm->type), + 0 /* priority */, + GSF_datastore_queue_size, + GNUNET_TIME_UNIT_FOREVER_REL, + &handle_datastore_reply, sc); + if (NULL == sc->qe) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queueing request with datastore failed (queue full?)\n"); + continue_writing (sc); + } + return GNUNET_OK; +} + + +/** + * Functions of this type are called upon new cadet connection from other peers. + * + * @param cls the closure from GNUNET_CADET_connect + * @param channel the channel representing the cadet + * @param initiator the identity of the peer who wants to establish a cadet + * with us; NULL on binding error + * @param port cadet port used for the incoming connection + * @param options channel option flags + * @return initial channel context (our 'struct CadetClient') + */ +static void * +accept_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *initiator, + uint32_t port, enum GNUNET_CADET_ChannelOption options) +{ + struct CadetClient *sc; + + GNUNET_assert (NULL != channel); + if (sc_count >= sc_count_max) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# cadet client connections rejected"), 1, + GNUNET_NO); + GNUNET_CADET_channel_destroy (channel); + return NULL; + } + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# cadet connections active"), 1, + GNUNET_NO); + sc = GNUNET_new (struct CadetClient); + sc->channel = channel; + GNUNET_CONTAINER_DLL_insert (sc_head, + sc_tail, + sc); + sc_count++; + refresh_timeout_task (sc); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Accepting inbound cadet connection from `%s' as client %p\n", + GNUNET_i2s (initiator), + sc); + return sc; +} + + +/** + * Function called by cadet when a client disconnects. + * Cleans up our 'struct CadetClient' of that channel. + * + * @param cls NULL + * @param channel channel of the disconnecting client + * @param channel_ctx our 'struct CadetClient' + */ +static void +cleaner_cb (void *cls, + const struct GNUNET_CADET_Channel *channel, + void *channel_ctx) +{ + struct CadetClient *sc = channel_ctx; + struct WriteQueueItem *wqi; + + if (NULL == sc) + return; + sc->channel = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Terminating cadet connection with client %p\n", + sc); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# cadet connections active"), -1, + GNUNET_NO); + if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task) + GNUNET_SCHEDULER_cancel (sc->terminate_task); + if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task) + GNUNET_SCHEDULER_cancel (sc->timeout_task); + if (NULL != sc->wh) + GNUNET_CADET_notify_transmit_ready_cancel (sc->wh); + if (NULL != sc->qe) + GNUNET_DATASTORE_cancel (sc->qe); + while (NULL != (wqi = sc->wqi_head)) + { + GNUNET_CONTAINER_DLL_remove (sc->wqi_head, + sc->wqi_tail, + wqi); + GNUNET_free (wqi); + } + GNUNET_CONTAINER_DLL_remove (sc_head, + sc_tail, + sc); + sc_count--; + GNUNET_free (sc); +} + + +/** + * Initialize subsystem for non-anonymous file-sharing. + */ +void +GSF_cadet_start_server () +{ + static const struct GNUNET_CADET_MessageHandler handlers[] = { + { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)}, + { NULL, 0, 0 } + }; + static const uint32_t ports[] = { + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + 0 + }; + + if (GNUNET_YES != + GNUNET_CONFIGURATION_get_value_number (GSF_cfg, + "fs", + "MAX_CADET_CLIENTS", + &sc_count_max)) + return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Initializing cadet FS server with a limit of %llu connections\n", + sc_count_max); + listen_channel = GNUNET_CADET_connect (GSF_cfg, + NULL, + &accept_cb, + &cleaner_cb, + handlers, + ports); +} + + +/** + * Shutdown subsystem for non-anonymous file-sharing. + */ +void +GSF_cadet_stop_server () +{ + if (NULL != listen_channel) + { + GNUNET_CADET_disconnect (listen_channel); + listen_channel = NULL; + } + GNUNET_assert (NULL == sc_head); + GNUNET_assert (0 == sc_count); +} + +/* end of gnunet-service-fs_cadet.c */ diff --git a/src/fs/gnunet-service-fs_mesh.h b/src/fs/gnunet-service-fs_mesh.h deleted file mode 100644 index ddf367668..000000000 --- a/src/fs/gnunet-service-fs_mesh.h +++ /dev/null @@ -1,159 +0,0 @@ -/* - This file is part of GNUnet. - (C) 2012 Christian Grothoff (and other contributing authors) - - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. - - GNUnet is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. -*/ - -/** - * @file fs/gnunet-service-fs_cadet.h - * @brief non-anonymous file-transfer - * @author Christian Grothoff - */ -#ifndef GNUNET_SERVICE_FS_CADET_H -#define GNUNET_SERVICE_FS_CADET_H - -/** - * Handle for a request that is going out via cadet API. - */ -struct GSF_CadetRequest; - - -/** - * Function called with a reply from the cadet. - * - * @param cls closure - * @param type type of the block, ANY on error - * @param expiration expiration time for the block - * @param data_size number of bytes in 'data', 0 on error - * @param data reply block data, NULL on error - */ -typedef void (*GSF_CadetReplyProcessor)(void *cls, - enum GNUNET_BLOCK_Type type, - struct GNUNET_TIME_Absolute expiration, - size_t data_size, - const void *data); - - -/** - * Look for a block by directly contacting a particular peer. - * - * @param target peer that should have the block - * @param query hash to query for the block - * @param type desired type for the block - * @param proc function to call with result - * @param proc_cls closure for 'proc' - * @return handle to cancel the operation - */ -struct GSF_CadetRequest * -GSF_cadet_query (const struct GNUNET_PeerIdentity *target, - const struct GNUNET_HashCode *query, - enum GNUNET_BLOCK_Type type, - GSF_CadetReplyProcessor proc, void *proc_cls); - - -/** - * Cancel an active request; must not be called after 'proc' - * was calld. - * - * @param sr request to cancel - */ -void -GSF_cadet_query_cancel (struct GSF_CadetRequest *sr); - - -/** - * Initialize subsystem for non-anonymous file-sharing. - */ -void -GSF_cadet_start_server (void); - - -/** - * Shutdown subsystem for non-anonymous file-sharing. - */ -void -GSF_cadet_stop_server (void); - -/** - * Initialize subsystem for non-anonymous file-sharing. - */ -void -GSF_cadet_start_client (void); - - -/** - * Shutdown subsystem for non-anonymous file-sharing. - */ -void -GSF_cadet_stop_client (void); - - -GNUNET_NETWORK_STRUCT_BEGIN - -/** - * Query from one peer, asking the other for CHK-data. - */ -struct CadetQueryMessage -{ - - /** - * Type is GNUNET_MESSAGE_TYPE_FS_CADET_QUERY. - */ - struct GNUNET_MessageHeader header; - - /** - * Block type must be DBLOCK or IBLOCK. - */ - uint32_t type GNUNET_PACKED; - - /** - * Query hash from CHK (hash of encrypted block). - */ - struct GNUNET_HashCode query; - -}; - - -/** - * Reply to a CadetQueryMessage. - */ -struct CadetReplyMessage -{ - - /** - * Type is GNUNET_MESSAGE_TYPE_FS_CADET_REPLY. - */ - struct GNUNET_MessageHeader header; - - /** - * Block type must be DBLOCK or IBLOCK. - */ - uint32_t type GNUNET_PACKED; - - /** - * Expiration time for the block. - */ - struct GNUNET_TIME_AbsoluteNBO expiration; - - /* followed by the encrypted block */ - -}; - -GNUNET_NETWORK_STRUCT_END - - -#endif diff --git a/src/fs/gnunet-service-fs_mesh_client.c b/src/fs/gnunet-service-fs_mesh_client.c deleted file mode 100644 index dde7aba48..000000000 --- a/src/fs/gnunet-service-fs_mesh_client.c +++ /dev/null @@ -1,766 +0,0 @@ -/* - This file is part of GNUnet. - (C) 2012, 2013 Christian Grothoff (and other contributing authors) - - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. - - GNUnet is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. -*/ - -/** - * @file fs/gnunet-service-fs_cadet_client.c - * @brief non-anonymous file-transfer - * @author Christian Grothoff - * - * TODO: - * - PORT is set to old application type, unsure if we should keep - * it that way (fine for now) - */ -#include "platform.h" -#include "gnunet_constants.h" -#include "gnunet_util_lib.h" -#include "gnunet_cadet_service.h" -#include "gnunet_protocols.h" -#include "gnunet_applications.h" -#include "gnunet-service-fs.h" -#include "gnunet-service-fs_indexing.h" -#include "gnunet-service-fs_cadet.h" - - -/** - * After how long do we reset connections without replies? - */ -#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) - - -/** - * Handle for a cadet to another peer. - */ -struct CadetHandle; - - -/** - * Handle for a request that is going out via cadet API. - */ -struct GSF_CadetRequest -{ - - /** - * DLL. - */ - struct GSF_CadetRequest *next; - - /** - * DLL. - */ - struct GSF_CadetRequest *prev; - - /** - * Which cadet is this request associated with? - */ - struct CadetHandle *mh; - - /** - * Function to call with the result. - */ - GSF_CadetReplyProcessor proc; - - /** - * Closure for 'proc' - */ - void *proc_cls; - - /** - * Query to transmit to the other peer. - */ - struct GNUNET_HashCode query; - - /** - * Desired type for the reply. - */ - enum GNUNET_BLOCK_Type type; - - /** - * Did we transmit this request already? #GNUNET_YES if we are - * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL. - */ - int was_transmitted; -}; - - -/** - * Handle for a cadet to another peer. - */ -struct CadetHandle -{ - /** - * Head of DLL of pending requests on this cadet. - */ - struct GSF_CadetRequest *pending_head; - - /** - * Tail of DLL of pending requests on this cadet. - */ - struct GSF_CadetRequest *pending_tail; - - /** - * Map from query to `struct GSF_CadetRequest`s waiting for - * a reply. - */ - struct GNUNET_CONTAINER_MultiHashMap *waiting_map; - - /** - * Channel to the other peer. - */ - struct GNUNET_CADET_Channel *channel; - - /** - * Handle for active write operation, or NULL. - */ - struct GNUNET_CADET_TransmitHandle *wh; - - /** - * Which peer does this cadet go to? - */ - struct GNUNET_PeerIdentity target; - - /** - * Task to kill inactive cadets (we keep them around for - * a few seconds to give the application a chance to give - * us another query). - */ - GNUNET_SCHEDULER_TaskIdentifier timeout_task; - - /** - * Task to reset cadets that had errors (asynchronously, - * as we may not be able to do it immediately during a - * callback from the cadet API). - */ - GNUNET_SCHEDULER_TaskIdentifier reset_task; - -}; - - -/** - * Cadet channel for creating outbound channels. - */ -static struct GNUNET_CADET_Handle *cadet_handle; - -/** - * Map from peer identities to 'struct CadetHandles' with cadet - * channels to those peers. - */ -static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map; - - -/* ********************* client-side code ************************* */ - - -/** - * Transmit pending requests via the cadet. - * - * @param mh cadet to process - */ -static void -transmit_pending (struct CadetHandle *mh); - - -/** - * Iterator called on each entry in a waiting map to - * move it back to the pending list. - * - * @param cls the `struct CadetHandle` - * @param key the key of the entry in the map (the query) - * @param value the `struct GSF_CadetRequest` to move to pending - * @return #GNUNET_YES (continue to iterate) - */ -static int -move_to_pending (void *cls, - const struct GNUNET_HashCode *key, - void *value) -{ - struct CadetHandle *mh = cls; - struct GSF_CadetRequest *sr = value; - - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, - key, - value)); - GNUNET_CONTAINER_DLL_insert (mh->pending_head, - mh->pending_tail, - sr); - sr->was_transmitted = GNUNET_NO; - return GNUNET_YES; -} - - -/** - * We had a serious error, tear down and re-create cadet from scratch. - * - * @param mh cadet to reset - */ -static void -reset_cadet (struct CadetHandle *mh) -{ - struct GNUNET_CADET_Channel *channel = mh->channel; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Resetting cadet channel to %s\n", - GNUNET_i2s (&mh->target)); - mh->channel = NULL; - if (NULL != channel) - GNUNET_CADET_channel_destroy (channel); - GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, - &move_to_pending, - mh); - mh->channel = GNUNET_CADET_channel_create (cadet_handle, - mh, - &mh->target, - GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, - GNUNET_CADET_OPTION_RELIABLE); - transmit_pending (mh); -} - - -/** - * Task called when it is time to destroy an inactive cadet channel. - * - * @param cls the `struct CadetHandle` to tear down - * @param tc scheduler context, unused - */ -static void -cadet_timeout (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct CadetHandle *mh = cls; - struct GNUNET_CADET_Channel *tun; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Timeout on cadet channel to %s\n", - GNUNET_i2s (&mh->target)); - mh->timeout_task = GNUNET_SCHEDULER_NO_TASK; - tun = mh->channel; - mh->channel = NULL; - GNUNET_CADET_channel_destroy (tun); -} - - -/** - * Task called when it is time to reset an cadet. - * - * @param cls the `struct CadetHandle` to tear down - * @param tc scheduler context, unused - */ -static void -reset_cadet_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct CadetHandle *mh = cls; - - mh->reset_task = GNUNET_SCHEDULER_NO_TASK; - reset_cadet (mh); -} - - -/** - * We had a serious error, tear down and re-create cadet from scratch, - * but do so asynchronously. - * - * @param mh cadet to reset - */ -static void -reset_cadet_async (struct CadetHandle *mh) -{ - if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task) - GNUNET_SCHEDULER_cancel (mh->reset_task); - mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, - mh); -} - - -/** - * Functions of this signature are called whenever we are ready to transmit - * query via a cadet. - * - * @param cls the struct CadetHandle for which we did the write call - * @param size the number of bytes that can be written to @a buf - * @param buf where to write the message - * @return number of bytes written to @a buf - */ -static size_t -transmit_sqm (void *cls, - size_t size, - void *buf) -{ - struct CadetHandle *mh = cls; - struct CadetQueryMessage sqm; - struct GSF_CadetRequest *sr; - - mh->wh = NULL; - if (NULL == buf) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Cadet channel to %s failed during transmission attempt, rebuilding\n", - GNUNET_i2s (&mh->target)); - reset_cadet_async (mh); - return 0; - } - sr = mh->pending_head; - if (NULL == sr) - return 0; - GNUNET_assert (size >= sizeof (struct CadetQueryMessage)); - GNUNET_CONTAINER_DLL_remove (mh->pending_head, - mh->pending_tail, - sr); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (mh->waiting_map, - &sr->query, - sr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - sr->was_transmitted = GNUNET_YES; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending query for %s via cadet to %s\n", - GNUNET_h2s (&sr->query), - GNUNET_i2s (&mh->target)); - sqm.header.size = htons (sizeof (sqm)); - sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); - sqm.type = htonl (sr->type); - sqm.query = sr->query; - memcpy (buf, &sqm, sizeof (sqm)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Successfully transmitted %u bytes via cadet to %s\n", - (unsigned int) size, - GNUNET_i2s (&mh->target)); - transmit_pending (mh); - return sizeof (sqm); -} - - -/** - * Transmit pending requests via the cadet. - * - * @param mh cadet to process - */ -static void -transmit_pending (struct CadetHandle *mh) -{ - if (NULL == mh->channel) - return; - if (NULL != mh->wh) - return; - mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /* allow cork */, - GNUNET_TIME_UNIT_FOREVER_REL, - sizeof (struct CadetQueryMessage), - &transmit_sqm, mh); -} - - -/** - * Closure for handle_reply(). - */ -struct HandleReplyClosure -{ - - /** - * Reply payload. - */ - const void *data; - - /** - * Expiration time for the block. - */ - struct GNUNET_TIME_Absolute expiration; - - /** - * Number of bytes in 'data'. - */ - size_t data_size; - - /** - * Type of the block. - */ - enum GNUNET_BLOCK_Type type; - - /** - * Did we have a matching query? - */ - int found; -}; - - -/** - * Iterator called on each entry in a waiting map to - * process a result. - * - * @param cls the `struct HandleReplyClosure` - * @param key the key of the entry in the map (the query) - * @param value the `struct GSF_CadetRequest` to handle result for - * @return #GNUNET_YES (continue to iterate) - */ -static int -handle_reply (void *cls, - const struct GNUNET_HashCode *key, - void *value) -{ - struct HandleReplyClosure *hrc = cls; - struct GSF_CadetRequest *sr = value; - - sr->proc (sr->proc_cls, - hrc->type, - hrc->expiration, - hrc->data_size, - hrc->data); - sr->proc = NULL; - GSF_cadet_query_cancel (sr); - hrc->found = GNUNET_YES; - return GNUNET_YES; -} - - -/** - * Functions with this signature are called whenever a complete reply - * is received. - * - * @param cls closure with the `struct CadetHandle` - * @param channel channel handle - * @param channel_ctx channel context - * @param message the actual message - * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing - */ -static int -reply_cb (void *cls, - struct GNUNET_CADET_Channel *channel, - void **channel_ctx, - const struct GNUNET_MessageHeader *message) -{ - struct CadetHandle *mh = *channel_ctx; - const struct CadetReplyMessage *srm; - struct HandleReplyClosure hrc; - uint16_t msize; - enum GNUNET_BLOCK_Type type; - struct GNUNET_HashCode query; - - msize = ntohs (message->size); - if (sizeof (struct CadetReplyMessage) > msize) - { - GNUNET_break_op (0); - reset_cadet_async (mh); - return GNUNET_SYSERR; - } - srm = (const struct CadetReplyMessage *) message; - msize -= sizeof (struct CadetReplyMessage); - type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); - if (GNUNET_YES != - GNUNET_BLOCK_get_key (GSF_block_ctx, - type, - &srm[1], msize, &query)) - { - GNUNET_break_op (0); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Received bogus reply of type %u with %u bytes via cadet from peer %s\n", - type, - msize, - GNUNET_i2s (&mh->target)); - reset_cadet_async (mh); - return GNUNET_SYSERR; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received reply `%s' via cadet from peer %s\n", - GNUNET_h2s (&query), - GNUNET_i2s (&mh->target)); - GNUNET_CADET_receive_done (channel); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# replies received via cadet"), 1, - GNUNET_NO); - hrc.data = &srm[1]; - hrc.data_size = msize; - hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration); - hrc.type = type; - hrc.found = GNUNET_NO; - GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map, - &query, - &handle_reply, - &hrc); - if (GNUNET_NO == hrc.found) - { - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# replies received via cadet dropped"), 1, - GNUNET_NO); - return GNUNET_OK; - } - return GNUNET_OK; -} - - -/** - * Get (or create) a cadet to talk to the given peer. - * - * @param target peer we want to communicate with - */ -static struct CadetHandle * -get_cadet (const struct GNUNET_PeerIdentity *target) -{ - struct CadetHandle *mh; - - mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, - target); - if (NULL != mh) - { - if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task) - { - GNUNET_SCHEDULER_cancel (mh->timeout_task); - mh->timeout_task = GNUNET_SCHEDULER_NO_TASK; - } - return mh; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating cadet channel to %s\n", - GNUNET_i2s (target)); - mh = GNUNET_new (struct CadetHandle); - mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, - &reset_cadet_task, - mh); - mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES); - mh->target = *target; - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_put (cadet_map, - &mh->target, - mh, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - mh->channel = GNUNET_CADET_channel_create (cadet_handle, - mh, - &mh->target, - GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, - GNUNET_CADET_OPTION_RELIABLE); - GNUNET_assert (mh == - GNUNET_CONTAINER_multipeermap_get (cadet_map, - target)); - return mh; -} - - -/** - * Look for a block by directly contacting a particular peer. - * - * @param target peer that should have the block - * @param query hash to query for the block - * @param type desired type for the block - * @param proc function to call with result - * @param proc_cls closure for @a proc - * @return handle to cancel the operation - */ -struct GSF_CadetRequest * -GSF_cadet_query (const struct GNUNET_PeerIdentity *target, - const struct GNUNET_HashCode *query, - enum GNUNET_BLOCK_Type type, - GSF_CadetReplyProcessor proc, void *proc_cls) -{ - struct CadetHandle *mh; - struct GSF_CadetRequest *sr; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Preparing to send query for %s via cadet to %s\n", - GNUNET_h2s (query), - GNUNET_i2s (target)); - mh = get_cadet (target); - sr = GNUNET_new (struct GSF_CadetRequest); - sr->mh = mh; - sr->proc = proc; - sr->proc_cls = proc_cls; - sr->type = type; - sr->query = *query; - GNUNET_CONTAINER_DLL_insert (mh->pending_head, - mh->pending_tail, - sr); - transmit_pending (mh); - return sr; -} - - -/** - * Cancel an active request; must not be called after 'proc' - * was calld. - * - * @param sr request to cancel - */ -void -GSF_cadet_query_cancel (struct GSF_CadetRequest *sr) -{ - struct CadetHandle *mh = sr->mh; - GSF_CadetReplyProcessor p; - - p = sr->proc; - sr->proc = NULL; - if (NULL != p) - { - /* signal failure / cancellation to callback */ - p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, - GNUNET_TIME_UNIT_ZERO_ABS, - 0, NULL); - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Cancelled query for %s via cadet to %s\n", - GNUNET_h2s (&sr->query), - GNUNET_i2s (&sr->mh->target)); - if (GNUNET_YES == sr->was_transmitted) - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, - &sr->query, - sr)); - else - GNUNET_CONTAINER_DLL_remove (mh->pending_head, - mh->pending_tail, - sr); - GNUNET_free (sr); - if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) && - (NULL == mh->pending_head) ) - mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, - &cadet_timeout, - mh); -} - - -/** - * Iterator called on each entry in a waiting map to - * call the 'proc' continuation and release associated - * resources. - * - * @param cls the `struct CadetHandle` - * @param key the key of the entry in the map (the query) - * @param value the `struct GSF_CadetRequest` to clean up - * @return #GNUNET_YES (continue to iterate) - */ -static int -free_waiting_entry (void *cls, - const struct GNUNET_HashCode *key, - void *value) -{ - struct GSF_CadetRequest *sr = value; - - GSF_cadet_query_cancel (sr); - return GNUNET_YES; -} - - -/** - * Function called by cadet when a client disconnects. - * Cleans up our `struct CadetClient` of that channel. - * - * @param cls NULL - * @param channel channel of the disconnecting client - * @param channel_ctx our `struct CadetClient` - */ -static void -cleaner_cb (void *cls, - const struct GNUNET_CADET_Channel *channel, - void *channel_ctx) -{ - struct CadetHandle *mh = channel_ctx; - struct GSF_CadetRequest *sr; - - if (NULL == mh->channel) - return; /* being destroyed elsewhere */ - GNUNET_assert (channel == mh->channel); - mh->channel = NULL; - while (NULL != (sr = mh->pending_head)) - GSF_cadet_query_cancel (sr); - /* first remove `mh` from the `cadet_map`, so that if the - callback from `free_waiting_entry()` happens to re-issue - the request, we don't immediately have it back in the - `waiting_map`. */ - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_remove (cadet_map, - &mh->target, - mh)); - GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, - &free_waiting_entry, - mh); - if (NULL != mh->wh) - GNUNET_CADET_notify_transmit_ready_cancel (mh->wh); - if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task) - GNUNET_SCHEDULER_cancel (mh->timeout_task); - if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task) - GNUNET_SCHEDULER_cancel (mh->reset_task); - GNUNET_assert (0 == - GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)); - GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map); - GNUNET_free (mh); -} - - -/** - * Initialize subsystem for non-anonymous file-sharing. - */ -void -GSF_cadet_start_client () -{ - static const struct GNUNET_CADET_MessageHandler handlers[] = { - { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 }, - { NULL, 0, 0 } - }; - - cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES); - cadet_handle = GNUNET_CADET_connect (GSF_cfg, - NULL, - NULL, - &cleaner_cb, - handlers, - NULL); -} - - -/** - * Function called on each active cadets to shut them down. - * - * @param cls NULL - * @param key target peer, unused - * @param value the `struct CadetHandle` to destroy - * @return #GNUNET_YES (continue to iterate) - */ -static int -release_cadets (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) -{ - struct CadetHandle *mh = value; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Timeout on cadet channel to %s\n", - GNUNET_i2s (&mh->target)); - if (NULL != mh->channel) - GNUNET_CADET_channel_destroy (mh->channel); - return GNUNET_YES; -} - - -/** - * Shutdown subsystem for non-anonymous file-sharing. - */ -void -GSF_cadet_stop_client () -{ - GNUNET_CONTAINER_multipeermap_iterate (cadet_map, - &release_cadets, - NULL); - GNUNET_CONTAINER_multipeermap_destroy (cadet_map); - cadet_map = NULL; - if (NULL != cadet_handle) - { - GNUNET_CADET_disconnect (cadet_handle); - cadet_handle = NULL; - } -} - - -/* end of gnunet-service-fs_cadet_client.c */ diff --git a/src/fs/gnunet-service-fs_mesh_server.c b/src/fs/gnunet-service-fs_mesh_server.c deleted file mode 100644 index 77add757d..000000000 --- a/src/fs/gnunet-service-fs_mesh_server.c +++ /dev/null @@ -1,595 +0,0 @@ -/* - This file is part of GNUnet. - (C) 2012, 2013 Christian Grothoff (and other contributing authors) - - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3, or (at your - option) any later version. - - GNUnet is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. -*/ - -/** - * @file fs/gnunet-service-fs_cadet_server.c - * @brief non-anonymous file-transfer - * @author Christian Grothoff - * - * TODO: - * - PORT is set to old application type, unsure if we should keep - * it that way (fine for now) - */ -#include "platform.h" -#include "gnunet_constants.h" -#include "gnunet_util_lib.h" -#include "gnunet_cadet_service.h" -#include "gnunet_protocols.h" -#include "gnunet_applications.h" -#include "gnunet-service-fs.h" -#include "gnunet-service-fs_indexing.h" -#include "gnunet-service-fs_cadet.h" - -/** - * After how long do we termiante idle connections? - */ -#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2) - - -/** - * A message in the queue to be written to the cadet. - */ -struct WriteQueueItem -{ - /** - * Kept in a DLL. - */ - struct WriteQueueItem *next; - - /** - * Kept in a DLL. - */ - struct WriteQueueItem *prev; - - /** - * Number of bytes of payload, allocated at the end of this struct. - */ - size_t msize; -}; - - -/** - * Information we keep around for each active cadeting client. - */ -struct CadetClient -{ - /** - * DLL - */ - struct CadetClient *next; - - /** - * DLL - */ - struct CadetClient *prev; - - /** - * Channel for communication. - */ - struct GNUNET_CADET_Channel *channel; - - /** - * Handle for active write operation, or NULL. - */ - struct GNUNET_CADET_TransmitHandle *wh; - - /** - * Head of write queue. - */ - struct WriteQueueItem *wqi_head; - - /** - * Tail of write queue. - */ - struct WriteQueueItem *wqi_tail; - - /** - * Current active request to the datastore, if we have one pending. - */ - struct GNUNET_DATASTORE_QueueEntry *qe; - - /** - * Task that is scheduled to asynchronously terminate the connection. - */ - GNUNET_SCHEDULER_TaskIdentifier terminate_task; - - /** - * Task that is scheduled to terminate idle connections. - */ - GNUNET_SCHEDULER_TaskIdentifier timeout_task; - - /** - * Size of the last write that was initiated. - */ - size_t reply_size; - -}; - - -/** - * Listen channel for incoming requests. - */ -static struct GNUNET_CADET_Handle *listen_channel; - -/** - * Head of DLL of cadet clients. - */ -static struct CadetClient *sc_head; - -/** - * Tail of DLL of cadet clients. - */ -static struct CadetClient *sc_tail; - -/** - * Number of active cadet clients in the 'sc_*'-DLL. - */ -static unsigned int sc_count; - -/** - * Maximum allowed number of cadet clients. - */ -static unsigned long long sc_count_max; - - - -/** - * Task run to asynchronously terminate the cadet due to timeout. - * - * @param cls the 'struct CadetClient' - * @param tc scheduler context - */ -static void -timeout_cadet_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct CadetClient *sc = cls; - struct GNUNET_CADET_Channel *tun; - - sc->timeout_task = GNUNET_SCHEDULER_NO_TASK; - tun = sc->channel; - sc->channel = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Timeout for inactive cadet client %p\n", - sc); - GNUNET_CADET_channel_destroy (tun); -} - - -/** - * Reset the timeout for the cadet client (due to activity). - * - * @param sc client handle to reset timeout for - */ -static void -refresh_timeout_task (struct CadetClient *sc) -{ - if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task) - GNUNET_SCHEDULER_cancel (sc->timeout_task); - sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT, - &timeout_cadet_task, - sc); -} - - -/** - * We're done handling a request from a client, read the next one. - * - * @param sc client to continue reading requests from - */ -static void -continue_reading (struct CadetClient *sc) -{ - refresh_timeout_task (sc); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Finished processing cadet request from client %p, ready to receive the next one\n", - sc); - GNUNET_CADET_receive_done (sc->channel); -} - - -/** - * Transmit the next entry from the write queue. - * - * @param sc where to process the write queue - */ -static void -continue_writing (struct CadetClient *sc); - - -/** - * Send a reply now, cadet is ready. - * - * @param cls closure with the `struct CadetClient` which sent the query - * @param size number of bytes available in @a buf - * @param buf where to write the message - * @return number of bytes written to @a buf - */ -static size_t -write_continuation (void *cls, - size_t size, - void *buf) -{ - struct CadetClient *sc = cls; - struct GNUNET_CADET_Channel *tun; - struct WriteQueueItem *wqi; - size_t ret; - - sc->wh = NULL; - if (NULL == (wqi = sc->wqi_head)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Write queue empty, reading more requests\n"); - return 0; - } - if ( (0 == size) || - (size < wqi->msize) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission of reply failed, terminating cadet\n"); - tun = sc->channel; - sc->channel = NULL; - GNUNET_CADET_channel_destroy (tun); - return 0; - } - GNUNET_CONTAINER_DLL_remove (sc->wqi_head, - sc->wqi_tail, - wqi); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitted %u byte reply via cadet to %p\n", - (unsigned int) size, - sc); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# Blocks transferred via cadet"), 1, - GNUNET_NO); - memcpy (buf, &wqi[1], ret = wqi->msize); - GNUNET_free (wqi); - continue_writing (sc); - return ret; -} - - -/** - * Transmit the next entry from the write queue. - * - * @param sc where to process the write queue - */ -static void -continue_writing (struct CadetClient *sc) -{ - struct WriteQueueItem *wqi; - struct GNUNET_CADET_Channel *tun; - - if (NULL != sc->wh) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Write pending, waiting for it to complete\n"); - return; /* write already pending */ - } - if (NULL == (wqi = sc->wqi_head)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Write queue empty, reading more requests\n"); - continue_reading (sc); - return; - } - sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO, - GNUNET_TIME_UNIT_FOREVER_REL, - wqi->msize, - &write_continuation, - sc); - if (NULL == sc->wh) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Write failed; terminating cadet\n"); - tun = sc->channel; - sc->channel = NULL; - GNUNET_CADET_channel_destroy (tun); - return; - } -} - - -/** - * Process a datum that was stored in the datastore. - * - * @param cls closure with the `struct CadetClient` which sent the query - * @param key key for the content - * @param size number of bytes in @a data - * @param data content stored - * @param type type of the content - * @param priority priority of the content - * @param anonymity anonymity-level for the content - * @param expiration expiration time for the content - * @param uid unique identifier for the datum; - * maybe 0 if no unique identifier is available - */ -static void -handle_datastore_reply (void *cls, - const struct GNUNET_HashCode *key, - size_t size, const void *data, - enum GNUNET_BLOCK_Type type, - uint32_t priority, - uint32_t anonymity, - struct GNUNET_TIME_Absolute expiration, - uint64_t uid) -{ - struct CadetClient *sc = cls; - size_t msize = size + sizeof (struct CadetReplyMessage); - struct WriteQueueItem *wqi; - struct CadetReplyMessage *srm; - - sc->qe = NULL; - if (NULL == data) - { - /* no result, this should not really happen, as for - non-anonymous routing only peers that HAVE the - answers should be queried; OTOH, this is not a - hard error as we might have had the answer in the - past and the user might have unindexed it. Hence - we log at level "INFO" for now. */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Have no answer for query `%s'\n", - GNUNET_h2s (key)); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# queries received via cadet not answered"), 1, - GNUNET_NO); - continue_writing (sc); - return; - } - if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Performing on-demand encoding for query %s\n", - GNUNET_h2s (key)); - if (GNUNET_OK != - GNUNET_FS_handle_on_demand_block (key, - size, data, type, - priority, anonymity, - expiration, uid, - &handle_datastore_reply, - sc)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "On-demand encoding request failed\n"); - continue_writing (sc); - } - return; - } - if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE) - { - GNUNET_break (0); - continue_writing (sc); - return; - } - GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n", - (unsigned int) size, - (unsigned int) type, - GNUNET_h2s (key), - sc); - wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize); - wqi->msize = msize; - srm = (struct CadetReplyMessage *) &wqi[1]; - srm->header.size = htons ((uint16_t) msize); - srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY); - srm->type = htonl (type); - srm->expiration = GNUNET_TIME_absolute_hton (expiration); - memcpy (&srm[1], data, size); - sc->reply_size = msize; - GNUNET_CONTAINER_DLL_insert (sc->wqi_head, - sc->wqi_tail, - wqi); - continue_writing (sc); -} - - -/** - * Functions with this signature are called whenever a - * complete query message is received. - * - * Do not call #GNUNET_SERVER_mst_destroy in callback - * - * @param cls closure with the 'struct CadetClient' - * @param channel channel handle - * @param channel_ctx channel context - * @param message the actual message - * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing - */ -static int -request_cb (void *cls, - struct GNUNET_CADET_Channel *channel, - void **channel_ctx, - const struct GNUNET_MessageHeader *message) -{ - struct CadetClient *sc = *channel_ctx; - const struct CadetQueryMessage *sqm; - - sqm = (const struct CadetQueryMessage *) message; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received query for `%s' via cadet from client %p\n", - GNUNET_h2s (&sqm->query), - sc); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# queries received via cadet"), 1, - GNUNET_NO); - refresh_timeout_task (sc); - sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, - 0, - &sqm->query, - ntohl (sqm->type), - 0 /* priority */, - GSF_datastore_queue_size, - GNUNET_TIME_UNIT_FOREVER_REL, - &handle_datastore_reply, sc); - if (NULL == sc->qe) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Queueing request with datastore failed (queue full?)\n"); - continue_writing (sc); - } - return GNUNET_OK; -} - - -/** - * Functions of this type are called upon new cadet connection from other peers. - * - * @param cls the closure from GNUNET_CADET_connect - * @param channel the channel representing the cadet - * @param initiator the identity of the peer who wants to establish a cadet - * with us; NULL on binding error - * @param port cadet port used for the incoming connection - * @param options channel option flags - * @return initial channel context (our 'struct CadetClient') - */ -static void * -accept_cb (void *cls, - struct GNUNET_CADET_Channel *channel, - const struct GNUNET_PeerIdentity *initiator, - uint32_t port, enum GNUNET_CADET_ChannelOption options) -{ - struct CadetClient *sc; - - GNUNET_assert (NULL != channel); - if (sc_count >= sc_count_max) - { - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# cadet client connections rejected"), 1, - GNUNET_NO); - GNUNET_CADET_channel_destroy (channel); - return NULL; - } - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# cadet connections active"), 1, - GNUNET_NO); - sc = GNUNET_new (struct CadetClient); - sc->channel = channel; - GNUNET_CONTAINER_DLL_insert (sc_head, - sc_tail, - sc); - sc_count++; - refresh_timeout_task (sc); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Accepting inbound cadet connection from `%s' as client %p\n", - GNUNET_i2s (initiator), - sc); - return sc; -} - - -/** - * Function called by cadet when a client disconnects. - * Cleans up our 'struct CadetClient' of that channel. - * - * @param cls NULL - * @param channel channel of the disconnecting client - * @param channel_ctx our 'struct CadetClient' - */ -static void -cleaner_cb (void *cls, - const struct GNUNET_CADET_Channel *channel, - void *channel_ctx) -{ - struct CadetClient *sc = channel_ctx; - struct WriteQueueItem *wqi; - - if (NULL == sc) - return; - sc->channel = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Terminating cadet connection with client %p\n", - sc); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# cadet connections active"), -1, - GNUNET_NO); - if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task) - GNUNET_SCHEDULER_cancel (sc->terminate_task); - if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task) - GNUNET_SCHEDULER_cancel (sc->timeout_task); - if (NULL != sc->wh) - GNUNET_CADET_notify_transmit_ready_cancel (sc->wh); - if (NULL != sc->qe) - GNUNET_DATASTORE_cancel (sc->qe); - while (NULL != (wqi = sc->wqi_head)) - { - GNUNET_CONTAINER_DLL_remove (sc->wqi_head, - sc->wqi_tail, - wqi); - GNUNET_free (wqi); - } - GNUNET_CONTAINER_DLL_remove (sc_head, - sc_tail, - sc); - sc_count--; - GNUNET_free (sc); -} - - -/** - * Initialize subsystem for non-anonymous file-sharing. - */ -void -GSF_cadet_start_server () -{ - static const struct GNUNET_CADET_MessageHandler handlers[] = { - { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)}, - { NULL, 0, 0 } - }; - static const uint32_t ports[] = { - GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, - 0 - }; - - if (GNUNET_YES != - GNUNET_CONFIGURATION_get_value_number (GSF_cfg, - "fs", - "MAX_CADET_CLIENTS", - &sc_count_max)) - return; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Initializing cadet FS server with a limit of %llu connections\n", - sc_count_max); - listen_channel = GNUNET_CADET_connect (GSF_cfg, - NULL, - &accept_cb, - &cleaner_cb, - handlers, - ports); -} - - -/** - * Shutdown subsystem for non-anonymous file-sharing. - */ -void -GSF_cadet_stop_server () -{ - if (NULL != listen_channel) - { - GNUNET_CADET_disconnect (listen_channel); - listen_channel = NULL; - } - GNUNET_assert (NULL == sc_head); - GNUNET_assert (0 == sc_count); -} - -/* end of gnunet-service-fs_cadet.c */ -- cgit v1.2.3