aboutsummaryrefslogtreecommitdiff
path: root/src/dht
diff options
context:
space:
mode:
authorNathan S. Evans <evans@in.tum.de>2010-07-20 14:50:01 +0000
committerNathan S. Evans <evans@in.tum.de>2010-07-20 14:50:01 +0000
commit27ee160b9c2bfd30a192db7f86585da8bc34e1cd (patch)
tree893c277e336a735a4d970fba1f13957a63394ee3 /src/dht
parent429d32f4c389b2dd84aaeaacc8e6e2b9b1ff0a6c (diff)
downloadgnunet-27ee160b9c2bfd30a192db7f86585da8bc34e1cd.tar.gz
gnunet-27ee160b9c2bfd30a192db7f86585da8bc34e1cd.zip
missing file
Diffstat (limited to 'src/dht')
-rw-r--r--src/dht/Makefile.am36
-rw-r--r--src/dht/dhtlog.c91
-rw-r--r--src/dht/gnunet-service-dht.c1941
-rw-r--r--src/dht/test_dhtlog_data.conf72
4 files changed, 1972 insertions, 168 deletions
diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am
index 592d6ea52..e2d80a308 100644
--- a/src/dht/Makefile.am
+++ b/src/dht/Makefile.am
@@ -4,7 +4,7 @@ if MINGW
4endif 4endif
5 5
6plugindir = $(libdir)/gnunet 6plugindir = $(libdir)/gnunet
7 7HAVE_MYSQL = 0
8if HAVE_MYSQL 8if HAVE_MYSQL
9 MYSQL_PLUGIN = libgnunet_plugin_dhtlog_mysql.la 9 MYSQL_PLUGIN = libgnunet_plugin_dhtlog_mysql.la
10endif 10endif
@@ -32,6 +32,14 @@ libgnunet_plugin_dhtlog_dummy_la_LIBADD = \
32libgnunet_plugin_dhtlog_dummy_la_LDFLAGS = \ 32libgnunet_plugin_dhtlog_dummy_la_LDFLAGS = \
33 $(GN_PLUGIN_LDFLAGS) 33 $(GN_PLUGIN_LDFLAGS)
34 34
35libgnunetdhtlog_la_SOURCES = \
36 dhtlog.c dhtlog.h
37libgnunetdhtlog_la_LIBADD = \
38 $(top_builddir)/src/util/libgnunetutil.la
39libgnunetdhtlog_la_LDFLAGS = \
40 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
41 -version-info 0:0:0
42
35if HAVE_MYSQL 43if HAVE_MYSQL
36libgnunet_plugin_dhtlog_mysql_la_SOURCES = \ 44libgnunet_plugin_dhtlog_mysql_la_SOURCES = \
37 plugin_dhtlog_mysql.c 45 plugin_dhtlog_mysql.c
@@ -41,14 +49,6 @@ libgnunet_plugin_dhtlog_mysql_la_LIBADD = \
41libgnunet_plugin_dhtlog_mysql_la_LDFLAGS = \ 49libgnunet_plugin_dhtlog_mysql_la_LDFLAGS = \
42 $(GN_PLUGIN_LDFLAGS) -lmysqlclient $(ZLIB_LNK) 50 $(GN_PLUGIN_LDFLAGS) -lmysqlclient $(ZLIB_LNK)
43endif 51endif
44
45libgnunetdhtlog_la_SOURCES = \
46 dhtlog.c dhtlog.h
47libgnunetdhtlog_la_LIBADD = \
48 $(top_builddir)/src/util/libgnunetutil.la
49libgnunetdhtlog_la_LDFLAGS = \
50 $(GN_LIB_LDFLAGS) $(WINFLAGS) \
51 -version-info 0:0:0
52 52
53libgnunetdht_la_SOURCES = \ 53libgnunetdht_la_SOURCES = \
54 dht_api.c dht.h 54 dht_api.c dht.h
@@ -115,8 +115,8 @@ check_PROGRAMS = \
115 test_dht_api \ 115 test_dht_api \
116 test_dht_twopeer \ 116 test_dht_twopeer \
117 test_dht_twopeer_put_get \ 117 test_dht_twopeer_put_get \
118 test_dht_multipeer 118 test_dht_multipeer \
119# test_dhtlog 119 test_dhtlog
120 120
121TESTS = test_dht_api $(check_SCRIPTS) 121TESTS = test_dht_api $(check_SCRIPTS)
122 122
@@ -148,13 +148,13 @@ test_dht_twopeer_put_get_LDADD = \
148 $(top_builddir)/src/testing/libgnunettesting.la \ 148 $(top_builddir)/src/testing/libgnunettesting.la \
149 $(top_builddir)/src/dht/libgnunetdht.la 149 $(top_builddir)/src/dht/libgnunetdht.la
150 150
151#test_dhtlog_SOURCES = \ 151test_dhtlog_SOURCES = \
152# test_dhtlog.c 152 test_dhtlog.c
153#test_dhtlog_LDADD = \ 153test_dhtlog_LDADD = \
154# $(top_builddir)/src/util/libgnunetutil.la \ 154 $(top_builddir)/src/util/libgnunetutil.la \
155# $(top_builddir)/src/testing/libgnunettesting.la \ 155 $(top_builddir)/src/testing/libgnunettesting.la \
156# $(top_builddir)/src/dht/libgnunetdht.la \ 156 $(top_builddir)/src/dht/libgnunetdht.la \
157# $(top_builddir)/src/dht/libgnunetdhtlog.la 157 $(top_builddir)/src/dht/libgnunetdhtlog.la
158 158
159EXTRA_DIST = \ 159EXTRA_DIST = \
160 $(check_SCRIPTS) \ 160 $(check_SCRIPTS) \
diff --git a/src/dht/dhtlog.c b/src/dht/dhtlog.c
new file mode 100644
index 000000000..37e9c2d95
--- /dev/null
+++ b/src/dht/dhtlog.c
@@ -0,0 +1,91 @@
1/*
2 This file is part of GNUnet.
3 (C) 2006 - 2009 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file src/dht/dhtlog.c
23 * @brief Plugin loaded to load logging
24 * to record DHT operations
25 * @author Nathan Evans
26 *
27 * Database: Loaded by plugin MySQL
28 */
29
30#include "platform.h"
31#include "gnunet_util_lib.h"
32#include "dhtlog.h"
33
34static char *libname;
35
36/*
37 * Provides the dhtlog api
38 *
39 * @param c the configuration to use to connect to a server
40 *
41 * @return the handle to the server, or NULL on error
42 */
43struct GNUNET_DHTLOG_Handle *
44GNUNET_DHTLOG_connect (const struct GNUNET_CONFIGURATION_Handle *c)
45{
46 struct GNUNET_DHTLOG_Plugin *plugin;
47 struct GNUNET_DHTLOG_Handle *api;
48 char *plugin_name;
49
50 plugin = GNUNET_malloc(sizeof(struct GNUNET_DHTLOG_Plugin));
51 plugin->cfg = c;
52 if (GNUNET_OK ==
53 GNUNET_CONFIGURATION_get_value_string (c,
54 "DHTLOG", "PLUGIN", &plugin_name))
55 {
56 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
57 _("Loading `%s' dhtlog plugin\n"), plugin_name);
58 GNUNET_asprintf (&libname, "libgnunet_plugin_dhtlog_%s", plugin_name);
59 GNUNET_PLUGIN_load (libname, plugin);
60 }
61
62 if (plugin->dhtlog_api == NULL)
63 {
64 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
65 _("Failed to load dhtlog plugin for `%s'\n"), plugin_name);
66 GNUNET_free (plugin_name);
67 GNUNET_free (plugin);
68 }
69
70 api = plugin->dhtlog_api;
71 GNUNET_free(plugin);
72 return api;
73}
74
75/**
76 * Shutdown the module.
77 */
78void
79GNUNET_DHTLOG_disconnect (struct GNUNET_DHTLOG_Handle *api)
80{
81#if DEBUG_DHTLOG
82 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
83 "MySQL DHT Logger: database shutdown\n");
84#endif
85 if (api != NULL)
86 {
87 GNUNET_PLUGIN_unload(libname, api);
88 }
89}
90
91/* end of dhtlog.c */
diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c
index d16f784f8..eb577de47 100644
--- a/src/dht/gnunet-service-dht.c
+++ b/src/dht/gnunet-service-dht.c
@@ -37,53 +37,147 @@
37#include "gnunet_datacache_lib.h" 37#include "gnunet_datacache_lib.h"
38#include "gnunet_transport_service.h" 38#include "gnunet_transport_service.h"
39#include "gnunet_hello_lib.h" 39#include "gnunet_hello_lib.h"
40#include "gnunet_dht_service.h"
41#include "dhtlog.h"
40#include "dht.h" 42#include "dht.h"
41 43
42/** 44/**
43 * Handle to the datacache service (for inserting/retrieving data) 45 * How many buckets will we allow total.
44 */ 46 */
45static struct GNUNET_DATACACHE_Handle *datacache; 47#define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
46 48
47/** 49/**
48 * The main scheduler to use for the DHT service 50 * What is the maximum number of peers in a given bucket.
49 */ 51 */
50static struct GNUNET_SCHEDULER_Handle *sched; 52#define DEFAULT_BUCKET_SIZE 8
51 53
52/** 54/**
53 * The configuration the DHT service is running with 55 * Minimum number of peers we need for "good" routing,
56 * any less than this and we will allow messages to
57 * travel much further through the network!
54 */ 58 */
55static const struct GNUNET_CONFIGURATION_Handle *cfg; 59#define MINIMUM_PEER_THRESHOLD 20
56 60
57/** 61/**
58 * Handle to the core service 62 * Real maximum number of hops, at which point we refuse
63 * to forward the message.
59 */ 64 */
60static struct GNUNET_CORE_Handle *coreAPI; 65#define MAX_HOPS 20
61 66
62/** 67/**
63 * Handle to the transport service, for getting our hello 68 * Linked list of messages to send to clients.
64 */ 69 */
65static struct GNUNET_TRANSPORT_Handle *transport_handle; 70struct P2PPendingMessage
71{
72 /**
73 * Pointer to next item in the list
74 */
75 struct P2PPendingMessage *next;
66 76
67/** 77 /**
68 * The identity of our peer. 78 * Pointer to previous item in the list
69 */ 79 */
70static struct GNUNET_PeerIdentity my_identity; 80 struct P2PPendingMessage *prev;
71 81
72/** 82 /**
73 * Short id of the peer, for printing 83 * Message importance level.
74 */ 84 */
75static char *my_short_id; 85 unsigned int importance;
86
87 /**
88 * How long to wait before sending message.
89 */
90 struct GNUNET_TIME_Relative timeout;
91
92 /**
93 * Actual message to be sent; // avoid allocation
94 */
95 const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
96
97};
76 98
77/** 99/**
78 * Our HELLO 100 * Per-peer information.
79 */ 101 */
80static struct GNUNET_MessageHeader *my_hello; 102struct PeerInfo
103{
104 /**
105 * Next peer entry (DLL)
106 */
107 struct PeerInfo *next;
108
109 /**
110 * Prev peer entry (DLL)
111 */
112 struct PeerInfo *prev;
113
114 /**
115 * Head of pending messages to be sent to this peer.
116 */
117 struct P2PPendingMessage *head;
118
119 /**
120 * Tail of pending messages to be sent to this peer.
121 */
122 struct P2PPendingMessage *tail;
123
124 /**
125 * Core handle for sending messages to this peer.
126 */
127 struct GNUNET_CORE_TransmitHandle *th;
128
129 /**
130 * Task for scheduling message sends.
131 */
132 GNUNET_SCHEDULER_TaskIdentifier send_task;
133
134 /**
135 * What is the average latency for replies received?
136 */
137 struct GNUNET_TIME_Relative latency;
138
139 /**
140 * Number of responses received
141 */
142 unsigned long long response_count;
143
144 /**
145 * Number of requests sent
146 */
147 unsigned long long request_count;
148
149 /**
150 * What is the identity of the peer?
151 */
152 struct GNUNET_PeerIdentity id;
153
154 /**
155 * Transport level distance to peer.
156 */
157 unsigned int distance;
158
159};
81 160
82/** 161/**
83 * Task to run when we shut down, cleaning up all our trash 162 * Peers are grouped into buckets.
84 */ 163 */
85static GNUNET_SCHEDULER_TaskIdentifier cleanup_task; 164struct PeerBucket
165{
166 /**
167 * Head of DLL
168 */
169 struct PeerInfo *head;
86 170
171 /**
172 * Tail of DLL
173 */
174 struct PeerInfo *tail;
175
176 /**
177 * Number of peers in the bucket.
178 */
179 unsigned int peers_size;
180};
87 181
88/** 182/**
89 * Linked list of messages to send to clients. 183 * Linked list of messages to send to clients.
@@ -142,63 +236,752 @@ struct ClientList
142 236
143}; 237};
144 238
239
145/** 240/**
146 * Context for handling results from a get request. 241 * Context containing information about a DHT message received.
147 */ 242 */
148struct DatacacheGetContext 243struct DHT_MessageContext
149{ 244{
150 /** 245 /**
151 * The client to send the result to. 246 * The client this request was received from.
247 * (NULL if received from another peer)
152 */ 248 */
153 struct ClientList *client; 249 struct ClientList *client;
154 250
155 /** 251 /**
156 * The unique id of this request 252 * The peer this request was received from.
253 * (NULL if received from local client)
157 */ 254 */
158 unsigned long long unique_id; 255 const struct GNUNET_PeerIdentity *peer;
256
257 /**
258 * The key this request was about
259 */
260 const GNUNET_HashCode *key;
261
262 /**
263 * The unique identifier of this request
264 */
265 uint64_t unique_id;
266
267 /**
268 * Desired replication level
269 */
270 uint32_t replication;
271
272 /**
273 * Network size estimate, either ours or the sum of
274 * those routed to thus far. =~ Log of number of peers
275 * chosen from for this request.
276 */
277 uint32_t network_size;
278
279 /**
280 * Any message options for this request
281 */
282 uint32_t msg_options;
283
284 /**
285 * How many hops has the message already traversed?
286 */
287 uint32_t hop_count;
288
289 /**
290 * Bloomfilter for this routing request.
291 */
292 struct GNUNET_CONTAINER_BloomFilter *bloom;
293
294 /**
295 * Did we forward this message? (may need to remember it!)
296 */
297 int forwarded;
298
299 /**
300 * Are we the closest known peer to this key (out of our neighbors?)
301 */
302 int closest;
159}; 303};
160 304
161/** 305/**
162 * Context containing information about a DHT message received. 306 * Record used for remembering what peers are waiting for what
307 * responses (based on search key).
163 */ 308 */
164struct DHT_MessageContext 309struct DHTRouteSource
165{ 310{
311
166 /** 312 /**
167 * The client this request was received from. 313 * This is a DLL.
314 */
315 struct DHTRouteSource *next;
316
317 /**
318 * This is a DLL.
319 */
320 struct DHTRouteSource *prev;
321
322 /**
323 * Source of the request. Replies should be forwarded to
324 * this peer.
325 */
326 struct GNUNET_PeerIdentity source;
327
328 /**
329 * If this was a local request, remember the client; otherwise NULL.
168 */ 330 */
169 struct ClientList *client; 331 struct ClientList *client;
170 332
171 /** 333 /**
172 * The key this request was about 334 * Pointer to this nodes heap location (for removal)
173 */ 335 */
174 const GNUNET_HashCode *key; 336 struct GNUNET_CONTAINER_HeapNode *hnode;
175 337
176 /** 338 /**
177 * The unique identifier of this request 339 * Back pointer to the record storing this information.
178 */ 340 */
179 unsigned long long unique_id; 341 struct DHTQueryRecord *record;
180 342
181 /** 343 /**
182 * Desired replication level 344 * Task to remove this entry on timeout.
183 */ 345 */
184 size_t replication; 346 GNUNET_SCHEDULER_TaskIdentifier delete_task;
347};
185 348
349/**
350 * Entry in the DHT routing table.
351 */
352struct DHTQueryRecord
353{
186 /** 354 /**
187 * Any message options for this request 355 * Head of DLL for result forwarding.
356 */
357 struct DHTRouteSource *head;
358
359 /**
360 * Tail of DLL for result forwarding.
361 */
362 struct DHTRouteSource *tail;
363
364 /**
365 * Key that the record concerns.
366 */
367 GNUNET_HashCode key;
368
369 /**
370 * GET message of this record (what we already forwarded?).
371 */
372 //DV_DHT_MESSAGE get; Try to get away with not saving this.
373
374 /**
375 * Bloomfilter of the peers we've replied to so far
376 */
377 //struct GNUNET_BloomFilter *bloom_results; Don't think we need this, just remove from DLL on response.
378
379};
380
381/**
382 * DHT Routing results structure
383 */
384struct DHTResults
385{
386 /*
387 * Min heap for removal upon reaching limit
188 */ 388 */
189 size_t msg_options; 389 struct GNUNET_CONTAINER_Heap *minHeap;
390
391 /*
392 * Hashmap for fast key based lookup
393 */
394 struct GNUNET_CONTAINER_MultiHashMap *hashmap;
395
190}; 396};
191 397
192/** 398/**
399 * Routing option to end routing when closest peer found.
400 */
401static int stop_on_closest;
402
403/**
404 * Routing option to end routing when data is found.
405 */
406static int stop_on_found;
407
408/**
409 * Container of active queries we should remember
410 */
411static struct DHTResults forward_list;
412
413/**
414 * Handle to the datacache service (for inserting/retrieving data)
415 */
416static struct GNUNET_DATACACHE_Handle *datacache;
417
418/**
419 * The main scheduler to use for the DHT service
420 */
421static struct GNUNET_SCHEDULER_Handle *sched;
422
423/**
424 * The configuration the DHT service is running with
425 */
426static const struct GNUNET_CONFIGURATION_Handle *cfg;
427
428/**
429 * Handle to the core service
430 */
431static struct GNUNET_CORE_Handle *coreAPI;
432
433/**
434 * Handle to the transport service, for getting our hello
435 */
436static struct GNUNET_TRANSPORT_Handle *transport_handle;
437
438/**
439 * The identity of our peer.
440 */
441static struct GNUNET_PeerIdentity my_identity;
442
443/**
444 * Short id of the peer, for printing
445 */
446static char *my_short_id;
447
448/**
449 * Our HELLO
450 */
451static struct GNUNET_MessageHeader *my_hello;
452
453/**
454 * Task to run when we shut down, cleaning up all our trash
455 */
456static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
457
458/**
459 * The lowest currently used bucket.
460 */
461static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */
462
463/**
464 * The buckets (Kademlia routing table, complete with growth).
465 * Array of size MAX_BUCKET_SIZE.
466 */
467static struct PeerBucket k_buckets[MAX_BUCKETS]; /* From 0 to MAX_BUCKETS - 1 */
468
469/**
470 * Maximum size for each bucket.
471 */
472static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; /* Initially equal to DEFAULT_BUCKET_SIZE */
473
474/**
193 * List of active clients. 475 * List of active clients.
194 */ 476 */
195static struct ClientList *client_list; 477static struct ClientList *client_list;
196 478
197/** 479/**
480 * Handle to the DHT logger.
481 */
482static struct GNUNET_DHTLOG_Handle *dhtlog_handle;
483
484/*
485 * Whether or not to send routing debugging information
486 * to the dht logging server
487 */
488static unsigned int debug_routes;
489
490/*
491 * Whether or not to send FULL route information to
492 * logging server
493 */
494static unsigned int debug_routes_extended;
495
496/**
198 * Forward declaration. 497 * Forward declaration.
199 */ 498 */
200static size_t send_generic_reply (void *cls, size_t size, void *buf); 499static size_t send_generic_reply (void *cls, size_t size, void *buf);
201 500
501/* Declare here so retry_core_send is aware of it */
502size_t core_transmit_notify (void *cls,
503 size_t size, void *buf);
504
505/**
506 * Try to send another message from our core send list
507 */
508static void
509try_core_send (void *cls,
510 const struct GNUNET_SCHEDULER_TaskContext *tc)
511{
512 struct PeerInfo *peer = cls;
513 struct P2PPendingMessage *pending;
514 size_t ssize;
515
516 peer->send_task = GNUNET_SCHEDULER_NO_TASK;
517
518 if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
519 return;
520
521 if (peer->th != NULL)
522 return; /* Message send already in progress */
523
524 pending = peer->head;
525 if (pending != NULL)
526 {
527 ssize = ntohs(pending->msg->size);
528#if DEBUG_DHT > 1
529 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
530 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n", my_short_id,
531 "DHT", ssize, GNUNET_i2s(&peer->id));
532#endif
533 peer->th = GNUNET_CORE_notify_transmit_ready(coreAPI, pending->importance,
534 pending->timeout, &peer->id,
535 ssize, &core_transmit_notify, peer);
536 }
537}
538
539/**
540 * Function called to send a request out to another peer.
541 * Called both for locally initiated requests and those
542 * received from other peers.
543 *
544 * @param cls DHT service closure argument
545 * @param msg the encapsulated message
546 * @param peer the peer to forward the message to
547 * @param msg_ctx the context of the message (hop count, bloom, etc.)
548 */
549static void forward_result_message (void *cls,
550 const struct GNUNET_MessageHeader *msg,
551 struct PeerInfo *peer,
552 struct DHT_MessageContext *msg_ctx)
553{
554 struct GNUNET_DHT_P2PRouteResultMessage *result_message;
555 struct P2PPendingMessage *pending;
556 size_t msize;
557 size_t psize;
558
559 msize = sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs(msg->size);
560 GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
561 psize = sizeof(struct P2PPendingMessage) + msize;
562 pending = GNUNET_malloc(psize);
563 pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
564 pending->importance = DHT_SEND_PRIORITY;
565 pending->timeout = GNUNET_TIME_relative_get_forever();
566 result_message = (struct GNUNET_DHT_P2PRouteResultMessage *)pending->msg;
567 result_message->header.size = htons(msize);
568 result_message->header.type = htons(GNUNET_MESSAGE_TYPE_P2P_DHT_ROUTE_RESULT);
569 result_message->options = htonl(msg_ctx->msg_options);
570 result_message->hop_count = htonl(msg_ctx->hop_count + 1);
571 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, result_message->bloomfilter, DHT_BLOOM_SIZE));
572 result_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
573 memcpy(&result_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
574 memcpy(&result_message[1], msg, ntohs(msg->size));
575#if DEBUG_DHT > 1
576 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
577#endif
578 GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
579 if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
580 peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
581}
582/**
583 * Called when core is ready to send a message we asked for
584 * out to the destination.
585 *
586 * @param cls closure (NULL)
587 * @param size number of bytes available in buf
588 * @param buf where the callee should write the message
589 * @return number of bytes written to buf
590 */
591size_t core_transmit_notify (void *cls,
592 size_t size, void *buf)
593{
594 struct PeerInfo *peer = cls;
595 char *cbuf = buf;
596 struct P2PPendingMessage *pending;
597
598 size_t off;
599 size_t msize;
600
601 if (buf == NULL)
602 {
603 /* client disconnected */
604#if DEBUG_DHT
605 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n", my_short_id, "DHT");
606#endif
607 return 0;
608 }
609
610 if (peer->head == NULL)
611 return 0;
612
613 peer->th = NULL;
614 off = 0;
615 pending = peer->head;
616 msize = ntohs(pending->msg->size);
617 if (msize <= size)
618 {
619 off = msize;
620 memcpy (cbuf, pending->msg, msize);
621 GNUNET_CONTAINER_DLL_remove (peer->head,
622 peer->tail,
623 pending);
624#if DEBUG_DHT > 1
625 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Removing pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
626#endif
627 GNUNET_free (pending);
628 }
629#if SMART
630 while (NULL != pending &&
631 (size - off >= (msize = ntohs (pending->msg->size))))
632 {
633#if DEBUG_DHT_ROUTING
634 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d\n", my_short_id, "dht service", msize, size);
635#endif
636 memcpy (&cbuf[off], pending->msg, msize);
637 off += msize;
638 GNUNET_CONTAINER_DLL_remove (peer->head,
639 peer->tail,
640 pending);
641 GNUNET_free (pending);
642 pending = peer->head;
643 }
644#endif
645 if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
646 peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
647#if DEBUG_DHT > 1
648 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "`%s:%s' : transmit_notify (core) called with size %d, available %d, returning %d\n", my_short_id, "dht service", msize, size, off);
649#endif
650 return off;
651}
652
653/**
654 * Determine how many low order bits match in two
655 * GNUNET_HashCodes. i.e. - 010011 and 011111 share
656 * the first two lowest order bits, and therefore the
657 * return value is two (NOT XOR distance, nor how many
658 * bits match absolutely!).
659 *
660 * @param first the first hashcode
661 * @param second the hashcode to compare first to
662 *
663 * @return the number of bits that match
664 */
665static unsigned int matching_bits(const GNUNET_HashCode *first, const GNUNET_HashCode *second)
666{
667 unsigned int i;
668
669 for (i = 0; i < sizeof (GNUNET_HashCode) * 8; i++)
670 if (GNUNET_CRYPTO_hash_get_bit (first, i) != GNUNET_CRYPTO_hash_get_bit (second, i))
671 return i;
672 return sizeof (GNUNET_HashCode) * 8;
673}
674
675/**
676 * Compute the distance between have and target as a 32-bit value.
677 * Differences in the lower bits must count stronger than differences
678 * in the higher bits.
679 *
680 * @return 0 if have==target, otherwise a number
681 * that is larger as the distance between
682 * the two hash codes increases
683 */
684static unsigned int
685distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
686{
687 unsigned int bucket;
688 unsigned int msb;
689 unsigned int lsb;
690 unsigned int i;
691
692 /* We have to represent the distance between two 2^9 (=512)-bit
693 numbers as a 2^5 (=32)-bit number with "0" being used for the
694 two numbers being identical; furthermore, we need to
695 guarantee that a difference in the number of matching
696 bits is always represented in the result.
697
698 We use 2^32/2^9 numerical values to distinguish between
699 hash codes that have the same LSB bit distance and
700 use the highest 2^9 bits of the result to signify the
701 number of (mis)matching LSB bits; if we have 0 matching
702 and hence 512 mismatching LSB bits we return -1 (since
703 512 itself cannot be represented with 9 bits) */
704
705 /* first, calculate the most significant 9 bits of our
706 result, aka the number of LSBs */
707 bucket = matching_bits (target, have);
708 /* bucket is now a value between 0 and 512 */
709 if (bucket == 512)
710 return 0; /* perfect match */
711 if (bucket == 0)
712 return (unsigned int) -1; /* LSB differs; use max (if we did the bit-shifting
713 below, we'd end up with max+1 (overflow)) */
714
715 /* calculate the most significant bits of the final result */
716 msb = (512 - bucket) << (32 - 9);
717 /* calculate the 32-9 least significant bits of the final result by
718 looking at the differences in the 32-9 bits following the
719 mismatching bit at 'bucket' */
720 lsb = 0;
721 for (i = bucket + 1;
722 (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
723 {
724 if (GNUNET_CRYPTO_hash_get_bit (target, i) != GNUNET_CRYPTO_hash_get_bit (have, i))
725 lsb |= (1 << (bucket + 32 - 9 - i)); /* first bit set will be 10,
726 last bit set will be 31 -- if
727 i does not reach 512 first... */
728 }
729 return msb | lsb;
730}
731
732/**
733 * Return a number that is larger the closer the
734 * "have" GNUNET_hash code is to the "target".
735 *
736 * @return inverse distance metric, non-zero.
737 * Must fudge the value if NO bits match.
738 */
739static unsigned int
740inverse_distance (const GNUNET_HashCode * target,
741 const GNUNET_HashCode * have)
742{
743 if (matching_bits(target, have) == 0)
744 return 1; /* Never return 0! */
745 return ((unsigned int) -1) - distance (target, have);
746}
747
748/**
749 * Find the optimal bucket for this key, regardless
750 * of the current number of buckets in use.
751 *
752 * @param hc the hashcode to compare our identity to
753 *
754 * @return the proper bucket index, or GNUNET_SYSERR
755 * on error (same hashcode)
756 */
757static int find_bucket(const GNUNET_HashCode *hc)
758{
759 unsigned int bits;
760
761 bits = matching_bits(&my_identity.hashPubKey, hc);
762 if (bits == MAX_BUCKETS)
763 return GNUNET_SYSERR;
764 return MAX_BUCKETS - bits - 1;
765}
766
767/**
768 * Find which k-bucket this peer should go into,
769 * taking into account the size of the k-bucket
770 * array. This means that if more bits match than
771 * there are currently buckets, lowest_bucket will
772 * be returned.
773 *
774 * @param hc GNUNET_HashCode we are finding the bucket for.
775 *
776 * @return the proper bucket index for this key,
777 * or GNUNET_SYSERR on error (same hashcode)
778 */
779static int find_current_bucket(const GNUNET_HashCode *hc)
780{
781 int actual_bucket;
782 actual_bucket = find_bucket(hc);
783
784 if (actual_bucket == GNUNET_SYSERR) /* hc and our peer identity match! */
785 return GNUNET_SYSERR;
786 else if (actual_bucket < lowest_bucket) /* actual_bucket not yet used */
787 return lowest_bucket;
788 else
789 return actual_bucket;
790}
791
792/**
793 * Find a routing table entry from a peer identity
794 *
795 * @param peer the peer identity to look up
796 *
797 * @return the routing table entry, or NULL if not found
798 */
799static struct PeerInfo *
800find_peer_by_id(const struct GNUNET_PeerIdentity *peer)
801{
802 int bucket;
803 struct PeerInfo *pos;
804 bucket = find_current_bucket(&peer->hashPubKey);
805
806 if (bucket == GNUNET_SYSERR)
807 return NULL;
808
809 pos = k_buckets[bucket].head;
810 while (pos != NULL)
811 {
812 if (0 == memcmp(&pos->id, peer, sizeof(struct GNUNET_PeerIdentity)))
813 return pos;
814 pos = pos->next;
815 }
816 return NULL; /* No such peer. */
817}
818
819/**
820 * Really add a peer to a bucket (only do assertions
821 * on size, etc.)
822 *
823 * @param peer GNUNET_PeerIdentity of the peer to add
824 * @param bucket the already figured out bucket to add
825 * the peer to
826 * @param latency the core reported latency of this peer
827 * @param distance the transport level distance to this peer
828 */
829static void add_peer(const struct GNUNET_PeerIdentity *peer,
830 unsigned int bucket,
831 struct GNUNET_TIME_Relative latency,
832 unsigned int distance)
833{
834 struct PeerInfo *new_peer;
835 GNUNET_assert(bucket < MAX_BUCKETS);
836 GNUNET_assert(peer != NULL);
837 new_peer = GNUNET_malloc(sizeof(struct PeerInfo));
838 new_peer->latency = latency;
839 new_peer->distance = distance;
840 memcpy(&new_peer->id, peer, sizeof(struct GNUNET_PeerIdentity));
841
842 GNUNET_CONTAINER_DLL_insert_after(k_buckets[bucket].head,
843 k_buckets[bucket].tail,
844 k_buckets[bucket].tail,
845 new_peer);
846 k_buckets[bucket].peers_size++;
847}
848
849/**
850 * Given a peer and its corresponding bucket,
851 * remove it from that bucket. Does not free
852 * the PeerInfo struct, nor cancel messages
853 * or free messages waiting to be sent to this
854 * peer!
855 *
856 * @param peer the peer to remove
857 * @param bucket the bucket the peer belongs to
858 */
859static void remove_peer (struct PeerInfo *peer,
860 unsigned int bucket)
861{
862 GNUNET_assert(k_buckets[bucket].peers_size > 0);
863 GNUNET_CONTAINER_DLL_remove(k_buckets[bucket].head,
864 k_buckets[bucket].tail,
865 peer);
866 k_buckets[bucket].peers_size--;
867}
868
869/**
870 * Removes peer from a bucket, then frees associated
871 * resources and frees peer.
872 *
873 * @param peer peer to be removed and freed
874 * @param bucket which bucket this peer belongs to
875 */
876static void delete_peer (struct PeerInfo *peer,
877 unsigned int bucket)
878{
879 struct P2PPendingMessage *pos;
880 struct P2PPendingMessage *next;
881 remove_peer(peer, bucket); /* First remove the peer from its bucket */
882
883 if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
884 GNUNET_SCHEDULER_cancel(sched, peer->send_task);
885 if (peer->th != NULL)
886 GNUNET_CORE_notify_transmit_ready_cancel(peer->th);
887
888 pos = peer->head;
889 while (pos != NULL) /* Remove any pending messages for this peer */
890 {
891 next = pos->next;
892 GNUNET_free(pos);
893 pos = next;
894 }
895 GNUNET_free(peer);
896}
897
898/**
899 * The current lowest bucket is full, so change the lowest
900 * bucket to the next lower down, and move any appropriate
901 * entries in the current lowest bucket to the new bucket.
902 */
903static void enable_next_bucket()
904{
905 unsigned int new_bucket;
906 unsigned int to_remove;
907 int i;
908 struct PeerInfo *to_remove_list[bucket_size]; /* We either use CPU by making a list, or memory with array. Use memory. */
909 struct PeerInfo *pos;
910 GNUNET_assert(lowest_bucket > 0);
911
912 pos = k_buckets[lowest_bucket].head;
913 memset(to_remove_list, 0, sizeof(to_remove_list));
914 to_remove = 0;
915 /* Populate the array of peers which should be in the next lowest bucket */
916 while (pos->next != NULL)
917 {
918 if (find_bucket(&pos->id.hashPubKey) < lowest_bucket)
919 {
920 to_remove_list[to_remove] = pos;
921 to_remove++;
922 }
923 pos = pos->next;
924 }
925 new_bucket = lowest_bucket - 1;
926
927 /* Remove peers from lowest bucket, insert into next lowest bucket */
928 for (i = 0; i < bucket_size; i++)
929 {
930 if (to_remove_list[i] != NULL)
931 {
932 remove_peer(to_remove_list[i], lowest_bucket);
933 GNUNET_CONTAINER_DLL_insert_after(k_buckets[new_bucket].head,
934 k_buckets[new_bucket].tail,
935 k_buckets[new_bucket].tail,
936 to_remove_list[i]);
937 k_buckets[new_bucket].peers_size++;
938 }
939 else
940 break;
941 }
942 lowest_bucket = new_bucket;
943}
944/**
945 * Attempt to add a peer to our k-buckets.
946 *
947 * @param peer, the peer identity of the peer being added
948 *
949 * @return GNUNET_YES if the peer was added,
950 * GNUNET_NO if not,
951 * GNUNET_SYSERR on err (peer is us!)
952 */
953static int try_add_peer(const struct GNUNET_PeerIdentity *peer,
954 unsigned int bucket,
955 struct GNUNET_TIME_Relative latency,
956 unsigned int distance)
957{
958 int peer_bucket;
959
960 peer_bucket = find_current_bucket(&peer->hashPubKey);
961 if (peer_bucket == GNUNET_SYSERR)
962 return GNUNET_SYSERR;
963
964 GNUNET_assert(peer_bucket >= lowest_bucket);
965 if ((k_buckets[peer_bucket].peers_size) < bucket_size)
966 {
967 add_peer(peer, peer_bucket, latency, distance);
968 return GNUNET_YES;
969 }
970 else if ((peer_bucket == lowest_bucket) && (lowest_bucket > 0))
971 {
972 enable_next_bucket();
973 return try_add_peer(peer, bucket, latency, distance); /* Recurse, if proper bucket still full ping peers */
974 }
975 else if ((k_buckets[peer_bucket].peers_size) == bucket_size)
976 {
977 /* TODO: implement ping_oldest_peer */
978 //ping_oldest_peer(bucket, peer, latency, distance); /* Find oldest peer, ping it. If no response, remove and add new peer! */
979 return GNUNET_NO;
980 }
981 GNUNET_break(0);
982 return GNUNET_NO;
983}
984
202 985
203/** 986/**
204 * Task run to check for messages that need to be sent to a client. 987 * Task run to check for messages that need to be sent to a client.
@@ -300,7 +1083,7 @@ send_reply_to_client (struct ClientList *client,
300 size_t tsize; 1083 size_t tsize;
301#if DEBUG_DHT 1084#if DEBUG_DHT
302 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1085 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
303 "`%s': Sending reply to client.\n", "DHT"); 1086 "`%s:%s': Sending reply to client.\n", my_short_id, "DHT");
304#endif 1087#endif
305 msize = ntohs (message->size); 1088 msize = ntohs (message->size);
306 tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize; 1089 tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize;
@@ -323,6 +1106,135 @@ send_reply_to_client (struct ClientList *client,
323 1106
324 1107
325/** 1108/**
1109 * Main function that handles whether or not to route a result
1110 * message to other peers, or to send to our local client.
1111 *
1112 * @param msg the result message to be routed
1113 * @return the number of peers the message was routed to,
1114 * GNUNET_SYSERR on failure
1115 */
1116static int route_result_message(void *cls,
1117 struct GNUNET_MessageHeader *msg,
1118 struct DHT_MessageContext *message_context)
1119{
1120 struct DHTQueryRecord *record;
1121 struct DHTRouteSource *pos;
1122 struct PeerInfo *peer_info;
1123
1124 record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, message_context->key);
1125 if (record == NULL) /* No record of this message! */
1126 {
1127#if DEBUG_DHT
1128 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1129 "`%s:%s': Have no record of response key %s uid %llu\n", my_short_id,
1130 "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
1131#endif
1132#if DEBUG_DHT_ROUTING
1133
1134 /*if ((debug_routes) && (dhtlog_handle != NULL))
1135 {
1136 dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_RESULT,
1137 message_context->hop_count, GNUNET_SYSERR,
1138 &my_identity, message_context->key);
1139 }*/
1140
1141 if ((debug_routes_extended) && (dhtlog_handle != NULL))
1142 {
1143 dhtlog_handle->insert_route (NULL,
1144 message_context->unique_id,
1145 DHTLOG_RESULT,
1146 message_context->hop_count,
1147 GNUNET_SYSERR,
1148 &my_identity,
1149 message_context->key,
1150 message_context->peer, NULL);
1151 }
1152#endif
1153 if (message_context->bloom != NULL)
1154 {
1155 GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
1156 message_context->bloom = NULL;
1157 }
1158 return 0;
1159 }
1160
1161 pos = record->head;
1162 while (pos != NULL)
1163 {
1164 if (0 == memcmp(&pos->source, &my_identity, sizeof(struct GNUNET_PeerIdentity))) /* Local client initiated request! */
1165 {
1166#if DEBUG_DHT
1167 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1168 "`%s:%s': Sending response key %s uid %llu to client\n", my_short_id,
1169 "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
1170#endif
1171#if DEBUG_DHT_ROUTING
1172 /*
1173 if ((debug_routes) && (dhtlog_handle != NULL))
1174 {
1175 dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_RESULT,
1176 message_context->hop_count, GNUNET_YES,
1177 &my_identity, message_context->key);
1178 }*/
1179
1180 if ((debug_routes_extended) && (dhtlog_handle != NULL))
1181 {
1182 dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_RESULT,
1183 message_context->hop_count,
1184 GNUNET_YES, &my_identity, message_context->key,
1185 message_context->peer, NULL);
1186 }
1187#endif
1188 send_reply_to_client(pos->client, msg, message_context->unique_id);
1189 }
1190 else /* Send to peer */
1191 {
1192 peer_info = find_peer_by_id(&pos->source);
1193 if (peer_info == NULL) /* Didn't find the peer in our routing table, perhaps peer disconnected! */
1194 {
1195 pos = pos->next;
1196 continue;
1197 }
1198
1199 if (message_context->bloom == NULL)
1200 message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1201 GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
1202 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (message_context->bloom, &peer_info->id.hashPubKey))
1203 {
1204#if DEBUG_DHT
1205 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1206 "`%s:%s': Forwarding response key %s uid %llu to peer %s\n", my_short_id,
1207 "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
1208#endif
1209#if DEBUG_DHT_ROUTING
1210 if ((debug_routes_extended) && (dhtlog_handle != NULL))
1211 {
1212 dhtlog_handle->insert_route (NULL, message_context->unique_id,
1213 DHTLOG_RESULT,
1214 message_context->hop_count,
1215 GNUNET_NO, &my_identity, message_context->key,
1216 message_context->peer, &pos->source);
1217 }
1218#endif
1219 forward_result_message(cls, msg, peer_info, message_context);
1220 }
1221 else
1222 {
1223#if DEBUG_DHT
1224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1225 "`%s:%s': NOT Forwarding response (bloom match) key %s uid %llu to peer %s\n", my_short_id,
1226 "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&peer_info->id));
1227#endif
1228 }
1229 }
1230 pos = pos->next;
1231 }
1232 if (message_context->bloom != NULL)
1233 GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
1234 return 0;
1235}
1236
1237/**
326 * Iterator for local get request results, 1238 * Iterator for local get request results,
327 * 1239 *
328 * @param cls closure for iterator, a DatacacheGetContext 1240 * @param cls closure for iterator, a DatacacheGetContext
@@ -341,35 +1253,46 @@ datacache_get_iterator (void *cls,
341 const GNUNET_HashCode * key, 1253 const GNUNET_HashCode * key,
342 uint32_t size, const char *data, uint32_t type) 1254 uint32_t size, const char *data, uint32_t type)
343{ 1255{
344 struct DatacacheGetContext *datacache_get_ctx = cls; 1256 struct DHT_MessageContext *msg_ctx = cls;
1257 struct DHT_MessageContext *new_msg_ctx;
345 struct GNUNET_DHT_GetResultMessage *get_result; 1258 struct GNUNET_DHT_GetResultMessage *get_result;
346#if DEBUG_DHT 1259#if DEBUG_DHT
347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
348 "`%s': Received `%s' response from datacache\n", "DHT", "GET"); 1261 "`%s:%s': Received `%s' response from datacache\n", my_short_id, "DHT", "GET");
349#endif 1262#endif
1263 new_msg_ctx = GNUNET_malloc(sizeof(struct DHT_MessageContext));
1264 memcpy(new_msg_ctx, msg_ctx, sizeof(struct DHT_MessageContext));
350 get_result = 1265 get_result =
351 GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size); 1266 GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
352 get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT); 1267 get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
353 get_result->header.size = 1268 get_result->header.size =
354 htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size); 1269 htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
355 get_result->expiration = GNUNET_TIME_absolute_hton (exp); 1270 get_result->expiration = GNUNET_TIME_absolute_hton(exp);
356 get_result->type = htons (type); 1271 get_result->type = htons (type);
357 memcpy (&get_result[1], data, size); 1272 memcpy (&get_result[1], data, size);
358 send_reply_to_client (datacache_get_ctx->client, &get_result->header, 1273 new_msg_ctx->peer = &my_identity;
359 datacache_get_ctx->unique_id); 1274 new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1275 new_msg_ctx->hop_count = 0;
1276 route_result_message(cls, &get_result->header, new_msg_ctx);
1277 GNUNET_free(new_msg_ctx);
1278 //send_reply_to_client (datacache_get_ctx->client, &get_result->header,
1279 // datacache_get_ctx->unique_id);
360 GNUNET_free (get_result); 1280 GNUNET_free (get_result);
361 return GNUNET_OK; 1281 return GNUNET_OK;
362} 1282}
363 1283
364 1284
365/** 1285/**
366 * Server handler for initiating local dht get requests 1286 * Server handler for all dht get requests, look for data,
1287 * if found, send response either to clients or other peers.
367 * 1288 *
368 * @param cls closure for service 1289 * @param cls closure for service
369 * @param msg the actual get message 1290 * @param msg the actual get message
370 * @param message_context struct containing pertinent information about the get request 1291 * @param message_context struct containing pertinent information about the get request
1292 *
1293 * @return number of items found for GET request
371 */ 1294 */
372static void 1295static unsigned int
373handle_dht_get (void *cls, 1296handle_dht_get (void *cls,
374 const struct GNUNET_MessageHeader *msg, 1297 const struct GNUNET_MessageHeader *msg,
375 struct DHT_MessageContext *message_context) 1298 struct DHT_MessageContext *message_context)
@@ -377,32 +1300,66 @@ handle_dht_get (void *cls,
377 const struct GNUNET_DHT_GetMessage *get_msg; 1300 const struct GNUNET_DHT_GetMessage *get_msg;
378 uint16_t get_type; 1301 uint16_t get_type;
379 unsigned int results; 1302 unsigned int results;
380 struct DatacacheGetContext datacache_get_context;
381 1303
382 get_msg = (const struct GNUNET_DHT_GetMessage *) msg; 1304 get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
383 if (ntohs (get_msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage)) 1305 if (ntohs (get_msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage))
384 { 1306 {
385 GNUNET_break (0); 1307 GNUNET_break (0);
386 return; 1308 return 0;
387 } 1309 }
388 1310
389 get_type = ntohs (get_msg->type); 1311 get_type = ntohs (get_msg->type);
390#if DEBUG_DHT 1312#if DEBUG_DHT
391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1313 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392 "`%s': Received `%s' request from client, message type %u, key %s, uid %llu\n", 1314 "`%s:%s': Received `%s' request from client, message type %u, key %s, uid %llu\n", my_short_id,
393 "DHT", "GET", get_type, GNUNET_h2s (message_context->key), 1315 "DHT", "GET", get_type, GNUNET_h2s (message_context->key),
394 message_context->unique_id); 1316 message_context->unique_id);
395#endif 1317#endif
396 datacache_get_context.client = message_context->client; 1318
397 datacache_get_context.unique_id = message_context->unique_id;
398 results = 0; 1319 results = 0;
399 if (datacache != NULL) 1320 if (datacache != NULL)
400 results = 1321 results =
401 GNUNET_DATACACHE_get (datacache, message_context->key, get_type, 1322 GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
402 &datacache_get_iterator, &datacache_get_context); 1323 &datacache_get_iterator, message_context);
403 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1324
404 "`%s': Found %d results for local `%s' request\n", "DHT", 1325 if (results >= 1)
405 results, "GET"); 1326 {
1327#if DEBUG_DHT
1328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1329 "`%s:%s': Found %d results for `%s' request uid %llu\n", my_short_id, "DHT",
1330 results, "GET", message_context->unique_id);
1331#endif
1332#if DEBUG_DHT_ROUTING
1333 if ((debug_routes) && (dhtlog_handle != NULL))
1334 {
1335 dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
1336 message_context->hop_count, GNUNET_YES, &my_identity,
1337 message_context->key);
1338 }
1339
1340 if ((debug_routes_extended) && (dhtlog_handle != NULL))
1341 {
1342 dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
1343 message_context->hop_count, GNUNET_YES,
1344 &my_identity, message_context->key, message_context->peer,
1345 NULL);
1346 }
1347#endif
1348 }
1349
1350 if (message_context->hop_count == 0) /* Locally initiated request */
1351 {
1352#if DEBUG_DHT_ROUTING
1353 if ((debug_routes) && (dhtlog_handle != NULL))
1354 {
1355 dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_GET,
1356 message_context->hop_count, GNUNET_NO, &my_identity,
1357 message_context->key);
1358 }
1359#endif
1360 }
1361
1362 return results;
406} 1363}
407 1364
408 1365
@@ -425,12 +1382,12 @@ handle_dht_find_peer (void *cls,
425 1382
426#if DEBUG_DHT 1383#if DEBUG_DHT
427 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1384 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
428 "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n", 1385 "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
429 "DHT", "FIND PEER", GNUNET_h2s (message_context->key), 1386 my_short_id, "DHT", "FIND PEER", GNUNET_h2s (message_context->key),
430 ntohs (find_msg->size), 1387 ntohs (find_msg->size),
431 sizeof (struct GNUNET_MessageHeader)); 1388 sizeof (struct GNUNET_MessageHeader));
432#endif 1389#endif
433 if (my_hello == NULL) 1390 if ((my_hello == NULL) || (message_context->closest != GNUNET_YES))
434 { 1391 {
435#if DEBUG_DHT 1392#if DEBUG_DHT
436 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1393 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -457,7 +1414,13 @@ handle_dht_find_peer (void *cls,
457 "`%s': Sending hello size %d to client.\n", 1414 "`%s': Sending hello size %d to client.\n",
458 "DHT", hello_size); 1415 "DHT", hello_size);
459#endif 1416#endif
460 send_reply_to_client(message_context->client, find_peer_result, message_context->unique_id); 1417 if (message_context->bloom != NULL)
1418 GNUNET_CONTAINER_bloomfilter_clear(message_context->bloom);
1419
1420 message_context->hop_count = 0;
1421 message_context->peer = &my_identity;
1422 route_result_message(cls, find_peer_result, message_context);
1423 //send_reply_to_client(message_context->client, find_peer_result, message_context->unique_id);
461 GNUNET_free(find_peer_result); 1424 GNUNET_free(find_peer_result);
462} 1425}
463 1426
@@ -485,19 +1448,609 @@ handle_dht_put (void *cls,
485 data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage); 1448 data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
486#if DEBUG_DHT 1449#if DEBUG_DHT
487 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1450 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
488 "`%s': Received `%s' request from client, message type %d, key %s\n", 1451 "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
489 "DHT", "PUT", put_type, GNUNET_h2s (message_context->key)); 1452 my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (message_context->key), message_context->unique_id);
1453#endif
1454#if DEBUG_DHT_ROUTING
1455
1456 if ((debug_routes) && (dhtlog_handle != NULL))
1457 {
1458 dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
1459 message_context->hop_count, GNUNET_YES, &my_identity,
1460 message_context->key);
1461 }
490#endif 1462#endif
1463
491 if (datacache != NULL) 1464 if (datacache != NULL)
492 GNUNET_DATACACHE_put (datacache, message_context->key, data_size, 1465 GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
493 (char *) &put_msg[1], put_type, 1466 (char *) &put_msg[1], put_type,
494 GNUNET_TIME_absolute_ntoh(put_msg->expiration)); 1467 GNUNET_TIME_absolute_ntoh(put_msg->expiration));
495 else 1468 else
496 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1469 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
497 "`%s': %s request received locally, but have no datacache!\n", 1470 "`%s:%s': %s request received, but have no datacache!\n",
498 "DHT", "PUT"); 1471 my_short_id, "DHT", "PUT");
499} 1472}
500 1473
1474/**
1475 * Estimate the diameter of the network based
1476 * on how many buckets are currently in use.
1477 * Concept here is that the diameter of the network
1478 * is roughly the distance a message must travel in
1479 * order to reach its intended destination. Since
1480 * at each hop we expect to get one bit closer, and
1481 * we have one bit per bucket, the number of buckets
1482 * in use should be the largest number of hops for
1483 * a sucessful message. (of course, this assumes we
1484 * know all peers in the network!)
1485 *
1486 * @return ballpark diameter figure
1487 */
1488static unsigned int estimate_diameter()
1489{
1490 return MAX_BUCKETS - lowest_bucket;
1491}
1492
1493/**
1494 * To how many peers should we (on average)
1495 * forward the request to obtain the desired
1496 * target_replication count (on average).
1497 *
1498 * Always 0, 1 or 2 (don't send, send once, split)
1499 */
1500static unsigned int
1501get_forward_count (unsigned int hop_count, size_t target_replication)
1502{
1503 double target_count;
1504 unsigned int target_value;
1505 unsigned int diameter;
1506
1507 /* FIXME: the smaller we think the network is the more lenient we should be for
1508 * routing right? The estimation below only works if we think we have reasonably
1509 * full routing tables, which for our RR topologies may not be the case!
1510 */
1511 diameter = estimate_diameter ();
1512 if ((hop_count > (diameter + 1) * 2) && (MINIMUM_PEER_THRESHOLD < estimate_diameter() * bucket_size))
1513 {
1514#if DEBUG_DHT
1515 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1516 "`%s:%s': Hop count too high (est %d, lowest %d), NOT Forwarding request\n", my_short_id,
1517 "DHT", estimate_diameter(), lowest_bucket);
1518#endif
1519 return 0;
1520 }
1521 else if (hop_count > MAX_HOPS)
1522 {
1523#if DEBUG_DHT
1524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1525 "`%s:%s': Hop count too high (greater than max)\n", my_short_id,
1526 "DHT");
1527#endif
1528 return 0;
1529 }
1530 target_count = /* target_count is ALWAYS < 1 unless replication is < 1 */
1531 target_replication / (target_replication * (hop_count + 1) + diameter);
1532 target_value = 0;
1533
1534#if NONSENSE
1535 while (target_value < target_count)
1536 target_value++; /* target_value is ALWAYS 1 after this "loop" */
1537#else
1538 target_value = 1;
1539#endif
1540 if ((target_count + 1 - target_value) >
1541 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1542 RAND_MAX) / RAND_MAX)
1543 target_value++;
1544 return target_value;
1545}
1546
1547/**
1548 * Find the closest peer in our routing table to the
1549 * given hashcode.
1550 *
1551 * @return The closest peer in our routing table to the
1552 * key, or NULL on error.
1553 */
1554static struct PeerInfo *
1555find_closest_peer (const GNUNET_HashCode *hc)
1556{
1557 struct PeerInfo *pos;
1558 struct PeerInfo *current_closest;
1559 unsigned int lowest_distance;
1560 unsigned int temp_distance;
1561 int bucket;
1562
1563 lowest_distance = -1;
1564
1565 if (k_buckets[lowest_bucket].peers_size == 0)
1566 return NULL;
1567
1568 for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1569 {
1570 pos = k_buckets[bucket].head;
1571 while (pos != NULL)
1572 {
1573 temp_distance = distance(&pos->id.hashPubKey, hc);
1574 if (temp_distance <= lowest_distance)
1575 {
1576 lowest_distance = temp_distance;
1577 current_closest = pos;
1578 }
1579 pos = pos->next;
1580 }
1581 }
1582 GNUNET_assert(current_closest != NULL);
1583 return current_closest;
1584}
1585
1586/*
1587 * Check whether my identity is closer than any known peers.
1588 *
1589 * @param target hash code to check closeness to
1590 *
1591 * Return GNUNET_YES if node location is closest, GNUNET_NO
1592 * otherwise.
1593 */
1594int
1595am_closest_peer (const GNUNET_HashCode * target)
1596{
1597 int bits;
1598 int other_bits;
1599 int bucket_num;
1600 struct PeerInfo *pos;
1601 unsigned int my_distance;
1602
1603 bucket_num = find_current_bucket(target);
1604 if (bucket_num == GNUNET_SYSERR) /* Same key! */
1605 return GNUNET_YES;
1606
1607 bits = matching_bits(&my_identity.hashPubKey, target);
1608 my_distance = distance(&my_identity.hashPubKey, target);
1609
1610 pos = k_buckets[bucket_num].head;
1611 while (pos != NULL)
1612 {
1613 other_bits = matching_bits(&pos->id.hashPubKey, target);
1614 if (other_bits > bits)
1615 return GNUNET_NO;
1616 else if (other_bits == bits) /* We match the same number of bits, do distance comparison */
1617 {
1618 if (distance(&pos->id.hashPubKey, target) < my_distance)
1619 return GNUNET_NO;
1620 }
1621 pos = pos->next;
1622 }
1623
1624#if DEBUG_TABLE
1625 GNUNET_GE_LOG (coreAPI->ectx,
1626 GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1627 GNUNET_GE_BULK, "closest peer\n");
1628 printPeerBits (&closest);
1629 GNUNET_GE_LOG (coreAPI->ectx,
1630 GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1631 GNUNET_GE_BULK, "me\n");
1632 printPeerBits (coreAPI->my_identity);
1633 GNUNET_GE_LOG (coreAPI->ectx,
1634 GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1635 GNUNET_GE_BULK, "key\n");
1636 printKeyBits (target);
1637 GNUNET_GE_LOG (coreAPI->ectx,
1638 GNUNET_GE_WARNING | GNUNET_GE_ADMIN | GNUNET_GE_USER |
1639 GNUNET_GE_BULK,
1640 "closest peer inverse distance is %u, mine is %u\n",
1641 inverse_distance (target, &closest.hashPubKey),
1642 inverse_distance (target,
1643 &coreAPI->my_identity->hashPubKey));
1644#endif
1645
1646 /* No peers closer, we are the closest! */
1647 return GNUNET_YES;
1648
1649}
1650
1651
1652/**
1653 * Select a peer from the routing table that would be a good routing
1654 * destination for sending a message for "target". The resulting peer
1655 * must not be in the set of blocked peers.<p>
1656 *
1657 * Note that we should not ALWAYS select the closest peer to the
1658 * target, peers further away from the target should be chosen with
1659 * exponentially declining probability.
1660 *
1661 * @param target the key we are selecting a peer to route to
1662 * @param bloom a bloomfilter containing entries this request has seen already
1663 *
1664 * @return Peer to route to, or NULL on error
1665 */
1666static struct PeerInfo *
1667select_peer (const GNUNET_HashCode * target,
1668 struct GNUNET_CONTAINER_BloomFilter *bloom)
1669{
1670 unsigned int distance;
1671 unsigned int bc;
1672 struct PeerInfo *pos;
1673#if USE_KADEMLIA
1674 const struct PeerInfo *chosen;
1675 unsigned long long largest_distance;
1676#else
1677 unsigned long long total_distance;
1678 unsigned long long selected;
1679#endif
1680
1681#if USE_KADEMLIA
1682 largest_distance = 0;
1683 chosen = NULL;
1684 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1685 {
1686 pos = k_buckets[bc].head;
1687 while (pos != NULL)
1688 {
1689 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1690 {
1691 distance = inverse_distance (target, &pos->id.hashPubKey);
1692 if (distance > largest_distance)
1693 {
1694 chosen = pos;
1695 largest_distance = distance;
1696 }
1697 }
1698 pos = pos->next;
1699 }
1700 }
1701
1702 if ((largest_distance > 0) && (chosen != NULL))
1703 {
1704 GNUNET_CONTAINER_bloomfilter_add(bloom, &chosen->id.hashPubKey);
1705 return chosen;
1706 }
1707 else
1708 {
1709 return NULL;
1710 }
1711#else
1712 /* GNUnet-style */
1713 total_distance = 0;
1714 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1715 {
1716 pos = k_buckets[bc].head;
1717 while (pos != NULL)
1718 {
1719 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1720 total_distance += (unsigned long long)inverse_distance (target, &pos->id.hashPubKey);
1721#if DEBUG_DHT > 1
1722 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1723 "`%s:%s': Total distance is %llu, distance from %s to %s is %u\n",
1724 my_short_id, "DHT", total_distance, GNUNET_i2s(&pos->id), GNUNET_h2s(target) , inverse_distance(target, &pos->id.hashPubKey));
1725#endif
1726 pos = pos->next;
1727 }
1728 }
1729 if (total_distance == 0)
1730 {
1731 return NULL;
1732 }
1733
1734 selected = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, total_distance);
1735 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1736 {
1737 pos = k_buckets[bc].head;
1738 while (pos != NULL)
1739 {
1740 if (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1741 {
1742 distance = inverse_distance (target, &pos->id.hashPubKey);
1743 if (distance > selected)
1744 return pos;
1745 selected -= distance;
1746 }
1747 else
1748 {
1749#if DEBUG_DHT
1750 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1751 "`%s:%s': peer %s matches bloomfilter.\n",
1752 my_short_id, "DHT", GNUNET_i2s(&pos->id));
1753#endif
1754 }
1755 pos = pos->next;
1756 }
1757 }
1758#if DEBUG_DHT
1759 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1760 "`%s:%s': peer %s matches bloomfilter.\n",
1761 my_short_id, "DHT", GNUNET_i2s(&pos->id));
1762#endif
1763 return NULL;
1764#endif
1765}
1766
1767/**
1768 * Function called to send a request out to another peer.
1769 * Called both for locally initiated requests and those
1770 * received from other peers.
1771 *
1772 * @param cls DHT service closure argument
1773 * @param msg the encapsulated message
1774 * @param peer the peer to forward the message to
1775 * @param msg_ctx the context of the message (hop count, bloom, etc.)
1776 */
1777static void forward_message (void *cls,
1778 const struct GNUNET_MessageHeader *msg,
1779 struct PeerInfo *peer,
1780 struct DHT_MessageContext *msg_ctx)
1781{
1782 struct GNUNET_DHT_P2PRouteMessage *route_message;
1783 struct P2PPendingMessage *pending;
1784 size_t msize;
1785 size_t psize;
1786
1787 msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
1788 GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1789 psize = sizeof(struct P2PPendingMessage) + msize;
1790 pending = GNUNET_malloc(psize);
1791 pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
1792 pending->importance = DHT_SEND_PRIORITY;
1793 pending->timeout = GNUNET_TIME_relative_get_forever();
1794 route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg;
1795 route_message->header.size = htons(msize);
1796 route_message->header.type = htons(GNUNET_MESSAGE_TYPE_P2P_DHT_ROUTE);
1797 route_message->options = htonl(msg_ctx->msg_options);
1798 route_message->hop_count = htonl(msg_ctx->hop_count + 1);
1799 route_message->network_size = htonl(msg_ctx->network_size);
1800 route_message->desired_replication_level = htonl(msg_ctx->replication);
1801 route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
1802 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
1803 memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
1804 memcpy(&route_message[1], msg, ntohs(msg->size));
1805#if DEBUG_DHT > 1
1806 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1807#endif
1808 GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
1809 if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1810 peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
1811}
1812
1813/**
1814 * Task used to remove forwarding entries, either
1815 * after timeout, when full, or on shutdown.
1816 *
1817 * @param cls the entry to remove
1818 * @param tc context, reason, etc.
1819 */
1820static void
1821remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1822{
1823 struct DHTRouteSource *source_info = cls;
1824 struct DHTQueryRecord *record;
1825 source_info = GNUNET_CONTAINER_heap_remove_node(forward_list.minHeap, source_info->hnode);
1826 record = source_info->record;
1827 GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
1828
1829 if (record->head == NULL) /* No more entries in DLL */
1830 {
1831 GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
1832 GNUNET_free(record);
1833 }
1834 GNUNET_free(source_info);
1835}
1836
1837/**
1838 * Remember this routing request so that if a reply is
1839 * received we can either forward it to the correct peer
1840 * or return the result locally.
1841 *
1842 * @param cls DHT service closure
1843 * @param msg_ctx Context of the route request
1844 *
1845 * @return GNUNET_YES if this response was cached, GNUNET_NO if not
1846 */
1847static int cache_response(void *cls, struct DHT_MessageContext *msg_ctx)
1848{
1849 struct DHTQueryRecord *record;
1850 struct DHTRouteSource *source_info;
1851 struct DHTRouteSource *pos;
1852 struct GNUNET_TIME_Absolute now;
1853 unsigned int current_size;
1854
1855 current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
1856 while (current_size >= MAX_OUTSTANDING_FORWARDS)
1857 {
1858 source_info = GNUNET_CONTAINER_heap_remove_root(forward_list.minHeap);
1859 record = source_info->record;
1860 GNUNET_CONTAINER_DLL_remove(record->head, record->tail, source_info);
1861 if (record->head == NULL) /* No more entries in DLL */
1862 {
1863 GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(forward_list.hashmap, &record->key, record));
1864 GNUNET_free(record);
1865 }
1866 GNUNET_SCHEDULER_cancel(sched, source_info->delete_task);
1867 GNUNET_free(source_info);
1868 current_size = GNUNET_CONTAINER_multihashmap_size(forward_list.hashmap);
1869 }
1870 now = GNUNET_TIME_absolute_get();
1871 record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, msg_ctx->key);
1872 if (record != NULL) /* Already know this request! */
1873 {
1874 pos = record->head;
1875 while (pos != NULL)
1876 {
1877 if (0 == memcmp(msg_ctx->peer, &pos->source, sizeof(struct GNUNET_PeerIdentity)))
1878 break; /* Already have this peer in reply list! */
1879 pos = pos->next;
1880 }
1881 if ((pos != NULL) && (pos->client == msg_ctx->client)) /* Seen this already */
1882 {
1883 GNUNET_CONTAINER_heap_update_cost(forward_list.minHeap, pos->hnode, now.value);
1884 return GNUNET_NO;
1885 }
1886 }
1887 else
1888 {
1889 record = GNUNET_malloc(sizeof (struct DHTQueryRecord));
1890 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(forward_list.hashmap, msg_ctx->key, record, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1891 memcpy(&record->key, msg_ctx->key, sizeof(GNUNET_HashCode));
1892 }
1893
1894 source_info = GNUNET_malloc(sizeof(struct DHTRouteSource));
1895 source_info->record = record;
1896 source_info->delete_task = GNUNET_SCHEDULER_add_delayed(sched, DHT_FORWARD_TIMEOUT, &remove_forward_entry, source_info);
1897 memcpy(&source_info->source, msg_ctx->peer, sizeof(struct GNUNET_PeerIdentity));
1898 GNUNET_CONTAINER_DLL_insert_after(record->head, record->tail, record->tail, source_info);
1899 if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */
1900 {
1901 source_info->client = msg_ctx->client;
1902 now = GNUNET_TIME_absolute_get_forever();
1903 }
1904 source_info->hnode = GNUNET_CONTAINER_heap_insert(forward_list.minHeap, source_info, now.value);
1905#if DEBUG_DHT > 1
1906 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1907 "`%s:%s': Created new forward source info for %s uid %llu\n", my_short_id,
1908 "DHT", GNUNET_h2s (msg_ctx->key), msg_ctx->unique_id);
1909#endif
1910 return GNUNET_YES;
1911}
1912
1913
1914/**
1915 * Main function that handles whether or not to route a message to other
1916 * peers.
1917 *
1918 * @param msg the message to be routed
1919 *
1920 * @return the number of peers the message was routed to,
1921 * GNUNET_SYSERR on failure
1922 */
1923static int route_message(void *cls,
1924 const struct GNUNET_MessageHeader *msg,
1925 struct DHT_MessageContext *message_context)
1926{
1927 int i;
1928 struct PeerInfo *selected;
1929 struct PeerInfo *nearest;
1930 unsigned int forward_count;
1931#if DEBUG_DHT
1932 char *nearest_buf;
1933#endif
1934#if DEBUG_DHT_ROUTING
1935 int ret;
1936#endif
1937
1938 message_context->closest = am_closest_peer(message_context->key);
1939 forward_count = get_forward_count(message_context->hop_count, message_context->replication);
1940 nearest = find_closest_peer(message_context->key);
1941
1942 if (message_context->bloom == NULL)
1943 message_context->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1944 GNUNET_CONTAINER_bloomfilter_add (message_context->bloom, &my_identity.hashPubKey);
1945
1946 if ((stop_on_closest == GNUNET_YES) && (message_context->closest == GNUNET_YES) && (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_PUT))
1947 forward_count = 0;
1948
1949#if DEBUG_DHT_ROUTING
1950 if (forward_count == 0)
1951 ret = GNUNET_SYSERR;
1952 else
1953 ret = GNUNET_NO;
1954
1955 if ((debug_routes_extended) && (dhtlog_handle != NULL))
1956 {
1957 dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
1958 message_context->hop_count, ret,
1959 &my_identity, message_context->key, message_context->peer,
1960 NULL);
1961 }
1962#endif
1963
1964 switch (ntohs(msg->type))
1965 {
1966 case GNUNET_MESSAGE_TYPE_DHT_GET: /* Add to hashmap of requests seen, search for data (always) */
1967 cache_response (cls, message_context);
1968 if ((handle_dht_get (cls, msg, message_context) > 0) && (stop_on_found == GNUNET_YES))
1969 forward_count = 0;
1970 break;
1971 case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. FIXME: thresholding?*/
1972 if (message_context->closest == GNUNET_YES)
1973 {
1974#if DEBUG_DHT_ROUTING
1975 if ((debug_routes_extended) && (dhtlog_handle != NULL))
1976 {
1977 dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
1978 message_context->hop_count, GNUNET_YES,
1979 &my_identity, message_context->key, message_context->peer,
1980 NULL);
1981 }
1982#endif
1983 handle_dht_put (cls, msg, message_context);
1984 }
1985#if DEBUG_DHT_ROUTING
1986 if (message_context->hop_count == 0) /* Locally initiated request */
1987 {
1988 if ((debug_routes) && (dhtlog_handle != NULL))
1989 {
1990 dhtlog_handle->insert_query (NULL, message_context->unique_id, DHTLOG_PUT,
1991 message_context->hop_count, GNUNET_NO, &my_identity,
1992 message_context->key);
1993 }
1994 }
1995#endif
1996 break;
1997 case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest, check options, add to requests seen */
1998 cache_response (cls, message_context);
1999 if ((message_context->closest == GNUNET_YES) || (message_context->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
2000 handle_dht_find_peer (cls, msg, message_context);
2001 break;
2002 default:
2003 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2004 "`%s': Message type (%d) not handled\n", "DHT", ntohs(msg->type));
2005 }
2006
2007 for (i = 0; i < forward_count; i++)
2008 {
2009 selected = select_peer(message_context->key, message_context->bloom);
2010
2011 if (selected != NULL)
2012 {
2013 GNUNET_CONTAINER_bloomfilter_add(message_context->bloom, &selected->id.hashPubKey);
2014#if DEBUG_DHT_ROUTING > 1
2015 nearest_buf = GNUNET_strdup(GNUNET_i2s(&nearest->id));
2016 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2017 "`%s:%s': Forwarding request key %s uid %llu to peer %s (closest %s, bits %d, distance %u)\n", my_short_id,
2018 "DHT", GNUNET_h2s (message_context->key), message_context->unique_id, GNUNET_i2s(&selected->id), nearest_buf, matching_bits(&nearest->id.hashPubKey, message_context->key), distance(&nearest->id.hashPubKey, message_context->key));
2019 GNUNET_free(nearest_buf);
2020#endif
2021 /* FIXME: statistics */
2022 if ((debug_routes_extended) && (dhtlog_handle != NULL))
2023 {
2024 dhtlog_handle->insert_route (NULL, message_context->unique_id, DHTLOG_ROUTE,
2025 message_context->hop_count, GNUNET_NO,
2026 &my_identity, message_context->key, message_context->peer,
2027 &selected->id);
2028 }
2029 forward_message(cls, msg, selected, message_context);
2030 }
2031 else
2032 {
2033#if DEBUG_DHT
2034 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2035 "`%s:%s': No peers selected for forwarding.\n", my_short_id,
2036 "DHT");
2037#endif
2038 }
2039 }
2040#if DEBUG_DHT_ROUTING > 1
2041 if (forward_count == 0)
2042 {
2043 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2044 "`%s:%s': NOT Forwarding request key %s uid %llu to any peers\n", my_short_id,
2045 "DHT", GNUNET_h2s (message_context->key), message_context->unique_id);
2046 }
2047#endif
2048
2049 if (message_context->bloom != NULL)
2050 GNUNET_CONTAINER_bloomfilter_free(message_context->bloom);
2051
2052 return forward_count;
2053}
501 2054
502/** 2055/**
503 * Find a client if it exists, add it otherwise. 2056 * Find a client if it exists, add it otherwise.
@@ -536,69 +2089,38 @@ find_active_client (struct GNUNET_SERVER_Client *client)
536 * @param message the actual message received 2089 * @param message the actual message received
537 */ 2090 */
538static void 2091static void
539handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client, 2092handle_dht_local_route_request (void *cls, struct GNUNET_SERVER_Client *client,
540 const struct GNUNET_MessageHeader *message) 2093 const struct GNUNET_MessageHeader *message)
541{ 2094{
542 const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message; 2095 const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct GNUNET_DHT_RouteMessage *) message;
543 const struct GNUNET_MessageHeader *enc_msg; 2096 const struct GNUNET_MessageHeader *enc_msg;
544 struct DHT_MessageContext *message_context; 2097 struct DHT_MessageContext message_context;
545 int handle_locally;
546 size_t enc_type; 2098 size_t enc_type;
547 2099
548 enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1]; 2100 enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
549 enc_type = ntohs (enc_msg->type); 2101 enc_type = ntohs (enc_msg->type);
550 2102
551
552#if DEBUG_DHT 2103#if DEBUG_DHT
553 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2104 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554 "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", 2105 "`%s:%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
555 "DHT", "GENERIC", enc_type, GNUNET_h2s (&dht_msg->key), 2106 my_short_id, "DHT", "GENERIC", enc_type, GNUNET_h2s (&dht_msg->key),
556 GNUNET_ntohll (dht_msg->unique_id)); 2107 GNUNET_ntohll (dht_msg->unique_id));
557#endif 2108#endif
2109#if DEBUG_DHT_ROUTING
2110 if (dhtlog_handle != NULL)
2111 dhtlog_handle->insert_dhtkey (NULL, &dht_msg->key);
2112#endif
2113 memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2114 message_context.client = find_active_client (client);
2115 message_context.key = &dht_msg->key;
2116 message_context.unique_id = GNUNET_ntohll (dht_msg->unique_id);
2117 message_context.replication = ntohl (dht_msg->desired_replication_level);
2118 message_context.msg_options = ntohl (dht_msg->options);
2119 message_context.network_size = estimate_diameter();
2120 message_context.peer = &my_identity;
558 2121
559 message_context = GNUNET_malloc (sizeof (struct DHT_MessageContext)); 2122 route_message(cls, enc_msg, &message_context);
560 message_context->client = find_active_client (client);
561 message_context->key = &dht_msg->key;
562 message_context->unique_id = GNUNET_ntohll (dht_msg->unique_id);
563 message_context->replication = ntohl (dht_msg->desired_replication_level);
564 message_context->msg_options = ntohl (dht_msg->options);
565 2123
566 /* TODO: Steps to be added by students */
567 /* FIXME: Implement *remote* DHT operations here (forward request) */
568 /* Implement generic route function and call here. */
569 /* FIXME: *IF* handling should be local, then do this: */
570 /* 1. find if this peer is closest based on whatever metric the DHT uses
571 * 2. if this peer is closest _OR_ the message options indicate it should
572 * be processed everywhere _AND_ we want it processed everywhere, then
573 * handle it locally.
574 */
575 handle_locally = GNUNET_NO;
576 if (handle_locally == GNUNET_YES)
577 {
578 switch (enc_type)
579 {
580 case GNUNET_MESSAGE_TYPE_DHT_GET:
581 handle_dht_get (cls, enc_msg,
582 message_context);
583 break;
584 case GNUNET_MESSAGE_TYPE_DHT_PUT:
585 handle_dht_put (cls, enc_msg,
586 message_context);
587 break;
588 case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
589 handle_dht_find_peer (cls,
590 enc_msg,
591 message_context);
592 break;
593 default:
594 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
595 "`%s': Message type (%d) not handled\n", "DHT", enc_type);
596 }
597 }
598 else
599 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
600 "`%s': Message type (%d) not handled\n", "DHT", enc_type);
601 GNUNET_free (message_context);
602 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2124 GNUNET_SERVER_receive_done (client, GNUNET_OK);
603 2125
604} 2126}
@@ -611,25 +2133,40 @@ handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
611 * @param client the client we received this message from 2133 * @param client the client we received this message from
612 * @param message the actual message received 2134 * @param message the actual message received
613 * 2135 *
614 * TODO: once messages are remembered by unique id, add code to
615 * forget them here
616 */ 2136 */
617static void 2137static void
618handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client, 2138handle_dht_local_route_stop(void *cls, struct GNUNET_SERVER_Client *client,
619 const struct GNUNET_MessageHeader *message) 2139 const struct GNUNET_MessageHeader *message)
620{ 2140{
2141
621 const struct GNUNET_DHT_StopMessage *dht_stop_msg = 2142 const struct GNUNET_DHT_StopMessage *dht_stop_msg =
622 (const struct GNUNET_DHT_StopMessage *) message; 2143 (const struct GNUNET_DHT_StopMessage *) message;
2144 struct DHTQueryRecord *record;
2145 struct DHTRouteSource *pos;
623 uint64_t uid; 2146 uint64_t uid;
624#if DEBUG_DHT 2147#if DEBUG_DHT
625 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2148 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
626 "`%s': Received `%s' request from client, uid %llu\n", "DHT", 2149 "`%s:%s': Received `%s' request from client, uid %llu\n", my_short_id, "DHT",
627 "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id)); 2150 "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
628#endif 2151#endif
629 2152
630 uid = GNUNET_ntohll(dht_stop_msg->unique_id); 2153 uid = GNUNET_ntohll(dht_stop_msg->unique_id);
631 /* TODO: actually stop... free associated resources for the request 2154
632 * lookup request by uid and remove state. */ 2155 record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap, &dht_stop_msg->key);
2156 if (record != NULL)
2157 {
2158 pos = record->head;
2159
2160 while (pos != NULL)
2161 {
2162 if ((pos->client != NULL) && (pos->client->client_handle == client))
2163 {
2164 GNUNET_SCHEDULER_cancel(sched, pos->delete_task);
2165 GNUNET_SCHEDULER_add_now(sched, &remove_forward_entry, pos);
2166 }
2167 pos = pos->next;
2168 }
2169 }
633 2170
634 GNUNET_SERVER_receive_done (client, GNUNET_OK); 2171 GNUNET_SERVER_receive_done (client, GNUNET_OK);
635} 2172}
@@ -646,11 +2183,30 @@ handle_dht_p2p_route_request (void *cls,
646{ 2183{
647#if DEBUG_DHT 2184#if DEBUG_DHT
648 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2185 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
649 "`%s': Received `%s' request from another peer\n", "DHT", 2186 "`%s:%s': Received P2P request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
650 "GET");
651#endif 2187#endif
652 // FIXME: setup tracking for sending replies to peer (with timeout) 2188 struct GNUNET_DHT_P2PRouteMessage *incoming = (struct GNUNET_DHT_P2PRouteMessage *)message;
653 // FIXME: call code from handle_dht_start_message (refactor...) 2189 struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
2190 struct DHT_MessageContext *message_context;
2191
2192 if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
2193 {
2194 GNUNET_break_op(0);
2195 return GNUNET_YES;
2196 }
2197 //memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2198 message_context = GNUNET_malloc(sizeof (struct DHT_MessageContext));
2199 message_context->bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2200 GNUNET_assert(message_context->bloom != NULL);
2201 message_context->hop_count = ntohl(incoming->hop_count);
2202 message_context->key = &incoming->key;
2203 message_context->replication = ntohl(incoming->desired_replication_level);
2204 message_context->unique_id = GNUNET_ntohll(incoming->unique_id);
2205 message_context->msg_options = ntohl(incoming->options);
2206 message_context->network_size = ntohl(incoming->network_size);
2207 message_context->peer = peer;
2208 route_message(cls, enc_msg, message_context);
2209 GNUNET_free(message_context);
654 return GNUNET_YES; 2210 return GNUNET_YES;
655} 2211}
656 2212
@@ -666,11 +2222,26 @@ handle_dht_p2p_route_result (void *cls,
666{ 2222{
667#if DEBUG_DHT 2223#if DEBUG_DHT
668 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
669 "`%s': Received `%s' request from another peer\n", "DHT", 2225 "`%s:%s': Received request from peer %s\n", my_short_id, "DHT", GNUNET_i2s(peer));
670 "GET");
671#endif 2226#endif
672 // FIXME: setup tracking for sending replies to peer 2227 struct GNUNET_DHT_P2PRouteResultMessage *incoming = (struct GNUNET_DHT_P2PRouteResultMessage *)message;
673 // FIXME: possibly call code from handle_dht_stop_message? (unique result?) (refactor...) 2228 struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
2229 struct DHT_MessageContext message_context;
2230
2231 if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
2232 {
2233 GNUNET_break_op(0);
2234 return GNUNET_YES;
2235 }
2236 memset(&message_context, 0, sizeof(struct DHT_MessageContext));
2237 message_context.bloom = GNUNET_CONTAINER_bloomfilter_init(incoming->bloomfilter, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2238 GNUNET_assert(message_context.bloom != NULL);
2239 message_context.key = &incoming->key;
2240 message_context.unique_id = GNUNET_ntohll(incoming->unique_id);
2241 message_context.msg_options = ntohl(incoming->options);
2242 message_context.hop_count = ntohl(incoming->hop_count);
2243 message_context.peer = peer;
2244 route_result_message(cls, enc_msg, &message_context);
674 return GNUNET_YES; 2245 return GNUNET_YES;
675} 2246}
676 2247
@@ -706,18 +2277,49 @@ process_hello (void *cls, const struct GNUNET_MessageHeader *message)
706static void 2277static void
707shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 2278shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
708{ 2279{
2280 int bucket_count;
2281 struct PeerInfo *pos;
2282
709 if (transport_handle != NULL) 2283 if (transport_handle != NULL)
710 { 2284 {
711 GNUNET_free_non_null(my_hello); 2285 GNUNET_free_non_null(my_hello);
712 GNUNET_TRANSPORT_get_hello_cancel(transport_handle, &process_hello, NULL); 2286 GNUNET_TRANSPORT_get_hello_cancel(transport_handle, &process_hello, NULL);
713 GNUNET_TRANSPORT_disconnect(transport_handle); 2287 GNUNET_TRANSPORT_disconnect(transport_handle);
714 } 2288 }
2289
2290 for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
2291 {
2292 while (k_buckets[bucket_count].head != NULL)
2293 {
2294 pos = k_buckets[bucket_count].head;
2295#if DEBUG_DHT
2296 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2297 "%s:%s Removing peer %s from bucket %d!\n", my_short_id, "DHT", GNUNET_i2s(&pos->id), bucket_count);
2298#endif
2299 delete_peer(pos, bucket_count);
2300 }
2301 }
715 if (coreAPI != NULL) 2302 if (coreAPI != NULL)
716 GNUNET_CORE_disconnect (coreAPI); 2303 {
2304#if DEBUG_DHT
2305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2306 "%s:%s Disconnecting core!\n", my_short_id, "DHT");
2307#endif
2308 GNUNET_CORE_disconnect (coreAPI);
2309 }
717 if (datacache != NULL) 2310 if (datacache != NULL)
718 GNUNET_DATACACHE_destroy (datacache); 2311 {
719 if (my_short_id != NULL) 2312#if DEBUG_DHT
720 GNUNET_free(my_short_id); 2313 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2314 "%s:%s Destroying datacache!\n", my_short_id, "DHT");
2315#endif
2316 GNUNET_DATACACHE_destroy (datacache);
2317 }
2318
2319 if (dhtlog_handle != NULL)
2320 GNUNET_DHTLOG_disconnect(dhtlog_handle);
2321
2322 GNUNET_free_non_null(my_short_id);
721} 2323}
722 2324
723 2325
@@ -757,12 +2359,15 @@ core_init (void *cls,
757 my_short_id = GNUNET_strdup(GNUNET_i2s(&my_identity)); 2359 my_short_id = GNUNET_strdup(GNUNET_i2s(&my_identity));
758 /* Set the server to local variable */ 2360 /* Set the server to local variable */
759 coreAPI = server; 2361 coreAPI = server;
2362
2363 if (dhtlog_handle != NULL)
2364 dhtlog_handle->insert_node (NULL, &my_identity);
760} 2365}
761 2366
762 2367
763static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { 2368static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
764 {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_LOCAL_DHT_ROUTE, 0}, 2369 {&handle_dht_local_route_request, NULL, GNUNET_MESSAGE_TYPE_LOCAL_DHT_ROUTE, 0},
765 {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_ROUTE_STOP, 0}, 2370 {&handle_dht_local_route_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_ROUTE_STOP, 0},
766 {NULL, NULL, 0, 0} 2371 {NULL, NULL, 0, 0}
767}; 2372};
768 2373
@@ -787,28 +2392,20 @@ void handle_core_connect (void *cls,
787 uint32_t distance) 2392 uint32_t distance)
788{ 2393{
789 int ret; 2394 int ret;
790 char *data;
791 size_t data_size = 42;
792 data = GNUNET_malloc (data_size);
793 memset (data, 43, data_size);
794 2395
795#if DEBUG_DHT 2396#if DEBUG_DHT
796 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2397 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
797 "%s:%s Receives core connect message for peer %s distance %d!\n", my_short_id, "dht", GNUNET_i2s(peer), distance); 2398 "%s:%s Receives core connect message for peer %s distance %d!\n", my_short_id, "dht", GNUNET_i2s(peer), distance);
798#endif 2399#endif
799 if (datacache != NULL) 2400
800 { 2401 ret = try_add_peer(peer,
801 ret = GNUNET_DATACACHE_put (datacache, &peer->hashPubKey, data_size, 2402 find_current_bucket(&peer->hashPubKey),
802 data, 130, 2403 latency,
803 GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get(), 2404 distance);
804 GNUNET_TIME_UNIT_MINUTES));
805#if DEBUG_DHT 2405#if DEBUG_DHT
806 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
807 "%s Inserting data %s, type %d into datacache, return value was %d\n", my_short_id, GNUNET_h2s(&peer->hashPubKey), 130, ret); 2407 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == GNUNET_YES ? "PEER ADDED" : "NOT ADDED");
808#endif 2408#endif
809 }
810 else
811 GNUNET_free(data);
812} 2409}
813 2410
814/** 2411/**
@@ -832,16 +2429,17 @@ run (void *cls,
832 coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */ 2429 coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
833 cfg, /* Main configuration */ 2430 cfg, /* Main configuration */
834 GNUNET_TIME_UNIT_FOREVER_REL, 2431 GNUNET_TIME_UNIT_FOREVER_REL,
835 NULL, /* FIXME: anything we want to pass around? */ 2432 NULL, /* Closure passed to DHT functionas around? */
836 &core_init, /* Call core_init once connected */ 2433 &core_init, /* Call core_init once connected */
837 &handle_core_connect, /* Don't care about connects */ 2434 &handle_core_connect, /* Don't care about connects */
838 NULL, /* Don't care about disconnects */ 2435 NULL, /* FIXME: remove peers on disconnects */
839 NULL, /* Don't care about peer status changes */ 2436 NULL, /* Do we care about "status" updates? */
840 NULL, /* Don't want notified about all incoming messages */ 2437 NULL, /* Don't want notified about all incoming messages */
841 GNUNET_NO, /* For header only inbound notification */ 2438 GNUNET_NO, /* For header only inbound notification */
842 NULL, /* Don't want notified about all outbound messages */ 2439 NULL, /* Don't want notified about all outbound messages */
843 GNUNET_NO, /* For header only outbound notification */ 2440 GNUNET_NO, /* For header only outbound notification */
844 core_handlers); /* Register these handlers */ 2441 core_handlers); /* Register these handlers */
2442
845 if (coreAPI == NULL) 2443 if (coreAPI == NULL)
846 return; 2444 return;
847 transport_handle = GNUNET_TRANSPORT_connect(sched, cfg, NULL, NULL, NULL, NULL); 2445 transport_handle = GNUNET_TRANSPORT_connect(sched, cfg, NULL, NULL, NULL, NULL);
@@ -849,7 +2447,50 @@ run (void *cls,
849 GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL); 2447 GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
850 else 2448 else
851 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to connect to transport service!\n"); 2449 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to connect to transport service!\n");
2450
2451 lowest_bucket = MAX_BUCKETS - 1;
2452 forward_list.hashmap = GNUNET_CONTAINER_multihashmap_create(MAX_OUTSTANDING_FORWARDS / 10);
2453 forward_list.minHeap = GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
852 /* Scheduled the task to clean up when shutdown is called */ 2454 /* Scheduled the task to clean up when shutdown is called */
2455
2456 if (GNUNET_YES == GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing", "mysql_logging"))
2457 {
2458 debug_routes = GNUNET_YES;
2459 }
2460
2461 if (GNUNET_YES ==
2462 GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
2463 "stop_on_closest"))
2464 {
2465 stop_on_closest = GNUNET_YES;
2466 }
2467
2468 if (GNUNET_YES ==
2469 GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht",
2470 "stop_found"))
2471 {
2472 stop_on_found = GNUNET_YES;
2473 }
2474
2475 if (GNUNET_YES ==
2476 GNUNET_CONFIGURATION_get_value_yesno(cfg, "dht_testing",
2477 "mysql_logging_extended"))
2478 {
2479 debug_routes = GNUNET_YES;
2480 debug_routes_extended = GNUNET_YES;
2481 }
2482
2483 if (GNUNET_YES == debug_routes)
2484 {
2485 dhtlog_handle = GNUNET_DHTLOG_connect(cfg);
2486 if (dhtlog_handle == NULL)
2487 {
2488 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2489 "Could not connect to mysql logging server, logging will not happen!");
2490 return;
2491 }
2492 }
2493
853 cleanup_task = GNUNET_SCHEDULER_add_delayed (sched, 2494 cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
854 GNUNET_TIME_UNIT_FOREVER_REL, 2495 GNUNET_TIME_UNIT_FOREVER_REL,
855 &shutdown_task, NULL); 2496 &shutdown_task, NULL);
diff --git a/src/dht/test_dhtlog_data.conf b/src/dht/test_dhtlog_data.conf
new file mode 100644
index 000000000..391f90f86
--- /dev/null
+++ b/src/dht/test_dhtlog_data.conf
@@ -0,0 +1,72 @@
1[fs]
2ACCEPT_FROM6 = ::1;
3ACCEPT_FROM = 127.0.0.1;
4BINARY = gnunet-service-fs
5CONFIG = $DEFAULTCONFIG
6HOME = $SERVICEHOME
7HOSTNAME = localhost
8PORT = 2094
9INDEXDB = $SERVICEHOME/idxinfo.lst
10
11[dhtcache]
12QUOTA = 1000000
13DATABASE = sqlite
14
15[transport]
16PLUGINS = tcp
17DEBUG = NO
18ACCEPT_FROM6 = ::1;
19ACCEPT_FROM = 127.0.0.1;
20NEIGHBOUR_LIMIT = 50
21BINARY = gnunet-service-transport
22CONFIG = $DEFAULTCONFIG
23HOME = $SERVICEHOME
24HOSTNAME = localhost
25PORT = 12365
26
27[core]
28TOTAL_QUOTA_OUT = 3932160
29TOTAL_QUOTA_IN = 3932160
30ACCEPT_FROM6 = ::1;
31ACCEPT_FROM = 127.0.0.1;
32BINARY = gnunet-service-core
33CONFIG = $DEFAULTCONFIG
34HOME = $SERVICEHOME
35HOSTNAME = localhost
36PORT = 12092
37
38[arm]
39DEFAULTSERVICES = core dht
40ACCEPT_FROM6 = ::1;
41ACCEPT_FROM = 127.0.0.1;
42BINARY = gnunet-service-arm
43CONFIG = $DEFAULTCONFIG
44HOME = $SERVICEHOME
45HOSTNAME = localhost
46PORT = 12366
47DEBUG = NO
48
49[transport-tcp]
50TIMEOUT = 300000
51PORT = 12368
52
53[TESTING]
54WEAKRANDOM = YES
55
56[gnunetd]
57HOSTKEY = $SERVICEHOME/.hostkey
58
59[PATHS]
60DEFAULTCONFIG = test_dhtlog_data.conf
61SERVICEHOME = /tmp/test-dhtlog/
62
63[DHTLOG]
64PLUGIN = dummy
65#PLUGIN = mysql
66
67[MYSQL]
68DATABASE = dhttests
69USER = dht
70PASSWORD = dht
71SERVER =
72MYSQL_PORT = 3306