aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht_neighbours.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht/gnunet-service-dht_neighbours.c')
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c2564
1 files changed, 0 insertions, 2564 deletions
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
deleted file mode 100644
index 6465d8d57..000000000
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ /dev/null
@@ -1,2564 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009-2017, 2021 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file dht/gnunet-service-dht_neighbours.c
23 * @brief GNUnet DHT service's bucket and neighbour management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_block_lib.h"
30#include "gnunet_hello_lib.h"
31#include "gnunet_constants.h"
32#include "gnunet_protocols.h"
33#include "gnunet_nse_service.h"
34#include "gnunet_ats_service.h"
35#include "gnunet_core_service.h"
36#include "gnunet_datacache_lib.h"
37#include "gnunet_transport_service.h"
38#include "gnunet_hello_lib.h"
39#include "gnunet_dht_service.h"
40#include "gnunet_statistics_service.h"
41#include "gnunet-service-dht.h"
42#include "gnunet-service-dht_datacache.h"
43#include "gnunet-service-dht_hello.h"
44#include "gnunet-service-dht_neighbours.h"
45#include "gnunet-service-dht_nse.h"
46#include "gnunet-service-dht_routing.h"
47#include "dht.h"
48
49#define LOG_TRAFFIC(kind, ...) GNUNET_log_from (kind, "dht-traffic", \
50 __VA_ARGS__)
51
52/**
53 * Enable slow sanity checks to debug issues.
54 */
55#define SANITY_CHECKS 1
56
57/**
58 * How many buckets will we allow total.
59 */
60#define MAX_BUCKETS sizeof(struct GNUNET_HashCode) * 8
61
62/**
63 * What is the maximum number of peers in a given bucket.
64 */
65#define DEFAULT_BUCKET_SIZE 8
66
67/**
68 * Desired replication level for FIND PEER requests
69 */
70#define FIND_PEER_REPLICATION_LEVEL 4
71
72/**
73 * Maximum allowed replication level for all requests.
74 */
75#define MAXIMUM_REPLICATION_LEVEL 16
76
77/**
78 * Maximum allowed number of pending messages per peer.
79 */
80#define MAXIMUM_PENDING_PER_PEER 64
81
82/**
83 * How long at least to wait before sending another find peer request.
84 */
85#define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply ( \
86 GNUNET_TIME_UNIT_SECONDS, 30)
87
88/**
89 * How long at most to wait before sending another find peer request.
90 */
91#define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply ( \
92 GNUNET_TIME_UNIT_MINUTES, 10)
93
94/**
95 * How long at most to wait for transmission of a GET request to another peer?
96 */
97#define GET_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
98
99/**
100 * Hello address expiration
101 */
102extern struct GNUNET_TIME_Relative hello_expiration;
103
104
105GNUNET_NETWORK_STRUCT_BEGIN
106
107/**
108 * P2P PUT message
109 */
110struct PeerPutMessage
111{
112 /**
113 * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
114 */
115 struct GNUNET_MessageHeader header;
116
117 /**
118 * Processing options
119 */
120 uint32_t options GNUNET_PACKED;
121
122 /**
123 * Content type.
124 */
125 uint32_t type GNUNET_PACKED;
126
127 /**
128 * Hop count
129 */
130 uint32_t hop_count GNUNET_PACKED;
131
132 /**
133 * Replication level for this message
134 */
135 uint32_t desired_replication_level GNUNET_PACKED;
136
137 /**
138 * Length of the PUT path that follows (if tracked).
139 */
140 uint32_t put_path_length GNUNET_PACKED;
141
142 /**
143 * When does the content expire?
144 */
145 struct GNUNET_TIME_AbsoluteNBO expiration_time;
146
147 /**
148 * Bloomfilter (for peer identities) to stop circular routes
149 */
150 char bloomfilter[DHT_BLOOM_SIZE];
151
152 /**
153 * The key we are storing under.
154 */
155 struct GNUNET_HashCode key;
156
157 /* put path (if tracked) */
158
159 /* Payload */
160};
161
162
163/**
164 * P2P Result message
165 */
166struct PeerResultMessage
167{
168 /**
169 * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
170 */
171 struct GNUNET_MessageHeader header;
172
173 /**
174 * Content type.
175 */
176 uint32_t type GNUNET_PACKED;
177
178 /**
179 * Length of the PUT path that follows (if tracked).
180 */
181 uint32_t put_path_length GNUNET_PACKED;
182
183 /**
184 * Length of the GET path that follows (if tracked).
185 */
186 uint32_t get_path_length GNUNET_PACKED;
187
188 /**
189 * When does the content expire?
190 */
191 struct GNUNET_TIME_AbsoluteNBO expiration_time;
192
193 /**
194 * The key of the corresponding GET request.
195 */
196 struct GNUNET_HashCode key;
197
198 /* put path (if tracked) */
199
200 /* get path (if tracked) */
201
202 /* Payload */
203};
204
205
206/**
207 * P2P GET message
208 */
209struct PeerGetMessage
210{
211 /**
212 * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
213 */
214 struct GNUNET_MessageHeader header;
215
216 /**
217 * Processing options
218 */
219 uint32_t options GNUNET_PACKED;
220
221 /**
222 * Desired content type.
223 */
224 uint32_t type GNUNET_PACKED;
225
226 /**
227 * Hop count
228 */
229 uint32_t hop_count GNUNET_PACKED;
230
231 /**
232 * Desired replication level for this request.
233 */
234 uint32_t desired_replication_level GNUNET_PACKED;
235
236 /**
237 * Size of the extended query.
238 */
239 uint32_t xquery_size;
240
241 /**
242 * Bloomfilter mutator.
243 */
244 uint32_t bf_mutator;
245
246 /**
247 * Bloomfilter (for peer identities) to stop circular routes
248 */
249 char bloomfilter[DHT_BLOOM_SIZE];
250
251 /**
252 * The key we are looking for.
253 */
254 struct GNUNET_HashCode key;
255
256 /* xquery */
257
258 /* result bloomfilter */
259};
260GNUNET_NETWORK_STRUCT_END
261
262
263/**
264 * Entry for a peer in a bucket.
265 */
266struct PeerInfo
267{
268 /**
269 * Next peer entry (DLL)
270 */
271 struct PeerInfo *next;
272
273 /**
274 * Prev peer entry (DLL)
275 */
276 struct PeerInfo *prev;
277
278 /**
279 * Handle for sending messages to this peer.
280 */
281 struct GNUNET_MQ_Handle *mq;
282
283 /**
284 * What is the identity of the peer?
285 */
286 const struct GNUNET_PeerIdentity *id;
287
288 /**
289 * Hash of @e id.
290 */
291 struct GNUNET_HashCode phash;
292
293 /**
294 * Which bucket is this peer in?
295 */
296 int peer_bucket;
297};
298
299
300/**
301 * Peers are grouped into buckets.
302 */
303struct PeerBucket
304{
305 /**
306 * Head of DLL
307 */
308 struct PeerInfo *head;
309
310 /**
311 * Tail of DLL
312 */
313 struct PeerInfo *tail;
314
315 /**
316 * Number of peers in the bucket.
317 */
318 unsigned int peers_size;
319};
320
321
322/**
323 * Information about a peer that we would like to connect to.
324 */
325struct ConnectInfo
326{
327 /**
328 * Handle to active HELLO offer operation, or NULL.
329 */
330 struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
331
332 /**
333 * Handle to active connectivity suggestion operation, or NULL.
334 */
335 struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
336
337 /**
338 * How much would we like to connect to this peer?
339 */
340 uint32_t strength;
341};
342
343
344/**
345 * Do we cache all results that we are routing in the local datacache?
346 */
347static int cache_results;
348
349/**
350 * Should routing details be logged to stderr (for debugging)?
351 */
352static int log_route_details_stderr;
353
354/**
355 * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
356 */
357static unsigned int closest_bucket;
358
359/**
360 * How many peers have we added since we sent out our last
361 * find peer request?
362 */
363static unsigned int newly_found_peers;
364
365/**
366 * Option for testing that disables the 'connect' function of the DHT.
367 */
368static int disable_try_connect;
369
370/**
371 * The buckets. Array of size #MAX_BUCKETS. Offset 0 means 0 bits matching.
372 */
373static struct PeerBucket k_buckets[MAX_BUCKETS];
374
375/**
376 * Hash map of all CORE-connected peers, for easy removal from
377 * #k_buckets on disconnect. Values are of type `struct PeerInfo`.
378 */
379static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
380
381/**
382 * Hash map of all peers we would like to be connected to.
383 * Values are of type `struct ConnectInfo`.
384 */
385static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
386
387/**
388 * Maximum size for each bucket.
389 */
390static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
391
392/**
393 * Task that sends FIND PEER requests.
394 */
395static struct GNUNET_SCHEDULER_Task *find_peer_task;
396
397/**
398 * Identity of this peer.
399 */
400static struct GNUNET_PeerIdentity my_identity;
401
402/**
403 * Hash of the identity of this peer.
404 */
405struct GNUNET_HashCode my_identity_hash;
406
407/**
408 * Handle to CORE.
409 */
410static struct GNUNET_CORE_Handle *core_api;
411
412/**
413 * Handle to ATS connectivity.
414 */
415static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
416
417
418/**
419 * Find the optimal bucket for this key.
420 *
421 * @param hc the hashcode to compare our identity to
422 * @return the proper bucket index, or #GNUNET_SYSERR
423 * on error (same hashcode)
424 */
425static int
426find_bucket (const struct GNUNET_HashCode *hc)
427{
428 unsigned int bits;
429
430 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
431 if (bits == MAX_BUCKETS)
432 {
433 /* How can all bits match? Got my own ID? */
434 GNUNET_break (0);
435 return GNUNET_SYSERR;
436 }
437 return MAX_BUCKETS - bits - 1;
438}
439
440
441/**
442 * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
443 * Clean up the "oh" field in the @a cls
444 *
445 * @param cls a `struct ConnectInfo`
446 */
447static void
448offer_hello_done (void *cls)
449{
450 struct ConnectInfo *ci = cls;
451
452 ci->oh = NULL;
453}
454
455
456/**
457 * Function called for all entries in #all_desired_peers to clean up.
458 *
459 * @param cls NULL
460 * @param peer peer the entry is for
461 * @param value the value to remove
462 * @return #GNUNET_YES
463 */
464static int
465free_connect_info (void *cls,
466 const struct GNUNET_PeerIdentity *peer,
467 void *value)
468{
469 struct ConnectInfo *ci = value;
470
471 (void) cls;
472 GNUNET_assert (GNUNET_YES ==
473 GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
474 peer,
475 ci));
476 if (NULL != ci->sh)
477 {
478 GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
479 ci->sh = NULL;
480 }
481 if (NULL != ci->oh)
482 {
483 GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
484 ci->oh = NULL;
485 }
486 GNUNET_free (ci);
487 return GNUNET_YES;
488}
489
490
491/**
492 * Consider if we want to connect to a given peer, and if so
493 * let ATS know. If applicable, the HELLO is offered to the
494 * TRANSPORT service.
495 *
496 * @param pid peer to consider connectivity requirements for
497 * @param h a HELLO message, or NULL
498 */
499static void
500try_connect (const struct GNUNET_PeerIdentity *pid,
501 const struct GNUNET_MessageHeader *h)
502{
503 int bucket;
504 struct GNUNET_HashCode pid_hash;
505 struct ConnectInfo *ci;
506 uint32_t strength;
507
508 GNUNET_CRYPTO_hash (pid,
509 sizeof(struct GNUNET_PeerIdentity),
510 &pid_hash);
511 bucket = find_bucket (&pid_hash);
512 if (bucket < 0)
513 return; /* self? */
514 ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
515 pid);
516
517 if (k_buckets[bucket].peers_size < bucket_size)
518 strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
519 else
520 strength = bucket; /* minimum value of connectivity */
521 if (GNUNET_YES ==
522 GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
523 pid))
524 strength *= 2; /* double for connected peers */
525 else if (k_buckets[bucket].peers_size > bucket_size)
526 strength = 0; /* bucket full, we really do not care about more */
527
528 if ((0 == strength) &&
529 (NULL != ci))
530 {
531 /* release request */
532 GNUNET_assert (GNUNET_YES ==
533 free_connect_info (NULL,
534 pid,
535 ci));
536 return;
537 }
538 if (NULL == ci)
539 {
540 ci = GNUNET_new (struct ConnectInfo);
541 GNUNET_assert (GNUNET_OK ==
542 GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
543 pid,
544 ci,
545 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
546 }
547 if ((NULL != ci->oh) &&
548 (NULL != h))
549 GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
550 if (NULL != h)
551 ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_cfg,
552 h,
553 &offer_hello_done,
554 ci);
555 if ((NULL != ci->sh) &&
556 (ci->strength != strength))
557 GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
558 if (ci->strength != strength)
559 ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
560 pid,
561 strength);
562 ci->strength = strength;
563}
564
565
566/**
567 * Function called for each peer in #all_desired_peers during
568 * #update_connect_preferences() if we have reason to adjust
569 * the strength of our desire to keep connections to certain
570 * peers. Calls #try_connect() to update the calculations for
571 * the given @a pid.
572 *
573 * @param cls NULL
574 * @param pid peer to update
575 * @param value unused
576 * @return #GNUNET_YES (continue to iterate)
577 */
578static int
579update_desire_strength (void *cls,
580 const struct GNUNET_PeerIdentity *pid,
581 void *value)
582{
583 (void) cls;
584 (void) value;
585 try_connect (pid,
586 NULL);
587 return GNUNET_YES;
588}
589
590
591/**
592 * Update our preferences for connectivity as given to ATS.
593 *
594 * @param cls the `struct PeerInfo` of the peer
595 * @param tc scheduler context.
596 */
597static void
598update_connect_preferences ()
599{
600 GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
601 &update_desire_strength,
602 NULL);
603}
604
605
606/**
607 * Add each of the peers we already know to the bloom filter of
608 * the request so that we don't get duplicate HELLOs.
609 *
610 * @param cls the `struct GNUNET_BLOCK_Group`
611 * @param key peer identity to add to the bloom filter
612 * @param value value the peer information (unused)
613 * @return #GNUNET_YES (we should continue to iterate)
614 */
615static int
616add_known_to_bloom (void *cls,
617 const struct GNUNET_PeerIdentity *key,
618 void *value)
619{
620 struct GNUNET_BLOCK_Group *bg = cls;
621 struct GNUNET_HashCode key_hash;
622
623 (void) cls;
624 (void) value;
625 GNUNET_CRYPTO_hash (key,
626 sizeof(struct GNUNET_PeerIdentity),
627 &key_hash);
628 GNUNET_BLOCK_group_set_seen (bg,
629 &key_hash,
630 1);
631 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632 "Adding known peer (%s) to bloomfilter for FIND PEER\n",
633 GNUNET_i2s (key));
634 return GNUNET_YES;
635}
636
637
638/**
639 * Task to send a find peer message for our own peer identifier
640 * so that we can find the closest peers in the network to ourselves
641 * and attempt to connect to them.
642 *
643 * @param cls closure for this task
644 */
645static void
646send_find_peer_message (void *cls)
647{
648 struct GNUNET_TIME_Relative next_send_time;
649 struct GNUNET_BLOCK_Group *bg;
650 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
651
652 (void) cls;
653 find_peer_task = NULL;
654 if (newly_found_peers > bucket_size)
655 {
656 /* If we are finding many peers already, no need to send out our request right now! */
657 find_peer_task =
658 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
659 &send_find_peer_message,
660 NULL);
661 newly_found_peers = 0;
662 return;
663 }
664 bg = GNUNET_BLOCK_group_create (GDS_block_context,
665 GNUNET_BLOCK_TYPE_DHT_HELLO,
666 GNUNET_CRYPTO_random_u32 (
667 GNUNET_CRYPTO_QUALITY_WEAK,
668 UINT32_MAX),
669 NULL,
670 0,
671 "filter-size",
672 DHT_BLOOM_SIZE,
673 NULL);
674 GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
675 &add_known_to_bloom,
676 bg);
677 GNUNET_STATISTICS_update (GDS_stats,
678 gettext_noop ("# FIND PEER messages initiated"),
679 1,
680 GNUNET_NO);
681 peer_bf
682 = GNUNET_CONTAINER_bloomfilter_init (NULL,
683 DHT_BLOOM_SIZE,
684 GNUNET_CONSTANTS_BLOOMFILTER_K);
685 // FIXME: pass priority!?
686 GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
687 GNUNET_DHT_RO_FIND_PEER
688 | GNUNET_DHT_RO_RECORD_ROUTE,
689 FIND_PEER_REPLICATION_LEVEL,
690 0,
691 &my_identity_hash,
692 NULL,
693 0,
694 bg,
695 peer_bf);
696 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
697 GNUNET_BLOCK_group_destroy (bg);
698 /* schedule next round */
699 next_send_time.rel_value_us =
700 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us
701 + GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
702 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us
703 / (newly_found_peers + 1));
704 newly_found_peers = 0;
705 GNUNET_assert (NULL == find_peer_task);
706 find_peer_task =
707 GNUNET_SCHEDULER_add_delayed (next_send_time,
708 &send_find_peer_message,
709 NULL);
710}
711
712
713/**
714 * Method called whenever a peer connects.
715 *
716 * @param cls closure
717 * @param peer peer identity this notification is about
718 * @param mq message queue for sending messages to @a peer
719 * @return our `struct PeerInfo` for @a peer
720 */
721static void *
722handle_core_connect (void *cls,
723 const struct GNUNET_PeerIdentity *peer,
724 struct GNUNET_MQ_Handle *mq)
725{
726 struct PeerInfo *pi;
727
728 (void) cls;
729 /* Check for connect to self message */
730 if (0 == GNUNET_memcmp (&my_identity,
731 peer))
732 return NULL;
733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734 "Connected to %s\n",
735 GNUNET_i2s (peer));
736 GNUNET_assert (GNUNET_NO ==
737 GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
738 peer));
739 GNUNET_STATISTICS_update (GDS_stats,
740 gettext_noop ("# peers connected"),
741 1,
742 GNUNET_NO);
743 pi = GNUNET_new (struct PeerInfo);
744 pi->id = peer;
745 pi->mq = mq;
746 GNUNET_CRYPTO_hash (peer,
747 sizeof(struct GNUNET_PeerIdentity),
748 &pi->phash);
749 pi->peer_bucket = find_bucket (&pi->phash);
750 GNUNET_assert ((pi->peer_bucket >= 0) &&
751 ((unsigned int) pi->peer_bucket < MAX_BUCKETS));
752 GNUNET_CONTAINER_DLL_insert_tail (k_buckets[pi->peer_bucket].head,
753 k_buckets[pi->peer_bucket].tail,
754 pi);
755 k_buckets[pi->peer_bucket].peers_size++;
756 closest_bucket = GNUNET_MAX (closest_bucket,
757 (unsigned int) pi->peer_bucket);
758 GNUNET_assert (GNUNET_OK ==
759 GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
760 pi->id,
761 pi,
762 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
763 if ((pi->peer_bucket > 0) &&
764 (k_buckets[pi->peer_bucket].peers_size <= bucket_size))
765 {
766 update_connect_preferences ();
767 newly_found_peers++;
768 }
769 if ((1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
770 (GNUNET_YES != disable_try_connect))
771 {
772 /* got a first connection, good time to start with FIND PEER requests... */
773 GNUNET_assert (NULL == find_peer_task);
774 find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
775 NULL);
776 }
777 return pi;
778}
779
780
781/**
782 * Method called whenever a peer disconnects.
783 *
784 * @param cls closure
785 * @param peer peer identity this notification is about
786 * @param internal_cls our `struct PeerInfo` for @a peer
787 */
788static void
789handle_core_disconnect (void *cls,
790 const struct GNUNET_PeerIdentity *peer,
791 void *internal_cls)
792{
793 struct PeerInfo *to_remove = internal_cls;
794
795 (void) cls;
796 /* Check for disconnect from self message */
797 if (NULL == to_remove)
798 return;
799 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800 "Disconnected %s\n",
801 GNUNET_i2s (peer));
802 GNUNET_STATISTICS_update (GDS_stats,
803 gettext_noop ("# peers connected"),
804 -1,
805 GNUNET_NO);
806 GNUNET_assert (GNUNET_YES ==
807 GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
808 peer,
809 to_remove));
810 if ((0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
811 (GNUNET_YES != disable_try_connect))
812 {
813 GNUNET_SCHEDULER_cancel (find_peer_task);
814 find_peer_task = NULL;
815 }
816 GNUNET_assert (to_remove->peer_bucket >= 0);
817 GNUNET_CONTAINER_DLL_remove (k_buckets[to_remove->peer_bucket].head,
818 k_buckets[to_remove->peer_bucket].tail,
819 to_remove);
820 GNUNET_assert (k_buckets[to_remove->peer_bucket].peers_size > 0);
821 k_buckets[to_remove->peer_bucket].peers_size--;
822 while ((closest_bucket > 0) &&
823 (0 == k_buckets[to_remove->peer_bucket].peers_size))
824 closest_bucket--;
825 if (k_buckets[to_remove->peer_bucket].peers_size < bucket_size)
826 update_connect_preferences ();
827 GNUNET_free (to_remove);
828}
829
830
831/**
832 * To how many peers should we (on average) forward the request to
833 * obtain the desired target_replication count (on average).
834 *
835 * @param hop_count number of hops the message has traversed
836 * @param target_replication the number of total paths desired
837 * @return Some number of peers to forward the message to
838 */
839static unsigned int
840get_forward_count (uint32_t hop_count,
841 uint32_t target_replication)
842{
843 uint32_t random_value;
844 uint32_t forward_count;
845 float target_value;
846
847 if (0 == target_replication)
848 target_replication = 1; /* 0 is verboten */
849 if (target_replication > MAXIMUM_REPLICATION_LEVEL)
850 target_replication = MAXIMUM_REPLICATION_LEVEL;
851 if (hop_count > GDS_NSE_get () * 4.0)
852 {
853 /* forcefully terminate */
854 GNUNET_STATISTICS_update (GDS_stats,
855 gettext_noop ("# requests TTL-dropped"),
856 1, GNUNET_NO);
857 return 0;
858 }
859 if (hop_count > GDS_NSE_get () * 2.0)
860 {
861 /* Once we have reached our ideal number of hops, only forward to 1 peer */
862 return 1;
863 }
864 /* bound by system-wide maximum */
865 target_replication =
866 GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
867 target_value =
868 1 + (target_replication - 1.0) / (GDS_NSE_get ()
869 + ((float) (target_replication - 1.0)
870 * hop_count));
871
872
873 /* Set forward count to floor of target_value */
874 forward_count = (uint32_t) target_value;
875 /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
876 target_value = target_value - forward_count;
877 random_value =
878 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
879 if (random_value < (target_value * UINT32_MAX))
880 forward_count++;
881 return GNUNET_MIN (forward_count,
882 MAXIMUM_REPLICATION_LEVEL);
883}
884
885
886/**
887 * Compute the distance between have and target as a 64-bit value.
888 * Differences in the lower bits must count stronger than differences
889 * in the higher bits.
890 *
891 * @param target
892 * @param have
893 * @param bucket up to which offset are @a target and @a have identical and thus those bits should not be considered
894 * @return 0 if have==target, otherwise a number
895 * that is larger as the distance between
896 * the two hash codes increases
897 */
898static uint64_t
899get_distance (const struct GNUNET_HashCode *target,
900 const struct GNUNET_HashCode *have,
901 unsigned int bucket)
902{
903 uint64_t lsb = 0;
904
905 for (unsigned int i = bucket + 1;
906 (i < sizeof(struct GNUNET_HashCode) * 8) &&
907 (i < bucket + 1 + 64);
908 i++)
909 {
910 if (GNUNET_CRYPTO_hash_get_bit_rtl (target, i) !=
911 GNUNET_CRYPTO_hash_get_bit_rtl (have, i))
912 lsb |= (1LLU << (bucket + 64 - i)); /* first bit set will be 1,
913 * last bit set will be 63 -- if
914 * i does not reach 512 first... */
915 }
916 return lsb;
917}
918
919
920/**
921 * Check whether my identity is closer than any known peers. If a
922 * non-null bloomfilter is given, check if this is the closest peer
923 * that hasn't already been routed to.
924 *
925 * @param key hash code to check closeness to
926 * @param bloom bloomfilter, exclude these entries from the decision
927 * @return #GNUNET_YES if node location is closest,
928 * #GNUNET_NO otherwise.
929 */
930int
931GDS_am_closest_peer (const struct GNUNET_HashCode *key,
932 const struct GNUNET_CONTAINER_BloomFilter *bloom)
933{
934 int bits;
935 int other_bits;
936 int bucket_num;
937 struct PeerInfo *pos;
938
939 if (0 == GNUNET_memcmp (&my_identity_hash,
940 key))
941 return GNUNET_YES;
942 bucket_num = find_bucket (key);
943 GNUNET_assert (bucket_num >= 0);
944 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
945 key);
946 pos = k_buckets[bucket_num].head;
947 while (NULL != pos)
948 {
949 if ((NULL != bloom) &&
950 (GNUNET_YES ==
951 GNUNET_CONTAINER_bloomfilter_test (bloom,
952 &pos->phash)))
953 {
954 pos = pos->next;
955 continue; /* Skip already checked entries */
956 }
957 other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->phash,
958 key);
959 if (other_bits > bits)
960 return GNUNET_NO;
961 if (other_bits == bits) /* We match the same number of bits */
962 return GNUNET_YES;
963 pos = pos->next;
964 }
965 /* No peers closer, we are the closest! */
966 return GNUNET_YES;
967}
968
969
970/**
971 * Select a peer from the routing table that would be a good routing
972 * destination for sending a message for "key". The resulting peer
973 * must not be in the set of blocked peers.<p>
974 *
975 * Note that we should not ALWAYS select the closest peer to the
976 * target, peers further away from the target should be chosen with
977 * exponentially declining probability.
978 *
979 * FIXME: double-check that this is fine
980 *
981 *
982 * @param key the key we are selecting a peer to route to
983 * @param bloom a bloomfilter containing entries this request has seen already
984 * @param hops how many hops has this message traversed thus far
985 * @return Peer to route to, or NULL on error
986 */
987static struct PeerInfo *
988select_peer (const struct GNUNET_HashCode *key,
989 const struct GNUNET_CONTAINER_BloomFilter *bloom,
990 uint32_t hops)
991{
992 unsigned int bc;
993 unsigned int count;
994 unsigned int selected;
995 struct PeerInfo *pos;
996 struct PeerInfo *chosen;
997
998 if (hops >= GDS_NSE_get ())
999 {
1000 /* greedy selection (closest peer that is not in bloomfilter) */
1001 unsigned int best_bucket = 0;
1002 uint64_t best_in_bucket = UINT64_MAX;
1003
1004 chosen = NULL;
1005 for (bc = 0; bc <= closest_bucket; bc++)
1006 {
1007 count = 0;
1008 for (pos = k_buckets[bc].head;
1009 (pos != NULL) &&
1010 (count < bucket_size);
1011 pos = pos->next)
1012 {
1013 unsigned int bucket;
1014 uint64_t dist;
1015
1016 bucket = GNUNET_CRYPTO_hash_matching_bits (key,
1017 &pos->phash);
1018 dist = get_distance (key,
1019 &pos->phash,
1020 bucket);
1021 if (bucket < best_bucket)
1022 continue;
1023 if (dist > best_in_bucket)
1024 continue;
1025 best_bucket = bucket;
1026 best_in_bucket = dist;
1027 if ( (NULL == bloom) ||
1028 (GNUNET_NO ==
1029 GNUNET_CONTAINER_bloomfilter_test (bloom,
1030 &pos->phash)) )
1031 {
1032 chosen = pos;
1033 }
1034 else
1035 {
1036 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1037 "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1038 GNUNET_i2s (pos->id),
1039 GNUNET_h2s (key));
1040 GNUNET_STATISTICS_update (GDS_stats,
1041 gettext_noop (
1042 "# Peers excluded from routing due to Bloomfilter"),
1043 1,
1044 GNUNET_NO);
1045 chosen = NULL;
1046 }
1047 count++;
1048 }
1049 }
1050 if (NULL == chosen)
1051 GNUNET_STATISTICS_update (GDS_stats,
1052 gettext_noop ("# Peer selection failed"),
1053 1,
1054 GNUNET_NO);
1055 else
1056 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1057 "Selected peer `%s' in greedy routing for %s\n",
1058 GNUNET_i2s (chosen->id),
1059 GNUNET_h2s (key));
1060 return chosen;
1061 }
1062
1063 /* select "random" peer */
1064 /* count number of peers that are available and not filtered */
1065 count = 0;
1066 for (bc = 0; bc <= closest_bucket; bc++)
1067 {
1068 pos = k_buckets[bc].head;
1069 while ((NULL != pos) && (count < bucket_size))
1070 {
1071 if ((NULL != bloom) &&
1072 (GNUNET_YES ==
1073 GNUNET_CONTAINER_bloomfilter_test (bloom,
1074 &pos->phash)))
1075 {
1076 GNUNET_STATISTICS_update (GDS_stats,
1077 gettext_noop
1078 (
1079 "# Peers excluded from routing due to Bloomfilter"),
1080 1, GNUNET_NO);
1081 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1082 "Excluded peer `%s' due to BF match in random routing for %s\n",
1083 GNUNET_i2s (pos->id),
1084 GNUNET_h2s (key));
1085 pos = pos->next;
1086 continue; /* Ignore bloomfiltered peers */
1087 }
1088 count++;
1089 pos = pos->next;
1090 }
1091 }
1092 if (0 == count) /* No peers to select from! */
1093 {
1094 GNUNET_STATISTICS_update (GDS_stats,
1095 gettext_noop ("# Peer selection failed"), 1,
1096 GNUNET_NO);
1097 return NULL;
1098 }
1099 /* Now actually choose a peer */
1100 selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1101 count);
1102 count = 0;
1103 for (bc = 0; bc <= closest_bucket; bc++)
1104 {
1105 for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size));
1106 pos = pos->next)
1107 {
1108 if ((bloom != NULL) &&
1109 (GNUNET_YES ==
1110 GNUNET_CONTAINER_bloomfilter_test (bloom,
1111 &pos->phash)))
1112 {
1113 continue; /* Ignore bloomfiltered peers */
1114 }
1115 if (0 == selected--)
1116 {
1117 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1118 "Selected peer `%s' in random routing for %s\n",
1119 GNUNET_i2s (pos->id),
1120 GNUNET_h2s (key));
1121 return pos;
1122 }
1123 }
1124 }
1125 GNUNET_break (0);
1126 return NULL;
1127}
1128
1129
1130/**
1131 * Compute the set of peers that the given request should be
1132 * forwarded to.
1133 *
1134 * @param key routing key
1135 * @param bloom bloom filter excluding peers as targets, all selected
1136 * peers will be added to the bloom filter
1137 * @param hop_count number of hops the request has traversed so far
1138 * @param target_replication desired number of replicas
1139 * @param targets where to store an array of target peers (to be
1140 * free'd by the caller)
1141 * @return number of peers returned in 'targets'.
1142 */
1143static unsigned int
1144get_target_peers (const struct GNUNET_HashCode *key,
1145 struct GNUNET_CONTAINER_BloomFilter *bloom,
1146 uint32_t hop_count,
1147 uint32_t target_replication,
1148 struct PeerInfo ***targets)
1149{
1150 unsigned int ret;
1151 unsigned int off;
1152 struct PeerInfo **rtargets;
1153 struct PeerInfo *nxt;
1154
1155 GNUNET_assert (NULL != bloom);
1156 ret = get_forward_count (hop_count,
1157 target_replication);
1158 if (0 == ret)
1159 {
1160 *targets = NULL;
1161 return 0;
1162 }
1163 rtargets = GNUNET_new_array (ret,
1164 struct PeerInfo *);
1165 for (off = 0; off < ret; off++)
1166 {
1167 nxt = select_peer (key,
1168 bloom,
1169 hop_count);
1170 if (NULL == nxt)
1171 break;
1172 rtargets[off] = nxt;
1173 GNUNET_break (GNUNET_NO ==
1174 GNUNET_CONTAINER_bloomfilter_test (bloom,
1175 &nxt->phash));
1176 GNUNET_CONTAINER_bloomfilter_add (bloom,
1177 &nxt->phash);
1178 }
1179 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1180 "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1181 off,
1182 GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1183 (unsigned int) hop_count,
1184 GNUNET_h2s (key),
1185 ret);
1186 if (0 == off)
1187 {
1188 GNUNET_free (rtargets);
1189 *targets = NULL;
1190 return 0;
1191 }
1192 *targets = rtargets;
1193 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1194 "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1195 GNUNET_h2s (key),
1196 off,
1197 ret);
1198 return off;
1199}
1200
1201
1202/**
1203 * Perform a PUT operation. Forwards the given request to other
1204 * peers. Does not store the data locally. Does not give the
1205 * data to local clients. May do nothing if this is the only
1206 * peer in the network (or if we are the closest peer in the
1207 * network).
1208 *
1209 * @param type type of the block
1210 * @param options routing options
1211 * @param desired_replication_level desired replication count
1212 * @param expiration_time when does the content expire
1213 * @param hop_count how many hops has this message traversed so far
1214 * @param bf Bloom filter of peers this PUT has already traversed
1215 * @param key key for the content
1216 * @param put_path_length number of entries in @a put_path
1217 * @param put_path peers this request has traversed so far (if tracked)
1218 * @param data payload to store
1219 * @param data_size number of bytes in @a data
1220 * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1221 */
1222int
1223GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1224 enum GNUNET_DHT_RouteOption options,
1225 uint32_t desired_replication_level,
1226 struct GNUNET_TIME_Absolute expiration_time,
1227 uint32_t hop_count,
1228 struct GNUNET_CONTAINER_BloomFilter *bf,
1229 const struct GNUNET_HashCode *key,
1230 unsigned int put_path_length,
1231 struct GNUNET_PeerIdentity *put_path,
1232 const void *data,
1233 size_t data_size)
1234{
1235 unsigned int target_count;
1236 unsigned int i;
1237 struct PeerInfo **targets;
1238 struct PeerInfo *target;
1239 size_t msize;
1240 struct GNUNET_MQ_Envelope *env;
1241 struct PeerPutMessage *ppm;
1242 struct GNUNET_PeerIdentity *pp;
1243 unsigned int skip_count;
1244
1245 GNUNET_assert (NULL != bf);
1246 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1247 "Adding myself (%s) to PUT bloomfilter for %s\n",
1248 GNUNET_i2s (&my_identity),
1249 GNUNET_h2s (key));
1250 GNUNET_CONTAINER_bloomfilter_add (bf,
1251 &my_identity_hash);
1252 GNUNET_STATISTICS_update (GDS_stats,
1253 gettext_noop ("# PUT requests routed"),
1254 1,
1255 GNUNET_NO);
1256 target_count
1257 = get_target_peers (key,
1258 bf,
1259 hop_count,
1260 desired_replication_level,
1261 &targets);
1262 if (0 == target_count)
1263 {
1264 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1265 "Routing PUT for %s terminates after %u hops at %s\n",
1266 GNUNET_h2s (key),
1267 (unsigned int) hop_count,
1268 GNUNET_i2s (&my_identity));
1269 return GNUNET_NO;
1270 }
1271 msize = put_path_length * sizeof(struct GNUNET_PeerIdentity) + data_size;
1272 if (msize + sizeof(struct PeerPutMessage)
1273 >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1274 {
1275 put_path_length = 0;
1276 msize = data_size;
1277 }
1278 if (msize + sizeof(struct PeerPutMessage)
1279 >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1280 {
1281 GNUNET_break (0);
1282 GNUNET_free (targets);
1283 return GNUNET_NO;
1284 }
1285 GNUNET_STATISTICS_update (GDS_stats,
1286 gettext_noop (
1287 "# PUT messages queued for transmission"),
1288 target_count,
1289 GNUNET_NO);
1290 skip_count = 0;
1291 for (i = 0; i < target_count; i++)
1292 {
1293 target = targets[i];
1294 if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1295 {
1296 /* skip */
1297 GNUNET_STATISTICS_update (GDS_stats,
1298 gettext_noop (
1299 "# P2P messages dropped due to full queue"),
1300 1,
1301 GNUNET_NO);
1302 skip_count++;
1303 continue;
1304 }
1305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306 "Routing PUT for %s after %u hops to %s\n",
1307 GNUNET_h2s (key),
1308 (unsigned int) hop_count,
1309 GNUNET_i2s (target->id));
1310 env = GNUNET_MQ_msg_extra (ppm,
1311 msize,
1312 GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1313 ppm->options = htonl (options);
1314 ppm->type = htonl (type);
1315 ppm->hop_count = htonl (hop_count + 1);
1316 ppm->desired_replication_level = htonl (desired_replication_level);
1317 ppm->put_path_length = htonl (put_path_length);
1318 ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1319 GNUNET_break (GNUNET_YES ==
1320 GNUNET_CONTAINER_bloomfilter_test (bf,
1321 &target->phash));
1322 GNUNET_assert (GNUNET_OK ==
1323 GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1324 ppm->bloomfilter,
1325 DHT_BLOOM_SIZE));
1326 ppm->key = *key;
1327 pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1328 GNUNET_memcpy (pp,
1329 put_path,
1330 sizeof(struct GNUNET_PeerIdentity) * put_path_length);
1331 GNUNET_memcpy (&pp[put_path_length],
1332 data,
1333 data_size);
1334 GNUNET_MQ_send (target->mq,
1335 env);
1336 }
1337 GNUNET_free (targets);
1338 return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1339}
1340
1341
1342/**
1343 * Perform a GET operation. Forwards the given request to other
1344 * peers. Does not lookup the key locally. May do nothing if this is
1345 * the only peer in the network (or if we are the closest peer in the
1346 * network).
1347 *
1348 * @param type type of the block
1349 * @param options routing options
1350 * @param desired_replication_level desired replication count
1351 * @param hop_count how many hops did this request traverse so far?
1352 * @param key key for the content
1353 * @param xquery extended query
1354 * @param xquery_size number of bytes in @a xquery
1355 * @param bg group to use for filtering replies
1356 * @param peer_bf filter for peers not to select (again)
1357 * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1358 */
1359int
1360GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1361 enum GNUNET_DHT_RouteOption options,
1362 uint32_t desired_replication_level,
1363 uint32_t hop_count,
1364 const struct GNUNET_HashCode *key,
1365 const void *xquery,
1366 size_t xquery_size,
1367 struct GNUNET_BLOCK_Group *bg,
1368 struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1369{
1370 unsigned int target_count;
1371 struct PeerInfo **targets;
1372 struct PeerInfo *target;
1373 struct GNUNET_MQ_Envelope *env;
1374 size_t msize;
1375 struct PeerGetMessage *pgm;
1376 char *xq;
1377 size_t reply_bf_size;
1378 void *reply_bf;
1379 unsigned int skip_count;
1380 uint32_t bf_nonce;
1381
1382 GNUNET_assert (NULL != peer_bf);
1383 GNUNET_STATISTICS_update (GDS_stats,
1384 gettext_noop ("# GET requests routed"),
1385 1,
1386 GNUNET_NO);
1387 target_count = get_target_peers (key,
1388 peer_bf,
1389 hop_count,
1390 desired_replication_level,
1391 &targets);
1392 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393 "Adding myself (%s) to GET bloomfilter for %s\n",
1394 GNUNET_i2s (&my_identity),
1395 GNUNET_h2s (key));
1396 GNUNET_CONTAINER_bloomfilter_add (peer_bf,
1397 &my_identity_hash);
1398 if (0 == target_count)
1399 {
1400 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1401 "Routing GET for %s terminates after %u hops at %s\n",
1402 GNUNET_h2s (key),
1403 (unsigned int) hop_count,
1404 GNUNET_i2s (&my_identity));
1405 return GNUNET_NO;
1406 }
1407 if (GNUNET_OK !=
1408 GNUNET_BLOCK_group_serialize (bg,
1409 &bf_nonce,
1410 &reply_bf,
1411 &reply_bf_size))
1412 {
1413 reply_bf = NULL;
1414 reply_bf_size = 0;
1415 bf_nonce = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1416 UINT32_MAX);
1417 }
1418 msize = xquery_size + reply_bf_size;
1419 if (msize + sizeof(struct PeerGetMessage) >= GNUNET_MAX_MESSAGE_SIZE)
1420 {
1421 GNUNET_break (0);
1422 GNUNET_free (reply_bf);
1423 GNUNET_free (targets);
1424 return GNUNET_NO;
1425 }
1426 GNUNET_STATISTICS_update (GDS_stats,
1427 gettext_noop (
1428 "# GET messages queued for transmission"),
1429 target_count,
1430 GNUNET_NO);
1431 /* forward request */
1432 skip_count = 0;
1433 for (unsigned int i = 0; i < target_count; i++)
1434 {
1435 target = targets[i];
1436 if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1437 {
1438 /* skip */
1439 GNUNET_STATISTICS_update (GDS_stats,
1440 gettext_noop (
1441 "# P2P messages dropped due to full queue"),
1442 1, GNUNET_NO);
1443 skip_count++;
1444 continue;
1445 }
1446 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1447 "Routing GET for %s after %u hops to %s\n",
1448 GNUNET_h2s (key),
1449 (unsigned int) hop_count,
1450 GNUNET_i2s (target->id));
1451 env = GNUNET_MQ_msg_extra (pgm,
1452 msize,
1453 GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1454 pgm->options = htonl (options);
1455 pgm->type = htonl (type);
1456 pgm->hop_count = htonl (hop_count + 1);
1457 pgm->desired_replication_level = htonl (desired_replication_level);
1458 pgm->xquery_size = htonl (xquery_size);
1459 pgm->bf_mutator = bf_nonce;
1460 GNUNET_break (GNUNET_YES ==
1461 GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1462 &target->phash));
1463 GNUNET_assert (GNUNET_OK ==
1464 GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1465 pgm->bloomfilter,
1466 DHT_BLOOM_SIZE));
1467 pgm->key = *key;
1468 xq = (char *) &pgm[1];
1469 GNUNET_memcpy (xq,
1470 xquery,
1471 xquery_size);
1472 GNUNET_memcpy (&xq[xquery_size],
1473 reply_bf,
1474 reply_bf_size);
1475 GNUNET_MQ_send (target->mq,
1476 env);
1477 }
1478 GNUNET_free (targets);
1479 GNUNET_free (reply_bf);
1480 return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1481}
1482
1483
1484/**
1485 * Handle a reply (route to origin). Only forwards the reply back to
1486 * the given peer. Does not do local caching or forwarding to local
1487 * clients.
1488 *
1489 * @param target neighbour that should receive the block (if still connected)
1490 * @param type type of the block
1491 * @param expiration_time when does the content expire
1492 * @param key key for the content
1493 * @param put_path_length number of entries in @a put_path
1494 * @param put_path peers the original PUT traversed (if tracked)
1495 * @param get_path_length number of entries in @a get_path
1496 * @param get_path peers this reply has traversed so far (if tracked)
1497 * @param data payload of the reply
1498 * @param data_size number of bytes in @a data
1499 */
1500void
1501GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1502 enum GNUNET_BLOCK_Type type,
1503 struct GNUNET_TIME_Absolute expiration_time,
1504 const struct GNUNET_HashCode *key,
1505 unsigned int put_path_length,
1506 const struct GNUNET_PeerIdentity *put_path,
1507 unsigned int get_path_length,
1508 const struct GNUNET_PeerIdentity *get_path,
1509 const void *data,
1510 size_t data_size)
1511{
1512 struct PeerInfo *pi;
1513 struct GNUNET_MQ_Envelope *env;
1514 size_t msize;
1515 struct PeerResultMessage *prm;
1516 struct GNUNET_PeerIdentity *paths;
1517
1518 msize = data_size + (get_path_length + put_path_length)
1519 * sizeof(struct GNUNET_PeerIdentity);
1520 if ((msize + sizeof(struct PeerResultMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
1521 (get_path_length >
1522 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
1523 (put_path_length >
1524 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
1525 (data_size > GNUNET_MAX_MESSAGE_SIZE))
1526 {
1527 GNUNET_break (0);
1528 return;
1529 }
1530 pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1531 target);
1532 if (NULL == pi)
1533 {
1534 /* peer disconnected in the meantime, drop reply */
1535 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536 "No matching peer for reply for key %s\n",
1537 GNUNET_h2s (key));
1538 return;
1539 }
1540 if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER)
1541 {
1542 /* skip */
1543 GNUNET_STATISTICS_update (GDS_stats,
1544 gettext_noop (
1545 "# P2P messages dropped due to full queue"),
1546 1,
1547 GNUNET_NO);
1548 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1549 "Peer queue full, ignoring reply for key %s\n",
1550 GNUNET_h2s (key));
1551 return;
1552 }
1553
1554 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1555 "Forwarding reply for key %s to peer %s\n",
1556 GNUNET_h2s (key),
1557 GNUNET_i2s (target));
1558 GNUNET_STATISTICS_update (GDS_stats,
1559 gettext_noop
1560 ("# RESULT messages queued for transmission"), 1,
1561 GNUNET_NO);
1562 env = GNUNET_MQ_msg_extra (prm,
1563 msize,
1564 GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1565 prm->type = htonl (type);
1566 prm->put_path_length = htonl (put_path_length);
1567 prm->get_path_length = htonl (get_path_length);
1568 prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1569 prm->key = *key;
1570 paths = (struct GNUNET_PeerIdentity *) &prm[1];
1571 GNUNET_memcpy (paths,
1572 put_path,
1573 put_path_length * sizeof(struct GNUNET_PeerIdentity));
1574 GNUNET_memcpy (&paths[put_path_length],
1575 get_path,
1576 get_path_length * sizeof(struct GNUNET_PeerIdentity));
1577 GNUNET_memcpy (&paths[put_path_length + get_path_length],
1578 data,
1579 data_size);
1580 GNUNET_MQ_send (pi->mq,
1581 env);
1582}
1583
1584
1585/**
1586 * To be called on core init/fail.
1587 *
1588 * @param cls service closure
1589 * @param identity the public identity of this peer
1590 */
1591static void
1592core_init (void *cls,
1593 const struct GNUNET_PeerIdentity *identity)
1594{
1595 (void) cls;
1596 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1597 "CORE called, I am %s\n",
1598 GNUNET_i2s (identity));
1599 my_identity = *identity;
1600 GNUNET_CRYPTO_hash (identity,
1601 sizeof(struct GNUNET_PeerIdentity),
1602 &my_identity_hash);
1603 GNUNET_SERVICE_resume (GDS_service);
1604}
1605
1606
1607/**
1608 * Check validity of a p2p put request.
1609 *
1610 * @param cls closure with the `struct PeerInfo` of the sender
1611 * @param message message
1612 * @return #GNUNET_OK if the message is valid
1613 */
1614static int
1615check_dht_p2p_put (void *cls,
1616 const struct PeerPutMessage *put)
1617{
1618 uint32_t putlen;
1619 uint16_t msize;
1620
1621 (void) cls;
1622 msize = ntohs (put->header.size);
1623 putlen = ntohl (put->put_path_length);
1624 if ((msize <
1625 sizeof(struct PeerPutMessage)
1626 + putlen * sizeof(struct GNUNET_PeerIdentity)) ||
1627 (putlen >
1628 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)))
1629 {
1630 GNUNET_break_op (0);
1631 return GNUNET_SYSERR;
1632 }
1633 return GNUNET_OK;
1634}
1635
1636
1637/**
1638 * Core handler for p2p put requests.
1639 *
1640 * @param cls closure with the `struct PeerInfo` of the sender
1641 * @param message message
1642 */
1643static void
1644handle_dht_p2p_put (void *cls,
1645 const struct PeerPutMessage *put)
1646{
1647 struct PeerInfo *peer = cls;
1648 const struct GNUNET_PeerIdentity *put_path;
1649 const void *payload;
1650 uint32_t putlen;
1651 uint16_t msize;
1652 size_t payload_size;
1653 enum GNUNET_DHT_RouteOption options;
1654 struct GNUNET_CONTAINER_BloomFilter *bf;
1655 struct GNUNET_HashCode test_key;
1656 int forwarded;
1657 struct GNUNET_TIME_Absolute exp_time;
1658
1659 exp_time = GNUNET_TIME_absolute_ntoh (put->expiration_time);
1660 if (0 == GNUNET_TIME_absolute_get_remaining (exp_time).rel_value_us)
1661 {
1662 GNUNET_STATISTICS_update (GDS_stats,
1663 gettext_noop ("# Expired PUTs discarded"),
1664 1,
1665 GNUNET_NO);
1666 return;
1667 }
1668 msize = ntohs (put->header.size);
1669 putlen = ntohl (put->put_path_length);
1670 GNUNET_STATISTICS_update (GDS_stats,
1671 gettext_noop ("# P2P PUT requests received"),
1672 1,
1673 GNUNET_NO);
1674 GNUNET_STATISTICS_update (GDS_stats,
1675 gettext_noop ("# P2P PUT bytes received"),
1676 msize,
1677 GNUNET_NO);
1678 put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1679 payload = &put_path[putlen];
1680 options = ntohl (put->options);
1681 payload_size = msize - (sizeof(struct PeerPutMessage)
1682 + putlen * sizeof(struct GNUNET_PeerIdentity));
1683
1684 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1685 "PUT for `%s' from %s\n",
1686 GNUNET_h2s (&put->key),
1687 GNUNET_i2s (peer->id));
1688 if (GNUNET_YES == log_route_details_stderr)
1689 {
1690 char *tmp;
1691 char *pp;
1692
1693 pp = GNUNET_STRINGS_pp2s (put_path,
1694 putlen);
1695 tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1696 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1697 "R5N PUT %s: %s->%s (%u, %u=>%u, PP: %s)\n",
1698 GNUNET_h2s (&put->key),
1699 GNUNET_i2s (peer->id),
1700 tmp,
1701 ntohl (put->hop_count),
1702 GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
1703 &put->key),
1704 GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
1705 &put->key),
1706 pp);
1707 GNUNET_free (pp);
1708 GNUNET_free (tmp);
1709 }
1710 switch (GNUNET_BLOCK_get_key
1711 (GDS_block_context,
1712 ntohl (put->type),
1713 payload,
1714 payload_size,
1715 &test_key))
1716 {
1717 case GNUNET_YES:
1718 if (0 != memcmp (&test_key,
1719 &put->key,
1720 sizeof(struct GNUNET_HashCode)))
1721 {
1722 char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1723
1724 GNUNET_break_op (0);
1725 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1726 "PUT with key `%s' for block with key %s\n",
1727 put_s,
1728 GNUNET_h2s_full (&test_key));
1729 GNUNET_free (put_s);
1730 return;
1731 }
1732 break;
1733
1734 case GNUNET_NO:
1735 GNUNET_break_op (0);
1736 return;
1737
1738 case GNUNET_SYSERR:
1739 /* cannot verify, good luck */
1740 break;
1741 }
1742 if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1743 {
1744 switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1745 ntohl (put->type),
1746 NULL, /* query group */
1747 GNUNET_BLOCK_EO_NONE,
1748 NULL, /* query */
1749 NULL, 0, /* xquery */
1750 payload,
1751 payload_size))
1752 {
1753 case GNUNET_BLOCK_EVALUATION_OK_MORE:
1754 case GNUNET_BLOCK_EVALUATION_OK_LAST:
1755 break;
1756
1757 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1758 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1759 case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1760 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1761 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1762 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1763 default:
1764 GNUNET_break_op (0);
1765 return;
1766 }
1767 }
1768
1769 bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1770 DHT_BLOOM_SIZE,
1771 GNUNET_CONSTANTS_BLOOMFILTER_K);
1772 GNUNET_break_op (GNUNET_YES ==
1773 GNUNET_CONTAINER_bloomfilter_test (bf,
1774 &peer->phash));
1775 {
1776 struct GNUNET_PeerIdentity pp[putlen + 1];
1777
1778 /* extend 'put path' by sender */
1779 if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1780 {
1781#if SANITY_CHECKS
1782 for (unsigned int i = 0; i <= putlen; i++)
1783 {
1784 for (unsigned int j = 0; j < i; j++)
1785 {
1786 GNUNET_break (0 != memcmp (&pp[i],
1787 &pp[j],
1788 sizeof(struct GNUNET_PeerIdentity)));
1789 }
1790 GNUNET_break (0 != memcmp (&pp[i],
1791 peer->id,
1792 sizeof(struct GNUNET_PeerIdentity)));
1793 }
1794#endif
1795 GNUNET_memcpy (pp,
1796 put_path,
1797 putlen * sizeof(struct GNUNET_PeerIdentity));
1798 pp[putlen] = *peer->id;
1799 putlen++;
1800 }
1801 else
1802 putlen = 0;
1803
1804 /* give to local clients */
1805 GDS_CLIENTS_handle_reply (exp_time,
1806 &put->key,
1807 0,
1808 NULL,
1809 putlen,
1810 pp,
1811 ntohl (put->type),
1812 payload_size,
1813 payload);
1814 /* store locally */
1815 if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1816 (GDS_am_closest_peer (&put->key, bf)))
1817 GDS_DATACACHE_handle_put (exp_time,
1818 &put->key,
1819 putlen,
1820 pp,
1821 ntohl (put->type),
1822 payload_size,
1823 payload);
1824 /* route to other peers */
1825 forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1826 options,
1827 ntohl (
1828 put->desired_replication_level),
1829 exp_time,
1830 ntohl (put->hop_count),
1831 bf,
1832 &put->key,
1833 putlen,
1834 pp,
1835 payload,
1836 payload_size);
1837 /* notify monitoring clients */
1838 GDS_CLIENTS_process_put (options
1839 | ((GNUNET_OK == forwarded)
1840 ? GNUNET_DHT_RO_LAST_HOP
1841 : 0),
1842 ntohl (put->type),
1843 ntohl (put->hop_count),
1844 ntohl (put->desired_replication_level),
1845 putlen, pp,
1846 exp_time,
1847 &put->key,
1848 payload,
1849 payload_size);
1850 }
1851 GNUNET_CONTAINER_bloomfilter_free (bf);
1852}
1853
1854
1855/**
1856 * We have received a FIND PEER request. Send matching
1857 * HELLOs back.
1858 *
1859 * @param sender sender of the FIND PEER request
1860 * @param key peers close to this key are desired
1861 * @param bg group for filtering peers
1862 */
1863static void
1864handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1865 const struct GNUNET_HashCode *key,
1866 struct GNUNET_BLOCK_Group *bg)
1867{
1868 int bucket_idx;
1869 struct PeerBucket *bucket;
1870 struct PeerInfo *peer;
1871 unsigned int choice;
1872 const struct GNUNET_HELLO_Message *hello;
1873 size_t hello_size;
1874
1875 /* first, check about our own HELLO */
1876 if (NULL != GDS_my_hello)
1877 {
1878 hello_size = GNUNET_HELLO_size ((const struct
1879 GNUNET_HELLO_Message *) GDS_my_hello);
1880 GNUNET_break (hello_size >= sizeof(struct GNUNET_MessageHeader));
1881 if (GNUNET_BLOCK_EVALUATION_OK_MORE ==
1882 GNUNET_BLOCK_evaluate (GDS_block_context,
1883 GNUNET_BLOCK_TYPE_DHT_HELLO,
1884 bg,
1885 GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO,
1886 &my_identity_hash,
1887 NULL, 0,
1888 GDS_my_hello,
1889 hello_size))
1890 {
1891 GDS_NEIGHBOURS_handle_reply (sender,
1892 GNUNET_BLOCK_TYPE_DHT_HELLO,
1893 GNUNET_TIME_relative_to_absolute (
1894 hello_expiration),
1895 key,
1896 0,
1897 NULL,
1898 0,
1899 NULL,
1900 GDS_my_hello,
1901 hello_size);
1902 }
1903 else
1904 {
1905 GNUNET_STATISTICS_update (GDS_stats,
1906 gettext_noop (
1907 "# FIND PEER requests ignored due to Bloomfilter"),
1908 1,
1909 GNUNET_NO);
1910 }
1911 }
1912 else
1913 {
1914 GNUNET_STATISTICS_update (GDS_stats,
1915 gettext_noop (
1916 "# FIND PEER requests ignored due to lack of HELLO"),
1917 1,
1918 GNUNET_NO);
1919 }
1920
1921 /* then, also consider sending a random HELLO from the closest bucket */
1922 if (0 == memcmp (&my_identity_hash,
1923 key,
1924 sizeof(struct GNUNET_HashCode)))
1925 bucket_idx = closest_bucket;
1926 else
1927 bucket_idx = GNUNET_MIN ((int) closest_bucket,
1928 find_bucket (key));
1929 if (bucket_idx < 0)
1930 return;
1931 bucket = &k_buckets[bucket_idx];
1932 if (bucket->peers_size == 0)
1933 return;
1934 choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1935 bucket->peers_size);
1936 peer = bucket->head;
1937 while (choice > 0)
1938 {
1939 GNUNET_assert (NULL != peer);
1940 peer = peer->next;
1941 choice--;
1942 }
1943 choice = bucket->peers_size;
1944 do
1945 {
1946 peer = peer->next;
1947 if (0 == choice--)
1948 return; /* no non-masked peer available */
1949 if (NULL == peer)
1950 peer = bucket->head;
1951 hello = GDS_HELLO_get (peer->id);
1952 }
1953 while ((NULL == hello) ||
1954 (GNUNET_BLOCK_EVALUATION_OK_MORE !=
1955 GNUNET_BLOCK_evaluate (GDS_block_context,
1956 GNUNET_BLOCK_TYPE_DHT_HELLO,
1957 bg,
1958 GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO,
1959 &peer->phash,
1960 NULL, 0,
1961 hello,
1962 (hello_size = GNUNET_HELLO_size (hello)))));
1963 GDS_NEIGHBOURS_handle_reply (sender,
1964 GNUNET_BLOCK_TYPE_DHT_HELLO,
1965 GNUNET_TIME_relative_to_absolute
1966 (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1967 key,
1968 0,
1969 NULL,
1970 0,
1971 NULL,
1972 hello,
1973 hello_size);
1974}
1975
1976
1977/**
1978 * Handle a result from local datacache for a GET operation.
1979 *
1980 * @param cls the `struct PeerInfo` for which this is a reply
1981 * @param type type of the block
1982 * @param expiration_time when does the content expire
1983 * @param key key for the content
1984 * @param put_path_length number of entries in @a put_path
1985 * @param put_path peers the original PUT traversed (if tracked)
1986 * @param get_path_length number of entries in @a get_path
1987 * @param get_path peers this reply has traversed so far (if tracked)
1988 * @param data payload of the reply
1989 * @param data_size number of bytes in @a data
1990 */
1991static void
1992handle_local_result (void *cls,
1993 enum GNUNET_BLOCK_Type type,
1994 struct GNUNET_TIME_Absolute expiration_time,
1995 const struct GNUNET_HashCode *key,
1996 unsigned int put_path_length,
1997 const struct GNUNET_PeerIdentity *put_path,
1998 unsigned int get_path_length,
1999 const struct GNUNET_PeerIdentity *get_path,
2000 const void *data,
2001 size_t data_size)
2002{
2003 struct PeerInfo *peer = cls;
2004 char *pp;
2005
2006 pp = GNUNET_STRINGS_pp2s (put_path,
2007 put_path_length);
2008 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2009 "Found local result for %s (PP: %s)\n",
2010 GNUNET_h2s (key),
2011 pp);
2012 GNUNET_free (pp);
2013 GDS_NEIGHBOURS_handle_reply (peer->id,
2014 type,
2015 expiration_time,
2016 key,
2017 put_path_length, put_path,
2018 get_path_length, get_path,
2019 data, data_size);
2020}
2021
2022
2023/**
2024 * Check validity of p2p get request.
2025 *
2026 * @param cls closure with the `struct PeerInfo` of the sender
2027 * @param get the message
2028 * @return #GNUNET_OK if the message is well-formed
2029 */
2030static int
2031check_dht_p2p_get (void *cls,
2032 const struct PeerGetMessage *get)
2033{
2034 uint32_t xquery_size;
2035 uint16_t msize;
2036
2037 (void) cls;
2038 msize = ntohs (get->header.size);
2039 xquery_size = ntohl (get->xquery_size);
2040 if (msize < sizeof(struct PeerGetMessage) + xquery_size)
2041 {
2042 GNUNET_break_op (0);
2043 return GNUNET_SYSERR;
2044 }
2045 return GNUNET_OK;
2046}
2047
2048
2049/**
2050 * Core handler for p2p get requests.
2051 *
2052 * @param cls closure with the `struct PeerInfo` of the sender
2053 * @param get the message
2054 */
2055static void
2056handle_dht_p2p_get (void *cls,
2057 const struct PeerGetMessage *get)
2058{
2059 struct PeerInfo *peer = cls;
2060 uint32_t xquery_size;
2061 size_t reply_bf_size;
2062 uint16_t msize;
2063 enum GNUNET_BLOCK_Type type;
2064 enum GNUNET_DHT_RouteOption options;
2065 enum GNUNET_BLOCK_EvaluationResult eval;
2066 struct GNUNET_BLOCK_Group *bg;
2067 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
2068 const char *xquery;
2069 int forwarded;
2070
2071 /* parse and validate message */
2072 msize = ntohs (get->header.size);
2073 xquery_size = ntohl (get->xquery_size);
2074 reply_bf_size = msize - (sizeof(struct PeerGetMessage) + xquery_size);
2075 type = ntohl (get->type);
2076 options = ntohl (get->options);
2077 xquery = (const char *) &get[1];
2078 GNUNET_STATISTICS_update (GDS_stats,
2079 gettext_noop ("# P2P GET requests received"),
2080 1,
2081 GNUNET_NO);
2082 GNUNET_STATISTICS_update (GDS_stats,
2083 gettext_noop ("# P2P GET bytes received"),
2084 msize,
2085 GNUNET_NO);
2086 if (GNUNET_YES == log_route_details_stderr)
2087 {
2088 char *tmp;
2089
2090 tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2091 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2092 "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2093 GNUNET_h2s (&get->key),
2094 GNUNET_i2s (peer->id),
2095 tmp,
2096 ntohl (get->hop_count),
2097 GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
2098 &get->key),
2099 GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
2100 &get->key),
2101 ntohl (get->xquery_size),
2102 xquery);
2103 GNUNET_free (tmp);
2104 }
2105 eval
2106 = GNUNET_BLOCK_evaluate (GDS_block_context,
2107 type,
2108 NULL,
2109 GNUNET_BLOCK_EO_NONE,
2110 &get->key,
2111 xquery,
2112 xquery_size,
2113 NULL,
2114 0);
2115 if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2116 {
2117 /* request invalid or block type not supported */
2118 GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2119 return;
2120 }
2121 peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter,
2122 DHT_BLOOM_SIZE,
2123 GNUNET_CONSTANTS_BLOOMFILTER_K);
2124 GNUNET_break_op (GNUNET_YES ==
2125 GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2126 &peer->phash));
2127 bg = GNUNET_BLOCK_group_create (GDS_block_context,
2128 type,
2129 get->bf_mutator,
2130 &xquery[xquery_size],
2131 reply_bf_size,
2132 "filter-size",
2133 reply_bf_size,
2134 NULL);
2135 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2136 "GET for %s at %s after %u hops\n",
2137 GNUNET_h2s (&get->key),
2138 GNUNET_i2s (&my_identity),
2139 (unsigned int) ntohl (get->hop_count));
2140 /* local lookup (this may update the reply_bf) */
2141 if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2142 (GDS_am_closest_peer (&get->key,
2143 peer_bf)))
2144 {
2145 if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2146 {
2147 GNUNET_STATISTICS_update (GDS_stats,
2148 gettext_noop (
2149 "# P2P FIND PEER requests processed"),
2150 1,
2151 GNUNET_NO);
2152 handle_find_peer (peer->id,
2153 &get->key,
2154 bg);
2155 }
2156 else
2157 {
2158 eval = GDS_DATACACHE_handle_get (&get->key,
2159 type,
2160 xquery,
2161 xquery_size,
2162 bg,
2163 &handle_local_result,
2164 peer);
2165 }
2166 }
2167 else
2168 {
2169 GNUNET_STATISTICS_update (GDS_stats,
2170 gettext_noop ("# P2P GET requests ONLY routed"),
2171 1,
2172 GNUNET_NO);
2173 }
2174
2175 /* remember request for routing replies */
2176 GDS_ROUTING_add (peer->id,
2177 type,
2178 bg, /* bg now owned by routing, but valid at least until end of this function! */
2179 options,
2180 &get->key,
2181 xquery,
2182 xquery_size);
2183
2184 /* P2P forwarding */
2185 forwarded = GNUNET_NO;
2186 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2187 forwarded = GDS_NEIGHBOURS_handle_get (type,
2188 options,
2189 ntohl (
2190 get->desired_replication_level),
2191 ntohl (get->hop_count),
2192 &get->key,
2193 xquery,
2194 xquery_size,
2195 bg,
2196 peer_bf);
2197 GDS_CLIENTS_process_get (options
2198 | ((GNUNET_OK == forwarded)
2199 ? GNUNET_DHT_RO_LAST_HOP : 0),
2200 type,
2201 ntohl (get->hop_count),
2202 ntohl (get->desired_replication_level),
2203 0,
2204 NULL,
2205 &get->key);
2206
2207 /* clean up; note that 'bg' is owned by routing now! */
2208 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2209}
2210
2211
2212/**
2213 * Check validity of p2p result message.
2214 *
2215 * @param cls closure
2216 * @param message message
2217 * @return #GNUNET_YES if the message is well-formed
2218 */
2219static int
2220check_dht_p2p_result (void *cls,
2221 const struct PeerResultMessage *prm)
2222{
2223 uint32_t get_path_length;
2224 uint32_t put_path_length;
2225 uint16_t msize;
2226
2227 (void) cls;
2228 msize = ntohs (prm->header.size);
2229 put_path_length = ntohl (prm->put_path_length);
2230 get_path_length = ntohl (prm->get_path_length);
2231 if ((msize <
2232 sizeof(struct PeerResultMessage) + (get_path_length
2233 + put_path_length)
2234 * sizeof(struct GNUNET_PeerIdentity)) ||
2235 (get_path_length >
2236 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
2237 (put_path_length >
2238 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)))
2239 {
2240 GNUNET_break_op (0);
2241 return GNUNET_SYSERR;
2242 }
2243 return GNUNET_OK;
2244}
2245
2246
2247/**
2248 * Process a reply, after the @a get_path has been updated.
2249 *
2250 * @param expiration_time when does the reply expire
2251 * @param key key matching the query
2252 * @param get_path_length number of entries in @a get_path
2253 * @param get_path path the reply has taken
2254 * @param put_path_length number of entries in @a put_path
2255 * @param put_path path the PUT has taken
2256 * @param type type of the block
2257 * @param data_size number of bytes in @a data
2258 * @param data payload of the reply
2259 */
2260static void
2261process_reply_with_path (struct GNUNET_TIME_Absolute expiration_time,
2262 const struct GNUNET_HashCode *key,
2263 unsigned int get_path_length,
2264 const struct GNUNET_PeerIdentity *get_path,
2265 unsigned int put_path_length,
2266 const struct GNUNET_PeerIdentity *put_path,
2267 enum GNUNET_BLOCK_Type type,
2268 size_t data_size,
2269 const void *data)
2270{
2271 /* forward to local clients */
2272 GDS_CLIENTS_handle_reply (expiration_time,
2273 key,
2274 get_path_length,
2275 get_path,
2276 put_path_length,
2277 put_path,
2278 type,
2279 data_size,
2280 data);
2281 GDS_CLIENTS_process_get_resp (type,
2282 get_path,
2283 get_path_length,
2284 put_path,
2285 put_path_length,
2286 expiration_time,
2287 key,
2288 data,
2289 data_size);
2290 if (GNUNET_YES == cache_results)
2291 {
2292 struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2293
2294 GNUNET_memcpy (xput_path,
2295 put_path,
2296 put_path_length * sizeof(struct GNUNET_PeerIdentity));
2297 GNUNET_memcpy (&xput_path[put_path_length],
2298 get_path,
2299 get_path_length * sizeof(struct GNUNET_PeerIdentity));
2300
2301 GDS_DATACACHE_handle_put (expiration_time,
2302 key,
2303 get_path_length + put_path_length,
2304 xput_path,
2305 type,
2306 data_size,
2307 data);
2308 }
2309 /* forward to other peers */
2310 GDS_ROUTING_process (type,
2311 expiration_time,
2312 key,
2313 put_path_length,
2314 put_path,
2315 get_path_length,
2316 get_path,
2317 data,
2318 data_size);
2319}
2320
2321
2322/**
2323 * Core handler for p2p result messages.
2324 *
2325 * @param cls closure
2326 * @param message message
2327 */
2328static void
2329handle_dht_p2p_result (void *cls,
2330 const struct PeerResultMessage *prm)
2331{
2332 struct PeerInfo *peer = cls;
2333 const struct GNUNET_PeerIdentity *put_path;
2334 const struct GNUNET_PeerIdentity *get_path;
2335 const void *data;
2336 uint32_t get_path_length;
2337 uint32_t put_path_length;
2338 uint16_t msize;
2339 size_t data_size;
2340 enum GNUNET_BLOCK_Type type;
2341 struct GNUNET_TIME_Absolute exp_time;
2342
2343 /* parse and validate message */
2344 exp_time = GNUNET_TIME_absolute_ntoh (prm->expiration_time);
2345 if (0 == GNUNET_TIME_absolute_get_remaining (exp_time).rel_value_us)
2346 {
2347 GNUNET_STATISTICS_update (GDS_stats,
2348 gettext_noop ("# Expired results discarded"),
2349 1,
2350 GNUNET_NO);
2351 return;
2352 }
2353 msize = ntohs (prm->header.size);
2354 put_path_length = ntohl (prm->put_path_length);
2355 get_path_length = ntohl (prm->get_path_length);
2356 put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2357 get_path = &put_path[put_path_length];
2358 type = ntohl (prm->type);
2359 data = (const void *) &get_path[get_path_length];
2360 data_size = msize - (sizeof(struct PeerResultMessage)
2361 + (get_path_length
2362 + put_path_length) * sizeof(struct
2363 GNUNET_PeerIdentity));
2364 GNUNET_STATISTICS_update (GDS_stats,
2365 gettext_noop ("# P2P RESULTS received"),
2366 1,
2367 GNUNET_NO);
2368 GNUNET_STATISTICS_update (GDS_stats,
2369 gettext_noop ("# P2P RESULT bytes received"),
2370 msize,
2371 GNUNET_NO);
2372 if (GNUNET_YES == log_route_details_stderr)
2373 {
2374 char *tmp;
2375 char *pp;
2376 char *gp;
2377
2378 gp = GNUNET_STRINGS_pp2s (get_path,
2379 get_path_length);
2380 pp = GNUNET_STRINGS_pp2s (put_path,
2381 put_path_length);
2382 tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2383 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2384 "R5N RESULT %s: %s->%s (GP: %s, PP: %s)\n",
2385 GNUNET_h2s (&prm->key),
2386 GNUNET_i2s (peer->id),
2387 tmp,
2388 gp,
2389 pp);
2390 GNUNET_free (gp);
2391 GNUNET_free (pp);
2392 GNUNET_free (tmp);
2393 }
2394 /* if we got a HELLO, consider it for our own routing table */
2395 if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2396 {
2397 const struct GNUNET_MessageHeader *h;
2398 struct GNUNET_PeerIdentity pid;
2399
2400 /* Should be a HELLO, validate and consider using it! */
2401 if (data_size < sizeof(struct GNUNET_HELLO_Message))
2402 {
2403 GNUNET_break_op (0);
2404 return;
2405 }
2406 h = data;
2407 if (data_size != ntohs (h->size))
2408 {
2409 GNUNET_break_op (0);
2410 return;
2411 }
2412 if (GNUNET_OK !=
2413 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2414 &pid))
2415 {
2416 GNUNET_break_op (0);
2417 return;
2418 }
2419 if ((GNUNET_YES != disable_try_connect) &&
2420 (0 != memcmp (&my_identity,
2421 &pid,
2422 sizeof(struct GNUNET_PeerIdentity))))
2423 try_connect (&pid,
2424 h);
2425 }
2426
2427 /* First, check if 'peer' is already on the path, and if
2428 so, truncate it instead of expanding. */
2429 for (unsigned int i = 0; i <= get_path_length; i++)
2430 if (0 == memcmp (&get_path[i],
2431 peer->id,
2432 sizeof(struct GNUNET_PeerIdentity)))
2433 {
2434 process_reply_with_path (exp_time,
2435 &prm->key,
2436 i,
2437 get_path,
2438 put_path_length,
2439 put_path,
2440 type,
2441 data_size,
2442 data);
2443 return;
2444 }
2445
2446 /* Need to append 'peer' to 'get_path' (normal case) */
2447 {
2448 struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2449
2450 GNUNET_memcpy (xget_path,
2451 get_path,
2452 get_path_length * sizeof(struct GNUNET_PeerIdentity));
2453 xget_path[get_path_length] = *peer->id;
2454
2455 process_reply_with_path (exp_time,
2456 &prm->key,
2457 get_path_length + 1,
2458 xget_path,
2459 put_path_length,
2460 put_path,
2461 type,
2462 data_size,
2463 data);
2464 }
2465}
2466
2467
2468/**
2469 * Initialize neighbours subsystem.
2470 *
2471 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2472 */
2473int
2474GDS_NEIGHBOURS_init ()
2475{
2476 struct GNUNET_MQ_MessageHandler core_handlers[] = {
2477 GNUNET_MQ_hd_var_size (dht_p2p_get,
2478 GNUNET_MESSAGE_TYPE_DHT_P2P_GET,
2479 struct PeerGetMessage,
2480 NULL),
2481 GNUNET_MQ_hd_var_size (dht_p2p_put,
2482 GNUNET_MESSAGE_TYPE_DHT_P2P_PUT,
2483 struct PeerPutMessage,
2484 NULL),
2485 GNUNET_MQ_hd_var_size (dht_p2p_result,
2486 GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT,
2487 struct PeerResultMessage,
2488 NULL),
2489 GNUNET_MQ_handler_end ()
2490 };
2491 unsigned long long temp_config_num;
2492
2493 disable_try_connect
2494 = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2495 "DHT",
2496 "DISABLE_TRY_CONNECT");
2497 if (GNUNET_OK ==
2498 GNUNET_CONFIGURATION_get_value_number (GDS_cfg,
2499 "DHT",
2500 "bucket_size",
2501 &temp_config_num))
2502 bucket_size = (unsigned int) temp_config_num;
2503 cache_results
2504 = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2505 "DHT",
2506 "CACHE_RESULTS");
2507
2508 log_route_details_stderr =
2509 (NULL != getenv ("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2510 ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2511 core_api = GNUNET_CORE_connect (GDS_cfg,
2512 NULL,
2513 &core_init,
2514 &handle_core_connect,
2515 &handle_core_disconnect,
2516 core_handlers);
2517 if (NULL == core_api)
2518 return GNUNET_SYSERR;
2519 all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2520 GNUNET_YES);
2521 all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2522 GNUNET_NO);
2523 return GNUNET_OK;
2524}
2525
2526
2527/**
2528 * Shutdown neighbours subsystem.
2529 */
2530void
2531GDS_NEIGHBOURS_done ()
2532{
2533 if (NULL == core_api)
2534 return;
2535 GNUNET_CORE_disconnect (core_api);
2536 core_api = NULL;
2537 GNUNET_assert (0 ==
2538 GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2539 GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2540 all_connected_peers = NULL;
2541 GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2542 &free_connect_info,
2543 NULL);
2544 GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2545 all_desired_peers = NULL;
2546 GNUNET_ATS_connectivity_done (ats_ch);
2547 ats_ch = NULL;
2548 GNUNET_assert (NULL == find_peer_task);
2549}
2550
2551
2552/**
2553 * Get the ID of the local node.
2554 *
2555 * @return identity of the local node
2556 */
2557struct GNUNET_PeerIdentity *
2558GDS_NEIGHBOURS_get_id ()
2559{
2560 return &my_identity;
2561}
2562
2563
2564/* end of gnunet-service-dht_neighbours.c */