From 858f4546a0044b3ea9abdfbe6dda2e95d7c04dc1 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 25 Sep 2011 22:16:20 +0000 Subject: wild hxing --- src/dht/gnunet-service-dht-new.c | 175 +------ src/dht/gnunet-service-dht.h | 43 ++ src/dht/gnunet-service-dht_clients.c | 27 +- src/dht/gnunet-service-dht_clients.h | 1 + src/dht/gnunet-service-dht_datacache.c | 315 +++++++++++++ src/dht/gnunet-service-dht_datacache.h | 84 ++++ src/dht/gnunet-service-dht_neighbours.c | 782 ++++++++++++++++++++++++++------ src/dht/gnunet-service-dht_neighbours.h | 35 +- 8 files changed, 1145 insertions(+), 317 deletions(-) create mode 100644 src/dht/gnunet-service-dht.h create mode 100644 src/dht/gnunet-service-dht_datacache.c create mode 100644 src/dht/gnunet-service-dht_datacache.h (limited to 'src/dht') diff --git a/src/dht/gnunet-service-dht-new.c b/src/dht/gnunet-service-dht-new.c index 2fb06457e..87cf97a40 100644 --- a/src/dht/gnunet-service-dht-new.c +++ b/src/dht/gnunet-service-dht-new.c @@ -390,11 +390,6 @@ static struct FindPeerMessageContext find_peer_context; */ static unsigned int newly_found_peers; -/** - * Handle to the datacache service (for inserting/retrieving data) - */ -static struct GNUNET_DATACACHE_Handle *datacache; - /** * Handle for the statistics service. */ @@ -1260,130 +1255,6 @@ route_result_message (struct GNUNET_MessageHeader *msg, } -/** - * Iterator for local get request results, - * - * @param cls closure for iterator, a DatacacheGetContext - * @param exp when does this value expire? - * @param key the key this data is stored under - * @param size the size of the data identified by key - * @param data the actual data - * @param type the type of the data - * - * @return GNUNET_OK to continue iteration, anything else - * to stop iteration. - */ -static int -datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode * key, size_t size, - const char *data, enum GNUNET_BLOCK_Type type) -{ - struct DHT_MessageContext *msg_ctx = cls; - struct DHT_MessageContext new_msg_ctx; - struct GNUNET_DHT_GetResultMessage *get_result; - enum GNUNET_BLOCK_EvaluationResult eval; - const struct DHTPutEntry *put_entry; - int get_size; - char *path_offset; - -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Received `%s' response from datacache\n", my_short_id, - "DHT", "GET"); -#endif - - put_entry = (const struct DHTPutEntry *) data; - - if (size != - sizeof (struct DHTPutEntry) + put_entry->data_size + - (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity))) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Path + data size doesn't add up for data inserted into datacache!\nData size %d, path length %d, expected %d, got %d\n", - put_entry->data_size, put_entry->path_length, - sizeof (struct DHTPutEntry) + put_entry->data_size + - (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)), - size); - msg_ctx->do_forward = GNUNET_NO; - return GNUNET_OK; - } - - eval = - GNUNET_BLOCK_evaluate (block_context, type, key, &msg_ctx->reply_bf, - msg_ctx->reply_bf_mutator, msg_ctx->xquery, - msg_ctx->xquery_size, &put_entry[1], - put_entry->data_size); - - switch (eval) - { - case GNUNET_BLOCK_EVALUATION_OK_LAST: - msg_ctx->do_forward = GNUNET_NO; - case GNUNET_BLOCK_EVALUATION_OK_MORE: - memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext)); - if (GNUNET_DHT_RO_RECORD_ROUTE == - (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) - { - new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE; - } - - get_size = - sizeof (struct GNUNET_DHT_GetResultMessage) + put_entry->data_size + - (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)); - get_result = GNUNET_malloc (get_size); - get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); - get_result->header.size = htons (get_size); - get_result->expiration = GNUNET_TIME_absolute_hton (exp); - get_result->type = htons (type); - get_result->put_path_length = htons (put_entry->path_length); - path_offset = (char *) &put_entry[1]; - path_offset += put_entry->data_size; - /* Copy the actual data and the path_history to the end of the get result */ - memcpy (&get_result[1], &put_entry[1], - put_entry->data_size + - (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity))); - new_msg_ctx.peer = my_identity; - new_msg_ctx.bloom = NULL; - new_msg_ctx.hop_count = 0; - new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */ - new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT; - increment_stats (STAT_GET_RESPONSE_START); - route_result_message (&get_result->header, &new_msg_ctx); - GNUNET_free (get_result); - break; - case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Duplicate block error\n", - my_short_id, "DHT"); -#endif - break; - case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "`%s:%s': Invalid request error\n", - my_short_id, "DHT"); -#endif - break; - case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Valid request, no results.\n", my_short_id, "DHT"); -#endif - GNUNET_break (0); - break; - case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: - GNUNET_break_op (0); - msg_ctx->do_forward = GNUNET_NO; - break; - case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "`%s:%s': Unsupported block type (%u) in response!\n", - my_short_id, "DHT", type); -#endif - /* msg_ctx->do_forward = GNUNET_NO; // not sure... */ - break; - } - return GNUNET_OK; -} /** @@ -1464,10 +1335,6 @@ handle_dht_get (const struct GNUNET_MessageHeader *msg, increment_stats (STAT_GETS); results = 0; msg_ctx->do_forward = GNUNET_YES; - if (datacache != NULL) - results = - GNUNET_DATACACHE_get (datacache, &msg_ctx->key, type, - &datacache_get_iterator, msg_ctx); #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Found %d results for `%s' request uid %llu\n", @@ -1826,36 +1693,6 @@ handle_dht_put (const struct GNUNET_MessageHeader *msg, #endif increment_stats (STAT_PUTS_INSERTED); - if (datacache != NULL) - { - /* Put size is actual data size plus struct overhead plus path length (if any) */ - put_size = - data_size + sizeof (struct DHTPutEntry) + - (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity)); - put_entry = GNUNET_malloc (put_size); - put_entry->data_size = data_size; - put_entry->path_length = msg_ctx->path_history_len; - /* Copy data to end of put entry */ - memcpy (&put_entry[1], &put_msg[1], data_size); - if (msg_ctx->path_history_len > 0) - { - /* Copy path after data */ - path_offset = (char *) &put_entry[1]; - path_offset += data_size; - memcpy (path_offset, msg_ctx->path_history, - msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity)); - } - - ret = - GNUNET_DATACACHE_put (datacache, &msg_ctx->key, put_size, - (const char *) put_entry, put_type, - GNUNET_TIME_absolute_ntoh (put_msg->expiration)); - GNUNET_free (put_entry); - } - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': %s request received, but have no datacache!\n", - my_short_id, "DHT", "PUT"); route_message (msg, msg_ctx); } @@ -2366,6 +2203,7 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) transport_handle = NULL; } GDS_NEIGHBOURS_done (); + GDS_DATACACHE_done (); GDS_NSE_done (); for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++) { @@ -2380,15 +2218,6 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) delete_peer (pos, bucket_count); } } - if (datacache != NULL) - { -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Destroying datacache!\n", - my_short_id, "DHT"); -#endif - GNUNET_DATACACHE_destroy (datacache); - datacache = NULL; - } if (stats != NULL) { GNUNET_STATISTICS_destroy (stats, GNUNET_YES); @@ -2418,7 +2247,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, unsigned long long temp_config_num; cfg = c; - datacache = GNUNET_DATACACHE_create (cfg, "dhtcache"); + GDS_DATACACHE_init (); coreAPI = GNUNET_CORE_connect (cfg, /* Main configuration */ DEFAULT_CORE_QUEUE_SIZE, /* queue size */ NULL, /* Closure passed to DHT functions */ diff --git a/src/dht/gnunet-service-dht.h b/src/dht/gnunet-service-dht.h new file mode 100644 index 000000000..c2e16151d --- /dev/null +++ b/src/dht/gnunet-service-dht.h @@ -0,0 +1,43 @@ +/* + This file is part of GNUnet. + (C) 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file dht/gnunet-service-dht.h + * @brief GNUnet DHT globals + * @author Christian Grothoff + */ +#ifndef GNUNET_SERVICE_DHT_H +#define GNUNET_SERVICE_DHT_H + +#include "gnunet_util_lib.h" + +/** + * Configuration we use. + */ +extern struct GNUNET_ConfigurationHandle *GDS_cfg; + + +/** + * Our handle to the BLOCK library. + */ +extern struct GNUNET_BLOCK_Context *GDS_block_context; + + +#endif diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index 75506534b..95a0d68d0 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c @@ -39,6 +39,7 @@ #include "dht_new.h" #include #include "gnunet-service-dht_clients.h" +#include "gnunet-service-dht_datacache.h" #include "gnunet-service-dht_neighbours.h" @@ -403,6 +404,22 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); } dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; + /* give to local clients */ + GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration), + &dht_msg->key, + 0, NULL, + 0, NULL, + ntohl (dht_msg->type), + size - sizeof (struct GNUNET_DHT_ClientPutMessage), + &dht_msg[1]); + /* store locally */ + GST_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration), + &dht_msg->key, + 0, NULL, + ntohl (dht_msg->type), + size - sizeof (struct GNUNET_DHT_ClientPutMessage), + &dht_msg[1]); + /* route to other peers */ GST_NEIGHBOURS_handle_put (ntohl (dht_msg->type), ntohl (dht_msg->options), ntohl (dht_msg->desired_replication_level), @@ -446,6 +463,7 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, get = (const struct GNUNET_DHT_ClientGetMessage *) message; xquery = (const char*) &get[1]; + cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); cqr->key = get->key; cqr->client = find_active_client (client); @@ -458,12 +476,19 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, cqr->xquery_size = xquery_size; cqr->replication = ntohl (get->desired_replication_level); cqr->msg_options = ntohl (get->options); - cqr->msg_type = ntohl (get->type); + cqr->msg_type = ntohl (get->type); GNUNET_CONTAINER_multihashmap_put (forward_map, KEY, cqr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + /* start remote requests */ if (GNUNET_SCHEDULER_NO_TASK != retry_task) GNUNET_SCHEDULER_cancel (retry_task); retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL); + /* perform local lookup */ + GDS_DATACACHE_handle_get (&get->key, + cqr->msg_type, + cqr->xquery, + xquery_size, + NULL, 0); GNUNET_SERVER_receive_done (client, GNUNET_OK); } diff --git a/src/dht/gnunet-service-dht_clients.h b/src/dht/gnunet-service-dht_clients.h index 931ca1a93..db4f0b9fe 100644 --- a/src/dht/gnunet-service-dht_clients.h +++ b/src/dht/gnunet-service-dht_clients.h @@ -27,6 +27,7 @@ #ifndef GNUNET_SERVICE_DHT_CLIENTS_H #define GNUNET_SERVICE_DHT_CLIENTS_H + /** * Handle a reply we've received from another peer. If the reply * matches any of our pending queries, forward it to the respective diff --git a/src/dht/gnunet-service-dht_datacache.c b/src/dht/gnunet-service-dht_datacache.c new file mode 100644 index 000000000..b2dd05ac9 --- /dev/null +++ b/src/dht/gnunet-service-dht_datacache.c @@ -0,0 +1,315 @@ +/* + This file is part of GNUnet. + (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file dht/gnunet-service-dht_datacache.c + * @brief GNUnet DHT service's datacache integration + * @author Christian Grothoff + * @author Nathan Evans + */ +#include "gnunet-service-dht_datacache.h" + + +/** + * Handle to the datacache service (for inserting/retrieving data) + */ +static struct GNUNET_DATACACHE_Handle *datacache; + + +/** + * Entry for inserting data into datacache from the DHT. + */ +struct DHTPutEntry +{ + /** + * Size of data. + */ + uint16_t data_size; + + /** + * Length of recorded path. + */ + uint16_t path_length; + + /* PATH ENTRIES */ + + /* PUT DATA */ + +}; + + +/** + * Handle a datum we've received from another peer. Cache if + * possible. + * + * @param expiration when will the reply expire + * @param key the query this reply is for + * @param put_path_length number of peers in 'put_path' + * @param put_path path the reply took on put + * @param type type of the reply + * @param data_size number of bytes in 'data' + * @param data application payload data + */ +void +GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration, + const GNUNET_HashCode *key, + unsigned int put_path_length, + const struct GNUNET_PeerIdentity *put_path, + uint32_t type, + size_t data_size, + const void *data) +{ + size_t plen = data_size + put_path_length * sizeof(struct GNUNET_PeerIdentity) + sizeof(struct DHTPutEntry); + char buf[plen]; + struct DHTPutEntry *pe; + struct GNUNET_PeerIdentity *pp; + char *path_offset; + + if (datacache == NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%s request received, but have no datacache!\n", + "PUT"); + return; + } + if (data_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + return; + } + /* Put size is actual data size plus struct overhead plus path length (if any) */ + pe = (struct DHTPutEntry *) buf; + pe->data_size = htons (data_size); + pe->path_length = htons ((uint16_t) put_path_length); + pp = (struct GNUNET_PeerIdentity *) &pe[1]; + memcpy (pp, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity)); + memcpy (&pp[put_path_length], + data, data_size); + (void) GNUNET_DATACACHE_put (datacache, key, + plen, (const char *) pe, type, + expiration); +} + + +/** + * Context containing information about a GET request. + */ +struct GetRequestContext +{ + /** + * extended query (see gnunet_block_lib.h). + */ + const void *xquery; + + /** + * Bloomfilter to filter out duplicate replies (updated) + */ + struct GNUNET_CONTAINER_BloomFilter **reply_bf; + + /** + * The key this request was about + */ + GNUNET_HashCode key; + + /** + * Number of bytes in xquery. + */ + size_t xquery_size; + + /** + * Mutator value for the reply_bf, see gnunet_block_lib.h + */ + uint32_t reply_bf_mutator; + +}; + + +/** + * Iterator for local get request results, + * + * @param cls closure for iterator, a DatacacheGetContext + * @param exp when does this value expire? + * @param key the key this data is stored under + * @param size the size of the data identified by key + * @param data the actual data + * @param type the type of the data + * + * @return GNUNET_OK to continue iteration, anything else + * to stop iteration. + */ +static int +datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, size_t size, + const char *data, enum GNUNET_BLOCK_Type type) +{ + struct GetRequestContext *ctx = cls; + const struct DHTPutEntry *pe; + const struct GNUNET_PeerIdentity *pp; + const char *data; + size_t data_size; + uint16_t put_path_length; + enum GNUNET_BLOCK_EvaluationResult eval; + + pe = (const struct DHTPutEntry *) data; + put_path_length = ntohs (pe->path_length); + data_size = ntohs (pe->data_size); + + if (size != + sizeof (struct DHTPutEntry) + data_size + + (put_path_length * sizeof (struct GNUNET_PeerIdentity))) + { + GNUNET_break (0); + return GNUNET_OK; + } + pp = (const struct GNUNET_PeerIdentity *) &pe[1]; + data = (const char*) &pp[put_path_length]; + eval = + GNUNET_BLOCK_evaluate (block_context, type, key, + ctx->reply_bf, + ctx->reply_bf_mutator, + ctx->xquery, + ctx->xquery_size, + data, + data_size); + switch (eval) + { + case GNUNET_BLOCK_EVALUATION_OK_LAST: + case GNUNET_BLOCK_EVALUATION_OK_MORE: + /* forward to local clients */ + GDS_CLIENT_handle_reply (exp, + key, + 0, NULL, + put_path_length, pp, + type, data_size, data); + /* forward to other peers */ + GDS_NEIGHBOURS_handle_reply (type, exp, + key, put_path_length, pp, + 0, NULL, data, data_size); + break; + case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: + break; + case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: + break; + case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: + GNUNET_break (0); + break; + case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: + GNUNET_break_op (0); + return GNUNET_SYSERR; + case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Unsupported block type (%u) in local response!\n", + type); + break; + } + return GNUNET_OK; +} + + +/** + * Context containing information about a GET request. + */ +struct GetRequestContext +{ + /** + * extended query (see gnunet_block_lib.h). + */ + const void *xquery; + + /** + * Bloomfilter to filter out duplicate replies (updated) + */ + struct GNUNET_CONTAINER_BloomFilter **reply_bf; + + /** + * The key this request was about + */ + GNUNET_HashCode key; + + /** + * Number of bytes in xquery. + */ + size_t xquery_size; + + /** + * Mutator value for the reply_bf, see gnunet_block_lib.h + */ + uint32_t reply_bf_mutator; + +}; + + +/** + * Handle a GET request we've received from another peer. + * + * @param key the query + * @param type requested data type + * @param xquery extended query + * @param xquery_size number of bytes in xquery + * @param reply_bf where the reply bf is (to be) stored, possibly updated, can be NULL + * @param reply_bf_mutator mutation value for reply_bf + */ +void +GDS_DATACACHE_handle_get (const GNUNET_HashCode *key, + uint32_t type, + const void *xquery, + size_t xquery_size, + struct GNUNET_CONTAINER_BloomFilter **reply_bf, + uint32_t reply_bf_mutator) +{ + struct GetRequestContext ctx; + + if (datacache == NULL) + return; + ctx.key = *key; + ctx.xquery = xquery; + ctx.xquery_size = xquery_size; + ctx.reply_bf = reply_bf; + ctx.reply_bf_mutator = reply_bf_mutator; + (void) GNUNET_DATACACHE_get (datacache, &msg_ctx->key, type, + &datacache_get_iterator, &ctx); +} + + +/** + * Initialize datacache subsystem. + */ +void +GDS_DATACACHE_init () +{ + datacache = GNUNET_DATACACHE_create (cfg, "dhtcache"); +} + + +/** + * Shutdown datacache subsystem. + */ +void +GDS_DATACACHE_done () +{ + if (datacache != NULL) + { + GNUNET_DATACACHE_destroy (datacache); + datacache = NULL; + } +} + + +/* end of gnunet-service-dht_datacache.c */ diff --git a/src/dht/gnunet-service-dht_datacache.h b/src/dht/gnunet-service-dht_datacache.h new file mode 100644 index 000000000..0501e9e4c --- /dev/null +++ b/src/dht/gnunet-service-dht_datacache.h @@ -0,0 +1,84 @@ +/* + This file is part of GNUnet. + (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file dht/gnunet-service-dht_datacache.h + * @brief GNUnet DHT service's datacache integration + * @author Christian Grothoff + * @author Nathan Evans + */ +#ifndef GNUNET_SERVICE_DHT_DATACACHE_H +#define GNUNET_SERVICE_DHT_DATACACHE_H + +/** + * Handle a datum we've received from another peer. Cache if + * possible. + * + * @param expiration when will the reply expire + * @param key the query this reply is for + * @param put_path_length number of peers in 'put_path' + * @param put_path path the reply took on put + * @param type type of the reply + * @param data_size number of bytes in 'data' + * @param data application payload data + */ +void +GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration, + const GNUNET_HashCode *key, + unsigned int put_path_length, + const struct GNUNET_PeerIdentity *put_path, + uint32_t type, + size_t data_size, + const void *data); + + +/** + * Handle a GET request we've received from another peer. + * + * @param key the query + * @param type requested data type + * @param xquery extended query + * @param xquery_size number of bytes in xquery + * @param reply_bf where the reply bf is (to be) stored, possibly updated!, can be NULL + * @param reply_bf_mutator mutation value for reply_bf + */ +void +GDS_DATACACHE_handle_get (const GNUNET_HashCode *key, + uint32_t type, + const void *xquery, + size_t xquery_size, + struct GNUNET_CONTAINER_BloomFilter **reply_bf, + uint32_t reply_bf_mutator); + + +/** + * Initialize datacache subsystem. + */ +void +GDS_DATACACHE_init (void); + + +/** + * Shutdown datacache subsystem. + */ +void +GDS_DATACACHE_done (void); + +#endif diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c index 8c87314e5..b7cc2048e 100644 --- a/src/dht/gnunet-service-dht_neighbours.c +++ b/src/dht/gnunet-service-dht_neighbours.c @@ -37,6 +37,7 @@ #include "gnunet_dht_service.h" #include "gnunet_statistics_service.h" #include "dht.h" +#include "gnunet-service-dht_datacache.h" #include /** @@ -86,11 +87,14 @@ struct PeerPutMessage uint32_t desired_replication_level GNUNET_PACKED; /** - * Generic route path length for a message in the - * DHT that arrived at a peer and generated - * a reply. Copied to the end of this message. + * Length of the PUT path that follows (if tracked). */ - uint32_t outgoing_path_length GNUNET_PACKED; + uint32_t put_path_length GNUNET_PACKED; + + /** + * When does the content expire? + */ + struct GNUNET_TIME_AbsoluteNBO expiration_time; /** * Bloomfilter (for peer identities) to stop circular routes @@ -109,13 +113,52 @@ struct PeerPutMessage }; +/** + * P2P Result message + */ +struct PeerResultMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT + */ + struct GNUNET_MessageHeader header; + + /** + * Content type. + */ + uint32_t type GNUNET_PACKED; + + /** + * Length of the PUT path that follows (if tracked). + */ + uint32_t put_path_length GNUNET_PACKED; + + /** + * Length of the GET path that follows (if tracked). + */ + uint32_t get_path_length GNUNET_PACKED; + + /** + * The key of the corresponding GET request. + */ + GNUNET_HashCode key; + + /* put path (if tracked) */ + + /* get path (if tracked) */ + + /* Payload */ + +}; + + /** * P2P GET message */ struct PeerGetMessage { /** - * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT + * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET */ struct GNUNET_MessageHeader header; @@ -182,19 +225,14 @@ struct P2PPendingMessage struct P2PPendingMessage *prev; /** - * Message importance level. + * Message importance level. FIXME: used? useful? */ unsigned int importance; /** - * Time when this request was scheduled to be sent. - */ - struct GNUNET_TIME_Absolute scheduled; - - /** - * How long to wait before sending message. + * When does this message time out? */ - struct GNUNET_TIME_Relative timeout; + struct GNUNET_TIME_Absolute timeout; /** * Actual message to be sent, allocated at the end of the struct: @@ -222,7 +260,8 @@ struct PeerInfo struct PeerInfo *prev; /** - * Count of outstanding messages for peer. + * Count of outstanding messages for peer. FIXME: NEEDED? + * FIXME: bound queue size!? */ unsigned int pending_count; @@ -467,13 +506,340 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) /** - * Perform a PUT operation. // FIXME: document if this is only - * routing or also storage and/or even local client notification! + * Called when core is ready to send a message we asked for + * out to the destination. + * + * @param cls the 'struct PeerInfo' of the target peer + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +core_transmit_notify (void *cls, size_t size, void *buf) +{ + struct PeerInfo *peer = cls; + char *cbuf = buf; + struct P2PPendingMessage *pending; + size_t off; + size_t msize; + + peer->th = NULL; + if (buf == NULL) + { + /* client disconnected */ + return 0; + } + if (peer->head == NULL) + { + /* no messages pending */ + return 0; + } + off = 0; + while ( (NULL != (pending = peer->head)) && + (size - off >= (msize = ntohs (pending->msg->size))) ) + { + memcpy (&cbuf[off], pending->msg, msize); + off += msize; + peer->pending_count--; + GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending); + GNUNET_free (pending); + } + if (peer->head != NULL) + peer->th + = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES, + pending->importance, + pending->timeout, &peer->id, msize, + &core_transmit_notify, peer); + + return off; +} + + +/** + * Transmit all messages in the peer's message queue. + * + * @param peer message queue to process + */ +static void +process_peer_queue (struct PeerInfo *peer) +{ + struct P2PPendingMessage *pending; + + if (NULL != (pending = peer->head)) + return; + if (NULL != peer->th) + return; + peer->th + = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES, + pending->importance, + pending->timeout, &peer->id, + ntohs (pending->msg->size), + &core_transmit_notify, peer); +} + + +/** + * To how many peers should we (on average) forward the request to + * obtain the desired target_replication count (on average). + * + * FIXME: double-check that this is fine + * + * @param hop_count number of hops the message has traversed + * @param target_replication the number of total paths desired + * @return Some number of peers to forward the message to + */ +static unsigned int +get_forward_count (uint32_t hop_count, + uint32_t target_replication) +{ + uint32_t random_value; + uint32_t forward_count; + float target_value; + + /* bound by system-wide maximum */ + target_replication = GNUNET_MIN (16 /* FIXME: use named constant */, + target_replication); + if (hop_count > log_of_network_size_estimate * 2.0) + { + /* Once we have reached our ideal number of hops, only forward to 1 peer */ + return 1; + } + target_value = + 1 + (target_replication - 1.0) / (log_of_network_size_estimate + + ((float) (target_replication - 1.0) * + hop_count)); + /* Set forward count to floor of target_value */ + forward_count = (uint32_t) target_value; + /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */ + target_value = target_value - forward_count; + random_value = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); + if (random_value < (target_value * UINT32_MAX)) + forward_count++; + return forward_count; +} + + +/** + * Check whether my identity is closer than any known peers. If a + * non-null bloomfilter is given, check if this is the closest peer + * that hasn't already been routed to. + * FIXME: needed? + * + * @param key hash code to check closeness to + * @param bloom bloomfilter, exclude these entries from the decision + * @return GNUNET_YES if node location is closest, + * GNUNET_NO otherwise. + */ +static int +am_closest_peer (const GNUNET_HashCode *key, + const struct GNUNET_CONTAINER_BloomFilter *bloom) +{ + int bits; + int other_bits; + int bucket_num; + int count; + struct PeerInfo *pos; + unsigned int my_distance; + + if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode))) + return GNUNET_YES; + bucket_num = find_current_bucket (key); + bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key); + my_distance = distance (&my_identity.hashPubKey, key); + pos = k_buckets[bucket_num].head; + count = 0; + while ((pos != NULL) && (count < bucket_size)) + { + if ((bloom != NULL) && + (GNUNET_YES == + GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))) + { + pos = pos->next; + continue; /* Skip already checked entries */ + } + other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key); + if (other_bits > bits) + return GNUNET_NO; + if (other_bits == bits) /* We match the same number of bits */ + return GNUNET_YES; + pos = pos->next; + } + /* No peers closer, we are the closest! */ + return GNUNET_YES; +} + + +/** + * Select a peer from the routing table that would be a good routing + * destination for sending a message for "key". The resulting peer + * must not be in the set of blocked peers.

+ * + * Note that we should not ALWAYS select the closest peer to the + * target, peers further away from the target should be chosen with + * exponentially declining probability. + * + * FIXME: double-check that this is fine + * + * + * @param key the key we are selecting a peer to route to + * @param bloom a bloomfilter containing entries this request has seen already + * @param hops how many hops has this message traversed thus far + * @return Peer to route to, or NULL on error + */ +static struct PeerInfo * +select_peer (const GNUNET_HashCode *key, + const struct GNUNET_CONTAINER_BloomFilter *bloom, + uint32_t hops) +{ + unsigned int bc; + unsigned int count; + unsigned int selected; + struct PeerInfo *pos; + unsigned int distance; + unsigned int largest_distance; + struct PeerInfo *chosen; + + if (hops >= log_of_network_size_estimate) + { + /* greedy selection (closest peer that is not in bloomfilter) */ + largest_distance = 0; + chosen = NULL; + for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++) + { + pos = k_buckets[bc].head; + count = 0; + while ((pos != NULL) && (count < bucket_size)) + { + /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */ + if (GNUNET_NO == + GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) + { + distance = inverse_distance (key, &pos->id.hashPubKey); + if (distance > largest_distance) + { + chosen = pos; + largest_distance = distance; + } + } + count++; + pos = pos->next; + } + } + return chosen; + } + + /* select "random" peer */ + /* count number of peers that are available and not filtered */ + count = 0; + for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++) + { + pos = k_buckets[bc].head; + while ((pos != NULL) && (count < bucket_size)) + { + if (GNUNET_YES == + GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) + { + pos = pos->next; + continue; /* Ignore bloomfiltered peers */ + } + count++; + pos = pos->next; + } + } + if (count == 0) /* No peers to select from! */ + { + return NULL; + } + /* Now actually choose a peer */ + selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count); + count = 0; + for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++) + { + pos = k_buckets[bc].head; + while ((pos != NULL) && (count < bucket_size)) + { + if (GNUNET_YES == + GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)) + { + pos = pos->next; + continue; /* Ignore bloomfiltered peers */ + } + if (0 == selected--) + return pos; + pos = pos->next; + } + } + GNUNET_break (0); + return NULL; +} + + +/** + * Compute the set of peers that the given request should be + * forwarded to. + * + * @param key routing key + * @param bloom bloom filter excluding peers as targets, all selected + * peers will be added to the bloom filter + * @param hop_count number of hops the request has traversed so far + * @param target_replication desired number of replicas + * @param targets where to store an array of target peers (to be + * free'd by the caller) + * @return number of peers returned in 'targets'. + */ +static unsigned int +get_target_peers (const GNUNET_HashCode *key, + struct GNUNET_CONTAINER_BloomFilter *bloom, + uint32_t hop_count, + uint32_t target_replication, + struct PeerInfo ***targets) +{ + unsigned int ret; + unsigned int off; + struct PeerInfo **rtargets; + struct PeerInfo *nxt; + + ret = get_forward_count (hop_count, target_replication); + if (ret == 0) + { + *targets = NULL; + return 0; + } + rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret); + off = 0; + while (ret-- > 0) + { + nxt = select_peer (key, bloom, hop_count); + if (nxt == NULL) + break; + rtargets[off++] = nxt; + GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey); + } + if (0 == off) + { + GNUNET_free (rtargets); + *targets = NULL; + return 0; + } + *targets = rtargets; + return off; +} + + +/** + * Perform a PUT operation. Forwards the given request to other + * peers. Does not store the data locally. Does not give the + * data to local clients. May do nothing if this is the only + * peer in the network (or if we are the closest peer in the + * network). * * @param type type of the block * @param options routing options * @param desired_replication_level desired replication count * @param expiration_time when does the content expire + * @param hop_count how many hops has this message traversed so far + * @param bf Bloom filter of peers this PUT has already traversed * @param key key for the content * @param put_path_length number of entries in put_path * @param put_path peers this request has traversed so far (if tracked) @@ -481,27 +847,87 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) * @param data_size number of bytes in data */ void -GST_NEIGHBOURS_handle_put (uint32_t type, +GDS_NEIGHBOURS_handle_put (uint32_t type, uint32_t options, uint32_t desired_replication_level, GNUNET_TIME_Absolute expiration_time, + uint32_t hop_count, + struct GNUNET_CONTAINER_BloomFilter *bf, const GNUNET_HashCode *key, unsigned int put_path_length, struct GNUNET_PeerIdentity *put_path, const void *data, size_t data_size) { - // FIXME + unsigned int target_count; + unsigned int i; + struct PeerInfo **targets; + struct PeerInfo *target; + struct P2PPendingMessage *pending; + size_t msize; + struct PeerPutMessage *ppm; + struct GNUNET_PeerIdentity *pp; + + target_count = get_target_peers (key, bf, hop_count, + desired_replication_level, + &targets); + if (0 == target_count) + return; + msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage); + if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + put_path_length = 0; + msize = data_size + sizeof (struct PeerPutMessage); + } + if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + return; + } + for (i=0;iimportance = 0; /* FIXME */ + pending->timeout = expiration_time; + ppm = (struct PeerPutMessage*) &pending[1]; + pending->msg = &ppm->header; + ppm->header.size = htons (msize); + ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT); + ppm->options = htonl (options); + ppm->type = htonl (type); + ppm->hop_count = htonl (hop_count + 1); + ppm->desired_replication_level = htonl (desired_replication_level); + ppm->put_path_length = htonl (put_path_length); + ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_bloomfilter_get_raw_data (bf, + ppm->bloomfilter, + DHT_BLOOM_SIZE)); + ppm->key = *key; + pp = (const struct GNUNET_PeerIdentity*) &ppm[1]; + memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length); + memcpy (&pp[put_path_length], data, data_size); + GNUNET_CONTAINER_DLL_insert (target->head, + target->tail, + pending); + target->pending_count++; + process_peer_queue (target); + } + GNUNET_free (targets); } /** - * Perform a GET operation. // FIXME: document if this is only - * routing or also state-tracking and/or even local lookup! + * Perform a GET operation. Forwards the given request to other + * peers. Does not lookup the key locally. May do nothing if this is + * the only peer in the network (or if we are the closest peer in the + * network). * * @param type type of the block * @param options routing options * @param desired_replication_level desired replication count + * @param hop_count how many hops did this request traverse so far? * @param key key for the content * @param xquery extended query * @param xquery_size number of bytes in xquery @@ -510,9 +936,10 @@ GST_NEIGHBOURS_handle_put (uint32_t type, * @param peer_bf filter for peers not to select (again) */ void -GST_NEIGHBOURS_handle_get (uint32_t type, +GDS_NEIGHBOURS_handle_get (uint32_t type, uint32_t options, uint32_t desired_replication_level, + uint32_t hop_count, const GNUNET_HashCode *key, const void *xquery, size_t xquery_size, @@ -520,13 +947,69 @@ GST_NEIGHBOURS_handle_get (uint32_t type, uint32_t reply_bf_mutator, const struct GNUNET_CONTAINER_BloomFilter *peer_bf) { - // FIXME + unsigned int target_count; + unsigned int i; + struct PeerInfo **targets; + struct PeerInfo *target; + struct P2PPendingMessage *pending; + size_t msize; + struct PeerGetMessage *pgm; + char *xq; + size_t reply_bf_size; + + target_count = get_target_peers (key, peer_bf, hop_count, + desired_replication_level, + &targets); + if (0 == target_count) + return; + reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf); + msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size; + if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + return; + } + for (i=0;iimportance = 0; /* FIXME */ + pending->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); /* FIXME */ + pgm = (struct PeerGetMessage*) &pending[1]; + pending->msg = &pgm->header; + pgm->header.size = htons (msize); + pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET); + pgm->options = htonl (options); + pgm->type = htonl (type); + pgm->hop_count = htonl (hop_count + 1); + pgm->desired_replication_level = htonl (desired_replication_level); + pgm->xquery_size = htonl (xquery_size); + pgm->bf_mutator = reply_bf_mutator; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf, + pgm->bloomfilter, + DHT_BLOOM_SIZE)); + pgm->key = *key; + xq = (const struct GNUNET_PeerIdentity*) &ppm[1]; + memcpy (xq, xquery, xquery_size); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf, + &xq[xquery_size], + reply_bf_size)); + GNUNET_CONTAINER_DLL_insert (target->head, + target->tail, + pending); + target->pending_count++; + process_peer_queue (target); + } + GNUNET_free (targets); } /** - * Handle a reply (route to origin). FIXME: should this be here? - * (reply-routing table might be better done elsewhere). + * Handle a reply (route to origin). Only forwards the reply back to + * other peers waiting for it. Does not do local caching or + * forwarding to local clients. * * @param type type of the block * @param options routing options @@ -540,7 +1023,7 @@ GST_NEIGHBOURS_handle_get (uint32_t type, * @param data_size number of bytes in data */ void -GST_NEIGHBOURS_handle_reply (uint32_t type, +GDS_NEIGHBOURS_handle_reply (uint32_t type, uint32_t options, GNUNET_TIME_Absolute expiration_time, const GNUNET_HashCode *key, @@ -555,11 +1038,28 @@ GST_NEIGHBOURS_handle_reply (uint32_t type, } +/** + * Closure for 'add_known_to_bloom'. + */ +struct BloomConstructorContext +{ + /** + * Bloom filter under construction. + */ + struct GNUNET_CONTAINER_BloomFilter *bloom; + + /** + * Mutator to use. + */ + uint32_t bf_mutator; +}; + + /** * Add each of the peers we already know to the bloom filter of * the request so that we don't get duplicate HELLOs. * - * @param cls the 'struct GNUNET_CONTAINER_BloomFilter' we're building + * @param cls the 'struct BloomConstructorContext'. * @param key peer identity to add to the bloom filter * @param value value the peer information (unused) * @return GNUNET_YES (we should continue to iterate) @@ -567,9 +1067,11 @@ GST_NEIGHBOURS_handle_reply (uint32_t type, static int add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value) { - struct GNUNET_CONTAINER_BloomFilter *bloom = cls; + struct BloomConstructorContext *ctx = cls; + GNUNET_HashCode mh; - GNUNET_CONTAINER_bloomfilter_add (bloom, key); + GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh); + GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh); return GNUNET_YES; } @@ -589,7 +1091,7 @@ send_find_peer_message (void *cls, struct GNUNET_DHT_FindPeerMessage *find_peer_msg; struct DHT_MessageContext msg_ctx; struct GNUNET_TIME_Relative next_send_time; - struct GNUNET_CONTAINER_BloomFilter *temp_bloom; + struct BloomConstructorContext bcc; find_peer_task = GNUNET_SCHEDULER_NO_TASK; if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) @@ -602,38 +1104,21 @@ send_find_peer_message (void *cls, newly_found_peers = 0; return; } - - // FIXME: build message... - find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage)); - find_peer_msg->header.size = - htons (sizeof (struct GNUNET_DHT_FindPeerMessage)); - find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); - temp_bloom = - GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); - GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom, - temp_bloom); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom, - find_peer_msg-> - bloomfilter, - DHT_BLOOM_SIZE)); - GNUNET_CONTAINER_bloomfilter_free (temp_bloom); - - memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext)); - memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode)); - msg_ctx.unique_id = - GNUNET_ntohll (GNUNET_CRYPTO_random_u64 - (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX)); - msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION; - msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE; - msg_ctx.network_size = log_of_network_size_estimate; - msg_ctx.peer = my_identity; - msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE; - msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT; - // FIXME: transmit message... - demultiplex_message (&find_peer_msg->header, &msg_ctx); - GNUNET_free (find_peer_msg); - + bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); + bcc.bloom = + GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); + GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, + &add_known_to_bloom, + &bcc); + // FIXME: pass priority!? + GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO, + GNUNET_DHT_RO_FIND_PEER, + 16 /* FIXME: replication level? */, + 0, + &my_identity.hashPubKey, + NULL, 0, + bcc.bloom, bcc.bf_mutator, NULL); + GNUNET_CONTAINER_bloomfilter_free (bcc.bloom); /* schedule next round */ newly_found_peers = 0; next_send_time.rel_value = @@ -674,9 +1159,10 @@ core_init (void *cls, struct GNUNET_CORE_Handle *server, /** - * Core handler for p2p get requests. + * Core handler for p2p put requests. * * @param cls closure + * @param peer sender of the request * @param message message * @param peer peer identity this notification is about * @param atsi performance data @@ -684,84 +1170,105 @@ core_init (void *cls, struct GNUNET_CORE_Handle *server, * GNUNET_SYSERR to close it (signal serious error) */ static int -handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer, +handle_dht_p2p_put (void *cls, + const struct GNUNET_PeerIdentity *peer, const struct GNUNET_MessageHeader *message, const struct GNUNET_TRANSPORT_ATS_Information *atsi) { - struct GNUNET_DHT_P2PRouteMessage *incoming = - (struct GNUNET_DHT_P2PRouteMessage *) message; - struct GNUNET_MessageHeader *enc_msg = - (struct GNUNET_MessageHeader *) &incoming[1]; - struct DHT_MessageContext *msg_ctx; - char *route_path; - int path_size; - - if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) + const struct PeerPutMessage *put; + const struct GNUNET_PeerIdentity *put_path; + const void *payload; + uint32_t putlen; + uint16_t msize; + size_t payload_size; + struct GNUNET_CONTAINER_BloomFilter *bf; + GNUNET_HashCode test_key; + + msize = ntohs (message->size); + if (msize < sizeof (struct PeerPutMessage)) { GNUNET_break_op (0); return GNUNET_YES; } - - if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending of previous replies took too long, backing off!\n"); - increment_stats ("# route requests dropped due to high load"); - decrease_max_send_delay (get_max_send_delay ()); - return GNUNET_YES; - } - msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext)); - msg_ctx->bloom = - GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE, - DHT_BLOOM_K); - GNUNET_assert (msg_ctx->bloom != NULL); - msg_ctx->hop_count = ntohl (incoming->hop_count); - memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode)); - msg_ctx->replication = ntohl (incoming->desired_replication_level); - msg_ctx->msg_options = ntohl (incoming->options); - if (GNUNET_DHT_RO_RECORD_ROUTE == - (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) + put = (const struct PeerPutMessage*) message; + putlen = ntohl (put->put_path_length); + if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) || + (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + put_path = (const struct GNUNET_PeerIdentity*) &put[1]; + payload = &put_path[putlen]; + payload_size = msize - (sizeof (struct PeerPutMessage) + + putlen * sizeof (struct GNUNET_PeerIdentity)); + switch (GNUNET_BLOCK_get_key (block_context, + ntohl (put->type), + payload, payload_size, + &test_key)) { - path_size = - ntohl (incoming->outgoing_path_length) * - sizeof (struct GNUNET_PeerIdentity); - if (ntohs (message->size) != - (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) + - path_size)) + case GNUNET_YES: + if (0 != memcmp (&test_key, key, sizeof (GNUNET_HashCode))) { GNUNET_break_op (0); - GNUNET_free (msg_ctx); return GNUNET_YES; } - route_path = (char *) &incoming[1]; - route_path = route_path + ntohs (enc_msg->size); - msg_ctx->path_history = - GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size); - memcpy (msg_ctx->path_history, route_path, path_size); - memcpy (&msg_ctx->path_history[path_size], &my_identity, - sizeof (struct GNUNET_PeerIdentity)); - msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1; + break; + case GNUNET_NO: + GNUNET_break_op (0); + return GNUNET_YES; + case GNUNET_SYSERR: + /* cannot verify, good luck */ + break; } - msg_ctx->network_size = ntohl (incoming->network_size); - msg_ctx->peer = *peer; - msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE; - msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT; - demultiplex_message (enc_msg, msg_ctx); - if (msg_ctx->bloom != NULL) + bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, + DHT_BLOOM_SIZE, + DHT_BLOOM_K); { - GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom); - msg_ctx->bloom = NULL; + struct GNUNET_PeerIdentity pp[putlen+1]; + + /* extend 'put path' by sender */ + memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity)); + pp[putlen] = *sender; + + /* give to local clients */ + GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time), + &put->key, + 0, NULL, + putlen + 1, + pp, + ntohl (put->type), + payload_size, + payload); + /* store locally */ + GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time), + &put->key, + putlen + 1, pp, + ntohl (put->type), + payload_size, + payload); + /* route to other peers */ + GDS_NEIGHBOURS_handle_put (ntohl (put->type), + ntohl (put->options), + ntohl (put->desired_replication_level), + GNUNET_TIME_absolute_ntoh (put->expiration_time), + ntohl (put->hop_count), + bf, + putlen + 1, pp, + payload, + payload_size); } - GNUNET_free (msg_ctx); + GNUNET_CONTAINER_bloomfilter_free (bf); return GNUNET_YES; } /** - * Core handler for p2p put requests. + * Core handler for p2p get requests. * * @param cls closure + * @param peer sender of the request * @param message message * @param peer peer identity this notification is about * @param atsi performance data @@ -769,11 +1276,18 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer, * GNUNET_SYSERR to close it (signal serious error) */ static int -handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer, +handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_MessageHeader *message, const struct GNUNET_TRANSPORT_ATS_Information *atsi) { + // 1) validate GET + // 2) store in routing table + // 3) check options (i.e. FIND PEER) + // 4) local lookup (=> need eval result!) + // 5) p2p forwarding + + struct GNUNET_DHT_P2PRouteMessage *incoming = (struct GNUNET_DHT_P2PRouteMessage *) message; struct GNUNET_MessageHeader *enc_msg = @@ -782,6 +1296,7 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer, char *route_path; int path_size; + // FIXME if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) { GNUNET_break_op (0); @@ -844,7 +1359,7 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer, /** - * Core handler for p2p route results. + * Core handler for p2p result messages. * * @param cls closure * @param message message @@ -858,12 +1373,17 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_TRANSPORT_ATS_Information *atsi) { + // 1) validate result format + // 2) append 'peer' to put path + // 3) forward to local clients + // 4) p2p routing const struct GNUNET_DHT_P2PRouteResultMessage *incoming = (const struct GNUNET_DHT_P2PRouteResultMessage *) message; struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *) &incoming[1]; struct DHT_MessageContext msg_ctx; + // FIXME if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) { GNUNET_break_op (0); @@ -903,7 +1423,7 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer, * Initialize neighbours subsystem. */ int -GST_NEIGHBOURS_init () +GDS_NEIGHBOURS_init () { static struct GNUNET_CORE_MessageHandler core_handlers[] = { {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0}, @@ -918,18 +1438,16 @@ GST_NEIGHBOURS_init () GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size", &temp_config_num)) bucket_size = (unsigned int) temp_config_num; - coreAPI = GNUNET_CORE_connect (GDS_cfg, /* Main configuration */ - DEFAULT_CORE_QUEUE_SIZE, /* queue size */ - NULL, /* Closure passed to DHT functions */ - &core_init, /* Call core_init once connected */ - &handle_core_connect, /* Handle connects */ - &handle_core_disconnect, /* remove peers on disconnects */ + coreAPI = GNUNET_CORE_connect (GDS_cfg, + DEFAULT_CORE_QUEUE_SIZE, + NULL, + &core_init, + &handle_core_connect, + &handle_core_disconnect, NULL, /* Do we care about "status" updates? */ - NULL, /* Don't want notified about all incoming messages */ - GNUNET_NO, /* For header only inbound notification */ - NULL, /* Don't want notified about all outbound messages */ - GNUNET_NO, /* For header only outbound notification */ - core_handlers); /* Register these handlers */ + NULL, GNUNET_NO, + NULL, GNUNET_NO, + core_handlers); if (coreAPI == NULL) return GNUNET_SYSERR; all_known_peers = GNUNET_CONTAINER_multihashmap_create (256); @@ -941,7 +1459,7 @@ GST_NEIGHBOURS_init () * Shutdown neighbours subsystem. */ void -GST_NEIGHBOURS_done () +GDS_NEIGHBOURS_done () { GNUNET_assert (coreAPI != NULL); GNUNET_CORE_disconnect (coreAPI); diff --git a/src/dht/gnunet-service-dht_neighbours.h b/src/dht/gnunet-service-dht_neighbours.h index 1f2ae08e6..2c20df2c4 100644 --- a/src/dht/gnunet-service-dht_neighbours.h +++ b/src/dht/gnunet-service-dht_neighbours.h @@ -29,12 +29,18 @@ /** - * Perform a PUT operation. + * Perform a PUT operation. Forwards the given request to other + * peers. Does not store the data locally. Does not give the + * data to local clients. May do nothing if this is the only + * peer in the network (or if we are the closest peer in the + * network). * * @param type type of the block * @param options routing options - * @param desired_replication_level desired replication count + * @param desired_replication_level desired replication level * @param expiration_time when does the content expire + * @param hop_count how many hops has this message traversed so far + * @param bf Bloom filter of peers this PUT has already traversed * @param key key for the content * @param put_path_length number of entries in put_path * @param put_path peers this request has traversed so far (if tracked) @@ -42,10 +48,12 @@ * @param data_size number of bytes in data */ void -GST_NEIGHBOURS_handle_put (uint32_t type, +GDS_NEIGHBOURS_handle_put (uint32_t type, uint32_t options, uint32_t desired_replication_level, GNUNET_TIME_Absolute expiration_time, + uint32_t hop_count, + struct GNUNET_CONTAINER_BloomFilter *bf, const GNUNET_HashCode *key, unsigned int put_path_length, struct GNUNET_PeerIdentity *put_path, @@ -54,11 +62,15 @@ GST_NEIGHBOURS_handle_put (uint32_t type, /** - * Perform a GET operation. + * Perform a GET operation. Forwards the given request to other + * peers. Does not lookup the key locally. May do nothing if this is + * the only peer in the network (or if we are the closest peer in the + * network). * * @param type type of the block * @param options routing options * @param desired_replication_level desired replication count + * @param hop_count how many hops did this request traverse so far? * @param key key for the content * @param xquery extended query * @param xquery_size number of bytes in xquery @@ -67,9 +79,10 @@ GST_NEIGHBOURS_handle_put (uint32_t type, * @param peer_bf filter for peers not to select (again) */ void -GST_NEIGHBOURS_handle_get (uint32_t type, +GDS_NEIGHBOURS_handle_get (uint32_t type, uint32_t options, uint32_t desired_replication_level, + uint32_t hop_count, const GNUNET_HashCode *key, const void *xquery, size_t xquery_size, @@ -79,10 +92,11 @@ GST_NEIGHBOURS_handle_get (uint32_t type, /** - * Handle a reply (route to origin). + * Handle a reply (route to origin). Only forwards the reply back to + * other peers waiting for it. Does not do local caching or + * forwarding to local clients. * * @param type type of the block - * @param options routing options * @param expiration_time when does the content expire * @param key key for the content * @param put_path_length number of entries in put_path @@ -93,8 +107,7 @@ GST_NEIGHBOURS_handle_get (uint32_t type, * @param data_size number of bytes in data */ void -GST_NEIGHBOURS_handle_reply (uint32_t type, - uint32_t options, +GDS_NEIGHBOURS_handle_reply (uint32_t type, GNUNET_TIME_Absolute expiration_time, const GNUNET_HashCode *key, unsigned int put_path_length, @@ -109,13 +122,13 @@ GST_NEIGHBOURS_handle_reply (uint32_t type, * Initialize neighbours subsystem. */ void -GST_NEIGHBOURS_init (void); +GDS_NEIGHBOURS_init (void); /** * Shutdown neighbours subsystem. */ void -GST_NEIGHBOURS_done (void); +GDS_NEIGHBOURS_done (void); #endif -- cgit v1.2.3