From 3756b235a8ef6a680a1bc7197c1ce45d778af09c Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 18 Sep 2010 19:10:12 +0000 Subject: plane hacking --- TODO | 7 +- src/block/Makefile.am | 21 ++- src/block/plugin_block_dht.c | 163 ++++++++++++++++++++ src/block/plugin_block_fs.c | 35 +++-- src/block/plugin_block_test.c | 154 ++++++++++++++++++ src/block/test_block.c | 8 +- src/datastore/gnunet-service-datastore.c | 4 +- src/dht/dht_api.c | 6 +- src/fs/Makefile.am | 1 + src/fs/fs.h | 26 ++++ src/fs/fs_download.c | 8 +- src/fs/fs_namespace.c | 4 +- src/fs/fs_publish.c | 6 +- src/fs/fs_search.c | 14 +- src/fs/fs_tree.c | 4 +- src/fs/fs_unindex.c | 2 +- src/fs/gnunet-service-fs.c | 257 +++++++++++++++++++------------ src/fs/gnunet-service-fs_indexing.c | 2 +- src/include/Makefile.am | 1 + src/include/gnunet_block_lib.h | 25 +-- src/include/gnunet_dht_service.h | 17 +- src/include/gnunet_load_lib.h | 106 +++++++++++++ src/include/gnunet_protocols.h | 5 + src/util/Makefile.am | 1 + src/util/load.c | 176 +++++++++++++++++++++ 25 files changed, 888 insertions(+), 165 deletions(-) create mode 100644 src/block/plugin_block_dht.c create mode 100644 src/block/plugin_block_test.c create mode 100644 src/include/gnunet_load_lib.h create mode 100644 src/util/load.c diff --git a/TODO b/TODO index e95522d87..04947882b 100644 --- a/TODO +++ b/TODO @@ -1,12 +1,13 @@ 0.9.0pre2: FS: - - move FS serivce to new block API - - integrate with DHT - measure latencies (core, datastore) => trust economy - refuse content migration message (or solicit?) - FS performance benchmarking + - integrate with DHT (need DHT API to fit block API better first; also, get rid of the continuation!) * DHT: [Nate] - - use new block lib + - use new block lib in service + - provide block-lib compatible API in gnunet_dht_service.h + - eliminate continuations in DHT API (not needed, we have auto-retransmit!) * CORE: - derived key generation [Nils] - Jun 27 11:51:54 core-7670 ERROR Assertion failed at gnunet-service-core.c:3616. diff --git a/src/block/Makefile.am b/src/block/Makefile.am index 2fc0c0081..b37c7705e 100644 --- a/src/block/Makefile.am +++ b/src/block/Makefile.am @@ -13,8 +13,19 @@ endif lib_LTLIBRARIES = libgnunetblock.la plugin_LTLIBRARIES = \ + libgnunet_plugin_block_dht.la \ libgnunet_plugin_block_fs.la \ - libgnunet_plugin_block_template.la + libgnunet_plugin_block_template.la \ + libgnunet_plugin_block_test.la + +libgnunet_plugin_block_dht_la_SOURCES = \ + plugin_block_dht.c +libgnunet_plugin_block_dht_la_LIBADD = \ + $(top_builddir)/src/hello/libgnunethello.la \ + $(top_builddir)/src/block/libgnunetblock.la \ + $(top_builddir)/src/util/libgnunetutil.la +libgnunet_plugin_block_dht_la_LDFLAGS = \ + $(GN_PLUGIN_LDFLAGS) libgnunet_plugin_block_fs_la_SOURCES = \ plugin_block_fs.c @@ -31,6 +42,14 @@ libgnunet_plugin_block_template_la_LIBADD = \ libgnunet_plugin_block_template_la_LDFLAGS = \ $(GN_PLUGIN_LDFLAGS) +libgnunet_plugin_block_test_la_SOURCES = \ + plugin_block_test.c +libgnunet_plugin_block_test_la_LIBADD = \ + $(top_builddir)/src/block/libgnunetblock.la \ + $(top_builddir)/src/util/libgnunetutil.la +libgnunet_plugin_block_test_la_LDFLAGS = \ + $(GN_PLUGIN_LDFLAGS) + libgnunetblock_la_SOURCES = \ block.c plugin_block.h diff --git a/src/block/plugin_block_dht.c b/src/block/plugin_block_dht.c new file mode 100644 index 000000000..8312a69b5 --- /dev/null +++ b/src/block/plugin_block_dht.c @@ -0,0 +1,163 @@ +/* + This file is part of GNUnet + (C) 2010 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file block/plugin_block_dht.c + * @brief block plugin for DHT internals (right now, find-peer requests only); + * other plugins should be used to store "useful" data in the + * DHT (see fs block plugin) + * @author Christian Grothoff + */ + +#include "platform.h" +#include "gnunet_hello_lib.h" +#include "plugin_block.h" + +#define DEBUG_DHT GNUNET_NO + + +/** + * Function called to validate a reply or a request. For + * request evaluation, simply pass "NULL" for the reply_block. + * + * @param cls closure + * @param type block type + * @param query original query (hash) + * @param bf pointer to bloom filter associated with query; possibly updated (!) + * @param bf_mutator mutation value for bf + * @param xquery extrended query data (can be NULL, depending on type) + * @param xquery_size number of bytes in xquery + * @param reply_block response to validate + * @param reply_block_size number of bytes in reply block + * @return characterization of result + */ +static enum GNUNET_BLOCK_EvaluationResult +block_plugin_dht_evaluate (void *cls, + enum GNUNET_BLOCK_Type type, + const GNUNET_HashCode *query, + struct GNUNET_CONTAINER_BloomFilter **bf, + int32_t bf_mutator, + const void *xquery, + size_t xquery_size, + const void *reply_block, + size_t reply_block_size) +{ + if (type != GNUNET_BLOCK_TYPE_DHT_HELLO) + return GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED; + if (xquery_size != 0) + return GNUNET_BLOCK_EVALUATION_REQUEST_INVALID; + if (reply_block_size == 0) + return GNUNET_BLOCK_EVALUATION_REQUEST_VALID; + GNUNET_break (NULL == *bf); + return GNUNET_BLOCK_EVALUATION_OK_LAST; +} + + +/** + * Function called to obtain the key for a block. + * + * @param cls closure + * @param type block type + * @param block block to get the key for + * @param block_size number of bytes in block + * @param key set to the key (query) for the given block + * @return GNUNET_OK on success, GNUNET_SYSERR if type not supported + * (or if extracting a key from a block of this type does not work) + */ +static int +block_plugin_dht_get_key (void *cls, + enum GNUNET_BLOCK_Type type, + const void *block, + size_t block_size, + GNUNET_HashCode *key) +{ + const struct GNUNET_MessageHeader *msg; + const struct GNUNET_HELLO_Message *hello; + struct GNUNET_PeerIdentity *pid; + + if (type != GNUNET_BLOCK_TYPE_DHT_HELLO) + return GNUNET_SYSERR; + if (block_size < sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, + "block-dht", + _("Block not of type %u\n"), + GNUNET_BLOCK_TYPE_DHT_HELLO); + return GNUNET_SYSERR; + } + msg = block; + if (block_size != ntohs (msg->size)) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, + "block-dht", + _("Size mismatch for block\n"), + GNUNET_BLOCK_TYPE_DHT_HELLO); + return GNUNET_SYSERR; + } + hello = block; + pid = (struct GNUNET_PeerIdentity*) key; + if (GNUNET_OK != + GNUNET_HELLO_get_id (hello, + pid)) + { + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, + "block-dht", + _("Block of type %u is malformed\n"), + GNUNET_BLOCK_TYPE_DHT_HELLO); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Entry point for the plugin. + */ +void * +gnunet_plugin_block_dht_init (void *cls) +{ + static enum GNUNET_BLOCK_Type types[] = + { + GNUNET_BLOCK_TYPE_DHT_HELLO, + GNUNET_BLOCK_TYPE_ANY /* end of list */ + }; + struct GNUNET_BLOCK_PluginFunctions *api; + + api = GNUNET_malloc (sizeof (struct GNUNET_BLOCK_PluginFunctions)); + api->evaluate = &block_plugin_dht_evaluate; + api->get_key = &block_plugin_dht_get_key; + api->types = types; + return api; +} + + +/** + * Exit point from the plugin. + */ +void * +gnunet_plugin_block_dht_done (void *cls) +{ + struct GNUNET_TRANSPORT_PluginFunctions *api = cls; + + GNUNET_free (api); + return NULL; +} + +/* end of plugin_block_dht.c */ diff --git a/src/block/plugin_block_fs.c b/src/block/plugin_block_fs.c index 3ad15a0c6..d4e510abe 100644 --- a/src/block/plugin_block_fs.c +++ b/src/block/plugin_block_fs.c @@ -74,8 +74,8 @@ block_plugin_fs_evaluate (void *cls, switch (type) { - case GNUNET_BLOCK_TYPE_DBLOCK: - case GNUNET_BLOCK_TYPE_IBLOCK: + case GNUNET_BLOCK_TYPE_FS_DBLOCK: + case GNUNET_BLOCK_TYPE_FS_IBLOCK: if (xquery_size != 0) { GNUNET_break_op (0); @@ -84,8 +84,8 @@ block_plugin_fs_evaluate (void *cls, if (reply_block == NULL) return GNUNET_BLOCK_EVALUATION_REQUEST_VALID; return GNUNET_BLOCK_EVALUATION_OK_LAST; - case GNUNET_BLOCK_TYPE_KBLOCK: - case GNUNET_BLOCK_TYPE_NBLOCK: + case GNUNET_BLOCK_TYPE_FS_KBLOCK: + case GNUNET_BLOCK_TYPE_FS_NBLOCK: if (xquery_size != 0) { GNUNET_break_op (0); @@ -111,7 +111,7 @@ block_plugin_fs_evaluate (void *cls, } GNUNET_CONTAINER_bloomfilter_add (*bf, &mhash); return GNUNET_BLOCK_EVALUATION_OK_MORE; - case GNUNET_BLOCK_TYPE_SBLOCK: + case GNUNET_BLOCK_TYPE_FS_SBLOCK: if (xquery_size != sizeof (GNUNET_HashCode)) { GNUNET_break_op (0); @@ -133,8 +133,9 @@ block_plugin_fs_evaluate (void *cls, &sh, sizeof (GNUNET_HashCode))) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Reply mismatched in terms of namespace. Discarded.\n")); + GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, + "block-fs", + _("Reply mismatched in terms of namespace. Discarded.\n")); return GNUNET_BLOCK_EVALUATION_RESULT_INVALID; } GNUNET_CRYPTO_hash (reply_block, @@ -185,11 +186,11 @@ block_plugin_fs_get_key (void *cls, switch (type) { - case GNUNET_BLOCK_TYPE_DBLOCK: - case GNUNET_BLOCK_TYPE_IBLOCK: + case GNUNET_BLOCK_TYPE_FS_DBLOCK: + case GNUNET_BLOCK_TYPE_FS_IBLOCK: GNUNET_CRYPTO_hash (block, block_size, key); return GNUNET_OK; - case GNUNET_BLOCK_TYPE_KBLOCK: + case GNUNET_BLOCK_TYPE_FS_KBLOCK: if (block_size < sizeof (struct KBlock)) { GNUNET_break_op (0); @@ -218,7 +219,7 @@ block_plugin_fs_get_key (void *cls, sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), key); return GNUNET_OK; - case GNUNET_BLOCK_TYPE_SBLOCK: + case GNUNET_BLOCK_TYPE_FS_SBLOCK: if (block_size < sizeof (struct SBlock)) { GNUNET_break_op (0); @@ -243,7 +244,7 @@ block_plugin_fs_get_key (void *cls, if (key != NULL) *key = sb->identifier; return GNUNET_OK; - case GNUNET_BLOCK_TYPE_NBLOCK: + case GNUNET_BLOCK_TYPE_FS_NBLOCK: if (block_size < sizeof (struct NBlock)) { GNUNET_break_op (0); @@ -303,11 +304,11 @@ libgnunet_plugin_block_fs_init (void *cls) { static enum GNUNET_BLOCK_Type types[] = { - GNUNET_BLOCK_TYPE_DBLOCK, - GNUNET_BLOCK_TYPE_IBLOCK, - GNUNET_BLOCK_TYPE_KBLOCK, - GNUNET_BLOCK_TYPE_SBLOCK, - GNUNET_BLOCK_TYPE_NBLOCK, + GNUNET_BLOCK_TYPE_FS_DBLOCK, + GNUNET_BLOCK_TYPE_FS_IBLOCK, + GNUNET_BLOCK_TYPE_FS_KBLOCK, + GNUNET_BLOCK_TYPE_FS_SBLOCK, + GNUNET_BLOCK_TYPE_FS_NBLOCK, GNUNET_BLOCK_TYPE_ANY /* end of list */ }; struct GNUNET_BLOCK_PluginFunctions *api; diff --git a/src/block/plugin_block_test.c b/src/block/plugin_block_test.c new file mode 100644 index 000000000..1cf8c701d --- /dev/null +++ b/src/block/plugin_block_test.c @@ -0,0 +1,154 @@ +/* + This file is part of GNUnet + (C) 2010 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file block/plugin_block_test.c + * @brief block plugin to test the DHT as a simple key-value store; + * this plugin simply accepts any (new) response for any key + * @author Christian Grothoff + */ + +#include "platform.h" +#include "plugin_block.h" + +#define DEBUG_TEST GNUNET_NO + + +/** + * Number of bits we set per entry in the bloomfilter. + * Do not change! + */ +#define BLOOMFILTER_K 16 + +/** + * Function called to validate a reply or a request. For + * request evaluation, simply pass "NULL" for the reply_block. + * + * @param cls closure + * @param type block type + * @param query original query (hash) + * @param bf pointer to bloom filter associated with query; possibly updated (!) + * @param bf_mutator mutation value for bf + * @param xquery extrended query data (can be NULL, depending on type) + * @param xquery_size number of bytes in xquery + * @param reply_block response to validate + * @param reply_block_size number of bytes in reply block + * @return characterization of result + */ +static enum GNUNET_BLOCK_EvaluationResult +block_plugin_test_evaluate (void *cls, + enum GNUNET_BLOCK_Type type, + const GNUNET_HashCode *query, + struct GNUNET_CONTAINER_BloomFilter **bf, + int32_t bf_mutator, + const void *xquery, + size_t xquery_size, + const void *reply_block, + size_t reply_block_size) +{ + GNUNET_HashCode chash; + GNUNET_HashCode mhash; + + if (type != GNUNET_BLOCK_TYPE_TEST) + return GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED; + if (xquery_size != 0) + GNUNET_BLOCK_EVALUATION_REQUEST_INVALID; + if (reply_block_size == 0) + GNUNET_BLOCK_EVALUATION_REQUEST_VALID; + + GNUNET_CRYPTO_hash (reply_block, + reply_block_size, + &chash); + GNUNET_BLOCK_mingle_hash (&chash, bf_mutator, &mhash); + if (NULL != *bf) + { + if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (*bf, + &mhash)) + return GNUNET_BLOCK_EVALUATION_OK_DUPLICATE; + } + else + { + *bf = GNUNET_CONTAINER_bloomfilter_init (NULL, + 8, + BLOOMFILTER_K); + } + GNUNET_CONTAINER_bloomfilter_add (*bf, &mhash); + return GNUNET_BLOCK_EVALUATION_OK_MORE; +} + + +/** + * Function called to obtain the key for a block. + * + * @param cls closure + * @param type block type + * @param block block to get the key for + * @param block_size number of bytes in block + * @param key set to the key (query) for the given block + * @return GNUNET_OK on success, GNUNET_SYSERR if type not supported + * (or if extracting a key from a block of this type does not work) + */ +static int +block_plugin_test_get_key (void *cls, + enum GNUNET_BLOCK_Type type, + const void *block, + size_t block_size, + GNUNET_HashCode *key) +{ + /* always fails since there is no fixed relationship between + keys and values for test values */ + return GNUNET_SYSERR; +} + + +/** + * Entry point for the plugin. + */ +void * +gnunet_plugin_block_test_init (void *cls) +{ + static enum GNUNET_BLOCK_Type types[] = + { + GNUNET_BLOCK_TYPE_TEST, + GNUNET_BLOCK_TYPE_ANY /* end of list */ + }; + struct GNUNET_BLOCK_PluginFunctions *api; + + api = GNUNET_malloc (sizeof (struct GNUNET_BLOCK_PluginFunctions)); + api->evaluate = &block_plugin_test_evaluate; + api->get_key = &block_plugin_test_get_key; + api->types = types; + return api; +} + + +/** + * Exit point from the plugin. + */ +void * +gnunet_plugin_block_test_done (void *cls) +{ + struct GNUNET_TRANSPORT_PluginFunctions *api = cls; + + GNUNET_free (api); + return NULL; +} + +/* end of plugin_block_test.c */ diff --git a/src/block/test_block.c b/src/block/test_block.c index 02719e5aa..bb9a1f01b 100644 --- a/src/block/test_block.c +++ b/src/block/test_block.c @@ -38,14 +38,14 @@ test_fs (struct GNUNET_BLOCK_Context *ctx) memset (block, 1, sizeof (block)); if (GNUNET_OK != GNUNET_BLOCK_get_key (ctx, - GNUNET_BLOCK_TYPE_DBLOCK, + GNUNET_BLOCK_TYPE_FS_DBLOCK, block, sizeof (block), &key)) return 1; if (GNUNET_BLOCK_EVALUATION_OK_LAST != GNUNET_BLOCK_evaluate (ctx, - GNUNET_BLOCK_TYPE_DBLOCK, + GNUNET_BLOCK_TYPE_FS_DBLOCK, &key, NULL, 0, NULL, 0, @@ -53,7 +53,7 @@ test_fs (struct GNUNET_BLOCK_Context *ctx) return 2; if (GNUNET_BLOCK_EVALUATION_REQUEST_VALID != GNUNET_BLOCK_evaluate (ctx, - GNUNET_BLOCK_TYPE_DBLOCK, + GNUNET_BLOCK_TYPE_FS_DBLOCK, &key, NULL, 0, NULL, 0, @@ -62,7 +62,7 @@ test_fs (struct GNUNET_BLOCK_Context *ctx) GNUNET_log_skip (1, GNUNET_NO); if (GNUNET_BLOCK_EVALUATION_REQUEST_INVALID != GNUNET_BLOCK_evaluate (ctx, - GNUNET_BLOCK_TYPE_DBLOCK, + GNUNET_BLOCK_TYPE_FS_DBLOCK, &key, NULL, 0, "bogus", 5, diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c index 3e9e2e480..8a896a7fb 100644 --- a/src/datastore/gnunet-service-datastore.c +++ b/src/datastore/gnunet-service-datastore.c @@ -1038,8 +1038,8 @@ check_present (void *cls, GNUNET_free (pc); return GNUNET_SYSERR; } - if ( (GNUNET_BLOCK_TYPE_DBLOCK == type) || - (GNUNET_BLOCK_TYPE_IBLOCK == type) || + if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) || + (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) || ( (size == ntohl(dm->size)) && (0 == memcmp (&dm[1], data, diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 413ecb83a..15faba6c9 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -1132,7 +1132,7 @@ GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle, struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, struct GNUNET_TIME_Relative timeout, - uint32_t type, + enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key, GNUNET_DHT_GetIterator iter, void *iter_cls, @@ -1366,7 +1366,7 @@ GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, void GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, - uint32_t type, + enum GNUNET_BLOCK_Type type, uint32_t size, const char *data, struct GNUNET_TIME_Absolute exp, @@ -1422,3 +1422,5 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, GNUNET_free (put_msg); } + +/* end of dht_api.c */ diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index bcf5f460c..542c776f3 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am @@ -96,6 +96,7 @@ gnunet_service_fs_SOURCES = \ gnunet-service-fs_indexing.c gnunet-service-fs_indexing.h gnunet_service_fs_LDADD = \ $(top_builddir)/src/fs/libgnunetfs.la \ + $(top_builddir)/src/dht/libgnunetdht.la \ $(top_builddir)/src/block/libgnunetblock.la \ $(top_builddir)/src/datastore/libgnunetdatastore.la \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ diff --git a/src/fs/fs.h b/src/fs/fs.h index 605e6c84b..edbde19da 100644 --- a/src/fs/fs.h +++ b/src/fs/fs.h @@ -2222,6 +2222,32 @@ struct PutMessage }; +/** + * Message send by a peer that wants to be excluded + * from migration for a while. + */ +struct MigrationStopMessage +{ + /** + * Message type will be + * GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP. + */ + struct GNUNET_MessageHeader header; + + /** + * Always zero. + */ + uint32_t reserved GNUNET_PACKED; + + /** + * How long should the block last? + */ + struct GNUNET_TIME_RelativeNBO duration; + +}; + + + #endif /* end of fs.h */ diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c index 80758ebc7..847509358 100644 --- a/src/fs/fs_download.c +++ b/src/fs/fs_download.c @@ -304,8 +304,8 @@ encrypt_existing_match (struct GNUNET_FS_DownloadContext *dc, prc.data = enc; prc.size = len; prc.type = (dc->treedepth == depth) - ? GNUNET_BLOCK_TYPE_DBLOCK - : GNUNET_BLOCK_TYPE_IBLOCK; + ? GNUNET_BLOCK_TYPE_FS_DBLOCK + : GNUNET_BLOCK_TYPE_FS_IBLOCK; prc.query = chk->query; prc.do_store = do_store; process_result_with_request (&prc, @@ -1423,9 +1423,9 @@ transmit_download_request (void *cls, else sm->options = htonl (0); if (dc->pending->depth == dc->treedepth) - sm->type = htonl (GNUNET_BLOCK_TYPE_DBLOCK); + sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK); else - sm->type = htonl (GNUNET_BLOCK_TYPE_IBLOCK); + sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK); sm->anonymity_level = htonl (dc->anonymity); sm->target = dc->target.hashPubKey; sm->query = dc->pending->chk.query; diff --git a/src/fs/fs_namespace.c b/src/fs/fs_namespace.c index 73cbc5f9e..5c1137eb7 100644 --- a/src/fs/fs_namespace.c +++ b/src/fs/fs_namespace.c @@ -416,7 +416,7 @@ advertisement_cont (void *cls, &query, ac->pt_size + sizeof (struct NBlock), ac->nb, - GNUNET_BLOCK_TYPE_NBLOCK, + GNUNET_BLOCK_TYPE_FS_NBLOCK, ac->priority, ac->anonymity, ac->expiration, @@ -982,7 +982,7 @@ GNUNET_FS_publish_sks (struct GNUNET_FS_Handle *h, &sb_enc->identifier, size, sb_enc, - GNUNET_BLOCK_TYPE_SBLOCK, + GNUNET_BLOCK_TYPE_FS_SBLOCK, priority, anonymity, expirationTime, diff --git a/src/fs/fs_publish.c b/src/fs/fs_publish.c index 0b0617340..6262dc604 100644 --- a/src/fs/fs_publish.c +++ b/src/fs/fs_publish.c @@ -555,7 +555,7 @@ block_proc (void *cls, dpc_cls->p = p; if ( (! p->is_directory) && (GNUNET_YES == p->data.file.do_index) && - (type == GNUNET_BLOCK_TYPE_DBLOCK) ) + (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) ) { #if DEBUG_PUBLISH GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -571,7 +571,7 @@ block_proc (void *cls, query, sizeof(struct OnDemandBlock), &odb, - GNUNET_BLOCK_TYPE_ONDEMAND, + GNUNET_BLOCK_TYPE_FS_ONDEMAND, p->priority, p->anonymity, p->expirationTime, @@ -1596,7 +1596,7 @@ publish_ksk_cont (void *cls, sizeof (struct KBlock) + pkc->slen, pkc->cpy, - GNUNET_BLOCK_TYPE_KBLOCK, + GNUNET_BLOCK_TYPE_FS_KBLOCK, pkc->priority, pkc->anonymity, pkc->expirationTime, diff --git a/src/fs/fs_search.c b/src/fs/fs_search.c index a6418f786..f077bdac3 100644 --- a/src/fs/fs_search.c +++ b/src/fs/fs_search.c @@ -814,7 +814,7 @@ process_result (struct GNUNET_FS_SearchContext *sc, } switch (type) { - case GNUNET_BLOCK_TYPE_KBLOCK: + case GNUNET_BLOCK_TYPE_FS_KBLOCK: if (! GNUNET_FS_uri_test_ksk (sc->uri)) { GNUNET_break (0); @@ -827,7 +827,7 @@ process_result (struct GNUNET_FS_SearchContext *sc, } process_kblock (sc, data, size); break; - case GNUNET_BLOCK_TYPE_SBLOCK: + case GNUNET_BLOCK_TYPE_FS_SBLOCK: if (! GNUNET_FS_uri_test_sks (sc->uri)) { GNUNET_break (0); @@ -840,7 +840,7 @@ process_result (struct GNUNET_FS_SearchContext *sc, } process_sblock (sc, data, size); break; - case GNUNET_BLOCK_TYPE_NBLOCK: + case GNUNET_BLOCK_TYPE_FS_NBLOCK: if (! GNUNET_FS_uri_test_ksk (sc->uri)) { GNUNET_break (0); @@ -854,9 +854,9 @@ process_result (struct GNUNET_FS_SearchContext *sc, process_nblock (sc, data, size); break; case GNUNET_BLOCK_TYPE_ANY: - case GNUNET_BLOCK_TYPE_DBLOCK: - case GNUNET_BLOCK_TYPE_ONDEMAND: - case GNUNET_BLOCK_TYPE_IBLOCK: + case GNUNET_BLOCK_TYPE_FS_DBLOCK: + case GNUNET_BLOCK_TYPE_FS_ONDEMAND: + case GNUNET_BLOCK_TYPE_FS_IBLOCK: GNUNET_break (0); break; default: @@ -981,7 +981,7 @@ transmit_search_request (void *cls, sm->options = htonl (1); else sm->options = htonl (0); - sm->type = htonl (GNUNET_BLOCK_TYPE_SBLOCK); + sm->type = htonl (GNUNET_BLOCK_TYPE_FS_SBLOCK); sm->anonymity_level = htonl (sc->anonymity); sm->target = sc->uri->data.sks.namespace; identifier = sc->uri->data.sks.identifier; diff --git a/src/fs/fs_tree.c b/src/fs/fs_tree.c index 760a7e039..55f7b0e09 100644 --- a/src/fs/fs_tree.c +++ b/src/fs/fs_tree.c @@ -385,8 +385,8 @@ void GNUNET_FS_tree_encoder_next (struct GNUNET_FS_TreeEncoder * te) &mychk->query, te->publish_offset, (te->current_depth == te->chk_tree_depth) - ? GNUNET_BLOCK_TYPE_DBLOCK - : GNUNET_BLOCK_TYPE_IBLOCK, + ? GNUNET_BLOCK_TYPE_FS_DBLOCK + : GNUNET_BLOCK_TYPE_FS_IBLOCK, enc, pt_size); if (NULL != te->progress) diff --git a/src/fs/fs_unindex.c b/src/fs/fs_unindex.c index 375b833fb..e4ac4efd6 100644 --- a/src/fs/fs_unindex.c +++ b/src/fs/fs_unindex.c @@ -207,7 +207,7 @@ unindex_process (void *cls, const void *data; struct OnDemandBlock odb; - if (type != GNUNET_BLOCK_TYPE_DBLOCK) + if (type != GNUNET_BLOCK_TYPE_FS_DBLOCK) { size = block_size; data = block; diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index d40cdb744..9fce6478c 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -24,8 +24,9 @@ * @author Christian Grothoff * * TODO: + * - track per-peer request latency (using new load API) + * - consider more precise latency estimation (per-peer & request) -- again load API? * - implement test_load_too_high, make decision priority-based, implement forwarding, etc. - * - consider more precise latency estimation (per-peer & request) * - introduce random latency in processing * - tell other peers to stop migration if our PUTs fail (or if * we don't support migration per configuration?) @@ -35,7 +36,9 @@ #include #include "gnunet_constants.h" #include "gnunet_core_service.h" +#include "gnunet_dht_service.h" #include "gnunet_datastore_service.h" +#include "gnunet_load_lib.h" #include "gnunet_peer_lib.h" #include "gnunet_protocols.h" #include "gnunet_signatures.h" @@ -51,6 +54,13 @@ */ #define MAX_QUEUE_PER_PEER 16 +/** + * Size for the hash map for DHT requests from the FS + * service. Should be about the number of concurrent + * DHT requests we plan to make. + */ +#define FS_DHT_HT_SIZE 1024 + /** * How often do we flush trust values to disk? */ @@ -173,6 +183,12 @@ struct ConnectedPeer */ struct GNUNET_TIME_Relative avg_delay; + /** + * Point in time until which this peer does not want us to migrate content + * to it. + */ + struct GNUNET_TIME_Absolute migration_blocked; + /** * Handle for an active request for transmission to this * peer, or NULL. @@ -708,6 +724,11 @@ static GNUNET_SCHEDULER_TaskIdentifier mig_task; */ static struct GNUNET_TIME_Relative min_migration_delay; +/** + * Handle for DHT operations. + */ +static struct GNUNET_DHT_Handle *dht_handle; + /** * Size of the doubly-linked list of migration blocks. */ @@ -730,6 +751,29 @@ static int active_migration; */ static double current_priorities; +/** + * Datastore load tracking. + */ +static struct GNUNET_LOAD_Value *datastore_load; + + +/** + * We've just now completed a datastore request. Update our + * datastore load calculations. + * + * @param start time when the datastore request was issued + */ +static void +update_datastore_delays (struct GNUNET_TIME_Absolute start) +{ + struct GNUNET_TIME_Relative delay; + + delay = GNUNET_TIME_absolute_get_duration (start); + GNUNET_LOAD_update (datastore_load, + delay.value); +} + + /** * Get the filename under which we would store the GNUNET_HELLO_Message * for the given host and protocol. @@ -768,7 +812,6 @@ transmit_to_peer (void *cls, /* ******************* clean up functions ************************ */ - /** * Delete the given migration block. * @@ -831,6 +874,8 @@ consider_migration (void *cls, unsigned int repl; /* consider 'cp' as a migration target for mb */ + if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0) + return GNUNET_YES; /* peer has requested no migration! */ if (mb != NULL) { GNUNET_PEER_resolve (cp->pid, @@ -986,7 +1031,7 @@ process_migration_content (void *cls, consider_migration_gathering (); return; } - if (type == GNUNET_BLOCK_TYPE_ONDEMAND) + if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) { if (GNUNET_OK != GNUNET_FS_handle_on_demand_block (key, size, data, @@ -994,7 +1039,9 @@ process_migration_content (void *cls, expiration, uid, &process_migration_content, NULL)) - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + { + GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + } return; } #if DEBUG_FS @@ -1587,6 +1634,10 @@ shutdown_task (void *cls, while (mig_head != NULL) delete_migration_block (mig_head); GNUNET_assert (0 == mig_size); + GNUNET_DHT_disconnect (dht_handle); + dht_handle = NULL; + GNUNET_LOAD_value_free (datastore_load); + datastore_load = NULL; GNUNET_BLOCK_context_destroy (block_ctx); block_ctx = NULL; GNUNET_CONFIGURATION_destroy (block_cfg); @@ -2297,6 +2348,21 @@ forward_request_task (void *cls, } if (GNUNET_YES == pr->local_only) return; /* configured to not do P2P search */ + /* (0) try DHT */ + if (0 == pr->anonymity_level) + { +#if 0 + /* DHT API needs fixing... */ + pr->dht_get = GNUNET_DHT_get_start (dht_handle, + GNUNET_TIME_UNIT_FOREVER_REL, + pr->type, + &pr->query, + &process_dht_reply, + pr, + FIXME, + FIXME); +#endif + } /* (1) select target */ psc.pr = pr; psc.target_score = -DBL_MAX; @@ -2376,14 +2442,14 @@ transmit_reply_continuation (void *cls, switch (pr->type) { - case GNUNET_BLOCK_TYPE_DBLOCK: - case GNUNET_BLOCK_TYPE_IBLOCK: + case GNUNET_BLOCK_TYPE_FS_DBLOCK: + case GNUNET_BLOCK_TYPE_FS_IBLOCK: /* only one reply expected, done with the request! */ destroy_pending_request (pr); break; case GNUNET_BLOCK_TYPE_ANY: - case GNUNET_BLOCK_TYPE_KBLOCK: - case GNUNET_BLOCK_TYPE_SBLOCK: + case GNUNET_BLOCK_TYPE_FS_KBLOCK: + case GNUNET_BLOCK_TYPE_FS_SBLOCK: break; default: GNUNET_break (0); @@ -2475,12 +2541,6 @@ struct ProcessReplyClosure */ size_t size; - /** - * Namespace that this reply belongs to - * (if it is of type SBLOCK). - */ - GNUNET_HashCode namespace; - /** * Type of the block. */ @@ -2491,6 +2551,11 @@ struct ProcessReplyClosure */ uint32_t priority; + /** + * Evaluation result (returned). + */ + enum GNUNET_BLOCK_EvaluationResult eval; + /** * Did we finish processing the associated request? */ @@ -2519,7 +2584,6 @@ process_reply (void *cls, struct PutMessage *pm; struct ConnectedPeer *cp; struct GNUNET_TIME_Relative cur_delay; - enum GNUNET_BLOCK_EvaluationResult eval; size_t msize; #if DEBUG_FS @@ -2565,15 +2629,15 @@ process_reply (void *cls, GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client); } } - eval = GNUNET_BLOCK_evaluate (block_ctx, - prq->type, - key, - &pr->bf, - pr->mingle, - pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0, - prq->data, - prq->size); - switch (eval) + prq->eval = GNUNET_BLOCK_evaluate (block_ctx, + prq->type, + key, + &pr->bf, + pr->mingle, + pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0, + prq->data, + prq->size); + switch (prq->eval) { case GNUNET_BLOCK_EVALUATION_OK_MORE: break; @@ -2636,8 +2700,21 @@ process_reply (void *cls, &pr->replies_seen[pr->replies_seen_off++]); refresh_bloomfilter (pr); } + if (NULL == prq->sender) + { +#if DEBUG_FS + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found result for query `%s' in local datastore\n", + GNUNET_h2s (key)); +#endif + GNUNET_STATISTICS_update (stats, + gettext_noop ("# results found locally"), + 1, + GNUNET_NO); + } prq->priority += pr->remaining_priority; pr->remaining_priority = 0; + pr->results_found++; if (NULL != pr->client_request_list) { GNUNET_STATISTICS_update (stats, @@ -2753,7 +2830,6 @@ handle_p2p_put (void *cls, struct GNUNET_TIME_Absolute expiration; GNUNET_HashCode query; struct ProcessReplyClosure prq; - const struct SBlock *sb; msize = ntohs (message->size); if (msize < sizeof (struct PutMessage)) @@ -2766,7 +2842,7 @@ handle_p2p_put (void *cls, type = ntohl (put->type); expiration = GNUNET_TIME_absolute_ntoh (put->expiration); - if (type == GNUNET_BLOCK_TYPE_ONDEMAND) + if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) return GNUNET_SYSERR; if (GNUNET_OK != GNUNET_BLOCK_get_key (block_ctx, @@ -2778,14 +2854,6 @@ handle_p2p_put (void *cls, GNUNET_break_op (0); return GNUNET_SYSERR; } - if (GNUNET_BLOCK_TYPE_SBLOCK == type) - { - sb = (const struct SBlock*) &put[1]; - GNUNET_CRYPTO_hash (&sb->subspace, - sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &prq.namespace); - } - #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received result for query `%s' from peer `%4s'\n", @@ -2838,6 +2906,30 @@ handle_p2p_put (void *cls, } +/** + * Handle P2P "MIGRATION_STOP" message. + * + * @param cls closure, always NULL + * @param other the other peer involved (sender or receiver, NULL + * for loopback messages where we are both sender and receiver) + * @param message the actual message + * @param latency reported latency of the connection with 'other' + * @param distance reported distance (DV) to 'other' + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_p2p_migration_stop (void *cls, + const struct GNUNET_PeerIdentity *other, + const struct GNUNET_MessageHeader *message, + struct GNUNET_TIME_Relative latency, + uint32_t distance) +{ + // FIXME! +} + + + /* **************************** P2P GET Handling ************************ */ @@ -2923,10 +3015,8 @@ process_local_reply (void *cls, struct PendingRequest *pr = cls; struct ProcessReplyClosure prq; struct CheckDuplicateRequestClosure cdrc; - const struct SBlock *sb; - GNUNET_HashCode dhash; - GNUNET_HashCode mhash; GNUNET_HashCode query; + unsigned int old_rf; if (NULL == key) { @@ -2973,7 +3063,7 @@ process_local_reply (void *cls, GNUNET_h2s (key), type); #endif - if (type == GNUNET_BLOCK_TYPE_ONDEMAND) + if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) { #if DEBUG_FS GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2989,54 +3079,16 @@ process_local_reply (void *cls, &process_local_reply, pr)) if (pr->qe != NULL) - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); - return; - } - - /* FIXME: use block lib here! */ - /* check for duplicates */ - GNUNET_CRYPTO_hash (data, size, &dhash); - GNUNET_BLOCK_mingle_hash (&dhash, - pr->mingle, - &mhash); - if ( (pr->bf != NULL) && - (GNUNET_YES == - GNUNET_CONTAINER_bloomfilter_test (pr->bf, - &mhash)) ) - { -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Result from datastore filtered by bloomfilter (duplicate).\n"); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# results filtered by query bloomfilter"), - 1, - GNUNET_NO); - if (pr->qe != NULL) - GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + { + GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); + } return; } -#if DEBUG_FS - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found result for query `%s' in local datastore\n", - GNUNET_h2s (key)); -#endif - GNUNET_STATISTICS_update (stats, - gettext_noop ("# results found locally"), - 1, - GNUNET_NO); - pr->results_found++; + old_rf = pr->results_found; memset (&prq, 0, sizeof (prq)); prq.data = data; prq.expiration = expiration; prq.size = size; - if (GNUNET_BLOCK_TYPE_SBLOCK == type) - { - sb = (const struct SBlock*) data; - GNUNET_CRYPTO_hash (&sb->subspace, - sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded), - &prq.namespace); - } if (GNUNET_OK != GNUNET_BLOCK_get_key (block_ctx, type, @@ -3058,12 +3110,14 @@ process_local_reply (void *cls, prq.priority = priority; prq.finished = GNUNET_NO; process_reply (&prq, key, pr); + if ( (old_rf == 0) && + (pr->results_found == 1) ) + update_datastore_delays (pr->start_time); if (prq.finished == GNUNET_YES) return; if (pr->qe == NULL) return; /* done here */ - if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) || - (type == GNUNET_BLOCK_TYPE_IBLOCK) ) + if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) { GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); return; @@ -3211,12 +3265,6 @@ handle_p2p_get (void *cls, opt = (const GNUNET_HashCode*) &gm[1]; bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode); bm = ntohl (gm->hash_bitmap); - if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) && - (type != GNUNET_BLOCK_TYPE_SBLOCK) ) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } bits = 0; cps = GNUNET_CONTAINER_multihashmap_get (connected_peers, &other->hashPubKey); @@ -3297,7 +3345,6 @@ handle_p2p_get (void *cls, pr->mingle = ntohl (gm->filter_mutator); if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]); - pr->anonymity_level = 1; pr->priority = bound_priority (ntohl (gm->priority), cps); pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority); @@ -3402,7 +3449,7 @@ handle_p2p_get (void *cls, /* calculate change in traffic preference */ cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE; /* process locally */ - if (type == GNUNET_BLOCK_TYPE_DBLOCK) + if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */ timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY, (pr->priority + 1)); @@ -3418,8 +3465,8 @@ handle_p2p_get (void *cls, /* Are multiple results possible? If so, start processing remotely now! */ switch (pr->type) { - case GNUNET_BLOCK_TYPE_DBLOCK: - case GNUNET_BLOCK_TYPE_IBLOCK: + case GNUNET_BLOCK_TYPE_FS_DBLOCK: + case GNUNET_BLOCK_TYPE_FS_IBLOCK: /* only one result, wait for datastore */ break; default: @@ -3499,8 +3546,8 @@ handle_start_search (void *cls, client_list = cl; } /* detect duplicate KBLOCK requests */ - if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) || - (type == GNUNET_BLOCK_TYPE_NBLOCK) || + if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) || + (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) || (type == GNUNET_BLOCK_TYPE_ANY) ) { crl = cl->rl_head; @@ -3542,7 +3589,7 @@ handle_start_search (void *cls, 1, GNUNET_NO); pr = GNUNET_malloc (sizeof (struct PendingRequest) + - ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0)); + ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0)); crl = GNUNET_malloc (sizeof (struct ClientRequestList)); memset (crl, 0, sizeof (struct ClientRequestList)); crl->client_list = cl; @@ -3560,6 +3607,7 @@ handle_start_search (void *cls, sc * sizeof (GNUNET_HashCode)); pr->replies_seen_off = sc; pr->anonymity_level = ntohl (sm->anonymity_level); + pr->start_time = GNUNET_TIME_absolute_get (); refresh_bloomfilter (pr); pr->query = sm->query; if (0 == (1 & ntohl (sm->options))) @@ -3568,14 +3616,14 @@ handle_start_search (void *cls, pr->local_only = GNUNET_YES; switch (type) { - case GNUNET_BLOCK_TYPE_DBLOCK: - case GNUNET_BLOCK_TYPE_IBLOCK: + case GNUNET_BLOCK_TYPE_FS_DBLOCK: + case GNUNET_BLOCK_TYPE_FS_IBLOCK: if (0 != memcmp (&sm->target, &all_zeros, sizeof (GNUNET_HashCode))) pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target); break; - case GNUNET_BLOCK_TYPE_SBLOCK: + case GNUNET_BLOCK_TYPE_FS_SBLOCK: pr->namespace = (GNUNET_HashCode*) &pr[1]; memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode)); break; @@ -3587,7 +3635,7 @@ handle_start_search (void *cls, &sm->query, pr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - if (type == GNUNET_BLOCK_TYPE_DBLOCK) + if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK) type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */ pr->qe = GNUNET_DATASTORE_get (dsh, &sm->query, @@ -3619,6 +3667,9 @@ main_init (struct GNUNET_SCHEDULER_Handle *s, GNUNET_MESSAGE_TYPE_FS_GET, 0 }, { &handle_p2p_put, GNUNET_MESSAGE_TYPE_FS_PUT, 0 }, + { &handle_p2p_migration_stop, + GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP, + sizeof (struct MigrationStopMessage) }, { NULL, 0, 0 } }; static const struct GNUNET_SERVER_MessageHandler handlers[] = { @@ -3746,6 +3797,7 @@ run (void *cls, GNUNET_SCHEDULER_shutdown (sched); return; } + datastore_load = GNUNET_LOAD_value_init (); block_cfg = GNUNET_CONFIGURATION_create (); GNUNET_CONFIGURATION_set_value_string (block_cfg, "block", @@ -3753,16 +3805,23 @@ run (void *cls, "fs"); block_ctx = GNUNET_BLOCK_context_create (block_cfg); GNUNET_assert (NULL != block_ctx); + dht_handle = GNUNET_DHT_connect (sched, + cfg, + FS_DHT_HT_SIZE); if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) || (GNUNET_OK != main_init (sched, server, cfg)) ) { GNUNET_SCHEDULER_shutdown (sched); GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO); dsh = NULL; + GNUNET_DHT_disconnect (dht_handle); + dht_handle = NULL; GNUNET_BLOCK_context_destroy (block_ctx); block_ctx = NULL; GNUNET_CONFIGURATION_destroy (block_cfg); block_cfg = NULL; + GNUNET_LOAD_value_free (datastore_load); + datastore_load = NULL; return; } } diff --git a/src/fs/gnunet-service-fs_indexing.c b/src/fs/gnunet-service-fs_indexing.c index 20350f8fc..99b5da102 100644 --- a/src/fs/gnunet-service-fs_indexing.c +++ b/src/fs/gnunet-service-fs_indexing.c @@ -682,7 +682,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, key, nsize, edata, - GNUNET_BLOCK_TYPE_DBLOCK, + GNUNET_BLOCK_TYPE_FS_DBLOCK, priority, anonymity, expiration, diff --git a/src/include/Makefile.am b/src/include/Makefile.am index 4e7fb8b7b..70fd97bd0 100644 --- a/src/include/Makefile.am +++ b/src/include/Makefile.am @@ -36,6 +36,7 @@ gnunetinclude_HEADERS = \ gnunet_fs_service.h \ gnunet_getopt_lib.h \ gnunet_hello_lib.h \ + gnunet_load_lib.h \ gnunet_nat_lib.h \ gnunet_network_lib.h \ gnunet_os_lib.h \ diff --git a/src/include/gnunet_block_lib.h b/src/include/gnunet_block_lib.h index 5154a3c08..bdbaeade5 100644 --- a/src/include/gnunet_block_lib.h +++ b/src/include/gnunet_block_lib.h @@ -50,39 +50,46 @@ enum GNUNET_BLOCK_Type /** * Data block (leaf) in the CHK tree. */ - GNUNET_BLOCK_TYPE_DBLOCK = 1, + GNUNET_BLOCK_TYPE_FS_DBLOCK = 1, /** * Inner block in the CHK tree. */ - GNUNET_BLOCK_TYPE_IBLOCK = 2, + GNUNET_BLOCK_TYPE_FS_IBLOCK = 2, /** * Type of a block representing a keyword search result. */ - GNUNET_BLOCK_TYPE_KBLOCK = 3, + GNUNET_BLOCK_TYPE_FS_KBLOCK = 3, /** * Type of a block that is used to advertise content in a namespace. */ - GNUNET_BLOCK_TYPE_SBLOCK = 4, + GNUNET_BLOCK_TYPE_FS_SBLOCK = 4, /** * Type of a block representing a block to be encoded on demand from disk. * Should never appear on the network directly. */ - GNUNET_BLOCK_TYPE_ONDEMAND = 5, + GNUNET_BLOCK_TYPE_FS_ONDEMAND = 5, /** * Type of a block that is used to advertise a namespace. */ - GNUNET_BLOCK_TYPE_NBLOCK = 6, + GNUNET_BLOCK_TYPE_FS_NBLOCK = 6, - GNUNET_BLOCK_TYPE_TEST = 9999 - - }; + /** + * Type of a block that contains a HELLO for a peer (for + * DHT find-peer operations). + */ + GNUNET_BLOCK_TYPE_DHT_HELLO = 7, + /** + * Block for testing. + */ + GNUNET_BLOCK_TYPE_TEST = 8 + }; /** diff --git a/src/include/gnunet_dht_service.h b/src/include/gnunet_dht_service.h index ca1bebe09..a95cfafe6 100644 --- a/src/include/gnunet_dht_service.h +++ b/src/include/gnunet_dht_service.h @@ -28,6 +28,7 @@ #define GNUNET_DHT_SERVICE_H #include "gnunet_util_lib.h" +#include "gnunet_block_lib.h" #include "gnunet_hello_lib.h" #ifdef __cplusplus @@ -112,7 +113,7 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle); void GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, - uint32_t type, + enum GNUNET_BLOCK_Type type, uint32_t size, const char *data, struct GNUNET_TIME_Absolute exp, @@ -133,11 +134,11 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, * @param data pointer to the result data */ typedef void (*GNUNET_DHT_GetIterator)(void *cls, - struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode * key, - uint32_t type, - uint32_t size, - const void *data); + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + enum GNUNET_BLOCK_Type type, + uint32_t size, + const void *data); @@ -147,7 +148,7 @@ typedef void (*GNUNET_DHT_GetIterator)(void *cls, * @param handle handle to the DHT service * @param timeout timeout for this request to be sent to the * service (this is NOT a timeout for receiving responses) - * @param type expected type of the response object (GNUNET_BLOCK_TYPE_*) + * @param type expected type of the response object (GNUNET_BLOCK_TYPE_FS_*) * @param key the key to look up * @param iter function to call on each result * @param iter_cls closure for iter @@ -160,7 +161,7 @@ typedef void (*GNUNET_DHT_GetIterator)(void *cls, struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, struct GNUNET_TIME_Relative timeout, - uint32_t type, + enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key, GNUNET_DHT_GetIterator iter, void *iter_cls, diff --git a/src/include/gnunet_load_lib.h b/src/include/gnunet_load_lib.h new file mode 100644 index 000000000..7af00ccd3 --- /dev/null +++ b/src/include/gnunet_load_lib.h @@ -0,0 +1,106 @@ +/* + This file is part of GNUnet. + (C) 2010 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file include/gnunet_load_lib.h + * @brief functions related to load calculations + * @author Christian Grothoff + */ + +#ifndef GNUNET_LOAD_LIB_H +#define GNUNET_LOAD_LIB_H + +#ifdef __cplusplus +extern "C" +{ +#if 0 /* keep Emacsens' auto-indent happy */ +} +#endif +#endif + +#include "gnunet_common.h" +#include "gnunet_time_lib.h" + +/** + * Opaque load handle. + */ +struct GNUNET_LOAD_Value; + +/** + * Create a new load value. + * + * @return the new load value + */ +struct GNUNET_LOAD_Value * +GNUNET_LOAD_value_init (void); + + +/** + * Free a load value. + * + * @param lv value to free + */ +#define GNUNET_LOAD_value_free(lv) GNUNET_free (lv) + + +/** + * Get the current load. + * + * @param load load handle + * @return zero for below-average load, otherwise + * number of std. devs we are above average; + * 100 if the latest updates were so large + * that we could not do proper calculations + */ +double +GNUNET_LOAD_get_load (const struct GNUNET_LOAD_Value *load); + + +/** + * Get the average value given to update so far. + * + * @param load load handle + * @return zero if update was never called + */ +double +GNUNET_LOAD_get_average (const struct GNUNET_LOAD_Value *load); + + +/** + * Update the current load. + * + * @param load to update + * @param data latest measurement value (for example, delay) + */ +void +GNUNET_LOAD_update (struct GNUNET_LOAD_Value *load, + uint64_t data); + + +#if 0 /* keep Emacsens' auto-indent happy */ +{ +#endif +#ifdef __cplusplus +} +#endif + +/* ifndef GNUNET_LOAD_LIB_H */ +#endif +/* end of gnunet_load_lib.h */ diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 611c56b61..784964b49 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -514,6 +514,11 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_FS_PUT 138 +/** + * Peer asks us to stop migrating content towards it for a while. + */ +#define GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP 139 + /** * DHT Message Types diff --git a/src/util/Makefile.am b/src/util/Makefile.am index 6d88537e5..2f7edd301 100644 --- a/src/util/Makefile.am +++ b/src/util/Makefile.am @@ -48,6 +48,7 @@ libgnunetutil_la_SOURCES = \ disk.h \ getopt.c \ getopt_helpers.c \ + load.c \ network.c \ os_installation.c \ os_load.c \ diff --git a/src/util/load.c b/src/util/load.c new file mode 100644 index 000000000..451aca295 --- /dev/null +++ b/src/util/load.c @@ -0,0 +1,176 @@ +/* + This file is part of GNUnet. + (C) 2010 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file util/load.c + * @brief functions related to load calculations + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_load_lib.h" + +#define DEBUG_LOAD GNUNET_NO + +/** + * Values we track for load calculations. + */ +struct GNUNET_LOAD_Value +{ + + /** + * Sum of all datastore delays ever observed (in ms). Note that + * delays above 64k ms are excluded (to avoid overflow within + * first 4 billion requests). + */ + uint64_t cummulative_delay; + + /** + * Sum of squares of all datastore delays ever observed (in ms). Note that + * delays above 64k ms are excluded (to avoid overflow within + * first 4 billion requests). + */ + uint64_t cummulative_squared_delay; + + /** + * Total number of requests included in the cummulative datastore delay values. + */ + uint64_t cummulative_request_count; + + /** + * Current running average datastore delay. Its relation to the + * average datastore delay and it std. dev. (as calcualted from the + * cummulative values) tells us our current load. + */ + double runavg_delay; + + /** + * How high is the load? 0 for below average, otherwise + * the number of std. devs we are above average, or 100 if the + * load is so high that we currently cannot calculate it. + */ + double load; + +}; + + +/** + * Create a new load value. + * + * @return the new load value + */ +struct GNUNET_LOAD_Value * +GNUNET_LOAD_value_init () +{ + return GNUNET_malloc (sizeof (struct GNUNET_LOAD_Value)); +} + + +/** + * Get the current load. + * + * @param load load handle + * @return zero for below-average load, otherwise + * number of std. devs we are above average; + * 100 if the latest updates were so large + * that we could not do proper calculations + */ +double +GNUNET_LOAD_get_load (const struct GNUNET_LOAD_Value *load) +{ + return load->load; +} + + +/** + * Get the average value given to update so far. + * + * @param load load handle + * @return zero if update was never called + */ +double +GNUNET_LOAD_get_average (const struct GNUNET_LOAD_Value *load) +{ + double n; + double avg; + double sum_val_i; + + if (load->cummulative_request_count == 0) + return 0.0; + n = ((double) load->cummulative_request_count); + sum_val_i = (double) load->cummulative_delay; + return sum_val_i / n; +} + + +/** + * Update the current load. + * + * @param load to update + * @param data latest measurement value (for example, delay) + */ +void +GNUNET_LOAD_update (struct GNUNET_LOAD_Value *load, + uint64_t data) +{ + uint32_t dv; + double stddev; + double avgdel; + double sum_val_i; + double n; + double nm1; + + if (data > 64 * 1024) + { + /* very large */ + load->load = 100.0; + return; + } + dv = (uint32_t) data; + load->cummulative_delay += dv; + load->cummulative_squared_delay += dv * dv; + load->cummulative_request_count++; + load->runavg_delay = ((load->runavg_delay * 7.0) + dv) / 8.0; + if (load->cummulative_request_count > 1) + { + /* calcuate std dev of latency; we have for n values of "i" that: + + avg = (sum val_i) / n + stddev = (sum (val_i - avg)^2) / (n-1) + = (sum (val_i^2 - 2 avg val_i + avg^2) / (n-1) + = (sum (val_i^2) - 2 avg sum (val_i) + n * avg^2) / (n-1) + */ + sum_val_i = (double) load->cummulative_delay; + n = ((double) load->cummulative_request_count); + nm1 = n - 1.0; + avgdel = sum_val_i / n; + stddev = (((double) load->cummulative_squared_delay) - 2.0 * avgdel * sum_val_i + n * avgdel * avgdel) / nm1; + if (stddev <= 0) + stddev = 0.01; /* must have been rounding error or zero; prevent division by zero */ + /* now calculate load based on how far out we are from + std dev; or if we are below average, simply assume load zero */ + if (load->runavg_delay < avgdel) + load->load = 0.0; + else + load->load = (load->runavg_delay - avgdel) / stddev; + } +} + + +/* end of load.c */ -- cgit v1.2.3