aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-09-21 05:51:38 +0000
committerChristian Grothoff <christian@grothoff.org>2011-09-21 05:51:38 +0000
commita864897478e2ee94ab36648e7f1db6f0dd57ea43 (patch)
tree70d3026143ff9285691325ee4b243a6a2361cbd7
parentf591bfdc7b28e93b9412c2d9e031c8848ce90f55 (diff)
downloadgnunet-a864897478e2ee94ab36648e7f1db6f0dd57ea43.tar.gz
gnunet-a864897478e2ee94ab36648e7f1db6f0dd57ea43.zip
add
-rw-r--r--src/dht/dht_api_new.c909
-rw-r--r--src/dht/dht_new.h190
-rw-r--r--src/dht/gnunet-service-dht-new.c2991
-rw-r--r--src/dht/gnunet-service-dht_clients.c876
-rw-r--r--src/dht/gnunet-service-dht_clients.h72
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c353
-rw-r--r--src/dht/gnunet-service-dht_neighbours.h122
-rw-r--r--src/dht/gnunet-service-dht_nse.c84
-rw-r--r--src/dht/gnunet-service-dht_nse.h40
9 files changed, 5637 insertions, 0 deletions
diff --git a/src/dht/dht_api_new.c b/src/dht/dht_api_new.c
new file mode 100644
index 000000000..88ed4ed9d
--- /dev/null
+++ b/src/dht/dht_api_new.c
@@ -0,0 +1,909 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/dht_api.c
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27
28#include "platform.h"
29#include "gnunet_bandwidth_lib.h"
30#include "gnunet_client_lib.h"
31#include "gnunet_constants.h"
32#include "gnunet_container_lib.h"
33#include "gnunet_arm_service.h"
34#include "gnunet_hello_lib.h"
35#include "gnunet_protocols.h"
36#include "gnunet_server_lib.h"
37#include "gnunet_time_lib.h"
38#include "gnunet_dht_service.h"
39#include "dht.h"
40
41#define DEBUG_DHT_API GNUNET_NO
42
43/**
44 * Entry in our list of messages to be (re-)transmitted.
45 */
46struct PendingMessage
47{
48 /**
49 * This is a doubly-linked list.
50 */
51 struct PendingMessage *prev;
52
53 /**
54 * This is a doubly-linked list.
55 */
56 struct PendingMessage *next;
57
58 /**
59 * Message that is pending, allocated at the end
60 * of this struct.
61 */
62 const struct GNUNET_MessageHeader *msg;
63
64 /**
65 * Handle to the DHT API context.
66 */
67 struct GNUNET_DHT_Handle *handle;
68
69 /**
70 * Continuation to call when the request has been
71 * transmitted (for the first time) to the service; can be NULL.
72 */
73 GNUNET_SCHEDULER_Task cont;
74
75 /**
76 * Closure for 'cont'.
77 */
78 void *cont_cls;
79
80 /**
81 * Timeout task for this message
82 */
83 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
84
85 /**
86 * Unique ID for this request
87 */
88 uint64_t unique_id;
89
90 /**
91 * Free the saved message once sent, set to GNUNET_YES for messages
92 * that do not receive responses; GNUNET_NO if this pending message
93 * is aliased from a 'struct GNUNET_DHT_RouteHandle' and will be freed
94 * from there.
95 */
96 int free_on_send;
97
98 /**
99 * GNUNET_YES if this message is in our pending queue right now.
100 */
101 int in_pending_queue;
102
103};
104
105
106/**
107 * Handle to a route request
108 */
109struct GNUNET_DHT_RouteHandle
110{
111
112 /**
113 * Iterator to call on data receipt
114 */
115 GNUNET_DHT_ReplyProcessor iter;
116
117 /**
118 * Closure for the iterator callback
119 */
120 void *iter_cls;
121
122 /**
123 * Main handle to this DHT api
124 */
125 struct GNUNET_DHT_Handle *dht_handle;
126
127 /**
128 * The actual message sent for this request,
129 * used for retransmitting requests on service
130 * failure/reconnect. Freed on route_stop.
131 */
132 struct PendingMessage *message;
133
134 /**
135 * Key that this get request is for
136 */
137 GNUNET_HashCode key;
138
139 /**
140 * Unique identifier for this request (for key collisions). FIXME: redundant!?
141 */
142 uint64_t uid;
143
144};
145
146
147/**
148 * Connection to the DHT service.
149 */
150struct GNUNET_DHT_Handle
151{
152
153 /**
154 * Configuration to use.
155 */
156 const struct GNUNET_CONFIGURATION_Handle *cfg;
157
158 /**
159 * Socket (if available).
160 */
161 struct GNUNET_CLIENT_Connection *client;
162
163 /**
164 * Currently pending transmission request (or NULL).
165 */
166 struct GNUNET_CLIENT_TransmitHandle *th;
167
168 /**
169 * Head of linked list of messages we would like to transmit.
170 */
171 struct PendingMessage *pending_head;
172
173 /**
174 * Tail of linked list of messages we would like to transmit.
175 */
176 struct PendingMessage *pending_tail;
177
178 /**
179 * Hash map containing the current outstanding unique requests
180 * (values are of type 'struct GNUNET_DHT_RouteHandle').
181 */
182 struct GNUNET_CONTAINER_MultiHashMap *active_requests;
183
184 /**
185 * Task for trying to reconnect.
186 */
187 GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
188
189 /**
190 * How quickly should we retry? Used for exponential back-off on
191 * connect-errors.
192 */
193 struct GNUNET_TIME_Relative retry_time;
194
195 /**
196 * Generator for unique ids.
197 */
198 uint64_t uid_gen;
199
200};
201
202
203/**
204 * Transmit the next pending message, called by notify_transmit_ready
205 */
206static size_t
207transmit_pending (void *cls, size_t size, void *buf);
208
209
210/**
211 * Handler for messages received from the DHT service
212 * a demultiplexer which handles numerous message types
213 *
214 */
215static void
216service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
217
218
219
220
221/**
222 * Try to (re)connect to the DHT service.
223 *
224 * @return GNUNET_YES on success, GNUNET_NO on failure.
225 */
226static int
227try_connect (struct GNUNET_DHT_Handle *handle)
228{
229 if (handle->client != NULL)
230 return GNUNET_OK;
231 handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
232 if (handle->client == NULL)
233 {
234 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
235 _("Failed to connect to the DHT service!\n"));
236 return GNUNET_NO;
237 }
238#if DEBUG_DHT
239 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
240 "Starting to process replies from DHT\n");
241#endif
242 GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
243 GNUNET_TIME_UNIT_FOREVER_REL);
244 return GNUNET_YES;
245}
246
247
248/**
249 * Add the request corresponding to the given route handle
250 * to the pending queue (if it is not already in there).
251 *
252 * @param cls the 'struct GNUNET_DHT_Handle*'
253 * @param key key for the request (not used)
254 * @param value the 'struct GNUNET_DHT_RouteHandle*'
255 * @return GNUNET_YES (always)
256 */
257static int
258add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
259{
260 struct GNUNET_DHT_Handle *handle = cls;
261 struct GNUNET_DHT_RouteHandle *rh = value;
262
263 if (GNUNET_NO == rh->message->in_pending_queue)
264 {
265 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
266 rh->message);
267 rh->message->in_pending_queue = GNUNET_YES;
268 }
269 return GNUNET_YES;
270}
271
272
273/**
274 * Try to send messages from list of messages to send
275 * @param handle DHT_Handle
276 */
277static void
278process_pending_messages (struct GNUNET_DHT_Handle *handle);
279
280
281/**
282 * Try reconnecting to the dht service.
283 *
284 * @param cls GNUNET_DHT_Handle
285 * @param tc scheduler context
286 */
287static void
288try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
289{
290 struct GNUNET_DHT_Handle *handle = cls;
291
292 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
293 if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
294 handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
295 else
296 handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
297 if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
298 handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
299 handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
300 handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
301 if (handle->client == NULL)
302 {
303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
304 return;
305 }
306 GNUNET_CONTAINER_multihashmap_iterate (handle->active_requests,
307 &add_request_to_pending, handle);
308 process_pending_messages (handle);
309}
310
311
312/**
313 * Try reconnecting to the DHT service.
314 *
315 * @param handle handle to dht to (possibly) disconnect and reconnect
316 */
317static void
318do_disconnect (struct GNUNET_DHT_Handle *handle)
319{
320 if (handle->client == NULL)
321 return;
322 GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
323 if (NULL != handle->th)
324 GNUNET_CLIENT_notify_transmit_ready_cancel(handle->th);
325 handle->th = NULL;
326 GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
327 handle->client = NULL;
328 handle->reconnect_task =
329 GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
330}
331
332
333/**
334 * Try to send messages from list of messages to send
335 */
336static void
337process_pending_messages (struct GNUNET_DHT_Handle *handle)
338{
339 struct PendingMessage *head;
340
341 if (handle->client == NULL)
342 {
343 do_disconnect (handle);
344 return;
345 }
346 if (handle->th != NULL)
347 return;
348 if (NULL == (head = handle->pending_head))
349 return;
350 handle->th =
351 GNUNET_CLIENT_notify_transmit_ready (handle->client,
352 ntohs (head->msg->size),
353 GNUNET_TIME_UNIT_FOREVER_REL,
354 GNUNET_YES, &transmit_pending,
355 handle);
356 if (NULL == handle->th)
357 {
358 do_disconnect (handle);
359 return;
360 }
361}
362
363
364/**
365 * Transmit the next pending message, called by notify_transmit_ready
366 */
367static size_t
368transmit_pending (void *cls, size_t size, void *buf)
369{
370 struct GNUNET_DHT_Handle *handle = cls;
371 struct PendingMessage *head;
372 size_t tsize;
373
374 handle->th = NULL;
375 if (buf == NULL)
376 {
377 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
378 "Transmission to DHT service failed! Reconnecting!\n");
379 do_disconnect (handle);
380 return 0;
381 }
382 if (NULL == (head = handle->pending_head))
383 return 0;
384
385 tsize = ntohs (head->msg->size);
386 if (size < tsize)
387 {
388 process_pending_messages (handle);
389 return 0;
390 }
391 memcpy (buf, head->msg, tsize);
392 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
393 head);
394 if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
395 {
396 GNUNET_SCHEDULER_cancel (head->timeout_task);
397 head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
398 }
399 if (NULL != head->cont)
400 {
401 GNUNET_SCHEDULER_add_continuation (head->cont, head->cont_cls,
402 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
403 head->cont = NULL;
404 head->cont_cls = NULL;
405 }
406 head->in_pending_queue = GNUNET_NO;
407 if (GNUNET_YES == head->free_on_send)
408 GNUNET_free (head);
409 process_pending_messages (handle);
410 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
411 "Forwarded request of %u bytes to DHT service\n",
412 (unsigned int) tsize);
413 return tsize;
414}
415
416
417/**
418 * Process a given reply that might match the given
419 * request.
420 */
421static int
422process_reply (void *cls, const GNUNET_HashCode * key, void *value)
423{
424 const struct GNUNET_DHT_RouteResultMessage *dht_msg = cls;
425 struct GNUNET_DHT_RouteHandle *rh = value;
426 const struct GNUNET_MessageHeader *enc_msg;
427 size_t enc_size;
428 uint64_t uid;
429 const struct GNUNET_PeerIdentity **outgoing_path;
430 const struct GNUNET_PeerIdentity *pos;
431 uint32_t outgoing_path_length;
432 unsigned int i;
433 char *path_offset;
434
435 uid = GNUNET_ntohll (dht_msg->unique_id);
436#if HAVE_UID_FOR_TESTING
437 if (uid != rh->uid)
438 {
439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
440 "Reply UID did not match request UID\n");
441 return GNUNET_YES;
442 }
443#endif
444 enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
445 enc_size = ntohs (enc_msg->size);
446 if (enc_size < sizeof (struct GNUNET_MessageHeader))
447 {
448 GNUNET_break (0);
449 return GNUNET_NO;
450 }
451 path_offset = (char *) &dht_msg[1];
452 path_offset += enc_size;
453 pos = (const struct GNUNET_PeerIdentity *) path_offset;
454 outgoing_path_length = ntohl (dht_msg->outgoing_path_length);
455 if (outgoing_path_length * sizeof (struct GNUNET_PeerIdentity) >
456 ntohs (dht_msg->header.size) - enc_size)
457 {
458 GNUNET_break (0);
459 return GNUNET_NO;
460 }
461
462 if (outgoing_path_length > 0)
463 {
464 outgoing_path =
465 GNUNET_malloc ((outgoing_path_length +
466 1) * sizeof (struct GNUNET_PeerIdentity *));
467 for (i = 0; i < outgoing_path_length; i++)
468 {
469 outgoing_path[i] = pos;
470 pos++;
471 }
472 outgoing_path[outgoing_path_length] = NULL;
473 }
474 else
475 outgoing_path = NULL;
476
477 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Processing reply.\n");
478 rh->iter (rh->iter_cls, &rh->key, outgoing_path, enc_msg);
479 GNUNET_free_non_null (outgoing_path);
480 return GNUNET_YES;
481}
482
483
484/**
485 * Handler for messages received from the DHT service
486 * a demultiplexer which handles numerous message types
487 *
488 * @param cls the 'struct GNUNET_DHT_Handle'
489 * @param msg the incoming message
490 */
491static void
492service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
493{
494 struct GNUNET_DHT_Handle *handle = cls;
495 const struct GNUNET_DHT_RouteResultMessage *dht_msg;
496
497 if (msg == NULL)
498 {
499 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500 "Error receiving data from DHT service, reconnecting\n");
501 do_disconnect (handle);
502 return;
503 }
504 if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT)
505 {
506 GNUNET_break (0);
507 do_disconnect (handle);
508 return;
509 }
510 if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_RouteResultMessage))
511 {
512 GNUNET_break (0);
513 do_disconnect (handle);
514 return;
515 }
516 dht_msg = (const struct GNUNET_DHT_RouteResultMessage *) msg;
517 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
518 "Comparing reply `%s' against %u pending requests.\n",
519 GNUNET_h2s (&dht_msg->key),
520 GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
521 GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
522 &dht_msg->key, &process_reply,
523 (void *) dht_msg);
524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
525 "Continuing to process replies from DHT\n");
526 GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
527 GNUNET_TIME_UNIT_FOREVER_REL);
528
529}
530
531
532/**
533 * Initialize the connection with the DHT service.
534 *
535 * @param cfg configuration to use
536 * @param ht_len size of the internal hash table to use for
537 * processing multiple GET/FIND requests in parallel
538 *
539 * @return handle to the DHT service, or NULL on error
540 */
541struct GNUNET_DHT_Handle *
542GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
543 unsigned int ht_len)
544{
545 struct GNUNET_DHT_Handle *handle;
546
547 handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
548 handle->cfg = cfg;
549 handle->uid_gen =
550 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
551 handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
552 if (GNUNET_NO == try_connect (handle))
553 {
554 GNUNET_DHT_disconnect (handle);
555 return NULL;
556 }
557 return handle;
558}
559
560
561/**
562 * Shutdown connection with the DHT service.
563 *
564 * @param handle handle of the DHT connection to stop
565 */
566void
567GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
568{
569 struct PendingMessage *pm;
570
571 GNUNET_assert (handle != NULL);
572 GNUNET_assert (0 ==
573 GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
574 if (handle->th != NULL)
575 {
576 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
577 handle->th = NULL;
578 }
579 while (NULL != (pm = handle->pending_head))
580 {
581 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
582 pm);
583 GNUNET_assert (GNUNET_YES == pm->free_on_send);
584 if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
585 GNUNET_SCHEDULER_cancel (pm->timeout_task);
586 if (NULL != pm->cont)
587 GNUNET_SCHEDULER_add_continuation (pm->cont, pm->cont_cls,
588 GNUNET_SCHEDULER_REASON_TIMEOUT);
589 pm->in_pending_queue = GNUNET_NO;
590 GNUNET_free (pm);
591 }
592 if (handle->client != NULL)
593 {
594 GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
595 handle->client = NULL;
596 }
597 if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
598 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
599 GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
600 GNUNET_free (handle);
601}
602
603
604
605
606/* ***** Special low-level API providing generic routing abstraction ***** */
607
608
609/**
610 * Timeout for the transmission of a fire&forget-request. Clean it up.
611 *
612 * @param cls the 'struct PendingMessage'
613 * @param tc scheduler context
614 */
615static void
616timeout_route_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
617{
618 struct PendingMessage *pending = cls;
619 struct GNUNET_DHT_Handle *handle;
620
621 if (pending->free_on_send != GNUNET_YES)
622 {
623 /* timeouts should only apply to fire & forget requests! */
624 GNUNET_break (0);
625 return;
626 }
627 handle = pending->handle;
628 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
629 pending);
630 if (pending->cont != NULL)
631 pending->cont (pending->cont_cls, tc);
632 GNUNET_free (pending);
633}
634
635
636/**
637 * Initiate a generic DHT route operation.
638 *
639 * @param handle handle to the DHT service
640 * @param key the key to look up
641 * @param desired_replication_level how many peers should ultimately receive
642 * this message (advisory only, target may be too high for the
643 * given DHT or not hit exactly).
644 * @param options options for routing
645 * @param enc send the encapsulated message to a peer close to the key
646 * @param iter function to call on each result, NULL if no replies are expected
647 * @param iter_cls closure for iter
648 * @param timeout when to abort with an error if we fail to get
649 * a confirmation for the request (when necessary) or how long
650 * to wait for tramission to the service; only applies
651 * if 'iter' is NULL
652 * @param cont continuation to call when the request has been transmitted
653 * the first time to the service
654 * @param cont_cls closure for cont
655 * @return handle to stop the request, NULL if the request is "fire and forget"
656 */
657struct GNUNET_DHT_RouteHandle *
658GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
659 const GNUNET_HashCode * key,
660 uint32_t desired_replication_level,
661 enum GNUNET_DHT_RouteOption options,
662 const struct GNUNET_MessageHeader *enc,
663 struct GNUNET_TIME_Relative timeout,
664 GNUNET_DHT_ReplyProcessor iter, void *iter_cls,
665 GNUNET_SCHEDULER_Task cont, void *cont_cls)
666{
667 struct PendingMessage *pending;
668 struct GNUNET_DHT_RouteMessage *message;
669 struct GNUNET_DHT_RouteHandle *route_handle;
670 uint16_t msize;
671 uint16_t esize;
672
673 esize = ntohs (enc->size);
674 if (sizeof (struct GNUNET_DHT_RouteMessage) + esize >=
675 GNUNET_SERVER_MAX_MESSAGE_SIZE)
676 {
677 GNUNET_break (0);
678 return NULL;
679 }
680 msize = sizeof (struct GNUNET_DHT_RouteMessage) + esize;
681 pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
682 message = (struct GNUNET_DHT_RouteMessage *) &pending[1];
683 pending->msg = &message->header;
684 pending->handle = handle;
685 pending->cont = cont;
686 pending->cont_cls = cont_cls;
687
688 message->header.size = htons (msize);
689 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE);
690 message->options = htonl ((uint32_t) options);
691 message->desired_replication_level = htonl (desired_replication_level);
692 message->reserved = 0;
693 message->key = *key;
694 handle->uid_gen++;
695 message->unique_id = GNUNET_htonll (handle->uid_gen);
696 memcpy (&message[1], enc, esize);
697
698 if (iter != NULL)
699 {
700 route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
701 route_handle->key = *key;
702 route_handle->iter = iter;
703 route_handle->iter_cls = iter_cls;
704 route_handle->dht_handle = handle;
705 route_handle->uid = handle->uid_gen;
706 route_handle->message = pending;
707 GNUNET_CONTAINER_multihashmap_put (handle->active_requests, key,
708 route_handle,
709 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
710 }
711 else
712 {
713 route_handle = NULL;
714 pending->free_on_send = GNUNET_YES;
715 pending->timeout_task =
716 GNUNET_SCHEDULER_add_delayed (timeout, &timeout_route_request, pending);
717 }
718 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
719 pending);
720 pending->in_pending_queue = GNUNET_YES;
721 process_pending_messages (handle);
722 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
723 "DHT route start request processed, returning %p\n",
724 route_handle);
725 return route_handle;
726}
727
728
729/**
730 * Stop a previously issued routing request
731 *
732 * @param route_handle handle to the request to stop
733 */
734void
735GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle)
736{
737 struct GNUNET_DHT_Handle *handle;
738 struct PendingMessage *pending;
739 struct GNUNET_DHT_StopMessage *message;
740 size_t msize;
741
742 handle = route_handle->dht_handle;
743 if (GNUNET_NO == route_handle->message->in_pending_queue)
744 {
745 /* need to send stop message */
746 msize = sizeof (struct GNUNET_DHT_StopMessage);
747 pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
748 message = (struct GNUNET_DHT_StopMessage *) &pending[1];
749 pending->msg = &message->header;
750 message->header.size = htons (msize);
751 message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_STOP);
752 message->reserved = 0;
753 message->unique_id = GNUNET_htonll (route_handle->uid);
754 message->key = route_handle->key;
755 pending->handle = handle;
756 pending->free_on_send = GNUNET_YES;
757 pending->in_pending_queue = GNUNET_YES;
758 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
759 pending);
760 process_pending_messages (handle);
761 }
762 else
763 {
764 /* simply remove pending request from message queue before
765 * transmission, no need to transmit STOP request! */
766 GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
767 route_handle->message);
768 }
769 GNUNET_assert (GNUNET_YES ==
770 GNUNET_CONTAINER_multihashmap_remove
771 (route_handle->dht_handle->active_requests, &route_handle->key,
772 route_handle));
773 GNUNET_free (route_handle->message);
774 GNUNET_free (route_handle);
775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DHT route stop request processed\n");
776}
777
778
779
780/* ***** Special API for controlling DHT routing maintenance ******* */
781
782
783/**
784 * Send a control message to the DHT.
785 *
786 * @param handle handle to the DHT service
787 * @param command command
788 * @param variable variable to the command
789 * @param cont continuation to call when done (transmitting request to service)
790 * @param cont_cls closure for cont
791 */
792static void
793send_control_message (struct GNUNET_DHT_Handle *handle, uint16_t command,
794 uint16_t variable, GNUNET_SCHEDULER_Task cont,
795 void *cont_cls)
796{
797 struct GNUNET_DHT_ControlMessage *msg;
798 struct PendingMessage *pending;
799
800 pending =
801 GNUNET_malloc (sizeof (struct PendingMessage) +
802 sizeof (struct GNUNET_DHT_ControlMessage));
803 msg = (struct GNUNET_DHT_ControlMessage *) &pending[1];
804 pending->msg = &msg->header;
805 msg->header.size = htons (sizeof (struct GNUNET_DHT_ControlMessage));
806 msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CONTROL);
807 msg->command = htons (command);
808 msg->variable = htons (variable);
809 pending->free_on_send = GNUNET_YES;
810 pending->cont = cont;
811 pending->cont_cls = cont_cls;
812 pending->in_pending_queue = GNUNET_YES;
813 GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
814 pending);
815 process_pending_messages (handle);
816}
817
818
819/**
820 * Send a message to the DHT telling it to issue a single find
821 * peer request using the peers unique identifier as key. This
822 * is used to fill the routing table, and is normally controlled
823 * by the DHT itself. However, for testing and perhaps more
824 * close control over the DHT, this can be explicitly managed.
825 *
826 * @param handle handle to the DHT service
827 * @param cont continuation to call when done (transmitting request to service)
828 * @param cont_cls closure for cont
829 */
830void
831GNUNET_DHT_find_peers (struct GNUNET_DHT_Handle *handle,
832 GNUNET_SCHEDULER_Task cont, void *cont_cls)
833{
834 send_control_message (handle, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0, cont,
835 cont_cls);
836}
837
838
839
840#if HAVE_MALICIOUS
841
842/**
843 * Send a message to the DHT telling it to start issuing random GET
844 * requests every 'frequency' milliseconds.
845 *
846 * @param handle handle to the DHT service
847 * @param frequency delay between sending malicious messages
848 * @param cont continuation to call when done (transmitting request to service)
849 * @param cont_cls closure for cont
850 */
851void
852GNUNET_DHT_set_malicious_getter (struct GNUNET_DHT_Handle *handle,
853 struct GNUNET_TIME_Relative frequency,
854 GNUNET_SCHEDULER_Task cont, void *cont_cls)
855{
856 if (frequency.rel_value > UINT16_MAX)
857 {
858 GNUNET_break (0);
859 return;
860 }
861 send_control_message (handle, GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_GET,
862 frequency.rel_value, cont, cont_cls);
863}
864
865/**
866 * Send a message to the DHT telling it to start issuing random PUT
867 * requests every 'frequency' milliseconds.
868 *
869 * @param handle handle to the DHT service
870 * @param frequency delay between sending malicious messages
871 * @param cont continuation to call when done (transmitting request to service)
872 * @param cont_cls closure for cont
873 */
874void
875GNUNET_DHT_set_malicious_putter (struct GNUNET_DHT_Handle *handle,
876 struct GNUNET_TIME_Relative frequency,
877 GNUNET_SCHEDULER_Task cont, void *cont_cls)
878{
879 if (frequency.rel_value > UINT16_MAX)
880 {
881 GNUNET_break (0);
882 return;
883 }
884
885 send_control_message (handle, GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_PUT,
886 frequency.rel_value, cont, cont_cls);
887}
888
889
890/**
891 * Send a message to the DHT telling it to start dropping
892 * all requests received.
893 *
894 * @param handle handle to the DHT service
895 * @param cont continuation to call when done (transmitting request to service)
896 * @param cont_cls closure for cont
897 *
898 */
899void
900GNUNET_DHT_set_malicious_dropper (struct GNUNET_DHT_Handle *handle,
901 GNUNET_SCHEDULER_Task cont, void *cont_cls)
902{
903 send_control_message (handle, GNUNET_MESSAGE_TYPE_DHT_MALICIOUS_DROP, 0, cont,
904 cont_cls);
905}
906
907#endif
908
909/* end of dht_api.c */
diff --git a/src/dht/dht_new.h b/src/dht/dht_new.h
new file mode 100644
index 000000000..c8e2baff1
--- /dev/null
+++ b/src/dht/dht_new.h
@@ -0,0 +1,190 @@
1/*
2 This file is part of GNUnet.
3 (C) 2001, 2002, 2003, 2004, 2009, 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @author Christian Grothoff
23 * @author Nathan Evans
24 * @file dht/dht.h
25 */
26
27#ifndef DHT_H_
28#define DHT_H_
29
30
31/**
32 * Message which indicates the DHT should cancel outstanding
33 * requests and discard any state.
34 */
35struct GNUNET_DHT_ClientGetStopMessage
36{
37 /**
38 * Type: GNUNET_MESSAGE_TYPE_DHT_GET_STOP
39 */
40 struct GNUNET_MessageHeader header;
41
42 /**
43 * Always zero.
44 */
45 uint32_t reserved GNUNET_PACKED;
46
47 /**
48 * Unique ID identifying this request
49 */
50 uint64_t unique_id GNUNET_PACKED;
51
52 /**
53 * Key of this request
54 */
55 GNUNET_HashCode key;
56
57};
58
59
60/**
61 * DHT GET message sent from clients to service. Indicates that a GET
62 * request should be issued.
63 */
64struct GNUNET_DHT_ClientGetMessage
65{
66 /**
67 * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET
68 */
69 struct GNUNET_MessageHeader header;
70
71 /**
72 * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
73 */
74 uint32_t options GNUNET_PACKED;
75
76 /**
77 * Replication level for this message
78 */
79 uint32_t desired_replication_level GNUNET_PACKED;
80
81 /**
82 * The type for the data for the GET request; actually an 'enum
83 * GNUNET_BLOCK_Type'.
84 */
85 uint32_t type;
86
87 /**
88 * The key to search for
89 */
90 GNUNET_HashCode key;
91
92 /**
93 * Unique ID identifying this request, if 0 then
94 * the client will not expect a response
95 */
96 uint64_t unique_id GNUNET_PACKED;
97
98 /* Possibly followed by xquery, copied to end of this dealy do */
99
100};
101
102
103/**
104 * Reply to a GET send from the service to a client.
105 */
106struct GNUNET_DHT_ClientResultMessage
107{
108 /**
109 * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT
110 */
111 struct GNUNET_MessageHeader header;
112
113 /**
114 * The type for the data.
115 */
116 uint32_t type;
117
118 /**
119 * Number of peers recorded in the outgoing path from source to the
120 * storgage location of this message.
121 */
122 uint32_t put_path_length GNUNET_PACKED;
123
124 /**
125 * The number of peer identities recorded from the storage location
126 * to this peer.
127 */
128 uint32_t get_path_length GNUNET_PACKED;
129
130 /**
131 * Unique ID of the matching GET request.
132 */
133 uint64_t unique_id GNUNET_PACKED;
134
135 /**
136 * When does this entry expire?
137 */
138 struct GNUNET_TIME_AbsoluteNBO expiration;
139
140 /**
141 * The key that was searched for
142 */
143 GNUNET_HashCode key;
144
145 /* put path, get path and actual data are copied to end of this dealy do */
146
147};
148
149
150/**
151 * Message to insert data into the DHT, sent from clients to DHT service.
152 */
153struct GNUNET_DHT_ClientPutMessage
154{
155 /**
156 * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT
157 */
158 struct GNUNET_MessageHeader header;
159
160 /**
161 * The type of data to insert.
162 */
163 uint32_t type GNUNET_PACKED;
164
165 /**
166 * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
167 */
168 uint32_t options GNUNET_PACKED;
169
170 /**
171 * Replication level for this message
172 */
173 uint32_t desired_replication_level GNUNET_PACKED;
174
175 /**
176 * How long should this data persist?
177 */
178 struct GNUNET_TIME_AbsoluteNBO expiration;
179
180 /**
181 * The key to store the value under.
182 */
183 GNUNET_HashCode key;
184
185 /* DATA copied to end of this message */
186
187};
188
189
190#endif
diff --git a/src/dht/gnunet-service-dht-new.c b/src/dht/gnunet-service-dht-new.c
new file mode 100644
index 000000000..62705bf38
--- /dev/null
+++ b/src/dht/gnunet-service-dht-new.c
@@ -0,0 +1,2991 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht.c
23 * @brief GNUnet DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 *
27 * TODO:
28 * - use OPTION_MULTIPLE instead of linked list for the forward_list.hashmap
29 * - use different 'struct DHT_MessageContext' for the different types of
30 * messages (currently rather confusing, especially with things like
31 * peer bloom filters occuring when processing replies).
32 */
33
34#include "platform.h"
35#include "gnunet_block_lib.h"
36#include "gnunet_client_lib.h"
37#include "gnunet_getopt_lib.h"
38#include "gnunet_os_lib.h"
39#include "gnunet_protocols.h"
40#include "gnunet_service_lib.h"
41#include "gnunet_nse_service.h"
42#include "gnunet_core_service.h"
43#include "gnunet_signal_lib.h"
44#include "gnunet_util_lib.h"
45#include "gnunet_datacache_lib.h"
46#include "gnunet_transport_service.h"
47#include "gnunet_hello_lib.h"
48#include "gnunet_dht_service.h"
49#include "gnunet_statistics_service.h"
50#include "dht.h"
51#include <fenv.h>
52
53
54/**
55 * Defines whether find peer requests send their HELLO's outgoing,
56 * or expect replies to contain hellos.
57 */
58#define FIND_PEER_WITH_HELLO GNUNET_YES
59
60#define DEFAULT_CORE_QUEUE_SIZE 32
61
62/**
63 * Minimum number of peers we need for "good" routing,
64 * any less than this and we will allow messages to
65 * travel much further through the network!
66 */
67#define MINIMUM_PEER_THRESHOLD 20
68
69/**
70 * Number of requests we track at most (for routing replies).
71 */
72#define DHT_MAX_RECENT (1024 * 16)
73
74/**
75 * How long do we wait at most when queueing messages with core
76 * that we are sending on behalf of other peers.
77 */
78#define DHT_DEFAULT_P2P_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
79
80/**
81 * Default importance for handling messages on behalf of other peers.
82 */
83#define DHT_DEFAULT_P2P_IMPORTANCE 0
84
85/**
86 * How long to keep recent requests around by default.
87 */
88#define DEFAULT_RECENT_REMOVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
89
90/**
91 * Default time to wait to send find peer messages sent by the dht service.
92 */
93#define DHT_DEFAULT_FIND_PEER_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
94
95/**
96 * Default importance for find peer messages sent by the dht service.
97 */
98#define DHT_DEFAULT_FIND_PEER_IMPORTANCE 8
99
100/**
101 * Default replication parameter for find peer messages sent by the dht service.
102 */
103#define DHT_DEFAULT_FIND_PEER_REPLICATION 4
104
105/**
106 * How long at least to wait before sending another find peer request.
107 */
108#define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
109
110/**
111 * How long at most to wait before sending another find peer request.
112 */
113#define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 8)
114
115/**
116 * How often to update our preference levels for peers in our routing tables.
117 */
118#define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
119
120/**
121 * How long at most on average will we allow a reply forward to take
122 * (before we quit sending out new requests)
123 */
124#define MAX_REQUEST_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
125
126/**
127 * How many time differences between requesting a core send and
128 * the actual callback to remember.
129 */
130#define MAX_REPLY_TIMES 8
131
132
133
134
135/**
136 * Context containing information about a DHT message received.
137 */
138struct DHT_MessageContext
139{
140 /**
141 * The client this request was received from.
142 * (NULL if received from another peer)
143 */
144 struct ClientList *client;
145
146 /**
147 * The peer this request was received from.
148 */
149 struct GNUNET_PeerIdentity peer;
150
151 /**
152 * Bloomfilter for this routing request.
153 */
154 struct GNUNET_CONTAINER_BloomFilter *bloom;
155
156 /**
157 * extended query (see gnunet_block_lib.h).
158 */
159 const void *xquery;
160
161 /**
162 * Bloomfilter to filter out duplicate replies.
163 */
164 struct GNUNET_CONTAINER_BloomFilter *reply_bf;
165
166 /**
167 * The key this request was about
168 */
169 GNUNET_HashCode key;
170
171 /**
172 * How long should we wait to transmit this request?
173 */
174 struct GNUNET_TIME_Relative timeout;
175
176 /**
177 * The unique identifier of this request
178 */
179 uint64_t unique_id;
180
181 /**
182 * Number of bytes in xquery.
183 */
184 size_t xquery_size;
185
186 /**
187 * Mutator value for the reply_bf, see gnunet_block_lib.h
188 */
189 uint32_t reply_bf_mutator;
190
191 /**
192 * Desired replication level
193 */
194 uint32_t replication;
195
196 /**
197 * Network size estimate, either ours or the sum of
198 * those routed to thus far. =~ Log of number of peers
199 * chosen from for this request.
200 */
201 uint32_t network_size;
202
203 /**
204 * Any message options for this request
205 */
206 uint32_t msg_options;
207
208 /**
209 * How many hops has the message already traversed?
210 */
211 uint32_t hop_count;
212
213 /**
214 * How many peer identities are present in the path history?
215 */
216 uint32_t path_history_len;
217
218 /**
219 * Path history.
220 */
221 char *path_history;
222
223 /**
224 * How important is this message?
225 */
226 unsigned int importance;
227
228 /**
229 * Should we (still) forward the request on to other peers?
230 */
231 int do_forward;
232
233 /**
234 * Did we forward this message? (may need to remember it!)
235 */
236 int forwarded;
237
238 /**
239 * Are we the closest known peer to this key (out of our neighbors?)
240 */
241 int closest;
242};
243
244
245/**
246 * Record used for remembering what peers are waiting for what
247 * responses (based on search key).
248 */
249struct DHTRouteSource
250{
251 /**
252 * This is a DLL.
253 */
254 struct DHTRouteSource *next;
255
256 /**
257 * This is a DLL.
258 */
259 struct DHTRouteSource *prev;
260
261 /**
262 * UID of the request, 0 if from another peer.
263 */
264 uint64_t uid;
265
266 /**
267 * Source of the request. Replies should be forwarded to
268 * this peer.
269 */
270 struct GNUNET_PeerIdentity source;
271
272 /**
273 * If this was a local request, remember the client; otherwise NULL.
274 */
275 struct ClientList *client;
276
277 /**
278 * Pointer to this nodes heap location (for removal)
279 */
280 struct GNUNET_CONTAINER_HeapNode *hnode;
281
282 /**
283 * Back pointer to the record storing this information.
284 */
285 struct DHTQueryRecord *record;
286
287 /**
288 * Task to remove this entry on timeout.
289 */
290 GNUNET_SCHEDULER_TaskIdentifier delete_task;
291
292 /**
293 * Bloomfilter of peers we have already sent back as
294 * replies to the initial request. Allows us to not
295 * forward the same peer multiple times for a find peer
296 * request.
297 */
298 struct GNUNET_CONTAINER_BloomFilter *find_peers_responded;
299
300};
301
302
303/**
304 * Entry in the DHT routing table.
305 */
306struct DHTQueryRecord
307{
308 /**
309 * Head of DLL for result forwarding.
310 */
311 struct DHTRouteSource *head;
312
313 /**
314 * Tail of DLL for result forwarding.
315 */
316 struct DHTRouteSource *tail;
317
318 /**
319 * Key that the record concerns.
320 */
321 GNUNET_HashCode key;
322
323};
324
325
326/**
327 * Context used to calculate the number of find peer messages
328 * per X time units since our last scheduled find peer message
329 * was sent. If we have seen too many messages, delay or don't
330 * send our own out.
331 */
332struct FindPeerMessageContext
333{
334 unsigned int count;
335
336 struct GNUNET_TIME_Absolute start;
337
338};
339
340
341struct RecentRequest
342{
343 /**
344 * Position of this node in the min heap.
345 */
346 struct GNUNET_CONTAINER_HeapNode *heap_node;
347
348 /**
349 * Bloomfilter containing entries for peers
350 * we forwarded this request to.
351 */
352 struct GNUNET_CONTAINER_BloomFilter *bloom;
353
354 /**
355 * Timestamp of this request, for ordering
356 * the min heap.
357 */
358 struct GNUNET_TIME_Absolute timestamp;
359
360 /**
361 * Key of this request.
362 */
363 GNUNET_HashCode key;
364
365 /**
366 * Unique identifier for this request, 0 if from another peer.
367 */
368 uint64_t uid;
369
370 /**
371 * Task to remove this entry on timeout.
372 */
373 GNUNET_SCHEDULER_TaskIdentifier remove_task;
374};
375
376
377/**
378 * Recent requests by time inserted.
379 */
380static struct GNUNET_CONTAINER_Heap *recent_heap;
381
382/**
383 * Context to use to calculate find peer rates.
384 */
385static struct FindPeerMessageContext find_peer_context;
386
387/**
388 * How many peers have we added since we sent out our last
389 * find peer request?
390 */
391static unsigned int newly_found_peers;
392
393/**
394 * Handle to the datacache service (for inserting/retrieving data)
395 */
396static struct GNUNET_DATACACHE_Handle *datacache;
397
398/**
399 * Handle for the statistics service.
400 */
401struct GNUNET_STATISTICS_Handle *stats;
402
403/**
404 * Handle to get our current HELLO.
405 */
406static struct GNUNET_TRANSPORT_GetHelloHandle *ghh;
407
408/**
409 * The configuration the DHT service is running with
410 */
411static const struct GNUNET_CONFIGURATION_Handle *cfg;
412
413/**
414 * Handle to the core service
415 */
416static struct GNUNET_CORE_Handle *coreAPI;
417
418/**
419 * Handle to the transport service, for getting our hello
420 */
421static struct GNUNET_TRANSPORT_Handle *transport_handle;
422
423/**
424 * The identity of our peer.
425 */
426static struct GNUNET_PeerIdentity my_identity;
427
428/**
429 * Short id of the peer, for printing
430 */
431static char *my_short_id;
432
433/**
434 * Our HELLO
435 */
436static struct GNUNET_MessageHeader *my_hello;
437
438/**
439 * Task to run when we shut down, cleaning up all our trash
440 */
441static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
442
443/**
444 * Recently seen find peer requests.
445 */
446static struct GNUNET_CONTAINER_MultiHashMap *recent_find_peer_requests;
447
448/**
449 * Reply times for requests, if we are busy, don't send any
450 * more requests!
451 */
452static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
453
454/**
455 * Current counter for replies.
456 */
457static unsigned int reply_counter;
458
459/**
460 * Our handle to the BLOCK library.
461 */
462static struct GNUNET_BLOCK_Context *block_context;
463
464
465
466/** Declare here so retry_core_send is aware of it */
467static size_t
468core_transmit_notify (void *cls, size_t size, void *buf);
469
470
471
472/**
473 * Given the largest send delay, artificially decrease it
474 * so the next time around we may have a chance at sending
475 * again.
476 */
477static void
478decrease_max_send_delay (struct GNUNET_TIME_Relative max_time)
479{
480 unsigned int i;
481
482 for (i = 0; i < MAX_REPLY_TIMES; i++)
483 {
484 if (reply_times[i].rel_value == max_time.rel_value)
485 {
486 reply_times[i].rel_value = reply_times[i].rel_value / 2;
487 return;
488 }
489 }
490}
491
492
493/**
494 * Find the maximum send time of the recently sent values.
495 *
496 * @return the average time between asking core to send a message
497 * and when the buffer for copying it is passed
498 */
499static struct GNUNET_TIME_Relative
500get_max_send_delay ()
501{
502 unsigned int i;
503 struct GNUNET_TIME_Relative max_time;
504
505 max_time = GNUNET_TIME_relative_get_zero ();
506
507 for (i = 0; i < MAX_REPLY_TIMES; i++)
508 {
509 if (reply_times[i].rel_value > max_time.rel_value)
510 max_time.rel_value = reply_times[i].rel_value;
511 }
512#if DEBUG_DHT
513 if (max_time.rel_value > MAX_REQUEST_TIME.rel_value)
514 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Max send delay was %llu\n",
515 (unsigned long long) max_time.rel_value);
516#endif
517 return max_time;
518}
519
520
521static void
522increment_stats (const char *value)
523{
524 if (stats == NULL)
525 return;
526 GNUNET_STATISTICS_update (stats, value, 1, GNUNET_NO);
527}
528
529
530static void
531decrement_stats (const char *value)
532{
533 if (stats == NULL)
534 return;
535 GNUNET_STATISTICS_update (stats, value, -1, GNUNET_NO);
536}
537
538
539/**
540 * Try to send another message from our core send list
541 */
542static void
543try_core_send (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
544{
545 struct PeerInfo *peer = cls;
546 struct P2PPendingMessage *pending;
547 size_t ssize;
548
549 peer->send_task = GNUNET_SCHEDULER_NO_TASK;
550
551 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
552 return;
553
554 if (peer->th != NULL)
555 return; /* Message send already in progress */
556
557 pending = peer->head;
558 if (pending != NULL)
559 {
560 ssize = ntohs (pending->msg->size);
561#if DEBUG_DHT > 1
562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
563 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n",
564 my_short_id, "DHT", ssize, GNUNET_i2s (&peer->id));
565#endif
566 pending->scheduled = GNUNET_TIME_absolute_get ();
567 reply_counter++;
568 if (reply_counter >= MAX_REPLY_TIMES)
569 reply_counter = 0;
570 peer->th =
571 GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
572 pending->importance,
573 pending->timeout, &peer->id, ssize,
574 &core_transmit_notify, peer);
575 if (peer->th == NULL)
576 increment_stats ("# notify transmit ready failed");
577 }
578}
579
580
581/**
582 * Function called to send a request out to another peer.
583 * Called both for locally initiated requests and those
584 * received from other peers.
585 *
586 * @param msg the encapsulated message
587 * @param peer the peer to forward the message to
588 * @param msg_ctx the context of the message (hop count, bloom, etc.)
589 */
590static void
591forward_result_message (const struct GNUNET_MessageHeader *msg,
592 struct PeerInfo *peer,
593 struct DHT_MessageContext *msg_ctx)
594{
595 struct GNUNET_DHT_P2PRouteResultMessage *result_message;
596 struct P2PPendingMessage *pending;
597 size_t msize;
598 size_t psize;
599 char *path_start;
600 char *path_offset;
601
602 increment_stats (STAT_RESULT_FORWARDS);
603 msize =
604 sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs (msg->size) +
605 (sizeof (struct GNUNET_PeerIdentity) * msg_ctx->path_history_len);
606 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
607 psize = sizeof (struct P2PPendingMessage) + msize;
608 pending = GNUNET_malloc (psize);
609 pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
610 pending->importance = DHT_SEND_PRIORITY;
611 pending->timeout = GNUNET_TIME_relative_get_forever ();
612 result_message = (struct GNUNET_DHT_P2PRouteResultMessage *) pending->msg;
613 result_message->header.size = htons (msize);
614 result_message->header.type =
615 htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT);
616 result_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
617 if (msg_ctx->path_history_len > 0)
618 {
619 /* End of pending is where enc_msg starts */
620 path_start = (char *) &pending[1];
621 /* Offset by the size of the enc_msg */
622 path_start += ntohs (msg->size);
623 memcpy (path_start, msg_ctx->path_history,
624 msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
625 }
626 result_message->options = htonl (msg_ctx->msg_options);
627 result_message->hop_count = htonl (msg_ctx->hop_count + 1);
628 memcpy (&result_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
629 /* Copy the enc_msg, then the path history as well! */
630 memcpy (&result_message[1], msg, ntohs (msg->size));
631 path_offset = (char *) &result_message[1];
632 path_offset += ntohs (msg->size);
633 /* If we have path history, copy it to the end of the whole thing */
634 if (msg_ctx->path_history_len > 0)
635 memcpy (path_offset, msg_ctx->path_history,
636 msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
637#if DEBUG_DHT > 1
638 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
639 "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
640 "DHT", msize, GNUNET_i2s (&peer->id));
641#endif
642 peer->pending_count++;
643 increment_stats ("# pending messages scheduled");
644 GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
645 pending);
646 if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
647 peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
648}
649
650
651/**
652 * Called when core is ready to send a message we asked for
653 * out to the destination.
654 *
655 * @param cls closure (NULL)
656 * @param size number of bytes available in buf
657 * @param buf where the callee should write the message
658 * @return number of bytes written to buf
659 */
660static size_t
661core_transmit_notify (void *cls, size_t size, void *buf)
662{
663 struct PeerInfo *peer = cls;
664 char *cbuf = buf;
665 struct P2PPendingMessage *pending;
666
667 size_t off;
668 size_t msize;
669
670 peer->th = NULL;
671 if (buf == NULL)
672 {
673 /* client disconnected */
674#if DEBUG_DHT
675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n",
676 my_short_id, "DHT");
677#endif
678 return 0;
679 }
680
681 if (peer->head == NULL)
682 return 0;
683
684 off = 0;
685 pending = peer->head;
686 while (NULL != pending &&
687 (size - off >= (msize = ntohs (pending->msg->size))))
688 {
689 memcpy (&cbuf[off], pending->msg, msize);
690 off += msize;
691 peer->pending_count--;
692 increment_stats ("# pending messages sent");
693 GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
694 GNUNET_free (pending);
695 pending = peer->head;
696 }
697 if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
698 peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
699
700 return off;
701}
702
703
704/**
705 * Compute the distance between have and target as a 32-bit value.
706 * Differences in the lower bits must count stronger than differences
707 * in the higher bits.
708 *
709 * @return 0 if have==target, otherwise a number
710 * that is larger as the distance between
711 * the two hash codes increases
712 */
713static unsigned int
714distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
715{
716 unsigned int bucket;
717 unsigned int msb;
718 unsigned int lsb;
719 unsigned int i;
720
721 /* We have to represent the distance between two 2^9 (=512)-bit
722 * numbers as a 2^5 (=32)-bit number with "0" being used for the
723 * two numbers being identical; furthermore, we need to
724 * guarantee that a difference in the number of matching
725 * bits is always represented in the result.
726 *
727 * We use 2^32/2^9 numerical values to distinguish between
728 * hash codes that have the same LSB bit distance and
729 * use the highest 2^9 bits of the result to signify the
730 * number of (mis)matching LSB bits; if we have 0 matching
731 * and hence 512 mismatching LSB bits we return -1 (since
732 * 512 itself cannot be represented with 9 bits) */
733
734 /* first, calculate the most significant 9 bits of our
735 * result, aka the number of LSBs */
736 bucket = GNUNET_CRYPTO_hash_matching_bits (target, have);
737 /* bucket is now a value between 0 and 512 */
738 if (bucket == 512)
739 return 0; /* perfect match */
740 if (bucket == 0)
741 return (unsigned int) -1; /* LSB differs; use max (if we did the bit-shifting
742 * below, we'd end up with max+1 (overflow)) */
743
744 /* calculate the most significant bits of the final result */
745 msb = (512 - bucket) << (32 - 9);
746 /* calculate the 32-9 least significant bits of the final result by
747 * looking at the differences in the 32-9 bits following the
748 * mismatching bit at 'bucket' */
749 lsb = 0;
750 for (i = bucket + 1;
751 (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++)
752 {
753 if (GNUNET_CRYPTO_hash_get_bit (target, i) !=
754 GNUNET_CRYPTO_hash_get_bit (have, i))
755 lsb |= (1 << (bucket + 32 - 9 - i)); /* first bit set will be 10,
756 * last bit set will be 31 -- if
757 * i does not reach 512 first... */
758 }
759 return msb | lsb;
760}
761
762
763/**
764 * Return a number that is larger the closer the
765 * "have" GNUNET_hash code is to the "target".
766 *
767 * @return inverse distance metric, non-zero.
768 * Must fudge the value if NO bits match.
769 */
770static unsigned int
771inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
772{
773 if (GNUNET_CRYPTO_hash_matching_bits (target, have) == 0)
774 return 1; /* Never return 0! */
775 return ((unsigned int) -1) - distance (target, have);
776}
777
778
779/**
780 * Find the optimal bucket for this key, regardless
781 * of the current number of buckets in use.
782 *
783 * @param hc the hashcode to compare our identity to
784 *
785 * @return the proper bucket index, or GNUNET_SYSERR
786 * on error (same hashcode)
787 */
788static int
789find_bucket (const GNUNET_HashCode * hc)
790{
791 unsigned int bits;
792
793 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
794 if (bits == MAX_BUCKETS)
795 return GNUNET_SYSERR;
796 return MAX_BUCKETS - bits - 1;
797}
798
799
800/**
801 * Find which k-bucket this peer should go into,
802 * taking into account the size of the k-bucket
803 * array. This means that if more bits match than
804 * there are currently buckets, lowest_bucket will
805 * be returned.
806 *
807 * @param hc GNUNET_HashCode we are finding the bucket for.
808 *
809 * @return the proper bucket index for this key,
810 * or GNUNET_SYSERR on error (same hashcode)
811 */
812static int
813find_current_bucket (const GNUNET_HashCode * hc)
814{
815 int actual_bucket;
816
817 actual_bucket = find_bucket (hc);
818 if (actual_bucket == GNUNET_SYSERR) /* hc and our peer identity match! */
819 return lowest_bucket;
820 if (actual_bucket < lowest_bucket) /* actual_bucket not yet used */
821 return lowest_bucket;
822 return actual_bucket;
823}
824
825
826/**
827 * Find a routing table entry from a peer identity
828 *
829 * @param peer the peer identity to look up
830 *
831 * @return the routing table entry, or NULL if not found
832 */
833static struct PeerInfo *
834find_peer_by_id (const struct GNUNET_PeerIdentity *peer)
835{
836 int bucket;
837 struct PeerInfo *pos;
838
839 bucket = find_current_bucket (&peer->hashPubKey);
840
841 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
842 return NULL;
843
844 pos = k_buckets[bucket].head;
845 while (pos != NULL)
846 {
847 if (0 == memcmp (&pos->id, peer, sizeof (struct GNUNET_PeerIdentity)))
848 return pos;
849 pos = pos->next;
850 }
851 return NULL; /* No such peer. */
852}
853
854/* Forward declaration */
855static void
856update_core_preference (void *cls,
857 const struct GNUNET_SCHEDULER_TaskContext *tc);
858
859
860/**
861 * Function called with statistics about the given peer.
862 *
863 * @param cls closure
864 * @param peer identifies the peer
865 * @param bpm_out set to the current bandwidth limit (sending) for this peer
866 * @param amount set to the amount that was actually reserved or unreserved;
867 * either the full requested amount or zero (no partial reservations)
868 * @param res_delay if the reservation could not be satisfied (amount was 0), how
869 * long should the client wait until re-trying?
870 * @param preference current traffic preference for the given peer
871 */
872static void
873update_core_preference_finish (void *cls,
874 const struct GNUNET_PeerIdentity *peer,
875 struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
876 int32_t amount,
877 struct GNUNET_TIME_Relative res_delay,
878 uint64_t preference)
879{
880 struct PeerInfo *peer_info = cls;
881
882 peer_info->info_ctx = NULL;
883 GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL,
884 &update_core_preference, peer_info);
885}
886
887static void
888update_core_preference (void *cls,
889 const struct GNUNET_SCHEDULER_TaskContext *tc)
890{
891 struct PeerInfo *peer = cls;
892 uint64_t preference;
893 unsigned int matching;
894
895 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
896 {
897 return;
898 }
899 matching =
900 GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
901 &peer->id.hashPubKey);
902 if (matching >= 64)
903 {
904#if DEBUG_DHT
905 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
906 "Peer identifier matches by %u bits, only shifting as much as we can!\n",
907 matching);
908#endif
909 matching = 63;
910 }
911 preference = 1LL << matching;
912 peer->info_ctx =
913 GNUNET_CORE_peer_change_preference (coreAPI, &peer->id,
914 GNUNET_TIME_UNIT_FOREVER_REL,
915 GNUNET_BANDWIDTH_VALUE_MAX, 0,
916 preference,
917 &update_core_preference_finish, peer);
918}
919
920
921/**
922 * Given a peer and its corresponding bucket,
923 * remove it from that bucket. Does not free
924 * the PeerInfo struct, nor cancel messages
925 * or free messages waiting to be sent to this
926 * peer!
927 *
928 * @param peer the peer to remove
929 * @param bucket the bucket the peer belongs to
930 */
931static void
932remove_peer (struct PeerInfo *peer, unsigned int bucket)
933{
934 GNUNET_assert (k_buckets[bucket].peers_size > 0);
935 GNUNET_CONTAINER_DLL_remove (k_buckets[bucket].head, k_buckets[bucket].tail,
936 peer);
937 k_buckets[bucket].peers_size--;
938 if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) &&
939 (lowest_bucket < MAX_BUCKETS - 1))
940 lowest_bucket++;
941}
942
943/**
944 * Removes peer from a bucket, then frees associated
945 * resources and frees peer.
946 *
947 * @param peer peer to be removed and freed
948 * @param bucket which bucket this peer belongs to
949 */
950static void
951delete_peer (struct PeerInfo *peer, unsigned int bucket)
952{
953 struct P2PPendingMessage *pos;
954 struct P2PPendingMessage *next;
955
956 remove_peer (peer, bucket); /* First remove the peer from its bucket */
957 if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
958 GNUNET_SCHEDULER_cancel (peer->send_task);
959 if ((peer->th != NULL) && (coreAPI != NULL))
960 GNUNET_CORE_notify_transmit_ready_cancel (peer->th);
961
962 pos = peer->head;
963 while (pos != NULL) /* Remove any pending messages for this peer */
964 {
965 increment_stats
966 ("# dht pending messages discarded (due to disconnect/shutdown)");
967 next = pos->next;
968 GNUNET_free (pos);
969 pos = next;
970 }
971
972 GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
973 (all_known_peers, &peer->id.hashPubKey));
974 GNUNET_assert (GNUNET_YES ==
975 GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
976 &peer->id.hashPubKey,
977 peer));
978 GNUNET_free (peer);
979 decrement_stats (STAT_PEERS_KNOWN);
980}
981
982
983/**
984 * Iterator over hash map entries.
985 *
986 * @param cls closure
987 * @param key current key code
988 * @param value PeerInfo of the peer to move to new lowest bucket
989 * @return GNUNET_YES if we should continue to
990 * iterate,
991 * GNUNET_NO if not.
992 */
993static int
994move_lowest_bucket (void *cls, const GNUNET_HashCode * key, void *value)
995{
996 struct PeerInfo *peer = value;
997 int new_bucket;
998
999 GNUNET_assert (lowest_bucket > 0);
1000 new_bucket = lowest_bucket - 1;
1001 remove_peer (peer, lowest_bucket);
1002 GNUNET_CONTAINER_DLL_insert_after (k_buckets[new_bucket].head,
1003 k_buckets[new_bucket].tail,
1004 k_buckets[new_bucket].tail, peer);
1005 k_buckets[new_bucket].peers_size++;
1006 return GNUNET_YES;
1007}
1008
1009
1010/**
1011 * The current lowest bucket is full, so change the lowest
1012 * bucket to the next lower down, and move any appropriate
1013 * entries in the current lowest bucket to the new bucket.
1014 */
1015static void
1016enable_next_bucket ()
1017{
1018 struct GNUNET_CONTAINER_MultiHashMap *to_remove;
1019 struct PeerInfo *pos;
1020
1021 GNUNET_assert (lowest_bucket > 0);
1022 to_remove = GNUNET_CONTAINER_multihashmap_create (bucket_size);
1023 pos = k_buckets[lowest_bucket].head;
1024
1025 /* Populate the array of peers which should be in the next lowest bucket */
1026 while (pos != NULL)
1027 {
1028 if (find_bucket (&pos->id.hashPubKey) < lowest_bucket)
1029 GNUNET_CONTAINER_multihashmap_put (to_remove, &pos->id.hashPubKey, pos,
1030 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1031 pos = pos->next;
1032 }
1033
1034 /* Remove peers from lowest bucket, insert into next lowest bucket */
1035 GNUNET_CONTAINER_multihashmap_iterate (to_remove, &move_lowest_bucket, NULL);
1036 GNUNET_CONTAINER_multihashmap_destroy (to_remove);
1037 lowest_bucket = lowest_bucket - 1;
1038}
1039
1040
1041/**
1042 * Find the closest peer in our routing table to the
1043 * given hashcode.
1044 *
1045 * @return The closest peer in our routing table to the
1046 * key, or NULL on error.
1047 */
1048static struct PeerInfo *
1049find_closest_peer (const GNUNET_HashCode * hc)
1050{
1051 struct PeerInfo *pos;
1052 struct PeerInfo *current_closest;
1053 unsigned int lowest_distance;
1054 unsigned int temp_distance;
1055 int bucket;
1056 int count;
1057
1058 lowest_distance = -1;
1059
1060 if (k_buckets[lowest_bucket].peers_size == 0)
1061 return NULL;
1062
1063 current_closest = NULL;
1064 for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1065 {
1066 pos = k_buckets[bucket].head;
1067 count = 0;
1068 while ((pos != NULL) && (count < bucket_size))
1069 {
1070 temp_distance = distance (&pos->id.hashPubKey, hc);
1071 if (temp_distance <= lowest_distance)
1072 {
1073 lowest_distance = temp_distance;
1074 current_closest = pos;
1075 }
1076 pos = pos->next;
1077 count++;
1078 }
1079 }
1080 GNUNET_assert (current_closest != NULL);
1081 return current_closest;
1082}
1083
1084
1085/**
1086 * Function called to send a request out to another peer.
1087 * Called both for locally initiated requests and those
1088 * received from other peers.
1089 *
1090 * @param msg the encapsulated message
1091 * @param peer the peer to forward the message to
1092 * @param msg_ctx the context of the message (hop count, bloom, etc.)
1093 */
1094static void
1095forward_message (const struct GNUNET_MessageHeader *msg, struct PeerInfo *peer,
1096 struct DHT_MessageContext *msg_ctx)
1097{
1098 struct GNUNET_DHT_P2PRouteMessage *route_message;
1099 struct P2PPendingMessage *pending;
1100 size_t msize;
1101 size_t psize;
1102 char *route_path;
1103
1104 increment_stats (STAT_ROUTE_FORWARDS);
1105 GNUNET_assert (peer != NULL);
1106 if ((msg_ctx->closest != GNUNET_YES) &&
1107 (peer == find_closest_peer (&msg_ctx->key)))
1108 increment_stats (STAT_ROUTE_FORWARDS_CLOSEST);
1109
1110 msize =
1111 sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (msg->size) +
1112 (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1113 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1114 psize = sizeof (struct P2PPendingMessage) + msize;
1115 pending = GNUNET_malloc (psize);
1116 pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
1117 pending->importance = msg_ctx->importance;
1118 pending->timeout = msg_ctx->timeout;
1119 route_message = (struct GNUNET_DHT_P2PRouteMessage *) pending->msg;
1120 route_message->header.size = htons (msize);
1121 route_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
1122 route_message->options = htonl (msg_ctx->msg_options);
1123 route_message->hop_count = htonl (msg_ctx->hop_count + 1);
1124 route_message->network_size = htonl (msg_ctx->network_size);
1125 route_message->desired_replication_level = htonl (msg_ctx->replication);
1126 if (msg_ctx->bloom != NULL)
1127 GNUNET_assert (GNUNET_OK ==
1128 GNUNET_CONTAINER_bloomfilter_get_raw_data (msg_ctx->bloom,
1129 route_message->
1130 bloomfilter,
1131 DHT_BLOOM_SIZE));
1132 memcpy (&route_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1133 memcpy (&route_message[1], msg, ntohs (msg->size));
1134 if (GNUNET_DHT_RO_RECORD_ROUTE ==
1135 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1136 {
1137 route_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
1138 /* Set pointer to start of enc_msg */
1139 route_path = (char *) &route_message[1];
1140 /* Offset to the end of the enc_msg */
1141 route_path += ntohs (msg->size);
1142 /* Copy the route_path after enc_msg */
1143 memcpy (route_path, msg_ctx->path_history,
1144 msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1145 }
1146#if DEBUG_DHT > 1
1147 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1148 "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
1149 "DHT", msize, GNUNET_i2s (&peer->id));
1150#endif
1151 peer->pending_count++;
1152 increment_stats ("# pending messages scheduled");
1153 GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
1154 pending);
1155 if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1156 peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
1157}
1158
1159
1160
1161
1162/**
1163 * Called when a reply needs to be sent to a client, as
1164 * a result it found to a GET or FIND PEER request.
1165 *
1166 * @param client the client to send the reply to
1167 * @param message the encapsulated message to send
1168 * @param msg_ctx the context of the received message
1169 */
1170static void
1171send_reply_to_client (struct ClientList *client,
1172 const struct GNUNET_MessageHeader *message,
1173 struct DHT_MessageContext *msg_ctx)
1174{
1175 struct GNUNET_DHT_RouteResultMessage *reply;
1176 struct PendingMessage *pending_message;
1177 uint16_t msize;
1178 size_t tsize;
1179 char *reply_offset;
1180
1181#if DEBUG_DHT
1182 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Sending reply to client.\n",
1183 my_short_id, "DHT");
1184#endif
1185 msize = ntohs (message->size);
1186 tsize =
1187 sizeof (struct GNUNET_DHT_RouteResultMessage) + msize +
1188 (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1189 if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1190 {
1191 GNUNET_break_op (0);
1192 return;
1193 }
1194 pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1195 pending_message->msg = (struct GNUNET_MessageHeader *) &pending_message[1];
1196 reply = (struct GNUNET_DHT_RouteResultMessage *) &pending_message[1];
1197 reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT);
1198 reply->header.size = htons (tsize);
1199 reply->outgoing_path_length = htonl (msg_ctx->path_history_len);
1200 reply->unique_id = GNUNET_htonll (msg_ctx->unique_id);
1201 memcpy (&reply->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1202 reply_offset = (char *) &reply[1];
1203 memcpy (&reply[1], message, msize);
1204 if (msg_ctx->path_history_len > 0)
1205 {
1206 reply_offset += msize;
1207 memcpy (reply_offset, msg_ctx->path_history,
1208 msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1209 }
1210 add_pending_message (client, pending_message);
1211}
1212
1213/**
1214 * Consider whether or not we would like to have this peer added to
1215 * our routing table. Check whether bucket for this peer is full,
1216 * if so return negative; if not return positive. Since peers are
1217 * only added on CORE level connect, this doesn't actually add the
1218 * peer to the routing table.
1219 *
1220 * @param peer the peer we are considering adding
1221 *
1222 * @return GNUNET_YES if we want this peer, GNUNET_NO if not (bucket
1223 * already full)
1224 */
1225static int
1226consider_peer (struct GNUNET_PeerIdentity *peer)
1227{
1228 int bucket;
1229
1230 if ((GNUNET_YES ==
1231 GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
1232 &peer->hashPubKey)) ||
1233 (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))))
1234 return GNUNET_NO; /* We already know this peer (are connected even!) */
1235 bucket = find_current_bucket (&peer->hashPubKey);
1236
1237 if ((k_buckets[bucket].peers_size < bucket_size) ||
1238 ((bucket == lowest_bucket) && (lowest_bucket > 0)))
1239 return GNUNET_YES;
1240
1241 return GNUNET_NO;
1242}
1243
1244
1245/**
1246 * Task used to remove forwarding entries, either
1247 * after timeout, when full, or on shutdown.
1248 *
1249 * @param cls the entry to remove
1250 * @param tc context, reason, etc.
1251 */
1252static void
1253remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1254{
1255 struct DHTRouteSource *source_info = cls;
1256 struct DHTQueryRecord *record;
1257
1258 source_info = GNUNET_CONTAINER_heap_remove_node (source_info->hnode);
1259 record = source_info->record;
1260 GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
1261
1262 if (record->head == NULL) /* No more entries in DLL */
1263 {
1264 GNUNET_assert (GNUNET_YES ==
1265 GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
1266 &record->key, record));
1267 GNUNET_free (record);
1268 }
1269 if (source_info->find_peers_responded != NULL)
1270 GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
1271 GNUNET_free (source_info);
1272}
1273
1274/**
1275 * Main function that handles whether or not to route a result
1276 * message to other peers, or to send to our local client.
1277 *
1278 * @param msg the result message to be routed
1279 * @param msg_ctx context of the message we are routing
1280 *
1281 * @return the number of peers the message was routed to,
1282 * GNUNET_SYSERR on failure
1283 */
1284static int
1285route_result_message (struct GNUNET_MessageHeader *msg,
1286 struct DHT_MessageContext *msg_ctx)
1287{
1288 struct GNUNET_PeerIdentity new_peer;
1289 struct DHTQueryRecord *record;
1290 struct DHTRouteSource *pos;
1291 struct PeerInfo *peer_info;
1292 const struct GNUNET_MessageHeader *hello_msg;
1293
1294#if DEBUG_DHT > 1
1295 unsigned int i;
1296#endif
1297
1298 increment_stats (STAT_RESULTS);
1299 /**
1300 * If a find peer result message is received and contains a valid
1301 * HELLO for another peer, offer it to the transport service.
1302 */
1303 if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
1304 {
1305 if (ntohs (msg->size) <= sizeof (struct GNUNET_MessageHeader))
1306 GNUNET_break_op (0);
1307
1308 hello_msg = &msg[1];
1309 if ((ntohs (hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
1310 (GNUNET_SYSERR ==
1311 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello_msg,
1312 &new_peer)))
1313 {
1314 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1315 "%s:%s Received non-HELLO message type in find peer result message!\n",
1316 my_short_id, "DHT");
1317 GNUNET_break_op (0);
1318 return GNUNET_NO;
1319 }
1320 else /* We have a valid hello, and peer id stored in new_peer */
1321 {
1322 find_peer_context.count++;
1323 increment_stats (STAT_FIND_PEER_REPLY);
1324 if (GNUNET_YES == consider_peer (&new_peer))
1325 {
1326 increment_stats (STAT_HELLOS_PROVIDED);
1327 GNUNET_TRANSPORT_offer_hello (transport_handle, hello_msg, NULL, NULL);
1328 GNUNET_CORE_peer_request_connect (coreAPI, &new_peer, NULL, NULL);
1329 }
1330 }
1331 }
1332
1333 record =
1334 GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
1335
1336 if (record == NULL) /* No record of this message! */
1337 {
1338#if DEBUG_DHT
1339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340 "`%s:%s': Have no record of response key %s uid %llu\n",
1341 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1342 msg_ctx->unique_id);
1343#endif
1344 return 0;
1345 }
1346
1347 pos = record->head;
1348 while (pos != NULL)
1349 {
1350 if (0 == memcmp (&pos->source, &my_identity, sizeof (struct GNUNET_PeerIdentity))) /* Local client (or DHT) initiated request! */
1351 {
1352#if DEBUG_DHT
1353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1354 "`%s:%s': Sending response key %s uid %llu to client\n",
1355 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1356 msg_ctx->unique_id);
1357#endif
1358 increment_stats (STAT_RESULTS_TO_CLIENT);
1359 if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
1360 increment_stats (STAT_GET_REPLY);
1361#if DEBUG_DHT > 1
1362 for (i = 0; i < msg_ctx->path_history_len; i++)
1363 {
1364 char *path_offset;
1365
1366 path_offset =
1367 &msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)];
1368 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1369 "(before client) Key %s Found peer %d:%s\n",
1370 GNUNET_h2s (&msg_ctx->key), i,
1371 GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
1372 }
1373#endif
1374 send_reply_to_client (pos->client, msg, msg_ctx);
1375 }
1376 else /* Send to peer */
1377 {
1378 peer_info = find_peer_by_id (&pos->source);
1379 if (peer_info == NULL) /* Didn't find the peer in our routing table, perhaps peer disconnected! */
1380 {
1381 pos = pos->next;
1382 continue;
1383 }
1384#if DEBUG_DHT
1385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386 "`%s:%s': Forwarding response key %s uid %llu to peer %s\n",
1387 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1388 msg_ctx->unique_id, GNUNET_i2s (&peer_info->id));
1389#endif
1390 forward_result_message (msg, peer_info, msg_ctx);
1391 /* Try removing forward entries after sending once, only allows ONE response per request */
1392 if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
1393 {
1394 GNUNET_SCHEDULER_cancel (pos->delete_task);
1395 pos->delete_task =
1396 GNUNET_SCHEDULER_add_now (&remove_forward_entry, pos);
1397 }
1398 }
1399 pos = pos->next;
1400 }
1401 return 0;
1402}
1403
1404
1405/**
1406 * Iterator for local get request results,
1407 *
1408 * @param cls closure for iterator, a DatacacheGetContext
1409 * @param exp when does this value expire?
1410 * @param key the key this data is stored under
1411 * @param size the size of the data identified by key
1412 * @param data the actual data
1413 * @param type the type of the data
1414 *
1415 * @return GNUNET_OK to continue iteration, anything else
1416 * to stop iteration.
1417 */
1418static int
1419datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
1420 const GNUNET_HashCode * key, size_t size,
1421 const char *data, enum GNUNET_BLOCK_Type type)
1422{
1423 struct DHT_MessageContext *msg_ctx = cls;
1424 struct DHT_MessageContext new_msg_ctx;
1425 struct GNUNET_DHT_GetResultMessage *get_result;
1426 enum GNUNET_BLOCK_EvaluationResult eval;
1427 const struct DHTPutEntry *put_entry;
1428 int get_size;
1429 char *path_offset;
1430
1431#if DEBUG_DHT
1432 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1433 "`%s:%s': Received `%s' response from datacache\n", my_short_id,
1434 "DHT", "GET");
1435#endif
1436
1437 put_entry = (const struct DHTPutEntry *) data;
1438
1439 if (size !=
1440 sizeof (struct DHTPutEntry) + put_entry->data_size +
1441 (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)))
1442 {
1443 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1444 "Path + data size doesn't add up for data inserted into datacache!\nData size %d, path length %d, expected %d, got %d\n",
1445 put_entry->data_size, put_entry->path_length,
1446 sizeof (struct DHTPutEntry) + put_entry->data_size +
1447 (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)),
1448 size);
1449 msg_ctx->do_forward = GNUNET_NO;
1450 return GNUNET_OK;
1451 }
1452
1453 eval =
1454 GNUNET_BLOCK_evaluate (block_context, type, key, &msg_ctx->reply_bf,
1455 msg_ctx->reply_bf_mutator, msg_ctx->xquery,
1456 msg_ctx->xquery_size, &put_entry[1],
1457 put_entry->data_size);
1458
1459 switch (eval)
1460 {
1461 case GNUNET_BLOCK_EVALUATION_OK_LAST:
1462 msg_ctx->do_forward = GNUNET_NO;
1463 case GNUNET_BLOCK_EVALUATION_OK_MORE:
1464 memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
1465 if (GNUNET_DHT_RO_RECORD_ROUTE ==
1466 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1467 {
1468 new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
1469 }
1470
1471 get_size =
1472 sizeof (struct GNUNET_DHT_GetResultMessage) + put_entry->data_size +
1473 (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity));
1474 get_result = GNUNET_malloc (get_size);
1475 get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
1476 get_result->header.size = htons (get_size);
1477 get_result->expiration = GNUNET_TIME_absolute_hton (exp);
1478 get_result->type = htons (type);
1479 get_result->put_path_length = htons (put_entry->path_length);
1480 path_offset = (char *) &put_entry[1];
1481 path_offset += put_entry->data_size;
1482 /* Copy the actual data and the path_history to the end of the get result */
1483 memcpy (&get_result[1], &put_entry[1],
1484 put_entry->data_size +
1485 (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)));
1486 new_msg_ctx.peer = my_identity;
1487 new_msg_ctx.bloom = NULL;
1488 new_msg_ctx.hop_count = 0;
1489 new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
1490 new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
1491 increment_stats (STAT_GET_RESPONSE_START);
1492 route_result_message (&get_result->header, &new_msg_ctx);
1493 GNUNET_free (get_result);
1494 break;
1495 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1496#if DEBUG_DHT
1497 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Duplicate block error\n",
1498 my_short_id, "DHT");
1499#endif
1500 break;
1501 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1502#if DEBUG_DHT
1503 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "`%s:%s': Invalid request error\n",
1504 my_short_id, "DHT");
1505#endif
1506 break;
1507 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1508#if DEBUG_DHT
1509 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1510 "`%s:%s': Valid request, no results.\n", my_short_id, "DHT");
1511#endif
1512 GNUNET_break (0);
1513 break;
1514 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1515 GNUNET_break_op (0);
1516 msg_ctx->do_forward = GNUNET_NO;
1517 break;
1518 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1519#if DEBUG_DHT
1520 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1521 "`%s:%s': Unsupported block type (%u) in response!\n",
1522 my_short_id, "DHT", type);
1523#endif
1524 /* msg_ctx->do_forward = GNUNET_NO; // not sure... */
1525 break;
1526 }
1527 return GNUNET_OK;
1528}
1529
1530
1531/**
1532 * Main function that handles whether or not to route a message to other
1533 * peers.
1534 *
1535 * @param msg the message to be routed
1536 * @param msg_ctx the context containing all pertinent information about the message
1537 */
1538static void
1539route_message (const struct GNUNET_MessageHeader *msg,
1540 struct DHT_MessageContext *msg_ctx);
1541
1542
1543/**
1544 * Server handler for all dht get requests, look for data,
1545 * if found, send response either to clients or other peers.
1546 *
1547 * @param msg the actual get message
1548 * @param msg_ctx struct containing pertinent information about the get request
1549 *
1550 * @return number of items found for GET request
1551 */
1552static unsigned int
1553handle_dht_get (const struct GNUNET_MessageHeader *msg,
1554 struct DHT_MessageContext *msg_ctx)
1555{
1556 const struct GNUNET_DHT_GetMessage *get_msg;
1557 uint16_t msize;
1558 uint16_t bf_size;
1559 unsigned int results;
1560 const char *end;
1561 enum GNUNET_BLOCK_Type type;
1562
1563 msize = ntohs (msg->size);
1564 if (msize < sizeof (struct GNUNET_DHT_GetMessage))
1565 {
1566 GNUNET_break (0);
1567 return 0;
1568 }
1569 get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
1570 bf_size = ntohs (get_msg->bf_size);
1571 msg_ctx->xquery_size = ntohs (get_msg->xquery_size);
1572 msg_ctx->reply_bf_mutator = get_msg->bf_mutator;
1573 if (msize !=
1574 sizeof (struct GNUNET_DHT_GetMessage) + bf_size + msg_ctx->xquery_size)
1575 {
1576 GNUNET_break_op (0);
1577 return 0;
1578 }
1579 end = (const char *) &get_msg[1];
1580 if (msg_ctx->xquery_size == 0)
1581 {
1582 msg_ctx->xquery = NULL;
1583 }
1584 else
1585 {
1586 msg_ctx->xquery = (const void *) end;
1587 end += msg_ctx->xquery_size;
1588 }
1589 if (bf_size == 0)
1590 {
1591 msg_ctx->reply_bf = NULL;
1592 }
1593 else
1594 {
1595 msg_ctx->reply_bf =
1596 GNUNET_CONTAINER_bloomfilter_init (end, bf_size,
1597 GNUNET_DHT_GET_BLOOMFILTER_K);
1598 }
1599 type = (enum GNUNET_BLOCK_Type) ntohl (get_msg->type);
1600#if DEBUG_DHT
1601 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1602 "`%s:%s': Received `%s' request, message type %u, key %s, uid %llu\n",
1603 my_short_id, "DHT", "GET", type, GNUNET_h2s (&msg_ctx->key),
1604 msg_ctx->unique_id);
1605#endif
1606 increment_stats (STAT_GETS);
1607 results = 0;
1608 msg_ctx->do_forward = GNUNET_YES;
1609 if (datacache != NULL)
1610 results =
1611 GNUNET_DATACACHE_get (datacache, &msg_ctx->key, type,
1612 &datacache_get_iterator, msg_ctx);
1613#if DEBUG_DHT
1614 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1615 "`%s:%s': Found %d results for `%s' request uid %llu\n",
1616 my_short_id, "DHT", results, "GET", msg_ctx->unique_id);
1617#endif
1618 if (results >= 1)
1619 {
1620 }
1621 else
1622 {
1623 /* check query valid */
1624 if (GNUNET_BLOCK_EVALUATION_REQUEST_INVALID ==
1625 GNUNET_BLOCK_evaluate (block_context, type, &msg_ctx->key,
1626 &msg_ctx->reply_bf, msg_ctx->reply_bf_mutator,
1627 msg_ctx->xquery, msg_ctx->xquery_size, NULL, 0))
1628 {
1629 GNUNET_break_op (0);
1630 msg_ctx->do_forward = GNUNET_NO;
1631 }
1632 }
1633
1634 if (msg_ctx->do_forward == GNUNET_YES)
1635 route_message (msg, msg_ctx);
1636 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->reply_bf);
1637 return results;
1638}
1639
1640
1641static void
1642remove_recent_find_peer (void *cls,
1643 const struct GNUNET_SCHEDULER_TaskContext *tc)
1644{
1645 GNUNET_HashCode *key = cls;
1646
1647 GNUNET_assert (GNUNET_YES ==
1648 GNUNET_CONTAINER_multihashmap_remove
1649 (recent_find_peer_requests, key, NULL));
1650 GNUNET_free (key);
1651}
1652
1653
1654/**
1655 * Server handler for initiating local dht find peer requests
1656 *
1657 * @param find_msg the actual find peer message
1658 * @param msg_ctx struct containing pertinent information about the request
1659 *
1660 */
1661static void
1662handle_dht_find_peer (const struct GNUNET_MessageHeader *find_msg,
1663 struct DHT_MessageContext *msg_ctx)
1664{
1665 struct GNUNET_MessageHeader *find_peer_result;
1666 struct GNUNET_DHT_FindPeerMessage *find_peer_message;
1667 struct DHT_MessageContext *new_msg_ctx;
1668 struct GNUNET_CONTAINER_BloomFilter *incoming_bloom;
1669 size_t hello_size;
1670 size_t tsize;
1671 GNUNET_HashCode *recent_hash;
1672 struct GNUNET_MessageHeader *other_hello;
1673 size_t other_hello_size;
1674 struct GNUNET_PeerIdentity peer_id;
1675
1676 find_peer_message = (struct GNUNET_DHT_FindPeerMessage *) find_msg;
1677 GNUNET_break_op (ntohs (find_msg->size) >=
1678 (sizeof (struct GNUNET_DHT_FindPeerMessage)));
1679 if (ntohs (find_msg->size) < sizeof (struct GNUNET_DHT_FindPeerMessage))
1680 return;
1681 other_hello = NULL;
1682 other_hello_size = 0;
1683 if (ntohs (find_msg->size) > sizeof (struct GNUNET_DHT_FindPeerMessage))
1684 {
1685 other_hello_size =
1686 ntohs (find_msg->size) - sizeof (struct GNUNET_DHT_FindPeerMessage);
1687 other_hello = GNUNET_malloc (other_hello_size);
1688 memcpy (other_hello, &find_peer_message[1], other_hello_size);
1689 if ((GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) other_hello) == 0)
1690 || (GNUNET_OK !=
1691 GNUNET_HELLO_get_id ((struct GNUNET_HELLO_Message *) other_hello,
1692 &peer_id)))
1693 {
1694 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1695 "Received invalid HELLO message in find peer request!\n");
1696 GNUNET_free (other_hello);
1697 return;
1698 }
1699#if FIND_PEER_WITH_HELLO
1700 if (GNUNET_YES == consider_peer (&peer_id))
1701 {
1702 increment_stats (STAT_HELLOS_PROVIDED);
1703 GNUNET_TRANSPORT_offer_hello (transport_handle, other_hello, NULL, NULL);
1704 GNUNET_CORE_peer_request_connect (coreAPI, &peer_id, NULL, NULL);
1705 route_message (find_msg, msg_ctx);
1706 GNUNET_free (other_hello);
1707 return;
1708 }
1709 else /* We don't want this peer! */
1710 {
1711 route_message (find_msg, msg_ctx);
1712 GNUNET_free (other_hello);
1713 return;
1714 }
1715#endif
1716 }
1717
1718#if DEBUG_DHT
1719 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1720 "`%s:%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
1721 my_short_id, "DHT", "FIND PEER", GNUNET_h2s (&msg_ctx->key),
1722 ntohs (find_msg->size), sizeof (struct GNUNET_MessageHeader));
1723#endif
1724 if (my_hello == NULL)
1725 {
1726#if DEBUG_DHT
1727 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1728 "`%s': Our HELLO is null, can't return.\n", "DHT");
1729#endif
1730 GNUNET_free_non_null (other_hello);
1731 route_message (find_msg, msg_ctx);
1732 return;
1733 }
1734
1735 incoming_bloom =
1736 GNUNET_CONTAINER_bloomfilter_init (find_peer_message->bloomfilter,
1737 DHT_BLOOM_SIZE, DHT_BLOOM_K);
1738 if (GNUNET_YES ==
1739 GNUNET_CONTAINER_bloomfilter_test (incoming_bloom,
1740 &my_identity.hashPubKey))
1741 {
1742 increment_stats (STAT_BLOOM_FIND_PEER);
1743 GNUNET_CONTAINER_bloomfilter_free (incoming_bloom);
1744 GNUNET_free_non_null (other_hello);
1745 route_message (find_msg, msg_ctx);
1746 return; /* We match the bloomfilter, do not send a response to this peer (they likely already know us!) */
1747 }
1748 GNUNET_CONTAINER_bloomfilter_free (incoming_bloom);
1749
1750 /**
1751 * Ignore any find peer requests from a peer we have seen very recently.
1752 */
1753 if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (recent_find_peer_requests, &msg_ctx->key)) /* We have recently responded to a find peer request for this peer! */
1754 {
1755 increment_stats ("# dht find peer requests ignored (recently seen!)");
1756 GNUNET_free_non_null (other_hello);
1757 return;
1758 }
1759
1760 /**
1761 * Use this check to only allow the peer to respond to find peer requests if
1762 * it would be beneficial to have the requesting peer in this peers routing
1763 * table. Can be used to thwart peers flooding the network with find peer
1764 * requests that we don't care about. However, if a new peer is joining
1765 * the network and has no other peers this is a problem (assume all buckets
1766 * full, no one will respond!).
1767 */
1768 memcpy (&peer_id.hashPubKey, &msg_ctx->key, sizeof (GNUNET_HashCode));
1769 if (GNUNET_NO == consider_peer (&peer_id))
1770 {
1771 increment_stats ("# dht find peer requests ignored (do not need!)");
1772 GNUNET_free_non_null (other_hello);
1773 route_message (find_msg, msg_ctx);
1774 return;
1775 }
1776
1777 recent_hash = GNUNET_malloc (sizeof (GNUNET_HashCode));
1778 memcpy (recent_hash, &msg_ctx->key, sizeof (GNUNET_HashCode));
1779 if (GNUNET_SYSERR !=
1780 GNUNET_CONTAINER_multihashmap_put (recent_find_peer_requests,
1781 &msg_ctx->key, NULL,
1782 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
1783 {
1784#if DEBUG_DHT
1785 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1786 "Adding recent remove task for key `%s`!\n",
1787 GNUNET_h2s (&msg_ctx->key));
1788#endif
1789 /* Only add a task if there wasn't one for this key already! */
1790 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
1791 (GNUNET_TIME_UNIT_SECONDS, 30),
1792 &remove_recent_find_peer, recent_hash);
1793 }
1794 else
1795 {
1796 GNUNET_free (recent_hash);
1797#if DEBUG_DHT
1798 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1799 "Received duplicate find peer request too soon!\n");
1800#endif
1801 }
1802
1803 /* Simplistic find_peer functionality, always return our hello */
1804 hello_size = ntohs (my_hello->size);
1805 tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
1806
1807 if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1808 {
1809 GNUNET_break_op (0);
1810 GNUNET_free_non_null (other_hello);
1811 return;
1812 }
1813
1814 find_peer_result = GNUNET_malloc (tsize);
1815 find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
1816 find_peer_result->size = htons (tsize);
1817 memcpy (&find_peer_result[1], my_hello, hello_size);
1818#if DEBUG_DHT
1819 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1820 "`%s': Sending hello size %d to requesting peer.\n", "DHT",
1821 hello_size);
1822#endif
1823
1824 new_msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
1825 memcpy (new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
1826 new_msg_ctx->peer = my_identity;
1827 new_msg_ctx->bloom =
1828 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1829 new_msg_ctx->hop_count = 0;
1830 new_msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make find peer requests a higher priority */
1831 new_msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
1832 increment_stats (STAT_FIND_PEER_ANSWER);
1833 if (GNUNET_DHT_RO_RECORD_ROUTE ==
1834 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1835 {
1836 new_msg_ctx->msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
1837 new_msg_ctx->path_history_len = msg_ctx->path_history_len;
1838 /* Assign to previous msg_ctx path history, caller should free after our return */
1839 new_msg_ctx->path_history = msg_ctx->path_history;
1840 }
1841 route_result_message (find_peer_result, new_msg_ctx);
1842 GNUNET_free (new_msg_ctx);
1843 GNUNET_free_non_null (other_hello);
1844 GNUNET_free (find_peer_result);
1845 route_message (find_msg, msg_ctx);
1846}
1847
1848
1849/**
1850 * Server handler for initiating local dht put requests
1851 *
1852 * @param msg the actual put message
1853 * @param msg_ctx struct containing pertinent information about the request
1854 */
1855static void
1856handle_dht_put (const struct GNUNET_MessageHeader *msg,
1857 struct DHT_MessageContext *msg_ctx)
1858{
1859 const struct GNUNET_DHT_PutMessage *put_msg;
1860 struct DHTPutEntry *put_entry;
1861 unsigned int put_size;
1862 char *path_offset;
1863 enum GNUNET_BLOCK_Type put_type;
1864 size_t data_size;
1865 int ret;
1866 GNUNET_HashCode key;
1867 struct DHTQueryRecord *record;
1868
1869 GNUNET_assert (ntohs (msg->size) >= sizeof (struct GNUNET_DHT_PutMessage));
1870
1871 put_msg = (const struct GNUNET_DHT_PutMessage *) msg;
1872 put_type = (enum GNUNET_BLOCK_Type) ntohl (put_msg->type);
1873 data_size =
1874 ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
1875 ret =
1876 GNUNET_BLOCK_get_key (block_context, put_type, &put_msg[1], data_size,
1877 &key);
1878 if (GNUNET_NO == ret)
1879 {
1880 /* invalid reply */
1881 GNUNET_break_op (0);
1882 return;
1883 }
1884 if ((GNUNET_YES == ret) &&
1885 (0 != memcmp (&key, &msg_ctx->key, sizeof (GNUNET_HashCode))))
1886 {
1887 /* invalid wrapper: key mismatch! */
1888 GNUNET_break_op (0);
1889 return;
1890 }
1891 /* ret == GNUNET_SYSERR means that there is no known relationship between
1892 * data and the key, so we cannot check it */
1893#if DEBUG_DHT
1894 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1895 "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
1896 my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
1897 msg_ctx->unique_id);
1898#endif
1899
1900 record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap,
1901 &msg_ctx->key);
1902 if (NULL != record)
1903 {
1904 struct DHTRouteSource *pos;
1905 struct GNUNET_DHT_GetResultMessage *get_result;
1906 struct DHT_MessageContext new_msg_ctx;
1907 size_t get_size;
1908
1909 pos = record->head;
1910 while (pos != NULL)
1911 {
1912 /* TODO: do only for local started requests? or also for remote peers? */
1913 /* TODO: include this in statistics? under what? */
1914 /* TODO: reverse order of path_history? */
1915 if (NULL == pos->client)
1916 {
1917 pos = pos->next;
1918 continue;
1919 }
1920
1921 memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
1922 if (GNUNET_DHT_RO_RECORD_ROUTE ==
1923 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1924 {
1925 new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
1926 }
1927
1928 get_size =
1929 sizeof (struct GNUNET_DHT_GetResultMessage) + data_size +
1930 (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1931 get_result = GNUNET_malloc (get_size);
1932 get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
1933 get_result->header.size = htons (get_size);
1934 get_result->expiration = put_msg->expiration;
1935 get_result->type = put_msg->type;
1936 get_result->put_path_length = htons (msg_ctx->path_history_len);
1937
1938 /* Copy the actual data and the path_history to the end of the get result */
1939 memcpy (&get_result[1], &put_msg[1], data_size);
1940 path_offset = (char *) &get_result[1];
1941 path_offset += data_size;
1942 memcpy (path_offset, msg_ctx->path_history,
1943 msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1944 new_msg_ctx.peer = my_identity;
1945 new_msg_ctx.bloom = NULL;
1946 new_msg_ctx.hop_count = 0;
1947 /* Make result routing a higher priority */
1948 new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;
1949 new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
1950 new_msg_ctx.unique_id = pos->uid;
1951 send_reply_to_client(pos->client, &get_result->header, &new_msg_ctx);
1952 GNUNET_free (get_result);
1953 pos = pos->next;
1954 }
1955 }
1956
1957 if (msg_ctx->closest != GNUNET_YES)
1958 {
1959 route_message (msg, msg_ctx);
1960 return;
1961 }
1962
1963#if DEBUG_DHT
1964 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1965 "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
1966 my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
1967 msg_ctx->unique_id);
1968#endif
1969
1970 increment_stats (STAT_PUTS_INSERTED);
1971 if (datacache != NULL)
1972 {
1973 /* Put size is actual data size plus struct overhead plus path length (if any) */
1974 put_size =
1975 data_size + sizeof (struct DHTPutEntry) +
1976 (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1977 put_entry = GNUNET_malloc (put_size);
1978 put_entry->data_size = data_size;
1979 put_entry->path_length = msg_ctx->path_history_len;
1980 /* Copy data to end of put entry */
1981 memcpy (&put_entry[1], &put_msg[1], data_size);
1982 if (msg_ctx->path_history_len > 0)
1983 {
1984 /* Copy path after data */
1985 path_offset = (char *) &put_entry[1];
1986 path_offset += data_size;
1987 memcpy (path_offset, msg_ctx->path_history,
1988 msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1989 }
1990
1991 ret =
1992 GNUNET_DATACACHE_put (datacache, &msg_ctx->key, put_size,
1993 (const char *) put_entry, put_type,
1994 GNUNET_TIME_absolute_ntoh (put_msg->expiration));
1995 GNUNET_free (put_entry);
1996 }
1997 else
1998 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1999 "`%s:%s': %s request received, but have no datacache!\n",
2000 my_short_id, "DHT", "PUT");
2001
2002 route_message (msg, msg_ctx);
2003}
2004
2005
2006/**
2007 * To how many peers should we (on average)
2008 * forward the request to obtain the desired
2009 * target_replication count (on average).
2010 *
2011 * returns: target_replication / (est. hops) + (target_replication * hop_count)
2012 * where est. hops is typically 2 * the routing table depth
2013 *
2014 * @param hop_count number of hops the message has traversed
2015 * @param target_replication the number of total paths desired
2016 *
2017 * @return Some number of peers to forward the message to
2018 */
2019static unsigned int
2020get_forward_count (unsigned int hop_count, size_t target_replication)
2021{
2022 uint32_t random_value;
2023 unsigned int forward_count;
2024 float target_value;
2025
2026 if (hop_count > log_of_network_size_estimate * 4.0)
2027 {
2028 /* forcefully terminate */
2029 return 0;
2030 }
2031
2032 if (hop_count > log_of_network_size_estimate * 2.0)
2033 {
2034 /* keep forwarding, but no more replication */
2035 return 1;
2036 }
2037
2038 target_value =
2039 1 + (target_replication - 1.0) / (log_of_network_size_estimate +
2040 ((float) (target_replication - 1.0) *
2041 hop_count));
2042 /* Set forward count to floor of target_value */
2043 forward_count = (unsigned int) target_value;
2044 /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
2045 target_value = target_value - forward_count;
2046 random_value =
2047 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
2048 if (random_value < (target_value * UINT32_MAX))
2049 forward_count++;
2050 return forward_count;
2051}
2052
2053
2054/**
2055 * Check whether my identity is closer than any known peers.
2056 * If a non-null bloomfilter is given, check if this is the closest
2057 * peer that hasn't already been routed to.
2058 *
2059 * @param target hash code to check closeness to
2060 * @param bloom bloomfilter, exclude these entries from the decision
2061 * @return GNUNET_YES if node location is closest,
2062 * GNUNET_NO otherwise.
2063 */
2064static int
2065am_closest_peer (const GNUNET_HashCode * target,
2066 struct GNUNET_CONTAINER_BloomFilter *bloom)
2067{
2068 int bits;
2069 int other_bits;
2070 int bucket_num;
2071 int count;
2072 struct PeerInfo *pos;
2073 unsigned int my_distance;
2074
2075 if (0 == memcmp (&my_identity.hashPubKey, target, sizeof (GNUNET_HashCode)))
2076 return GNUNET_YES;
2077
2078 bucket_num = find_current_bucket (target);
2079
2080 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, target);
2081 my_distance = distance (&my_identity.hashPubKey, target);
2082 pos = k_buckets[bucket_num].head;
2083 count = 0;
2084 while ((pos != NULL) && (count < bucket_size))
2085 {
2086 if ((bloom != NULL) &&
2087 (GNUNET_YES ==
2088 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
2089 {
2090 pos = pos->next;
2091 continue; /* Skip already checked entries */
2092 }
2093
2094 other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, target);
2095 if (other_bits > bits)
2096 return GNUNET_NO;
2097 else if (other_bits == bits) /* We match the same number of bits */
2098 {
2099 if (distance (&pos->id.hashPubKey, target) < my_distance) /* Check all known peers, only return if we are the true closest */
2100 return GNUNET_NO;
2101 }
2102 pos = pos->next;
2103 }
2104
2105 /* No peers closer, we are the closest! */
2106 return GNUNET_YES;
2107}
2108
2109
2110/**
2111 * Select a peer from the routing table that would be a good routing
2112 * destination for sending a message for "target". The resulting peer
2113 * must not be in the set of blocked peers.<p>
2114 *
2115 * Note that we should not ALWAYS select the closest peer to the
2116 * target, peers further away from the target should be chosen with
2117 * exponentially declining probability.
2118 *
2119 * @param target the key we are selecting a peer to route to
2120 * @param bloom a bloomfilter containing entries this request has seen already
2121 * @param hops how many hops has this message traversed thus far
2122 *
2123 * @return Peer to route to, or NULL on error
2124 */
2125static struct PeerInfo *
2126select_peer (const GNUNET_HashCode * target,
2127 struct GNUNET_CONTAINER_BloomFilter *bloom, unsigned int hops)
2128{
2129 unsigned int bc;
2130 unsigned int count;
2131 unsigned int selected;
2132 struct PeerInfo *pos;
2133 unsigned int distance;
2134 unsigned int largest_distance;
2135 struct PeerInfo *chosen;
2136
2137 if (hops >= log_of_network_size_estimate)
2138 {
2139 /* greedy selection (closest peer that is not in bloomfilter) */
2140 largest_distance = 0;
2141 chosen = NULL;
2142 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
2143 {
2144 pos = k_buckets[bc].head;
2145 count = 0;
2146 while ((pos != NULL) && (count < bucket_size))
2147 {
2148 /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
2149 if (GNUNET_NO ==
2150 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
2151 {
2152 distance = inverse_distance (target, &pos->id.hashPubKey);
2153 if (distance > largest_distance)
2154 {
2155 chosen = pos;
2156 largest_distance = distance;
2157 }
2158 }
2159 count++;
2160 pos = pos->next;
2161 }
2162 }
2163 if ((largest_distance > 0) && (chosen != NULL))
2164 {
2165 GNUNET_CONTAINER_bloomfilter_add (bloom, &chosen->id.hashPubKey);
2166 return chosen;
2167 }
2168 return NULL; /* no peer available or we are the closest */
2169 }
2170
2171
2172 /* select "random" peer */
2173 /* count number of peers that are available and not filtered */
2174 count = 0;
2175 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
2176 {
2177 pos = k_buckets[bc].head;
2178 while ((pos != NULL) && (count < bucket_size))
2179 {
2180 if (GNUNET_YES ==
2181 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
2182 {
2183 pos = pos->next;
2184 increment_stats ("# peer blocked from selection by Bloom filter");
2185 continue; /* Ignore bloomfiltered peers */
2186 }
2187 count++;
2188 pos = pos->next;
2189 }
2190 }
2191 if (count == 0) /* No peers to select from! */
2192 {
2193 increment_stats ("# failed to select peer");
2194 return NULL;
2195 }
2196 /* Now actually choose a peer */
2197 selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
2198 count = 0;
2199 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
2200 {
2201 pos = k_buckets[bc].head;
2202 while ((pos != NULL) && (count < bucket_size))
2203 {
2204 if (GNUNET_YES ==
2205 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
2206 {
2207 pos = pos->next;
2208 continue; /* Ignore bloomfiltered peers */
2209 }
2210 if (0 == selected--)
2211 return pos;
2212 pos = pos->next;
2213 }
2214 }
2215 GNUNET_break (0);
2216 return NULL;
2217}
2218
2219
2220/**
2221 * Remember this routing request so that if a reply is
2222 * received we can either forward it to the correct peer
2223 * or return the result locally.
2224 *
2225 * @param msg_ctx Context of the route request
2226 *
2227 * @return GNUNET_YES if this response was cached, GNUNET_NO if not
2228 */
2229static int
2230cache_response (struct DHT_MessageContext *msg_ctx)
2231{
2232 struct DHTQueryRecord *record;
2233 struct DHTRouteSource *source_info;
2234 struct DHTRouteSource *pos;
2235 struct GNUNET_TIME_Absolute now;
2236 unsigned int current_size;
2237
2238 current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
2239
2240 while (current_size >= MAX_OUTSTANDING_FORWARDS)
2241 {
2242 source_info = GNUNET_CONTAINER_heap_remove_root (forward_list.minHeap);
2243 GNUNET_assert (source_info != NULL);
2244 record = source_info->record;
2245 GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
2246 if (record->head == NULL) /* No more entries in DLL */
2247 {
2248 GNUNET_assert (GNUNET_YES ==
2249 GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
2250 &record->key,
2251 record));
2252 GNUNET_free (record);
2253 }
2254 if (source_info->delete_task != GNUNET_SCHEDULER_NO_TASK)
2255 {
2256 GNUNET_SCHEDULER_cancel (source_info->delete_task);
2257 source_info->delete_task = GNUNET_SCHEDULER_NO_TASK;
2258 }
2259 if (source_info->find_peers_responded != NULL)
2260 GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
2261 GNUNET_free (source_info);
2262 current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
2263 }
2264
2265 /** Non-local request and have too many outstanding forwards, discard! */
2266 if ((current_size >= MAX_OUTSTANDING_FORWARDS) && (msg_ctx->client == NULL))
2267 return GNUNET_NO;
2268
2269 now = GNUNET_TIME_absolute_get ();
2270 record =
2271 GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
2272 if (record != NULL) /* Already know this request! */
2273 {
2274 pos = record->head;
2275 while (pos != NULL)
2276 {
2277 if (0 ==
2278 memcmp (&msg_ctx->peer, &pos->source,
2279 sizeof (struct GNUNET_PeerIdentity)))
2280 break; /* Already have this peer in reply list! */
2281 pos = pos->next;
2282 }
2283 if ((pos != NULL) && (pos->client == msg_ctx->client)) /* Seen this already */
2284 {
2285 GNUNET_CONTAINER_heap_update_cost (forward_list.minHeap, pos->hnode,
2286 now.abs_value);
2287 return GNUNET_NO;
2288 }
2289 }
2290 else
2291 {
2292 record = GNUNET_malloc (sizeof (struct DHTQueryRecord));
2293 GNUNET_assert (GNUNET_OK ==
2294 GNUNET_CONTAINER_multihashmap_put (forward_list.hashmap,
2295 &msg_ctx->key, record,
2296 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2297 memcpy (&record->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
2298 }
2299
2300 source_info = GNUNET_malloc (sizeof (struct DHTRouteSource));
2301 source_info->record = record;
2302 source_info->delete_task =
2303 GNUNET_SCHEDULER_add_delayed (DHT_FORWARD_TIMEOUT, &remove_forward_entry,
2304 source_info);
2305 source_info->find_peers_responded =
2306 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2307 source_info->source = msg_ctx->peer;
2308 GNUNET_CONTAINER_DLL_insert_after (record->head, record->tail, record->tail,
2309 source_info);
2310 if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */
2311 {
2312 source_info->client = msg_ctx->client;
2313 now = GNUNET_TIME_absolute_get_forever ();
2314 }
2315 source_info->hnode =
2316 GNUNET_CONTAINER_heap_insert (forward_list.minHeap, source_info,
2317 now.abs_value);
2318 source_info->uid = msg_ctx->unique_id;
2319#if DEBUG_DHT > 1
2320 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2321 "`%s:%s': Created new forward source info for %s uid %llu\n",
2322 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
2323 msg_ctx->unique_id);
2324#endif
2325 return GNUNET_YES;
2326}
2327
2328
2329/**
2330 * Main function that handles whether or not to route a message to other
2331 * peers.
2332 *
2333 * @param msg the message to be routed
2334 * @param msg_ctx the context containing all pertinent information about the message
2335 */
2336static void
2337route_message (const struct GNUNET_MessageHeader *msg,
2338 struct DHT_MessageContext *msg_ctx)
2339{
2340 int i;
2341 struct PeerInfo *selected;
2342 unsigned int target_forward_count;
2343 unsigned int forward_count;
2344 struct RecentRequest *recent_req;
2345 char *stat_forward_count;
2346 char *temp_stat_str;
2347
2348 increment_stats (STAT_ROUTES);
2349 target_forward_count =
2350 get_forward_count (msg_ctx->hop_count, msg_ctx->replication);
2351 GNUNET_asprintf (&stat_forward_count, "# forward counts of %d",
2352 target_forward_count);
2353 increment_stats (stat_forward_count);
2354 GNUNET_free (stat_forward_count);
2355 if (msg_ctx->bloom == NULL)
2356 msg_ctx->bloom =
2357 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2358
2359 if (GNUNET_CONTAINER_heap_get_size (recent_heap) >= DHT_MAX_RECENT)
2360 {
2361 recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
2362 GNUNET_assert (recent_req != NULL);
2363 GNUNET_SCHEDULER_cancel (recent_req->remove_task);
2364 GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
2365 GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom);
2366 GNUNET_free (recent_req);
2367 }
2368
2369 recent_req = GNUNET_malloc (sizeof (struct RecentRequest));
2370 recent_req->uid = msg_ctx->unique_id;
2371 memcpy (&recent_req->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
2372 recent_req->heap_node =
2373 GNUNET_CONTAINER_heap_insert (recent_heap, recent_req,
2374 GNUNET_TIME_absolute_get ().abs_value);
2375 recent_req->bloom =
2376 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2377
2378 forward_count = 0;
2379 for (i = 0; i < target_forward_count; i++)
2380 {
2381 selected = select_peer (&msg_ctx->key, msg_ctx->bloom, msg_ctx->hop_count);
2382 if (selected == NULL)
2383 break;
2384 forward_count++;
2385 if (GNUNET_CRYPTO_hash_matching_bits
2386 (&selected->id.hashPubKey,
2387 &msg_ctx->key) >=
2388 GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
2389 &msg_ctx->key))
2390 GNUNET_asprintf (&temp_stat_str,
2391 "# requests routed to close(r) peer hop %u",
2392 msg_ctx->hop_count);
2393 else
2394 GNUNET_asprintf (&temp_stat_str,
2395 "# requests routed to less close peer hop %u",
2396 msg_ctx->hop_count);
2397 if (temp_stat_str != NULL)
2398 {
2399 increment_stats (temp_stat_str);
2400 GNUNET_free (temp_stat_str);
2401 }
2402 GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom,
2403 &selected->id.hashPubKey);
2404 forward_message (msg, selected, msg_ctx);
2405 }
2406
2407 if (msg_ctx->bloom != NULL)
2408 {
2409 GNUNET_CONTAINER_bloomfilter_or2 (recent_req->bloom, msg_ctx->bloom,
2410 DHT_BLOOM_SIZE);
2411 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
2412 msg_ctx->bloom = NULL;
2413 }
2414}
2415
2416
2417/**
2418 * Main function that handles whether or not to route a message to other
2419 * peers.
2420 *
2421 * @param msg the message to be routed
2422 * @param msg_ctx the context containing all pertinent information about the message
2423 */
2424static void
2425demultiplex_message (const struct GNUNET_MessageHeader *msg,
2426 struct DHT_MessageContext *msg_ctx)
2427{
2428 /* FIXME: Should we use closest excluding those we won't route to (the bloomfilter problem)? */
2429 msg_ctx->closest = am_closest_peer (&msg_ctx->key, msg_ctx->bloom);
2430
2431 switch (ntohs (msg->type))
2432 {
2433 case GNUNET_MESSAGE_TYPE_DHT_GET: /* Add to hashmap of requests seen, search for data (always) */
2434 cache_response (msg_ctx);
2435 handle_dht_get (msg, msg_ctx);
2436 break;
2437 case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. */
2438 increment_stats (STAT_PUTS);
2439 handle_dht_put (msg, msg_ctx);
2440 break;
2441 case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest and not started by us, check options, add to requests seen */
2442 increment_stats (STAT_FIND_PEER);
2443 if (((msg_ctx->hop_count > 0) &&
2444 (0 !=
2445 memcmp (&msg_ctx->peer, &my_identity,
2446 sizeof (struct GNUNET_PeerIdentity)))) ||
2447 (msg_ctx->client != NULL))
2448 {
2449 cache_response (msg_ctx);
2450 if ((msg_ctx->closest == GNUNET_YES) ||
2451 (msg_ctx->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
2452 handle_dht_find_peer (msg, msg_ctx);
2453 }
2454 else
2455 route_message (msg, msg_ctx);
2456 break;
2457 default:
2458 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2459 "`%s': Message type (%d) not handled, forwarding anyway!\n",
2460 "DHT", ntohs (msg->type));
2461 route_message (msg, msg_ctx);
2462 }
2463}
2464
2465
2466/**
2467 * Iterator over hash map entries.
2468 *
2469 * @param cls closure
2470 * @param key current key code
2471 * @param value value in the hash map
2472 * @return GNUNET_YES if we should continue to
2473 * iterate,
2474 * GNUNET_NO if not.
2475 */
2476static int
2477add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
2478{
2479 struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
2480
2481 GNUNET_CONTAINER_bloomfilter_add (bloom, key);
2482 return GNUNET_YES;
2483}
2484
2485/**
2486 * Task to send a find peer message for our own peer identifier
2487 * so that we can find the closest peers in the network to ourselves
2488 * and attempt to connect to them.
2489 *
2490 * @param cls closure for this task
2491 * @param tc the context under which the task is running
2492 */
2493static void
2494send_find_peer_message (void *cls,
2495 const struct GNUNET_SCHEDULER_TaskContext *tc)
2496{
2497 struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
2498 struct DHT_MessageContext msg_ctx;
2499 struct GNUNET_TIME_Relative next_send_time;
2500 struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
2501
2502 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
2503 return;
2504
2505 if (newly_found_peers > bucket_size) /* If we are finding peers already, no need to send out our request right now! */
2506 {
2507 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2508 "Have %d newly found peers since last find peer message sent!\n",
2509 newly_found_peers);
2510 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
2511 &send_find_peer_message, NULL);
2512 newly_found_peers = 0;
2513 return;
2514 }
2515
2516 increment_stats (STAT_FIND_PEER_START);
2517#if FIND_PEER_WITH_HELLO
2518 find_peer_msg =
2519 GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) +
2520 GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *)
2521 my_hello));
2522 find_peer_msg->header.size =
2523 htons (sizeof (struct GNUNET_DHT_FindPeerMessage) +
2524 GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
2525 memcpy (&find_peer_msg[1], my_hello,
2526 GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
2527#else
2528 find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
2529 find_peer_msg->header.size =
2530 htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
2531#endif
2532 find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
2533 temp_bloom =
2534 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2535 GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
2536 temp_bloom);
2537 GNUNET_assert (GNUNET_OK ==
2538 GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
2539 find_peer_msg->
2540 bloomfilter,
2541 DHT_BLOOM_SIZE));
2542 GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
2543 memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
2544 memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
2545 msg_ctx.unique_id =
2546 GNUNET_ntohll (GNUNET_CRYPTO_random_u64
2547 (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
2548 msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
2549 msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
2550 msg_ctx.network_size = log_of_network_size_estimate;
2551 msg_ctx.peer = my_identity;
2552 msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
2553 msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
2554
2555 demultiplex_message (&find_peer_msg->header, &msg_ctx);
2556 GNUNET_free (find_peer_msg);
2557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2558 "`%s:%s': Sent `%s' request to some (?) peers\n", my_short_id,
2559 "DHT", "FIND PEER");
2560 if (newly_found_peers < bucket_size)
2561 {
2562 next_send_time.rel_value =
2563 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
2564 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
2565 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
2566 }
2567 else
2568 {
2569 next_send_time.rel_value =
2570 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
2571 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
2572 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value -
2573 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
2574 }
2575
2576 GNUNET_assert (next_send_time.rel_value != 0);
2577 find_peer_context.count = 0;
2578 newly_found_peers = 0;
2579 find_peer_context.start = GNUNET_TIME_absolute_get ();
2580 GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
2581 NULL);
2582}
2583
2584
2585/**
2586 * Core handler for p2p route requests.
2587 *
2588 * @param cls closure
2589 * @param message message
2590 * @param peer peer identity this notification is about
2591 * @param atsi performance data
2592 * @return GNUNET_OK to keep the connection open,
2593 * GNUNET_SYSERR to close it (signal serious error)
2594 */
2595static int
2596handle_dht_p2p_route_request (void *cls, const struct GNUNET_PeerIdentity *peer,
2597 const struct GNUNET_MessageHeader *message,
2598 const struct GNUNET_TRANSPORT_ATS_Information
2599 *atsi)
2600{
2601#if DEBUG_DHT
2602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2603 "`%s:%s': Received P2P request from peer %s\n", my_short_id,
2604 "DHT", GNUNET_i2s (peer));
2605#endif
2606 struct GNUNET_DHT_P2PRouteMessage *incoming =
2607 (struct GNUNET_DHT_P2PRouteMessage *) message;
2608 struct GNUNET_MessageHeader *enc_msg =
2609 (struct GNUNET_MessageHeader *) &incoming[1];
2610 struct DHT_MessageContext *msg_ctx;
2611 char *route_path;
2612 int path_size;
2613
2614 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
2615 {
2616 GNUNET_break_op (0);
2617 return GNUNET_YES;
2618 }
2619
2620 if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
2621 {
2622 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2623 "Sending of previous replies took too long, backing off!\n");
2624 increment_stats ("# route requests dropped due to high load");
2625 decrease_max_send_delay (get_max_send_delay ());
2626 return GNUNET_YES;
2627 }
2628 msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
2629 msg_ctx->bloom =
2630 GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
2631 DHT_BLOOM_K);
2632 GNUNET_assert (msg_ctx->bloom != NULL);
2633 msg_ctx->hop_count = ntohl (incoming->hop_count);
2634 memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
2635 msg_ctx->replication = ntohl (incoming->desired_replication_level);
2636 msg_ctx->msg_options = ntohl (incoming->options);
2637 if (GNUNET_DHT_RO_RECORD_ROUTE ==
2638 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
2639 {
2640 path_size =
2641 ntohl (incoming->outgoing_path_length) *
2642 sizeof (struct GNUNET_PeerIdentity);
2643 if (ntohs (message->size) !=
2644 (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
2645 path_size))
2646 {
2647 GNUNET_break_op (0);
2648 GNUNET_free (msg_ctx);
2649 return GNUNET_YES;
2650 }
2651 route_path = (char *) &incoming[1];
2652 route_path = route_path + ntohs (enc_msg->size);
2653 msg_ctx->path_history =
2654 GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
2655 memcpy (msg_ctx->path_history, route_path, path_size);
2656 memcpy (&msg_ctx->path_history[path_size], &my_identity,
2657 sizeof (struct GNUNET_PeerIdentity));
2658 msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
2659 }
2660 msg_ctx->network_size = ntohl (incoming->network_size);
2661 msg_ctx->peer = *peer;
2662 msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
2663 msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
2664 demultiplex_message (enc_msg, msg_ctx);
2665 if (msg_ctx->bloom != NULL)
2666 {
2667 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
2668 msg_ctx->bloom = NULL;
2669 }
2670 GNUNET_free (msg_ctx);
2671 return GNUNET_YES;
2672}
2673
2674
2675/**
2676 * Core handler for p2p route results.
2677 *
2678 * @param cls closure
2679 * @param message message
2680 * @param peer peer identity this notification is about
2681 * @param atsi performance data
2682 *
2683 */
2684static int
2685handle_dht_p2p_route_result (void *cls, const struct GNUNET_PeerIdentity *peer,
2686 const struct GNUNET_MessageHeader *message,
2687 const struct GNUNET_TRANSPORT_ATS_Information
2688 *atsi)
2689{
2690#if DEBUG_DHT
2691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2692 "`%s:%s': Received request from peer %s\n", my_short_id, "DHT",
2693 GNUNET_i2s (peer));
2694#endif
2695 const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
2696 (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
2697 struct GNUNET_MessageHeader *enc_msg =
2698 (struct GNUNET_MessageHeader *) &incoming[1];
2699 struct DHT_MessageContext msg_ctx;
2700
2701 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
2702 {
2703 GNUNET_break_op (0);
2704 return GNUNET_YES;
2705 }
2706
2707 memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
2708 memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
2709 msg_ctx.msg_options = ntohl (incoming->options);
2710 msg_ctx.hop_count = ntohl (incoming->hop_count);
2711 msg_ctx.peer = *peer;
2712 msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
2713 msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
2714 if ((GNUNET_DHT_RO_RECORD_ROUTE ==
2715 (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
2716 (ntohl (incoming->outgoing_path_length) > 0))
2717 {
2718 if (ntohs (message->size) -
2719 sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
2720 ntohs (enc_msg->size) !=
2721 ntohl (incoming->outgoing_path_length) *
2722 sizeof (struct GNUNET_PeerIdentity))
2723 {
2724#if DEBUG_DHT
2725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2726 "Return message indicated a path was included, but sizes are wrong: Total size %d, enc size %d, left %d, expected %d\n",
2727 ntohs (message->size), ntohs (enc_msg->size),
2728 ntohs (message->size) -
2729 sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
2730 ntohs (enc_msg->size),
2731 ntohl (incoming->outgoing_path_length) *
2732 sizeof (struct GNUNET_PeerIdentity));
2733#endif
2734 GNUNET_break_op (0);
2735 return GNUNET_NO;
2736 }
2737 msg_ctx.path_history = (char *) &incoming[1];
2738 msg_ctx.path_history += ntohs (enc_msg->size);
2739 msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
2740 }
2741 route_result_message (enc_msg, &msg_ctx);
2742 return GNUNET_YES;
2743}
2744
2745
2746/**
2747 * Receive the HELLO from transport service,
2748 * free current and replace if necessary.
2749 *
2750 * @param cls NULL
2751 * @param message HELLO message of peer
2752 */
2753static void
2754process_hello (void *cls, const struct GNUNET_MessageHeader *message)
2755{
2756#if DEBUG_DHT
2757 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2758 "Received our `%s' from transport service\n", "HELLO");
2759#endif
2760
2761 GNUNET_assert (message != NULL);
2762 GNUNET_free_non_null (my_hello);
2763 my_hello = GNUNET_malloc (ntohs (message->size));
2764 memcpy (my_hello, message, ntohs (message->size));
2765}
2766
2767
2768/**
2769 * Task run during shutdown.
2770 *
2771 * @param cls unused
2772 * @param tc unused
2773 */
2774static void
2775shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2776{
2777 int bucket_count;
2778 struct PeerInfo *pos;
2779
2780 if (NULL != ghh)
2781 {
2782 GNUNET_TRANSPORT_get_hello_cancel (ghh);
2783 ghh = NULL;
2784 }
2785 if (transport_handle != NULL)
2786 {
2787 GNUNET_free_non_null (my_hello);
2788 GNUNET_TRANSPORT_disconnect (transport_handle);
2789 transport_handle = NULL;
2790 }
2791 GDS_NSE_done ();
2792 if (coreAPI != NULL)
2793 {
2794#if DEBUG_DHT
2795 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Disconnecting core!\n",
2796 my_short_id, "DHT");
2797#endif
2798 GNUNET_CORE_disconnect (coreAPI);
2799 coreAPI = NULL;
2800 }
2801 for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
2802 {
2803 while (k_buckets[bucket_count].head != NULL)
2804 {
2805 pos = k_buckets[bucket_count].head;
2806#if DEBUG_DHT
2807 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2808 "%s:%s Removing peer %s from bucket %d!\n", my_short_id,
2809 "DHT", GNUNET_i2s (&pos->id), bucket_count);
2810#endif
2811 delete_peer (pos, bucket_count);
2812 }
2813 }
2814 if (datacache != NULL)
2815 {
2816#if DEBUG_DHT
2817 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Destroying datacache!\n",
2818 my_short_id, "DHT");
2819#endif
2820 GNUNET_DATACACHE_destroy (datacache);
2821 datacache = NULL;
2822 }
2823 if (stats != NULL)
2824 {
2825 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
2826 stats = NULL;
2827 }
2828 if (block_context != NULL)
2829 {
2830 GNUNET_BLOCK_context_destroy (block_context);
2831 block_context = NULL;
2832 }
2833 GNUNET_free_non_null (my_short_id);
2834 my_short_id = NULL;
2835}
2836
2837
2838/**
2839 * To be called on core init/fail.
2840 *
2841 * @param cls service closure
2842 * @param server handle to the server for this service
2843 * @param identity the public identity of this peer
2844 * @param publicKey the public key of this peer
2845 */
2846static void
2847core_init (void *cls, struct GNUNET_CORE_Handle *server,
2848 const struct GNUNET_PeerIdentity *identity,
2849 const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
2850{
2851
2852 if (server == NULL)
2853 {
2854#if DEBUG_DHT
2855 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Connection to core FAILED!\n",
2856 "dht", GNUNET_i2s (identity));
2857#endif
2858 GNUNET_SCHEDULER_cancel (cleanup_task);
2859 GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
2860 return;
2861 }
2862#if DEBUG_DHT
2863 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2864 "%s: Core connection initialized, I am peer: %s\n", "dht",
2865 GNUNET_i2s (identity));
2866#endif
2867
2868 /* Copy our identity so we can use it */
2869 memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
2870 if (my_short_id != NULL)
2871 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2872 "%s Receive CORE INIT message but have already been initialized! Did CORE fail?\n",
2873 "DHT SERVICE");
2874 my_short_id = GNUNET_strdup (GNUNET_i2s (&my_identity));
2875}
2876
2877
2878static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2879 {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0},
2880 {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0},
2881 {NULL, 0, 0}
2882};
2883
2884
2885
2886
2887/**
2888 * Process dht requests.
2889 *
2890 * @param cls closure
2891 * @param server the initialized server
2892 * @param c configuration to use
2893 */
2894static void
2895run (void *cls, struct GNUNET_SERVER_Handle *server,
2896 const struct GNUNET_CONFIGURATION_Handle *c)
2897{
2898 struct GNUNET_TIME_Relative next_send_time;
2899 unsigned long long temp_config_num;
2900
2901 cfg = c;
2902 datacache = GNUNET_DATACACHE_create (cfg, "dhtcache");
2903 coreAPI = GNUNET_CORE_connect (cfg, /* Main configuration */
2904 DEFAULT_CORE_QUEUE_SIZE, /* queue size */
2905 NULL, /* Closure passed to DHT functions */
2906 &core_init, /* Call core_init once connected */
2907 &handle_core_connect, /* Handle connects */
2908 &handle_core_disconnect, /* remove peers on disconnects */
2909 NULL, /* Do we care about "status" updates? */
2910 NULL, /* Don't want notified about all incoming messages */
2911 GNUNET_NO, /* For header only inbound notification */
2912 NULL, /* Don't want notified about all outbound messages */
2913 GNUNET_NO, /* For header only outbound notification */
2914 core_handlers); /* Register these handlers */
2915
2916 if (coreAPI == NULL)
2917 return;
2918 transport_handle =
2919 GNUNET_TRANSPORT_connect (cfg, NULL, NULL, NULL, NULL, NULL);
2920 if (transport_handle != NULL)
2921 ghh = GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
2922 else
2923 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2924 "Failed to connect to transport service!\n");
2925 block_context = GNUNET_BLOCK_context_create (cfg);
2926 lowest_bucket = MAX_BUCKETS - 1;
2927 all_known_peers = GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
2928 GNUNET_assert (all_known_peers != NULL);
2929
2930 if (GNUNET_OK ==
2931 GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
2932 &temp_config_num))
2933 {
2934 bucket_size = (unsigned int) temp_config_num;
2935 }
2936
2937 stats = GNUNET_STATISTICS_create ("dht", cfg);
2938 next_send_time.rel_value =
2939 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
2940 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
2941 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
2942 2) -
2943 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
2944 find_peer_context.start = GNUNET_TIME_absolute_get ();
2945 GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
2946 &find_peer_context);
2947
2948 /* Scheduled the task to clean up when shutdown is called */
2949 cleanup_task =
2950 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2951 &shutdown_task, NULL);
2952}
2953
2954
2955/**
2956 * The main function for the dht service.
2957 *
2958 * @param argc number of arguments from the command line
2959 * @param argv command line arguments
2960 * @return 0 ok, 1 on error
2961 */
2962int
2963main (int argc, char *const *argv)
2964{
2965 int ret;
2966 struct RecentRequest *recent_req;
2967
2968 recent_heap =
2969 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2970 recent_find_peer_requests =
2971 GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
2972 ret =
2973 (GNUNET_OK ==
2974 GNUNET_SERVICE_run (argc, argv, "dht", GNUNET_SERVICE_OPTION_NONE, &run,
2975 NULL)) ? 0 : 1;
2976 while (GNUNET_CONTAINER_heap_get_size (recent_heap) > 0)
2977 {
2978 recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
2979 GNUNET_assert (recent_req != NULL);
2980 GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
2981 GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom);
2982 GNUNET_free (recent_req);
2983 }
2984 GNUNET_CONTAINER_heap_destroy (recent_heap);
2985 recent_heap = NULL;
2986 GNUNET_CONTAINER_multihashmap_destroy (recent_find_peer_requests);
2987 recent_find_peer_requests = NULL;
2988 return ret;
2989}
2990
2991/* end of gnunet-service-dht.c */
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
new file mode 100644
index 000000000..75506534b
--- /dev/null
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -0,0 +1,876 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht_clients.c
23 * @brief GNUnet DHT service's client management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27
28#include "platform.h"
29#include "gnunet_block_lib.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_protocols.h"
32#include "gnunet_nse_service.h"
33#include "gnunet_core_service.h"
34#include "gnunet_datacache_lib.h"
35#include "gnunet_transport_service.h"
36#include "gnunet_hello_lib.h"
37#include "gnunet_dht_service.h"
38#include "gnunet_statistics_service.h"
39#include "dht_new.h"
40#include <fenv.h>
41#include "gnunet-service-dht_clients.h"
42#include "gnunet-service-dht_neighbours.h"
43
44
45/**
46 * Linked list of messages to send to clients.
47 */
48struct PendingMessage
49{
50 /**
51 * Pointer to next item in the list
52 */
53 struct PendingMessage *next;
54
55 /**
56 * Pointer to previous item in the list
57 */
58 struct PendingMessage *prev;
59
60 /**
61 * Actual message to be sent, allocated at the end of the struct:
62 * // msg = (cast) &pm[1];
63 * // memcpy (&pm[1], data, len);
64 */
65 const struct GNUNET_MessageHeader *msg;
66
67};
68
69
70/**
71 * Struct containing information about a client,
72 * handle to connect to it, and any pending messages
73 * that need to be sent to it.
74 */
75struct ClientList
76{
77 /**
78 * Linked list of active clients
79 */
80 struct ClientList *next;
81
82 /**
83 * Linked list of active clients
84 */
85 struct ClientList *prev;
86
87 /**
88 * The handle to this client
89 */
90 struct GNUNET_SERVER_Client *client_handle;
91
92 /**
93 * Handle to the current transmission request, NULL
94 * if none pending.
95 */
96 struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
97
98 /**
99 * Linked list of pending messages for this client
100 */
101 struct PendingMessage *pending_head;
102
103 /**
104 * Tail of linked list of pending messages for this client
105 */
106 struct PendingMessage *pending_tail;
107
108};
109
110
111/**
112 * Entry in the DHT routing table for a client's GET request.
113 */
114struct ClientQueryRecord
115{
116
117 /**
118 * The key this request was about
119 */
120 GNUNET_HashCode key;
121
122 /**
123 * Client responsible for the request.
124 */
125 struct ClientList *client;
126
127 /**
128 * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
129 */
130 const void *xquery;
131
132 /**
133 * Replies we have already seen for this request.
134 */
135 GNUNET_HashCode *seen_replies;
136
137 /**
138 * Pointer to this nodes heap location in the retry-heap (for fast removal)
139 */
140 struct GNUNET_CONTAINER_HeapNode *hnode;
141
142 /**
143 * What's the delay between re-try operations that we currently use for this
144 * request?
145 */
146 struct GNUNET_TIME_Relative retry_frequency;
147
148 /**
149 * What's the next time we should re-try this request?
150 */
151 struct GNUNET_TIME_Absolute retry_time;
152
153 /**
154 * The unique identifier of this request
155 */
156 uint64_t unique_id;
157
158 /**
159 * Number of bytes in xquery.
160 */
161 size_t xquery_size;
162
163 /**
164 * Number of entries in 'seen_replies'.
165 */
166 unsigned int seen_replies_count;
167
168 /**
169 * Desired replication level
170 */
171 uint32_t replication;
172
173 /**
174 * Any message options for this request
175 */
176 uint32_t msg_options;
177
178 /**
179 * The type for the data for the GET request; actually an 'enum
180 * GNUNET_BLOCK_Type'.
181 */
182 uint32_t msg_type;
183
184};
185
186
187/**
188 * List of active clients.
189 */
190static struct ClientList *client_head;
191
192/**
193 * List of active clients.
194 */
195static struct ClientList *client_tail;
196
197/**
198 * Hashmap for fast key based lookup, maps keys to 'struct ClientQueryRecord' entries.
199 */
200static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
201
202/**
203 * Heap with all of our client's request, sorted by retry time (earliest on top).
204 */
205static struct GNUNET_CONTAINER_Heap *retry_heap;
206
207/**
208 * Task that re-transmits requests (using retry_heap).
209 */
210static GNUNET_SCHEDULER_TaskIdentifier retry_task;
211
212
213/**
214 * Find a client if it exists, add it otherwise.
215 *
216 * @param client the server handle to the client
217 *
218 * @return the client if found, a new client otherwise
219 */
220static struct ClientList *
221find_active_client (struct GNUNET_SERVER_Client *client)
222{
223 struct ClientList *pos = client_list;
224 struct ClientList *ret;
225
226 while (pos != NULL)
227 {
228 if (pos->client_handle == client)
229 return pos;
230 pos = pos->next;
231 }
232 ret = GNUNET_malloc (sizeof (struct ClientList));
233 ret->client_handle = client;
234 GNUNET_CONTAINER_DLL_insert (client_head,
235 client_tail,
236 ret);
237 return ret;
238}
239
240
241/**
242 * Iterator over hash map entries that frees all entries
243 * associated with the given client.
244 *
245 * @param cls client to search for in source routes
246 * @param key current key code (ignored)
247 * @param value value in the hash map, a ClientQueryRecord
248 * @return GNUNET_YES (we should continue to iterate)
249 */
250static int
251remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
252{
253 struct ClientList *client = cls;
254 struct ClientQueryRecord *record = value;
255
256 if (record->client != client)
257 return GNUNET_YES;
258 GNUNET_assert (GNUNET_YES ==
259 GNUNET_CONTAINER_multihashmap_remove (forward_map,
260 key, record));
261 GNUNET_CONTAINER_heap_remove_node (record->hnode);
262 GNUNET_ARRAY_append (record->seen_replies,
263 record->seen_replies_count,
264 0);
265 GNUNET_free (record);
266 return GNUNET_YES;
267}
268
269
270/**
271 * Functions with this signature are called whenever a client
272 * is disconnected on the network level.
273 *
274 * @param cls closure (NULL for dht)
275 * @param client identification of the client; NULL
276 * for the last call when the server is destroyed
277 */
278static void
279handle_client_disconnect (void *cls,
280 struct GNUNET_SERVER_Client *client)
281{
282 struct ClientList *pos = client_list;
283 struct ClientList *found;
284 struct PendingMessage *reply;
285
286 found = NULL;
287 while (pos != NULL)
288 {
289 if (pos->client_handle == client)
290 {
291 GNUNET_CONTAINER_DLL_remove (client_head,
292 client_tail,
293 pos);
294 found = pos;
295 break;
296 }
297 pos = pos->next;
298 }
299 if (found == NULL)
300 return;
301 if (found->transmit_handle != NULL)
302 GNUNET_CONNECTION_notify_transmit_ready_cancel (found->transmit_handle);
303 while (NULL != (reply = found->pending_head))
304 {
305 GNUNET_CONTAINER_DLL_remove (found->pending_head, found->pending_tail,
306 reply);
307 GNUNET_free (reply);
308 }
309 GNUNET_CONTAINER_multihashmap_iterate (forward_list.hashmap,
310 &remove_client_records, found);
311 GNUNET_free (found);
312}
313
314
315/**
316 * Route the given request via the DHT. This includes updating
317 * the bloom filter and retransmission times, building the P2P
318 * message and initiating the routing operation.
319 */
320static void
321transmit_request (struct ClientQueryRecord *cqr)
322{
323 int32_t reply_bf_mutator;
324
325 reply_bf_mutator = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
326 UINT32_MAX);
327 reply_bf = GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator,
328 cqr->seen_replies,
329 cqr->seen_replies_count);
330 GST_NEIGHBOURS_handle_get (cqr->msg_type,
331 cqr->msg_options,
332 cqr->replication,
333 &cqr->key,
334 cqr->xquery,
335 cqr->xquery_size,
336 reply_bf,
337 reply_bf_mutator,
338 NULL /* no peers blocked initially */);
339 GNUNET_CONTAINER_bloomfilter_destroy (reply_bf);
340
341 /* exponential back-off for retries, max 1h */
342 cqr->retry_frequency =
343 GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_HOURS,
344 GNUNET_TIME_relative_multiply (cqr->retry_frequency, 2));
345 cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
346}
347
348
349/**
350 * Task that looks at the 'retry_heap' and transmits all of the requests
351 * on the heap that are ready for transmission. Then re-schedules
352 * itself (unless the heap is empty).
353 *
354 * @param cls unused
355 * @param tc scheduler context
356 */
357static void
358transmit_next_request_task (void *cls,
359 const struct GNUNET_SCHEDULER_TaskContext *tc)
360{
361 struct ClientQueryRecord *cqr;
362 struct GNUNET_TIME_Relative delay;
363
364 retry_task = GNUNET_SCHEDULER_NO_TASK;
365 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
366 return;
367 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
368 {
369 cqr->hnode = NULL;
370 delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
371 if (delay.value > 0)
372 {
373 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
374 cqr->retry_time.abs_value);
375 retry_task = GNUNET_SCHEDULER_add_delayed (delay,
376 &transmit_next_request_task,
377 NULL);
378 return;
379 }
380 transmit_request (cqr);
381 }
382}
383
384
385/**
386 * Handler for PUT messages.
387 *
388 * @param cls closure for the service
389 * @param client the client we received this message from
390 * @param message the actual message received
391 */
392static void
393handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
394 const struct GNUNET_MessageHeader *message)
395{
396 const struct GNUNET_DHT_ClientPutMessage *dht_msg;
397 uint16_t size;
398
399 size = ntohs (message->size);
400 if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
401 {
402 GNUNET_break (0);
403 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
404 }
405 dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
406 GST_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
407 ntohl (dht_msg->options),
408 ntohl (dht_msg->desired_replication_level),
409 GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
410 &dht_msg->key,
411 &dht_msg[1],
412 size - sizeof (struct GNUNET_DHT_ClientPutMessage));
413 GNUNET_SERVER_receive_done (client, GNUNET_OK);
414}
415
416
417/**
418 * Handler for any generic DHT messages, calls the appropriate handler
419 * depending on message type, sends confirmation if responses aren't otherwise
420 * expected.
421 *
422 * @param cls closure for the service
423 * @param client the client we received this message from
424 * @param message the actual message received
425 */
426static void
427handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
428 const struct GNUNET_MessageHeader *message)
429{
430 const struct GNUNET_DHT_ClientGetMessage *get;
431 const struct GNUNET_MessageHeader *enc_msg;
432
433 struct ClientQueryRecord *cqr;
434 size_t xquery_size;
435 const char* xquery;
436 uint16_t size;
437
438 size = ntohs (message->size);
439 if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
440 {
441 GNUNET_break (0);
442 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
443 return;
444 }
445 xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
446 get = (const struct GNUNET_DHT_ClientGetMessage *) message;
447 xquery = (const char*) &get[1];
448
449 cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
450 cqr->key = get->key;
451 cqr->client = find_active_client (client);
452 cqr->xquery = (void*) &cqr[1];
453 memcpy (&cqr[1], xquery, xquery_size);
454 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0);
455 cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS;
456 cqr->retry_time = GNUNET_TIME_absolute_get ();
457 cqr->unique_id = get->unique_id;
458 cqr->xquery_size = xquery_size;
459 cqr->replication = ntohl (get->desired_replication_level);
460 cqr->msg_options = ntohl (get->options);
461 cqr->msg_type = ntohl (get->type);
462 GNUNET_CONTAINER_multihashmap_put (forward_map, KEY, cqr,
463 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
464 if (GNUNET_SCHEDULER_NO_TASK != retry_task)
465 GNUNET_SCHEDULER_cancel (retry_task);
466 retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
467 GNUNET_SERVER_receive_done (client, GNUNET_OK);
468}
469
470
471/**
472 * Closure for 'remove_by_uid'.
473 */
474struct RemoveByUidContext
475{
476 /**
477 * Client that issued the removal request.
478 */
479 struct ClientList *client;
480
481 /**
482 * Unique ID of the request.
483 */
484 uint64_t uid;
485};
486
487
488/**
489 * Iterator over hash map entries that frees all entries
490 * that match the given client and UID.
491 *
492 * @param cls UID and client to search for in source routes
493 * @param key current key code
494 * @param value value in the hash map, a ClientQueryRecord
495 * @return GNUNET_YES (we should continue to iterate)
496 */
497static int
498remove_by_uid (void *cls, const GNUNET_HashCode * key, void *value)
499{
500 const struct RemoveByUidContext *ctx = cls;
501 struct ClientQueryRecord *record = value;
502
503 if (record->uid != ctx->uid)
504 return GNUNET_YES;
505 return remove_client_records (ctx->client, key, record);
506}
507
508
509/**
510 * Handler for any generic DHT stop messages, calls the appropriate handler
511 * depending on message type (if processed locally)
512 *
513 * @param cls closure for the service
514 * @param client the client we received this message from
515 * @param message the actual message received
516 *
517 */
518static void
519handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
520 const struct GNUNET_MessageHeader *message)
521{
522 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
523 (const struct GNUNET_DHT_ClientGetStopMessage *) message;
524
525 ctx.client = find_active_client (client);
526 ctx.uid = &dht_stop_msg.unique_id);
527 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
528 &dht_stop_msg->key,
529 &remove_by_uid,
530 &ctx);
531 GNUNET_SERVER_receive_done (client, GNUNET_OK);
532}
533
534
535/**
536 * Task run to check for messages that need to be sent to a client.
537 *
538 * @param client a ClientList, containing the client and any messages to be sent to it
539 */
540static void
541process_pending_messages (struct ClientList *client);
542
543
544/**
545 * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
546 * request. A ClientList is passed as closure, take the head of the list
547 * and copy it into buf, which has the result of sending the message to the
548 * client.
549 *
550 * @param cls closure to this call
551 * @param size maximum number of bytes available to send
552 * @param buf where to copy the actual message to
553 *
554 * @return the number of bytes actually copied, 0 indicates failure
555 */
556static size_t
557send_reply_to_client (void *cls, size_t size, void *buf)
558{
559 struct ClientList *client = cls;
560 char *cbuf = buf;
561 struct PendingMessage *reply;
562 size_t off;
563 size_t msize;
564
565 client->transmit_handle = NULL;
566 if (buf == NULL)
567 {
568 /* client disconnected */
569 return 0;
570 }
571 off = 0;
572 while ((NULL != (reply = client->pending_head)) &&
573 (size >= off + (msize = ntohs (reply->msg->size))))
574 {
575 GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
576 reply);
577 memcpy (&cbuf[off], reply->msg, msize);
578 GNUNET_free (reply);
579 off += msize;
580 }
581 process_pending_messages (client);
582 return off;
583}
584
585
586/**
587 * Task run to check for messages that need to be sent to a client.
588 *
589 * @param client a ClientList, containing the client and any messages to be sent to it
590 */
591static void
592process_pending_messages (struct ClientList *client)
593{
594 if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
595 return;
596 client->transmit_handle =
597 GNUNET_SERVER_notify_transmit_ready (client->client_handle,
598 ntohs (client->pending_head->
599 msg->size),
600 GNUNET_TIME_UNIT_FOREVER_REL,
601 &send_reply_to_client, client);
602}
603
604
605/**
606 * Add a PendingMessage to the clients list of messages to be sent
607 *
608 * @param client the active client to send the message to
609 * @param pending_message the actual message to send
610 */
611static void
612add_pending_message (struct ClientList *client,
613 struct PendingMessage *pending_message)
614{
615 GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
616 pending_message);
617 process_pending_messages (client);
618}
619
620
621/**
622 * Closure for 'forward_reply'
623 */
624struct ForwardReplyContext
625{
626
627 /**
628 * Actual message to send to matching clients.
629 */
630 struct PendingMessage *pm;
631
632 /**
633 * Embedded payload.
634 */
635 const void *data;
636
637 /**
638 * Type of the data.
639 */
640 uint32_t type;
641
642 /**
643 * Number of bytes in data.
644 */
645 size_t data_size;
646
647 /**
648 * Do we need to copy 'pm' because it was already used?
649 */
650 int do_copy;
651
652};
653
654
655/**
656 * Iterator over hash map entries that send a given reply to
657 * each of the matching clients. With some tricky recycling
658 * of the buffer.
659 *
660 * @param cls the 'struct ForwardReplyContext'
661 * @param key current key
662 * @param value value in the hash map, a ClientQueryRecord
663 * @return GNUNET_YES (we should continue to iterate),
664 * if the result is mal-formed, GNUNET_NO
665 */
666static int
667forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
668{
669 struct ForwardReplyContext *frc = cls;
670 struct ClientQueryRecord *record = value;
671 struct PendingMessage *pm;
672 struct ReplyMessage *reply;
673 enum GNUNET_BLOCK_EvaluationResult eval;
674 int do_free;
675 GNUNET_HashCode ch;
676 unsigned int i;
677
678 if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
679 (record->type != frc->type) )
680 return GNUNET_YES; /* type mismatch */
681 GNUNET_CRYPTO_hash (frc->data,
682 frc->data_size,
683 &ch);
684 for (i=0;i<record->seen_replies_count;i++)
685 if (0 == memcmp (&record->seen_replies[i],
686 &ch,
687 sizeof (GNUNET_HashCode)))
688 return GNUNET_YES; /* duplicate */
689 eval =
690 GNUNET_BLOCK_evaluate (GDS_block_context,
691 record->type, key,
692 NULL, 0,
693 record->xquery,
694 record->xquery_size,
695 frc->data,
696 frc->data_size);
697 switch (eval)
698 {
699 case GNUNET_BLOCK_EVALUATION_OK_LAST:
700 do_free = GNUNET_YES;
701 break;
702 case GNUNET_BLOCK_EVALUATION_OK_MORE:
703 GNUNET_ARRAY_append (record->seen_replies,
704 record->seen_replies_count,
705 ch);
706 do_free = GNUNET_NO;
707 break;
708 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
709 /* should be impossible to encounter here */
710 GNUNET_break (0);
711 return GNUNET_YES;
712 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
713 GNUNET_break_op (0);
714 return GNUNET_NO;
715 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
716 GNUNET_break (0);
717 return GNUNET_NO;
718 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
719 GNUNET_break (0);
720 return GNUNET_NO;
721 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
722 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
723 "Unsupported block type (%u) in request!\n",
724 record->type);
725 return GNUNET_NO;
726 }
727 if (GNUNET_NO == frc->do_copy)
728 {
729 /* first time, we can use the original data */
730 pm = frc->pm;
731 frc->do_copy = GNUNET_YES;
732 }
733 else
734 {
735 /* two clients waiting for same reply, must copy for queueing */
736 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
737 ntohs (frc->pm->msg->size));
738 memcpy (pm, frc->pm,
739 sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
740 pm->next = pm->prev = NULL;
741 }
742 reply = (struct ReplyMessage*) &pm[1];
743 reply->unique_id = record->unique_id;
744 add_pending_message (record->client, pm);
745 if (GNUNET_YES == do_free)
746 remove_client_records (record->client, key, record);
747 return GNUNET_YES;
748}
749
750
751/**
752 * Handle a reply we've received from another peer. If the reply
753 * matches any of our pending queries, forward it to the respective
754 * client(s).
755 *
756 * @param expiration when will the reply expire
757 * @param key the query this reply is for
758 * @param get_path_length number of peers in 'get_path'
759 * @param get_path path the reply took on get
760 * @param put_path_length number of peers in 'put_path'
761 * @param put_path path the reply took on put
762 * @param type type of the reply
763 * @param data_size number of bytes in 'data'
764 * @param data application payload data
765 */
766void
767GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
768 const GNUNET_HashCode *key,
769 unsigned int get_path_length,
770 const struct GNUNET_PeerIdentity *get_path,
771 unsigned int put_path_length,
772 const struct GNUNET_PeerIdentity *put_path,
773 uint32_t type,
774 size_t data_size,
775 const void *data)
776{
777 struct ForwardReplyContext frc;
778 struct PendingMessage *pm;
779 struct ReplyMessage *reply;
780 struct GNUNET_PeerIdentity *paths;
781 size_t msize;
782
783 if (NULL ==
784 GNUNET_CONTAINER_multihashmap_get (foward_map, key))
785 return; /* no matching request, fast exit! */
786 msize = sizeof(struct ReplyMessage) + data_size +
787 (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
788 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
789 {
790 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
791 _("Could not pass reply to client, message too big!\n"));
792 return;
793 }
794 pm = (struct PendingMessage *) GNUNET_malloc (msize + sizeof (struct PendingMessage));
795 reply = (struct ReplyMessage*) &pm[1];
796 pm->msg = &reply->header;
797 reply->header.size = htons ((uint16_t) msize);
798 reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
799 reply->type = htonl (type);
800 reply->get_path_length = htonl (get_path_length);
801 reply->put_path_length = htonl (put_path_length);
802 reply->unique_id = 0; /* filled in later */
803 reply->expiration = GNUNET_TIME_absolute_hton (expiration);
804 reply->key = *key;
805 paths = (struct GNUNET_PeerIdentity*) &reply[1];
806 mempcy (paths, get_path,
807 sizeof (struct GNUNET_PeerIdentity) * get_path_length);
808 mempcy (&paths[get_path_length],
809 put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
810 memcpy (&paths[get_path_length + put_path_length],
811 data,
812 data_size);
813 frc.do_copy = GNUNET_NO;
814 frc.pm = pm;
815 frc.data = data;
816 frc.data_size = data_size;
817 frc.type = type;
818 GNUNET_CONTAINER_multihashmap_get_multiple (foward_map, key,
819 &forward_reply,
820 &frc);
821 if (GNUNET_NO == frc.do_copy)
822 {
823 /* did not match any of the requests, free! */
824 GNUNET_free (buf);
825 }
826}
827
828
829/**
830 * Initialize client subsystem.
831 *
832 * @param server the initialized server
833 */
834void
835GDS_CLIENT_init (struct GNUNET_SERVER_Handle *server)
836{
837 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
838 {&handle_dht_local_put, NULL,
839 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0},
840 {&handle_dht_local_get, NULL,
841 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
842 {&handle_dht_local_get_stop, NULL,
843 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP,
844 sizeof (struct GNUNET_DHT_StopMessage) },
845 {NULL, NULL, 0, 0}
846 };
847 forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
848 retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
849 GNUNET_SERVER_add_handlers (server, plugin_handlers);
850 GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
851}
852
853
854/**
855 * Shutdown client subsystem.
856 */
857void
858GDS_CLIENT_done ()
859{
860 GNUNET_assert (client_head == NULL);
861 GNUNET_assert (client_tail == NULL);
862 if (GNUNET_SCHEDULER_NO_TASK != retry_task)
863 {
864 GNUNET_SCHEDULER_cancel (retry_task);
865 retry_task = GNUNET_SCHEDULER_NO_TASK;
866 }
867 GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
868 GNUNET_CONTAINER_heap_destroy (retry_heap);
869 retry_heap = NULL;
870 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
871 GNUNET_CONTAINER_multihashmap_destroy (forward_map);
872 forward_map = NULL;
873}
874
875/* end of gnunet-service-dht_clients.c */
876
diff --git a/src/dht/gnunet-service-dht_clients.h b/src/dht/gnunet-service-dht_clients.h
new file mode 100644
index 000000000..931ca1a93
--- /dev/null
+++ b/src/dht/gnunet-service-dht_clients.h
@@ -0,0 +1,72 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht_clients.h
23 * @brief GNUnet DHT service's client management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#ifndef GNUNET_SERVICE_DHT_CLIENTS_H
28#define GNUNET_SERVICE_DHT_CLIENTS_H
29
30/**
31 * Handle a reply we've received from another peer. If the reply
32 * matches any of our pending queries, forward it to the respective
33 * client(s).
34 *
35 * @param expiration when will the reply expire
36 * @param key the query this reply is for
37 * @param get_path_length number of peers in 'get_path'
38 * @param get_path path the reply took on get
39 * @param put_path_length number of peers in 'put_path'
40 * @param put_path path the reply took on put
41 * @param type type of the reply
42 * @param data_size number of bytes in 'data'
43 * @param data application payload data
44 */
45void
46GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
47 const GNUNET_HashCode *key,
48 unsigned int get_path_length,
49 const struct GNUNET_PeerIdentity *get_path,
50 unsigned int put_path_length,
51 const struct GNUNET_PeerIdentity *put_path,
52 uint32_t type,
53 size_t data_size,
54 const void *data);
55
56
57/**
58 * Initialize client subsystem.
59 *
60 * @param server the initialized server
61 */
62void
63GDS_CLIENT_init (struct GNUNET_SERVER_Handle *server);
64
65
66/**
67 * Shutdown client subsystem.
68 */
69void
70GDS_CLIENT_done (void);
71
72#endif
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
new file mode 100644
index 000000000..7585b5a47
--- /dev/null
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -0,0 +1,353 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht_neighbours.c
23 * @brief GNUnet DHT service's bucket and neighbour management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27
28#include "platform.h"
29#include "gnunet_block_lib.h"
30#include "gnunet_util_lib.h"
31#include "gnunet_protocols.h"
32#include "gnunet_nse_service.h"
33#include "gnunet_core_service.h"
34#include "gnunet_datacache_lib.h"
35#include "gnunet_transport_service.h"
36#include "gnunet_hello_lib.h"
37#include "gnunet_dht_service.h"
38#include "gnunet_statistics_service.h"
39#include "dht.h"
40#include <fenv.h>
41
42/**
43 * How many buckets will we allow total.
44 */
45#define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8
46
47/**
48 * What is the maximum number of peers in a given bucket.
49 */
50#define DEFAULT_BUCKET_SIZE 4
51
52
53/**
54 * Linked list of messages to send to a particular other peer.
55 */
56struct P2PPendingMessage
57{
58 /**
59 * Pointer to next item in the list
60 */
61 struct P2PPendingMessage *next;
62
63 /**
64 * Pointer to previous item in the list
65 */
66 struct P2PPendingMessage *prev;
67
68 /**
69 * Message importance level.
70 */
71 unsigned int importance;
72
73 /**
74 * Time when this request was scheduled to be sent.
75 */
76 struct GNUNET_TIME_Absolute scheduled;
77
78 /**
79 * How long to wait before sending message.
80 */
81 struct GNUNET_TIME_Relative timeout;
82
83 /**
84 * Actual message to be sent, allocated at the end of the struct:
85 * // msg = (cast) &pm[1];
86 * // memcpy (&pm[1], data, len);
87 */
88 const struct GNUNET_MessageHeader *msg;
89
90};
91
92
93/**
94 * Entry for a peer in a bucket.
95 */
96struct PeerInfo
97{
98 /**
99 * Next peer entry (DLL)
100 */
101 struct PeerInfo *next;
102
103 /**
104 * Prev peer entry (DLL)
105 */
106 struct PeerInfo *prev;
107
108 /**
109 * Count of outstanding messages for peer.
110 */
111 unsigned int pending_count;
112
113 /**
114 * Head of pending messages to be sent to this peer.
115 */
116 struct P2PPendingMessage *head;
117
118 /**
119 * Tail of pending messages to be sent to this peer.
120 */
121 struct P2PPendingMessage *tail;
122
123 /**
124 * Core handle for sending messages to this peer.
125 */
126 struct GNUNET_CORE_TransmitHandle *th;
127
128 /**
129 * Preference update context
130 */
131 struct GNUNET_CORE_InformationRequestContext *info_ctx;
132
133 /**
134 * Task for scheduling message sends.
135 */
136 GNUNET_SCHEDULER_TaskIdentifier send_task;
137
138 /**
139 * Task for scheduling preference updates
140 */
141 GNUNET_SCHEDULER_TaskIdentifier preference_task;
142
143 /**
144 * What is the identity of the peer?
145 */
146 struct GNUNET_PeerIdentity id;
147
148#if 0
149 /**
150 * What is the average latency for replies received?
151 */
152 struct GNUNET_TIME_Relative latency;
153
154 /**
155 * Transport level distance to peer.
156 */
157 unsigned int distance;
158#endif
159
160};
161
162
163/**
164 * Peers are grouped into buckets.
165 */
166struct PeerBucket
167{
168 /**
169 * Head of DLL
170 */
171 struct PeerInfo *head;
172
173 /**
174 * Tail of DLL
175 */
176 struct PeerInfo *tail;
177
178 /**
179 * Number of peers in the bucket.
180 */
181 unsigned int peers_size;
182};
183
184
185/**
186 * The lowest currently used bucket.
187 */
188static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */
189
190/**
191 * The buckets (Kademlia routing table, complete with growth).
192 * Array of size MAX_BUCKET_SIZE.
193 */
194static struct PeerBucket k_buckets[MAX_BUCKETS];
195
196/**
197 * Hash map of all known peers, for easy removal from k_buckets on disconnect.
198 */
199static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
200
201/**
202 * Maximum size for each bucket.
203 */
204static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
205
206
207
208/**
209 * Method called whenever a peer connects.
210 *
211 * @param cls closure
212 * @param peer peer identity this notification is about
213 * @param atsi performance data
214 */
215static void
216handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
217 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
218{
219 struct PeerInfo *ret;
220 int peer_bucket;
221
222 /* Check for connect to self message */
223 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
224 return;
225
226#if DEBUG_DHT
227 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
228 "%s:%s Receives core connect message for peer %s distance %d!\n",
229 my_short_id, "dht", GNUNET_i2s (peer), distance);
230#endif
231
232 if (GNUNET_YES ==
233 GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
234 &peer->hashPubKey))
235 {
236#if DEBUG_DHT
237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238 "%s:%s Received %s message for peer %s, but already have peer in RT!",
239 my_short_id, "DHT", "CORE CONNECT", GNUNET_i2s (peer));
240#endif
241 GNUNET_break (0);
242 return;
243 }
244
245 peer_bucket = find_current_bucket (&peer->hashPubKey);
246 GNUNET_assert (peer_bucket >= lowest_bucket);
247 GNUNET_assert (peer_bucket < MAX_BUCKETS);
248 ret = GNUNET_malloc (sizeof (struct PeerInfo));
249#if 0
250 ret->latency = latency;
251 ret->distance = distance;
252#endif
253 ret->id = *peer;
254 GNUNET_CONTAINER_DLL_insert_after (k_buckets[peer_bucket].head,
255 k_buckets[peer_bucket].tail,
256 k_buckets[peer_bucket].tail, ret);
257 k_buckets[peer_bucket].peers_size++;
258 if ((GNUNET_CRYPTO_hash_matching_bits
259 (&my_identity.hashPubKey, &peer->hashPubKey) > 0) &&
260 (k_buckets[peer_bucket].peers_size <= bucket_size))
261 ret->preference_task =
262 GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
263 if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
264 enable_next_bucket ();
265 newly_found_peers++;
266 GNUNET_CONTAINER_multihashmap_put (all_known_peers, &peer->hashPubKey, ret,
267 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
268 increment_stats (STAT_PEERS_KNOWN);
269
270#if DEBUG_DHT
271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
272 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT",
273 ret == NULL ? "NOT ADDED" : "PEER ADDED");
274#endif
275}
276
277
278/**
279 * Method called whenever a peer disconnects.
280 *
281 * @param cls closure
282 * @param peer peer identity this notification is about
283 */
284static void
285handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
286{
287 struct PeerInfo *to_remove;
288 int current_bucket;
289
290 /* Check for disconnect from self message */
291 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
292 return;
293#if DEBUG_DHT
294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295 "%s:%s: Received peer disconnect message for peer `%s' from %s\n",
296 my_short_id, "DHT", GNUNET_i2s (peer), "CORE");
297#endif
298
299 if (GNUNET_YES !=
300 GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
301 &peer->hashPubKey))
302 {
303 GNUNET_break (0);
304#if DEBUG_DHT
305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
306 "%s:%s: do not have peer `%s' in RT, can't disconnect!\n",
307 my_short_id, "DHT", GNUNET_i2s (peer));
308#endif
309 return;
310 }
311 increment_stats (STAT_DISCONNECTS);
312 GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
313 (all_known_peers, &peer->hashPubKey));
314 to_remove =
315 GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
316 GNUNET_assert (to_remove != NULL);
317 if (NULL != to_remove->info_ctx)
318 {
319 GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
320 to_remove->info_ctx = NULL;
321 }
322 GNUNET_assert (0 ==
323 memcmp (peer, &to_remove->id,
324 sizeof (struct GNUNET_PeerIdentity)));
325 current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
326 delete_peer (to_remove, current_bucket);
327}
328
329
330
331/**
332 * Initialize neighbours subsystem.
333 */
334void
335GST_NEIGHBOURS_init ()
336{
337}
338
339
340/**
341 * Shutdown neighbours subsystem.
342 */
343void
344GST_NEIGHBOURS_done ()
345{
346}
347
348
349
350
351
352
353/* end of gnunet-service-dht_neighbours.c */
diff --git a/src/dht/gnunet-service-dht_neighbours.h b/src/dht/gnunet-service-dht_neighbours.h
new file mode 100644
index 000000000..08357d7ff
--- /dev/null
+++ b/src/dht/gnunet-service-dht_neighbours.h
@@ -0,0 +1,122 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht_neighbours.h
23 * @brief GNUnet DHT routing code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#ifndef GNUNET_SERVICE_DHT_NEIGHBOURS_H
28#define GNUNET_SERVICE_DHT_NEIGHBOURS_H
29
30
31/**
32 * Perform a PUT operation.
33 *
34 * @param type type of the block
35 * @param options routing options
36 * @param desired_replication_level desired replication count
37 * @param expiration_time when does the content expire
38 * @param key key for the content
39 * @param put_path_length number of entries in put_path
40 * @param put_path peers this request has traversed so far (if tracked)
41 * @param data payload to store
42 * @param data_size number of bytes in data
43 */
44void
45GST_NEIGHBOURS_handle_put (uint32_t type,
46 uint32_t options,
47 uint32_t desired_replication_level,
48 GNUNET_TIME_Absolute expiration_time,
49 const GNUNET_HashCode *key,
50 unsigned int put_path_length,
51 struct GNUNET_PeerIdentity *put_path,
52 const void *data,
53 size_t data_size);
54
55
56/**
57 * Perform a GET operation.
58 *
59 *
60 * @param type type of the block
61 * @param options routing options
62 * @param desired_replication_level desired replication count
63 * @param key key for the content
64 * @param xquery extended query
65 * @param xquery_size number of bytes in xquery
66 * @param reply_bf bloomfilter to filter duplicates
67 * @param reply_bf_mutator mutator for reply_bf
68 * @param peer_bf filter for peers not to select (again)
69 */
70void
71GST_NEIGHBOURS_handle_get (uint32_t type,
72 uint32_t options,
73 uint32_t desired_replication_level,
74 const GNUNET_HashCode *key,
75 const void *xquery,
76 size_t xquery_size,
77 const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
78 uint32_t reply_bf_mutator,
79 const struct GNUNET_CONTAINER_BloomFilter *peer_bf);
80
81
82/**
83 * Handle a reply (route to origin).
84 *
85 * @param type type of the block
86 * @param options routing options
87 * @param expiration_time when does the content expire
88 * @param key key for the content
89 * @param put_path_length number of entries in put_path
90 * @param put_path peers the original PUT traversed (if tracked)
91 * @param get_path_length number of entries in put_path
92 * @param get_path peers this reply has traversed so far (if tracked)
93 * @param data payload of the reply
94 * @param data_size number of bytes in data
95 */
96void
97GST_NEIGHBOURS_handle_reply (uint32_t type,
98 uint32_t options,
99 GNUNET_TIME_Absolute expiration_time,
100 const GNUNET_HashCode *key,
101 unsigned int put_path_length,
102 struct GNUNET_PeerIdentity *put_path,
103 unsigned int get_path_length,
104 struct GNUNET_PeerIdentity *get_path,
105 const void *data,
106 size_t data_size);
107
108
109/**
110 * Initialize neighbours subsystem.
111 */
112void
113GST_NEIGHBOURS_init (void);
114
115/**
116 * Shutdown neighbours subsystem.
117 */
118void
119GST_NEIGHBOURS_done (void);
120
121
122#endif
diff --git a/src/dht/gnunet-service-dht_nse.c b/src/dht/gnunet-service-dht_nse.c
new file mode 100644
index 000000000..4711c9c31
--- /dev/null
+++ b/src/dht/gnunet-service-dht_nse.c
@@ -0,0 +1,84 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht_nse.c
23 * @brief GNUnet DHT integration with NSE
24 * @author Christian Grothoff
25 */
26#include "gnunet-service-dht.h"
27#include "gnunet-service-dht_nse.h"
28
29/**
30 * log of the current network size estimate, used as the point where
31 * we switch between random and deterministic routing. Default
32 * value of 4.0 is used if NSE module is not available (i.e. not
33 * configured).
34 */
35static double log_of_network_size_estimate = 4.0;
36
37/**
38 * Network size estimation handle.
39 */
40static struct GNUNET_NSE_Handle *nse;
41
42
43/**
44 * Callback that is called when network size estimate is updated.
45 *
46 * @param cls closure
47 * @param timestamp time when the estimate was received from the server (or created by the server)
48 * @param logestimate the log(Base 2) value of the current network size estimate
49 * @param std_dev standard deviation for the estimate
50 *
51 */
52static void
53update_network_size_estimate (void *cls, struct GNUNET_TIME_Absolute timestamp,
54 double logestimate, double std_dev)
55{
56 log_of_network_size_estimate = logestimate;
57}
58
59
60double
61GDS_nse_get ()
62{
63 return log_of_network_size_estimate;
64}
65
66
67void
68GDS_nse_init ()
69{
70 nse = GNUNET_NSE_connect (GDS_cfg, &update_network_size_estimate, NULL);
71}
72
73
74void
75GDS_nse_done ()
76{
77 if (NULL != nse)
78 {
79 GNUNET_NSE_disconnect (nse);
80 nse = NULL;
81 }
82}
83
84/* end of gnunet-service-dht_nse.c */
diff --git a/src/dht/gnunet-service-dht_nse.h b/src/dht/gnunet-service-dht_nse.h
new file mode 100644
index 000000000..4642d4d9c
--- /dev/null
+++ b/src/dht/gnunet-service-dht_nse.h
@@ -0,0 +1,40 @@
1/*
2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht_nse.h
23 * @brief GNUnet DHT integration with NSE
24 * @author Christian Grothoff
25 */
26#ifndef GNUNET_SERVICE_DHT_NSE_H
27#define GNUNET_SERVICE_DHT_NSE_H
28
29
30double
31GDS_nse_get (void);
32
33
34void
35GDS_nse_init (void);
36
37void
38GDS_nse_done (void);
39
40#endif