aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht_neighbours.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht/gnunet-service-dht_neighbours.c')
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c2557
1 files changed, 0 insertions, 2557 deletions
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
deleted file mode 100644
index ca255310c..000000000
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ /dev/null
@@ -1,2557 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009-2017, 2021 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file dht/gnunet-service-dht_neighbours.c
23 * @brief GNUnet DHT service's bucket and neighbour management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
29#include "gnunet_block_lib.h"
30#include "gnunet_hello_lib.h"
31#include "gnunet_constants.h"
32#include "gnunet_protocols.h"
33#include "gnunet_nse_service.h"
34#include "gnunet_ats_service.h"
35#include "gnunet_core_service.h"
36#include "gnunet_datacache_lib.h"
37#include "gnunet_transport_service.h"
38#include "gnunet_hello_lib.h"
39#include "gnunet_dht_service.h"
40#include "gnunet_statistics_service.h"
41#include "gnunet-service-dht.h"
42#include "gnunet-service-dht_datacache.h"
43#include "gnunet-service-dht_hello.h"
44#include "gnunet-service-dht_neighbours.h"
45#include "gnunet-service-dht_nse.h"
46#include "gnunet-service-dht_routing.h"
47#include "dht.h"
48
49#define LOG_TRAFFIC(kind, ...) GNUNET_log_from (kind, "dht-traffic", \
50 __VA_ARGS__)
51
52/**
53 * Enable slow sanity checks to debug issues.
54 */
55#define SANITY_CHECKS 1
56
57/**
58 * How many buckets will we allow total.
59 */
60#define MAX_BUCKETS sizeof(struct GNUNET_HashCode) * 8
61
62/**
63 * What is the maximum number of peers in a given bucket.
64 */
65#define DEFAULT_BUCKET_SIZE 8
66
67/**
68 * Desired replication level for FIND PEER requests
69 */
70#define FIND_PEER_REPLICATION_LEVEL 4
71
72/**
73 * Maximum allowed replication level for all requests.
74 */
75#define MAXIMUM_REPLICATION_LEVEL 16
76
77/**
78 * Maximum allowed number of pending messages per peer.
79 */
80#define MAXIMUM_PENDING_PER_PEER 64
81
82/**
83 * How long at least to wait before sending another find peer request.
84 */
85#define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply ( \
86 GNUNET_TIME_UNIT_SECONDS, 30)
87
88/**
89 * How long at most to wait before sending another find peer request.
90 */
91#define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply ( \
92 GNUNET_TIME_UNIT_MINUTES, 10)
93
94/**
95 * How long at most to wait for transmission of a GET request to another peer?
96 */
97#define GET_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
98
99/**
100 * Hello address expiration
101 */
102extern struct GNUNET_TIME_Relative hello_expiration;
103
104
105GNUNET_NETWORK_STRUCT_BEGIN
106
107/**
108 * P2P PUT message
109 */
110struct PeerPutMessage
111{
112 /**
113 * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
114 */
115 struct GNUNET_MessageHeader header;
116
117 /**
118 * Processing options
119 */
120 uint32_t options GNUNET_PACKED;
121
122 /**
123 * Content type.
124 */
125 uint32_t type GNUNET_PACKED;
126
127 /**
128 * Hop count
129 */
130 uint32_t hop_count GNUNET_PACKED;
131
132 /**
133 * Replication level for this message
134 */
135 uint32_t desired_replication_level GNUNET_PACKED;
136
137 /**
138 * Length of the PUT path that follows (if tracked).
139 */
140 uint32_t put_path_length GNUNET_PACKED;
141
142 /**
143 * When does the content expire?
144 */
145 struct GNUNET_TIME_AbsoluteNBO expiration_time;
146
147 /**
148 * Bloomfilter (for peer identities) to stop circular routes
149 */
150 char bloomfilter[DHT_BLOOM_SIZE];
151
152 /**
153 * The key we are storing under.
154 */
155 struct GNUNET_HashCode key;
156
157 /* put path (if tracked) */
158
159 /* Payload */
160};
161
162
163/**
164 * P2P Result message
165 */
166struct PeerResultMessage
167{
168 /**
169 * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
170 */
171 struct GNUNET_MessageHeader header;
172
173 /**
174 * Content type.
175 */
176 uint32_t type GNUNET_PACKED;
177
178 /**
179 * Length of the PUT path that follows (if tracked).
180 */
181 uint32_t put_path_length GNUNET_PACKED;
182
183 /**
184 * Length of the GET path that follows (if tracked).
185 */
186 uint32_t get_path_length GNUNET_PACKED;
187
188 /**
189 * When does the content expire?
190 */
191 struct GNUNET_TIME_AbsoluteNBO expiration_time;
192
193 /**
194 * The key of the corresponding GET request.
195 */
196 struct GNUNET_HashCode key;
197
198 /* put path (if tracked) */
199
200 /* get path (if tracked) */
201
202 /* Payload */
203};
204
205
206/**
207 * P2P GET message
208 */
209struct PeerGetMessage
210{
211 /**
212 * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
213 */
214 struct GNUNET_MessageHeader header;
215
216 /**
217 * Processing options
218 */
219 uint32_t options GNUNET_PACKED;
220
221 /**
222 * Desired content type.
223 */
224 uint32_t type GNUNET_PACKED;
225
226 /**
227 * Hop count
228 */
229 uint32_t hop_count GNUNET_PACKED;
230
231 /**
232 * Desired replication level for this request.
233 */
234 uint32_t desired_replication_level GNUNET_PACKED;
235
236 /**
237 * Size of the extended query.
238 */
239 uint32_t xquery_size;
240
241 /**
242 * Bloomfilter mutator.
243 */
244 uint32_t bf_mutator;
245
246 /**
247 * Bloomfilter (for peer identities) to stop circular routes
248 */
249 char bloomfilter[DHT_BLOOM_SIZE];
250
251 /**
252 * The key we are looking for.
253 */
254 struct GNUNET_HashCode key;
255
256 /* xquery */
257
258 /* result bloomfilter */
259};
260GNUNET_NETWORK_STRUCT_END
261
262
263/**
264 * Entry for a peer in a bucket.
265 */
266struct PeerInfo
267{
268 /**
269 * Next peer entry (DLL)
270 */
271 struct PeerInfo *next;
272
273 /**
274 * Prev peer entry (DLL)
275 */
276 struct PeerInfo *prev;
277
278 /**
279 * Handle for sending messages to this peer.
280 */
281 struct GNUNET_MQ_Handle *mq;
282
283 /**
284 * What is the identity of the peer?
285 */
286 const struct GNUNET_PeerIdentity *id;
287
288 /**
289 * Hash of @e id.
290 */
291 struct GNUNET_HashCode phash;
292
293 /**
294 * Which bucket is this peer in?
295 */
296 int peer_bucket;
297};
298
299
300/**
301 * Peers are grouped into buckets.
302 */
303struct PeerBucket
304{
305 /**
306 * Head of DLL
307 */
308 struct PeerInfo *head;
309
310 /**
311 * Tail of DLL
312 */
313 struct PeerInfo *tail;
314
315 /**
316 * Number of peers in the bucket.
317 */
318 unsigned int peers_size;
319};
320
321
322/**
323 * Information about a peer that we would like to connect to.
324 */
325struct ConnectInfo
326{
327 /**
328 * Handle to active HELLO offer operation, or NULL.
329 */
330 struct GNUNET_TRANSPORT_OfferHelloHandle *oh;
331
332 /**
333 * Handle to active connectivity suggestion operation, or NULL.
334 */
335 struct GNUNET_ATS_ConnectivitySuggestHandle *sh;
336
337 /**
338 * How much would we like to connect to this peer?
339 */
340 uint32_t strength;
341};
342
343
344/**
345 * Do we cache all results that we are routing in the local datacache?
346 */
347static int cache_results;
348
349/**
350 * Should routing details be logged to stderr (for debugging)?
351 */
352static int log_route_details_stderr;
353
354/**
355 * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
356 */
357static unsigned int closest_bucket;
358
359/**
360 * How many peers have we added since we sent out our last
361 * find peer request?
362 */
363static unsigned int newly_found_peers;
364
365/**
366 * Option for testing that disables the 'connect' function of the DHT.
367 */
368static int disable_try_connect;
369
370/**
371 * The buckets. Array of size #MAX_BUCKETS. Offset 0 means 0 bits matching.
372 */
373static struct PeerBucket k_buckets[MAX_BUCKETS];
374
375/**
376 * Hash map of all CORE-connected peers, for easy removal from
377 * #k_buckets on disconnect. Values are of type `struct PeerInfo`.
378 */
379static struct GNUNET_CONTAINER_MultiPeerMap *all_connected_peers;
380
381/**
382 * Hash map of all peers we would like to be connected to.
383 * Values are of type `struct ConnectInfo`.
384 */
385static struct GNUNET_CONTAINER_MultiPeerMap *all_desired_peers;
386
387/**
388 * Maximum size for each bucket.
389 */
390static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
391
392/**
393 * Task that sends FIND PEER requests.
394 */
395static struct GNUNET_SCHEDULER_Task *find_peer_task;
396
397/**
398 * Identity of this peer.
399 */
400static struct GNUNET_PeerIdentity my_identity;
401
402/**
403 * Hash of the identity of this peer.
404 */
405struct GNUNET_HashCode my_identity_hash;
406
407/**
408 * Handle to CORE.
409 */
410static struct GNUNET_CORE_Handle *core_api;
411
412/**
413 * Handle to ATS connectivity.
414 */
415static struct GNUNET_ATS_ConnectivityHandle *ats_ch;
416
417
418/**
419 * Find the optimal bucket for this key.
420 *
421 * @param hc the hashcode to compare our identity to
422 * @return the proper bucket index, or #GNUNET_SYSERR
423 * on error (same hashcode)
424 */
425static int
426find_bucket (const struct GNUNET_HashCode *hc)
427{
428 unsigned int bits;
429
430 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, hc);
431 if (bits == MAX_BUCKETS)
432 {
433 /* How can all bits match? Got my own ID? */
434 GNUNET_break (0);
435 return GNUNET_SYSERR;
436 }
437 return MAX_BUCKETS - bits - 1;
438}
439
440
441/**
442 * Function called when #GNUNET_TRANSPORT_offer_hello() is done.
443 * Clean up the "oh" field in the @a cls
444 *
445 * @param cls a `struct ConnectInfo`
446 */
447static void
448offer_hello_done (void *cls)
449{
450 struct ConnectInfo *ci = cls;
451
452 ci->oh = NULL;
453}
454
455
456/**
457 * Function called for all entries in #all_desired_peers to clean up.
458 *
459 * @param cls NULL
460 * @param peer peer the entry is for
461 * @param value the value to remove
462 * @return #GNUNET_YES
463 */
464static int
465free_connect_info (void *cls,
466 const struct GNUNET_PeerIdentity *peer,
467 void *value)
468{
469 struct ConnectInfo *ci = value;
470
471 (void) cls;
472 GNUNET_assert (GNUNET_YES ==
473 GNUNET_CONTAINER_multipeermap_remove (all_desired_peers,
474 peer,
475 ci));
476 if (NULL != ci->sh)
477 {
478 GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
479 ci->sh = NULL;
480 }
481 if (NULL != ci->oh)
482 {
483 GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
484 ci->oh = NULL;
485 }
486 GNUNET_free (ci);
487 return GNUNET_YES;
488}
489
490
491/**
492 * Consider if we want to connect to a given peer, and if so
493 * let ATS know. If applicable, the HELLO is offered to the
494 * TRANSPORT service.
495 *
496 * @param pid peer to consider connectivity requirements for
497 * @param h a HELLO message, or NULL
498 */
499static void
500try_connect (const struct GNUNET_PeerIdentity *pid,
501 const struct GNUNET_MessageHeader *h)
502{
503 int bucket;
504 struct GNUNET_HashCode pid_hash;
505 struct ConnectInfo *ci;
506 uint32_t strength;
507
508 GNUNET_CRYPTO_hash (pid,
509 sizeof(struct GNUNET_PeerIdentity),
510 &pid_hash);
511 bucket = find_bucket (&pid_hash);
512 if (bucket < 0)
513 return; /* self? */
514 ci = GNUNET_CONTAINER_multipeermap_get (all_desired_peers,
515 pid);
516
517 if (k_buckets[bucket].peers_size < bucket_size)
518 strength = (bucket_size - k_buckets[bucket].peers_size) * bucket;
519 else
520 strength = bucket; /* minimum value of connectivity */
521 if (GNUNET_YES ==
522 GNUNET_CONTAINER_multipeermap_contains (all_connected_peers,
523 pid))
524 strength *= 2; /* double for connected peers */
525 else if (k_buckets[bucket].peers_size > bucket_size)
526 strength = 0; /* bucket full, we really do not care about more */
527
528 if ((0 == strength) &&
529 (NULL != ci))
530 {
531 /* release request */
532 GNUNET_assert (GNUNET_YES ==
533 free_connect_info (NULL,
534 pid,
535 ci));
536 return;
537 }
538 if (NULL == ci)
539 {
540 ci = GNUNET_new (struct ConnectInfo);
541 GNUNET_assert (GNUNET_OK ==
542 GNUNET_CONTAINER_multipeermap_put (all_desired_peers,
543 pid,
544 ci,
545 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
546 }
547 if ((NULL != ci->oh) &&
548 (NULL != h))
549 GNUNET_TRANSPORT_offer_hello_cancel (ci->oh);
550 if (NULL != h)
551 ci->oh = GNUNET_TRANSPORT_offer_hello (GDS_cfg,
552 h,
553 &offer_hello_done,
554 ci);
555 if ((NULL != ci->sh) &&
556 (ci->strength != strength))
557 GNUNET_ATS_connectivity_suggest_cancel (ci->sh);
558 if (ci->strength != strength)
559 ci->sh = GNUNET_ATS_connectivity_suggest (ats_ch,
560 pid,
561 strength);
562 ci->strength = strength;
563}
564
565
566/**
567 * Function called for each peer in #all_desired_peers during
568 * #update_connect_preferences() if we have reason to adjust
569 * the strength of our desire to keep connections to certain
570 * peers. Calls #try_connect() to update the calculations for
571 * the given @a pid.
572 *
573 * @param cls NULL
574 * @param pid peer to update
575 * @param value unused
576 * @return #GNUNET_YES (continue to iterate)
577 */
578static int
579update_desire_strength (void *cls,
580 const struct GNUNET_PeerIdentity *pid,
581 void *value)
582{
583 (void) cls;
584 (void) value;
585 try_connect (pid,
586 NULL);
587 return GNUNET_YES;
588}
589
590
591/**
592 * Update our preferences for connectivity as given to ATS.
593 *
594 * @param cls the `struct PeerInfo` of the peer
595 * @param tc scheduler context.
596 */
597static void
598update_connect_preferences ()
599{
600 GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
601 &update_desire_strength,
602 NULL);
603}
604
605
606/**
607 * Add each of the peers we already know to the bloom filter of
608 * the request so that we don't get duplicate HELLOs.
609 *
610 * @param cls the `struct GNUNET_BLOCK_Group`
611 * @param key peer identity to add to the bloom filter
612 * @param value value the peer information (unused)
613 * @return #GNUNET_YES (we should continue to iterate)
614 */
615static int
616add_known_to_bloom (void *cls,
617 const struct GNUNET_PeerIdentity *key,
618 void *value)
619{
620 struct GNUNET_BLOCK_Group *bg = cls;
621 struct GNUNET_HashCode key_hash;
622
623 (void) cls;
624 (void) value;
625 GNUNET_CRYPTO_hash (key,
626 sizeof(struct GNUNET_PeerIdentity),
627 &key_hash);
628 GNUNET_BLOCK_group_set_seen (bg,
629 &key_hash,
630 1);
631 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
632 "Adding known peer (%s) to bloomfilter for FIND PEER\n",
633 GNUNET_i2s (key));
634 return GNUNET_YES;
635}
636
637
638/**
639 * Task to send a find peer message for our own peer identifier
640 * so that we can find the closest peers in the network to ourselves
641 * and attempt to connect to them.
642 *
643 * @param cls closure for this task
644 */
645static void
646send_find_peer_message (void *cls)
647{
648 struct GNUNET_TIME_Relative next_send_time;
649 struct GNUNET_BLOCK_Group *bg;
650 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
651
652 (void) cls;
653 find_peer_task = NULL;
654 if (newly_found_peers > bucket_size)
655 {
656 /* If we are finding many peers already, no need to send out our request right now! */
657 find_peer_task =
658 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
659 &send_find_peer_message,
660 NULL);
661 newly_found_peers = 0;
662 return;
663 }
664 bg = GNUNET_BLOCK_group_create (GDS_block_context,
665 GNUNET_BLOCK_TYPE_DHT_HELLO,
666 GNUNET_CRYPTO_random_u32 (
667 GNUNET_CRYPTO_QUALITY_WEAK,
668 UINT32_MAX),
669 NULL,
670 0,
671 "filter-size",
672 DHT_BLOOM_SIZE,
673 NULL);
674 GNUNET_CONTAINER_multipeermap_iterate (all_connected_peers,
675 &add_known_to_bloom,
676 bg);
677 GNUNET_STATISTICS_update (GDS_stats,
678 gettext_noop ("# FIND PEER messages initiated"),
679 1,
680 GNUNET_NO);
681 peer_bf
682 = GNUNET_CONTAINER_bloomfilter_init (NULL,
683 DHT_BLOOM_SIZE,
684 GNUNET_CONSTANTS_BLOOMFILTER_K);
685 // FIXME: pass priority!?
686 GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
687 GNUNET_DHT_RO_FIND_PEER
688 | GNUNET_DHT_RO_RECORD_ROUTE,
689 FIND_PEER_REPLICATION_LEVEL,
690 0,
691 &my_identity_hash,
692 NULL,
693 0,
694 bg,
695 peer_bf);
696 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
697 GNUNET_BLOCK_group_destroy (bg);
698 /* schedule next round */
699 next_send_time.rel_value_us =
700 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value_us
701 + GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
702 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value_us
703 / (newly_found_peers + 1));
704 newly_found_peers = 0;
705 GNUNET_assert (NULL == find_peer_task);
706 find_peer_task =
707 GNUNET_SCHEDULER_add_delayed (next_send_time,
708 &send_find_peer_message,
709 NULL);
710}
711
712
713/**
714 * Method called whenever a peer connects.
715 *
716 * @param cls closure
717 * @param peer peer identity this notification is about
718 * @param mq message queue for sending messages to @a peer
719 * @return our `struct PeerInfo` for @a peer
720 */
721static void *
722handle_core_connect (void *cls,
723 const struct GNUNET_PeerIdentity *peer,
724 struct GNUNET_MQ_Handle *mq)
725{
726 struct PeerInfo *pi;
727
728 (void) cls;
729 /* Check for connect to self message */
730 if (0 == GNUNET_memcmp (&my_identity,
731 peer))
732 return NULL;
733 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
734 "Connected to %s\n",
735 GNUNET_i2s (peer));
736 GNUNET_assert (GNUNET_NO ==
737 GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
738 peer));
739 GNUNET_STATISTICS_update (GDS_stats,
740 gettext_noop ("# peers connected"),
741 1,
742 GNUNET_NO);
743 pi = GNUNET_new (struct PeerInfo);
744 pi->id = peer;
745 pi->mq = mq;
746 GNUNET_CRYPTO_hash (peer,
747 sizeof(struct GNUNET_PeerIdentity),
748 &pi->phash);
749 pi->peer_bucket = find_bucket (&pi->phash);
750 GNUNET_assert ((pi->peer_bucket >= 0) &&
751 ((unsigned int) pi->peer_bucket < MAX_BUCKETS));
752 GNUNET_CONTAINER_DLL_insert_tail (k_buckets[pi->peer_bucket].head,
753 k_buckets[pi->peer_bucket].tail,
754 pi);
755 k_buckets[pi->peer_bucket].peers_size++;
756 closest_bucket = GNUNET_MAX (closest_bucket,
757 (unsigned int) pi->peer_bucket);
758 GNUNET_assert (GNUNET_OK ==
759 GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
760 pi->id,
761 pi,
762 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
763 if ((pi->peer_bucket > 0) &&
764 (k_buckets[pi->peer_bucket].peers_size <= bucket_size))
765 {
766 update_connect_preferences ();
767 newly_found_peers++;
768 }
769 if ((1 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
770 (GNUNET_YES != disable_try_connect))
771 {
772 /* got a first connection, good time to start with FIND PEER requests... */
773 GNUNET_assert (NULL == find_peer_task);
774 find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
775 NULL);
776 }
777 return pi;
778}
779
780
781/**
782 * Method called whenever a peer disconnects.
783 *
784 * @param cls closure
785 * @param peer peer identity this notification is about
786 * @param internal_cls our `struct PeerInfo` for @a peer
787 */
788static void
789handle_core_disconnect (void *cls,
790 const struct GNUNET_PeerIdentity *peer,
791 void *internal_cls)
792{
793 struct PeerInfo *to_remove = internal_cls;
794
795 (void) cls;
796 /* Check for disconnect from self message */
797 if (NULL == to_remove)
798 return;
799 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800 "Disconnected %s\n",
801 GNUNET_i2s (peer));
802 GNUNET_STATISTICS_update (GDS_stats,
803 gettext_noop ("# peers connected"),
804 -1,
805 GNUNET_NO);
806 GNUNET_assert (GNUNET_YES ==
807 GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
808 peer,
809 to_remove));
810 if ((0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
811 (GNUNET_YES != disable_try_connect))
812 {
813 GNUNET_SCHEDULER_cancel (find_peer_task);
814 find_peer_task = NULL;
815 }
816 GNUNET_assert (to_remove->peer_bucket >= 0);
817 GNUNET_CONTAINER_DLL_remove (k_buckets[to_remove->peer_bucket].head,
818 k_buckets[to_remove->peer_bucket].tail,
819 to_remove);
820 GNUNET_assert (k_buckets[to_remove->peer_bucket].peers_size > 0);
821 k_buckets[to_remove->peer_bucket].peers_size--;
822 while ((closest_bucket > 0) &&
823 (0 == k_buckets[to_remove->peer_bucket].peers_size))
824 closest_bucket--;
825 if (k_buckets[to_remove->peer_bucket].peers_size < bucket_size)
826 update_connect_preferences ();
827 GNUNET_free (to_remove);
828}
829
830
831/**
832 * To how many peers should we (on average) forward the request to
833 * obtain the desired target_replication count (on average).
834 *
835 * @param hop_count number of hops the message has traversed
836 * @param target_replication the number of total paths desired
837 * @return Some number of peers to forward the message to
838 */
839static unsigned int
840get_forward_count (uint32_t hop_count,
841 uint32_t target_replication)
842{
843 uint32_t random_value;
844 uint32_t forward_count;
845 float target_value;
846
847 if (hop_count > GDS_NSE_get () * 4.0)
848 {
849 /* forcefully terminate */
850 GNUNET_STATISTICS_update (GDS_stats,
851 gettext_noop ("# requests TTL-dropped"),
852 1, GNUNET_NO);
853 return 0;
854 }
855 if (hop_count > GDS_NSE_get () * 2.0)
856 {
857 /* Once we have reached our ideal number of hops, only forward to 1 peer */
858 return 1;
859 }
860 /* bound by system-wide maximum */
861 target_replication =
862 GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication);
863 target_value =
864 1 + (target_replication - 1.0) / (GDS_NSE_get ()
865 + ((float) (target_replication - 1.0)
866 * hop_count));
867 /* Set forward count to floor of target_value */
868 forward_count = (uint32_t) target_value;
869 /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
870 target_value = target_value - forward_count;
871 random_value =
872 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
873 if (random_value < (target_value * UINT32_MAX))
874 forward_count++;
875 return forward_count;
876}
877
878
879/**
880 * Compute the distance between have and target as a 64-bit value.
881 * Differences in the lower bits must count stronger than differences
882 * in the higher bits.
883 *
884 * @param target
885 * @param have
886 * @param bucket up to which offset are @a target and @a have identical and thus those bits should not be considered
887 * @return 0 if have==target, otherwise a number
888 * that is larger as the distance between
889 * the two hash codes increases
890 */
891static uint64_t
892get_distance (const struct GNUNET_HashCode *target,
893 const struct GNUNET_HashCode *have,
894 unsigned int bucket)
895{
896 uint64_t lsb = 0;
897
898 for (unsigned int i = bucket + 1;
899 (i < sizeof(struct GNUNET_HashCode) * 8) &&
900 (i < bucket + 1 + 64);
901 i++)
902 {
903 if (GNUNET_CRYPTO_hash_get_bit_rtl (target, i) !=
904 GNUNET_CRYPTO_hash_get_bit_rtl (have, i))
905 lsb |= (1LLU << (bucket + 64 - i)); /* first bit set will be 1,
906 * last bit set will be 63 -- if
907 * i does not reach 512 first... */
908 }
909 return lsb;
910}
911
912
913/**
914 * Check whether my identity is closer than any known peers. If a
915 * non-null bloomfilter is given, check if this is the closest peer
916 * that hasn't already been routed to.
917 *
918 * @param key hash code to check closeness to
919 * @param bloom bloomfilter, exclude these entries from the decision
920 * @return #GNUNET_YES if node location is closest,
921 * #GNUNET_NO otherwise.
922 */
923int
924GDS_am_closest_peer (const struct GNUNET_HashCode *key,
925 const struct GNUNET_CONTAINER_BloomFilter *bloom)
926{
927 int bits;
928 int other_bits;
929 int bucket_num;
930 struct PeerInfo *pos;
931
932 if (0 == GNUNET_memcmp (&my_identity_hash,
933 key))
934 return GNUNET_YES;
935 bucket_num = find_bucket (key);
936 GNUNET_assert (bucket_num >= 0);
937 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
938 key);
939 pos = k_buckets[bucket_num].head;
940 while (NULL != pos)
941 {
942 if ((NULL != bloom) &&
943 (GNUNET_YES ==
944 GNUNET_CONTAINER_bloomfilter_test (bloom,
945 &pos->phash)))
946 {
947 pos = pos->next;
948 continue; /* Skip already checked entries */
949 }
950 other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->phash,
951 key);
952 if (other_bits > bits)
953 return GNUNET_NO;
954 if (other_bits == bits) /* We match the same number of bits */
955 return GNUNET_YES;
956 pos = pos->next;
957 }
958 /* No peers closer, we are the closest! */
959 return GNUNET_YES;
960}
961
962
963/**
964 * Select a peer from the routing table that would be a good routing
965 * destination for sending a message for "key". The resulting peer
966 * must not be in the set of blocked peers.<p>
967 *
968 * Note that we should not ALWAYS select the closest peer to the
969 * target, peers further away from the target should be chosen with
970 * exponentially declining probability.
971 *
972 * FIXME: double-check that this is fine
973 *
974 *
975 * @param key the key we are selecting a peer to route to
976 * @param bloom a bloomfilter containing entries this request has seen already
977 * @param hops how many hops has this message traversed thus far
978 * @return Peer to route to, or NULL on error
979 */
980static struct PeerInfo *
981select_peer (const struct GNUNET_HashCode *key,
982 const struct GNUNET_CONTAINER_BloomFilter *bloom,
983 uint32_t hops)
984{
985 unsigned int bc;
986 unsigned int count;
987 unsigned int selected;
988 struct PeerInfo *pos;
989 struct PeerInfo *chosen;
990
991 if (hops >= GDS_NSE_get ())
992 {
993 /* greedy selection (closest peer that is not in bloomfilter) */
994 unsigned int best_bucket = 0;
995 uint64_t best_in_bucket = UINT64_MAX;
996
997 chosen = NULL;
998 for (bc = 0; bc <= closest_bucket; bc++)
999 {
1000 count = 0;
1001 for (pos = k_buckets[bc].head;
1002 (pos != NULL) &&
1003 (count < bucket_size);
1004 pos = pos->next)
1005 {
1006 unsigned int bucket;
1007 uint64_t dist;
1008
1009 bucket = GNUNET_CRYPTO_hash_matching_bits (key,
1010 &pos->phash);
1011 dist = get_distance (key,
1012 &pos->phash,
1013 bucket);
1014 if (bucket < best_bucket)
1015 continue;
1016 if (dist > best_in_bucket)
1017 continue;
1018 best_bucket = bucket;
1019 best_in_bucket = dist;
1020 if ( (NULL == bloom) ||
1021 (GNUNET_NO ==
1022 GNUNET_CONTAINER_bloomfilter_test (bloom,
1023 &pos->phash)) )
1024 {
1025 chosen = pos;
1026 }
1027 else
1028 {
1029 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030 "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1031 GNUNET_i2s (pos->id),
1032 GNUNET_h2s (key));
1033 GNUNET_STATISTICS_update (GDS_stats,
1034 gettext_noop (
1035 "# Peers excluded from routing due to Bloomfilter"),
1036 1,
1037 GNUNET_NO);
1038 chosen = NULL;
1039 }
1040 count++;
1041 }
1042 }
1043 if (NULL == chosen)
1044 GNUNET_STATISTICS_update (GDS_stats,
1045 gettext_noop ("# Peer selection failed"),
1046 1,
1047 GNUNET_NO);
1048 else
1049 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1050 "Selected peer `%s' in greedy routing for %s\n",
1051 GNUNET_i2s (chosen->id),
1052 GNUNET_h2s (key));
1053 return chosen;
1054 }
1055
1056 /* select "random" peer */
1057 /* count number of peers that are available and not filtered */
1058 count = 0;
1059 for (bc = 0; bc <= closest_bucket; bc++)
1060 {
1061 pos = k_buckets[bc].head;
1062 while ((NULL != pos) && (count < bucket_size))
1063 {
1064 if ((NULL != bloom) &&
1065 (GNUNET_YES ==
1066 GNUNET_CONTAINER_bloomfilter_test (bloom,
1067 &pos->phash)))
1068 {
1069 GNUNET_STATISTICS_update (GDS_stats,
1070 gettext_noop
1071 (
1072 "# Peers excluded from routing due to Bloomfilter"),
1073 1, GNUNET_NO);
1074 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1075 "Excluded peer `%s' due to BF match in random routing for %s\n",
1076 GNUNET_i2s (pos->id),
1077 GNUNET_h2s (key));
1078 pos = pos->next;
1079 continue; /* Ignore bloomfiltered peers */
1080 }
1081 count++;
1082 pos = pos->next;
1083 }
1084 }
1085 if (0 == count) /* No peers to select from! */
1086 {
1087 GNUNET_STATISTICS_update (GDS_stats,
1088 gettext_noop ("# Peer selection failed"), 1,
1089 GNUNET_NO);
1090 return NULL;
1091 }
1092 /* Now actually choose a peer */
1093 selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1094 count);
1095 count = 0;
1096 for (bc = 0; bc <= closest_bucket; bc++)
1097 {
1098 for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size));
1099 pos = pos->next)
1100 {
1101 if ((bloom != NULL) &&
1102 (GNUNET_YES ==
1103 GNUNET_CONTAINER_bloomfilter_test (bloom,
1104 &pos->phash)))
1105 {
1106 continue; /* Ignore bloomfiltered peers */
1107 }
1108 if (0 == selected--)
1109 {
1110 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1111 "Selected peer `%s' in random routing for %s\n",
1112 GNUNET_i2s (pos->id),
1113 GNUNET_h2s (key));
1114 return pos;
1115 }
1116 }
1117 }
1118 GNUNET_break (0);
1119 return NULL;
1120}
1121
1122
1123/**
1124 * Compute the set of peers that the given request should be
1125 * forwarded to.
1126 *
1127 * @param key routing key
1128 * @param bloom bloom filter excluding peers as targets, all selected
1129 * peers will be added to the bloom filter
1130 * @param hop_count number of hops the request has traversed so far
1131 * @param target_replication desired number of replicas
1132 * @param targets where to store an array of target peers (to be
1133 * free'd by the caller)
1134 * @return number of peers returned in 'targets'.
1135 */
1136static unsigned int
1137get_target_peers (const struct GNUNET_HashCode *key,
1138 struct GNUNET_CONTAINER_BloomFilter *bloom,
1139 uint32_t hop_count,
1140 uint32_t target_replication,
1141 struct PeerInfo ***targets)
1142{
1143 unsigned int ret;
1144 unsigned int off;
1145 struct PeerInfo **rtargets;
1146 struct PeerInfo *nxt;
1147
1148 GNUNET_assert (NULL != bloom);
1149 ret = get_forward_count (hop_count,
1150 target_replication);
1151 if (0 == ret)
1152 {
1153 *targets = NULL;
1154 return 0;
1155 }
1156 rtargets = GNUNET_new_array (ret,
1157 struct PeerInfo *);
1158 for (off = 0; off < ret; off++)
1159 {
1160 nxt = select_peer (key,
1161 bloom,
1162 hop_count);
1163 if (NULL == nxt)
1164 break;
1165 rtargets[off] = nxt;
1166 GNUNET_break (GNUNET_NO ==
1167 GNUNET_CONTAINER_bloomfilter_test (bloom,
1168 &nxt->phash));
1169 GNUNET_CONTAINER_bloomfilter_add (bloom,
1170 &nxt->phash);
1171 }
1172 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1173 "Selected %u/%u peers at hop %u for %s (target was %u)\n",
1174 off,
1175 GNUNET_CONTAINER_multipeermap_size (all_connected_peers),
1176 (unsigned int) hop_count,
1177 GNUNET_h2s (key),
1178 ret);
1179 if (0 == off)
1180 {
1181 GNUNET_free (rtargets);
1182 *targets = NULL;
1183 return 0;
1184 }
1185 *targets = rtargets;
1186 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187 "Forwarding query `%s' to %u peers (goal was %u peers)\n",
1188 GNUNET_h2s (key),
1189 off,
1190 ret);
1191 return off;
1192}
1193
1194
1195/**
1196 * Perform a PUT operation. Forwards the given request to other
1197 * peers. Does not store the data locally. Does not give the
1198 * data to local clients. May do nothing if this is the only
1199 * peer in the network (or if we are the closest peer in the
1200 * network).
1201 *
1202 * @param type type of the block
1203 * @param options routing options
1204 * @param desired_replication_level desired replication count
1205 * @param expiration_time when does the content expire
1206 * @param hop_count how many hops has this message traversed so far
1207 * @param bf Bloom filter of peers this PUT has already traversed
1208 * @param key key for the content
1209 * @param put_path_length number of entries in @a put_path
1210 * @param put_path peers this request has traversed so far (if tracked)
1211 * @param data payload to store
1212 * @param data_size number of bytes in @a data
1213 * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1214 */
1215int
1216GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1217 enum GNUNET_DHT_RouteOption options,
1218 uint32_t desired_replication_level,
1219 struct GNUNET_TIME_Absolute expiration_time,
1220 uint32_t hop_count,
1221 struct GNUNET_CONTAINER_BloomFilter *bf,
1222 const struct GNUNET_HashCode *key,
1223 unsigned int put_path_length,
1224 struct GNUNET_PeerIdentity *put_path,
1225 const void *data,
1226 size_t data_size)
1227{
1228 unsigned int target_count;
1229 unsigned int i;
1230 struct PeerInfo **targets;
1231 struct PeerInfo *target;
1232 size_t msize;
1233 struct GNUNET_MQ_Envelope *env;
1234 struct PeerPutMessage *ppm;
1235 struct GNUNET_PeerIdentity *pp;
1236 unsigned int skip_count;
1237
1238 GNUNET_assert (NULL != bf);
1239 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1240 "Adding myself (%s) to PUT bloomfilter for %s\n",
1241 GNUNET_i2s (&my_identity),
1242 GNUNET_h2s (key));
1243 GNUNET_CONTAINER_bloomfilter_add (bf,
1244 &my_identity_hash);
1245 GNUNET_STATISTICS_update (GDS_stats,
1246 gettext_noop ("# PUT requests routed"),
1247 1,
1248 GNUNET_NO);
1249 target_count
1250 = get_target_peers (key,
1251 bf,
1252 hop_count,
1253 desired_replication_level,
1254 &targets);
1255 if (0 == target_count)
1256 {
1257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258 "Routing PUT for %s terminates after %u hops at %s\n",
1259 GNUNET_h2s (key),
1260 (unsigned int) hop_count,
1261 GNUNET_i2s (&my_identity));
1262 return GNUNET_NO;
1263 }
1264 msize = put_path_length * sizeof(struct GNUNET_PeerIdentity) + data_size;
1265 if (msize + sizeof(struct PeerPutMessage)
1266 >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1267 {
1268 put_path_length = 0;
1269 msize = data_size;
1270 }
1271 if (msize + sizeof(struct PeerPutMessage)
1272 >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1273 {
1274 GNUNET_break (0);
1275 GNUNET_free (targets);
1276 return GNUNET_NO;
1277 }
1278 GNUNET_STATISTICS_update (GDS_stats,
1279 gettext_noop (
1280 "# PUT messages queued for transmission"),
1281 target_count,
1282 GNUNET_NO);
1283 skip_count = 0;
1284 for (i = 0; i < target_count; i++)
1285 {
1286 target = targets[i];
1287 if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1288 {
1289 /* skip */
1290 GNUNET_STATISTICS_update (GDS_stats,
1291 gettext_noop (
1292 "# P2P messages dropped due to full queue"),
1293 1,
1294 GNUNET_NO);
1295 skip_count++;
1296 continue;
1297 }
1298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1299 "Routing PUT for %s after %u hops to %s\n",
1300 GNUNET_h2s (key),
1301 (unsigned int) hop_count,
1302 GNUNET_i2s (target->id));
1303 env = GNUNET_MQ_msg_extra (ppm,
1304 msize,
1305 GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1306 ppm->options = htonl (options);
1307 ppm->type = htonl (type);
1308 ppm->hop_count = htonl (hop_count + 1);
1309 ppm->desired_replication_level = htonl (desired_replication_level);
1310 ppm->put_path_length = htonl (put_path_length);
1311 ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1312 GNUNET_break (GNUNET_YES ==
1313 GNUNET_CONTAINER_bloomfilter_test (bf,
1314 &target->phash));
1315 GNUNET_assert (GNUNET_OK ==
1316 GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
1317 ppm->bloomfilter,
1318 DHT_BLOOM_SIZE));
1319 ppm->key = *key;
1320 pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1321 GNUNET_memcpy (pp,
1322 put_path,
1323 sizeof(struct GNUNET_PeerIdentity) * put_path_length);
1324 GNUNET_memcpy (&pp[put_path_length],
1325 data,
1326 data_size);
1327 GNUNET_MQ_send (target->mq,
1328 env);
1329 }
1330 GNUNET_free (targets);
1331 return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1332}
1333
1334
1335/**
1336 * Perform a GET operation. Forwards the given request to other
1337 * peers. Does not lookup the key locally. May do nothing if this is
1338 * the only peer in the network (or if we are the closest peer in the
1339 * network).
1340 *
1341 * @param type type of the block
1342 * @param options routing options
1343 * @param desired_replication_level desired replication count
1344 * @param hop_count how many hops did this request traverse so far?
1345 * @param key key for the content
1346 * @param xquery extended query
1347 * @param xquery_size number of bytes in @a xquery
1348 * @param bg group to use for filtering replies
1349 * @param peer_bf filter for peers not to select (again)
1350 * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not
1351 */
1352int
1353GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1354 enum GNUNET_DHT_RouteOption options,
1355 uint32_t desired_replication_level,
1356 uint32_t hop_count,
1357 const struct GNUNET_HashCode *key,
1358 const void *xquery,
1359 size_t xquery_size,
1360 struct GNUNET_BLOCK_Group *bg,
1361 struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1362{
1363 unsigned int target_count;
1364 struct PeerInfo **targets;
1365 struct PeerInfo *target;
1366 struct GNUNET_MQ_Envelope *env;
1367 size_t msize;
1368 struct PeerGetMessage *pgm;
1369 char *xq;
1370 size_t reply_bf_size;
1371 void *reply_bf;
1372 unsigned int skip_count;
1373 uint32_t bf_nonce;
1374
1375 GNUNET_assert (NULL != peer_bf);
1376 GNUNET_STATISTICS_update (GDS_stats,
1377 gettext_noop ("# GET requests routed"),
1378 1,
1379 GNUNET_NO);
1380 target_count = get_target_peers (key,
1381 peer_bf,
1382 hop_count,
1383 desired_replication_level,
1384 &targets);
1385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386 "Adding myself (%s) to GET bloomfilter for %s\n",
1387 GNUNET_i2s (&my_identity),
1388 GNUNET_h2s (key));
1389 GNUNET_CONTAINER_bloomfilter_add (peer_bf,
1390 &my_identity_hash);
1391 if (0 == target_count)
1392 {
1393 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1394 "Routing GET for %s terminates after %u hops at %s\n",
1395 GNUNET_h2s (key),
1396 (unsigned int) hop_count,
1397 GNUNET_i2s (&my_identity));
1398 return GNUNET_NO;
1399 }
1400 if (GNUNET_OK !=
1401 GNUNET_BLOCK_group_serialize (bg,
1402 &bf_nonce,
1403 &reply_bf,
1404 &reply_bf_size))
1405 {
1406 reply_bf = NULL;
1407 reply_bf_size = 0;
1408 bf_nonce = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1409 UINT32_MAX);
1410 }
1411 msize = xquery_size + reply_bf_size;
1412 if (msize + sizeof(struct PeerGetMessage) >= GNUNET_MAX_MESSAGE_SIZE)
1413 {
1414 GNUNET_break (0);
1415 GNUNET_free (reply_bf);
1416 GNUNET_free (targets);
1417 return GNUNET_NO;
1418 }
1419 GNUNET_STATISTICS_update (GDS_stats,
1420 gettext_noop (
1421 "# GET messages queued for transmission"),
1422 target_count,
1423 GNUNET_NO);
1424 /* forward request */
1425 skip_count = 0;
1426 for (unsigned int i = 0; i < target_count; i++)
1427 {
1428 target = targets[i];
1429 if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1430 {
1431 /* skip */
1432 GNUNET_STATISTICS_update (GDS_stats,
1433 gettext_noop (
1434 "# P2P messages dropped due to full queue"),
1435 1, GNUNET_NO);
1436 skip_count++;
1437 continue;
1438 }
1439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1440 "Routing GET for %s after %u hops to %s\n",
1441 GNUNET_h2s (key),
1442 (unsigned int) hop_count,
1443 GNUNET_i2s (target->id));
1444 env = GNUNET_MQ_msg_extra (pgm,
1445 msize,
1446 GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1447 pgm->options = htonl (options);
1448 pgm->type = htonl (type);
1449 pgm->hop_count = htonl (hop_count + 1);
1450 pgm->desired_replication_level = htonl (desired_replication_level);
1451 pgm->xquery_size = htonl (xquery_size);
1452 pgm->bf_mutator = bf_nonce;
1453 GNUNET_break (GNUNET_YES ==
1454 GNUNET_CONTAINER_bloomfilter_test (peer_bf,
1455 &target->phash));
1456 GNUNET_assert (GNUNET_OK ==
1457 GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
1458 pgm->bloomfilter,
1459 DHT_BLOOM_SIZE));
1460 pgm->key = *key;
1461 xq = (char *) &pgm[1];
1462 GNUNET_memcpy (xq,
1463 xquery,
1464 xquery_size);
1465 GNUNET_memcpy (&xq[xquery_size],
1466 reply_bf,
1467 reply_bf_size);
1468 GNUNET_MQ_send (target->mq,
1469 env);
1470 }
1471 GNUNET_free (targets);
1472 GNUNET_free (reply_bf);
1473 return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
1474}
1475
1476
1477/**
1478 * Handle a reply (route to origin). Only forwards the reply back to
1479 * the given peer. Does not do local caching or forwarding to local
1480 * clients.
1481 *
1482 * @param target neighbour that should receive the block (if still connected)
1483 * @param type type of the block
1484 * @param expiration_time when does the content expire
1485 * @param key key for the content
1486 * @param put_path_length number of entries in @a put_path
1487 * @param put_path peers the original PUT traversed (if tracked)
1488 * @param get_path_length number of entries in @a get_path
1489 * @param get_path peers this reply has traversed so far (if tracked)
1490 * @param data payload of the reply
1491 * @param data_size number of bytes in @a data
1492 */
1493void
1494GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1495 enum GNUNET_BLOCK_Type type,
1496 struct GNUNET_TIME_Absolute expiration_time,
1497 const struct GNUNET_HashCode *key,
1498 unsigned int put_path_length,
1499 const struct GNUNET_PeerIdentity *put_path,
1500 unsigned int get_path_length,
1501 const struct GNUNET_PeerIdentity *get_path,
1502 const void *data,
1503 size_t data_size)
1504{
1505 struct PeerInfo *pi;
1506 struct GNUNET_MQ_Envelope *env;
1507 size_t msize;
1508 struct PeerResultMessage *prm;
1509 struct GNUNET_PeerIdentity *paths;
1510
1511 msize = data_size + (get_path_length + put_path_length)
1512 * sizeof(struct GNUNET_PeerIdentity);
1513 if ((msize + sizeof(struct PeerResultMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
1514 (get_path_length >
1515 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
1516 (put_path_length >
1517 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
1518 (data_size > GNUNET_MAX_MESSAGE_SIZE))
1519 {
1520 GNUNET_break (0);
1521 return;
1522 }
1523 pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
1524 target);
1525 if (NULL == pi)
1526 {
1527 /* peer disconnected in the meantime, drop reply */
1528 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1529 "No matching peer for reply for key %s\n",
1530 GNUNET_h2s (key));
1531 return;
1532 }
1533 if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER)
1534 {
1535 /* skip */
1536 GNUNET_STATISTICS_update (GDS_stats,
1537 gettext_noop (
1538 "# P2P messages dropped due to full queue"),
1539 1,
1540 GNUNET_NO);
1541 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1542 "Peer queue full, ignoring reply for key %s\n",
1543 GNUNET_h2s (key));
1544 return;
1545 }
1546
1547 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1548 "Forwarding reply for key %s to peer %s\n",
1549 GNUNET_h2s (key),
1550 GNUNET_i2s (target));
1551 GNUNET_STATISTICS_update (GDS_stats,
1552 gettext_noop
1553 ("# RESULT messages queued for transmission"), 1,
1554 GNUNET_NO);
1555 env = GNUNET_MQ_msg_extra (prm,
1556 msize,
1557 GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1558 prm->type = htonl (type);
1559 prm->put_path_length = htonl (put_path_length);
1560 prm->get_path_length = htonl (get_path_length);
1561 prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1562 prm->key = *key;
1563 paths = (struct GNUNET_PeerIdentity *) &prm[1];
1564 GNUNET_memcpy (paths,
1565 put_path,
1566 put_path_length * sizeof(struct GNUNET_PeerIdentity));
1567 GNUNET_memcpy (&paths[put_path_length],
1568 get_path,
1569 get_path_length * sizeof(struct GNUNET_PeerIdentity));
1570 GNUNET_memcpy (&paths[put_path_length + get_path_length],
1571 data,
1572 data_size);
1573 GNUNET_MQ_send (pi->mq,
1574 env);
1575}
1576
1577
1578/**
1579 * To be called on core init/fail.
1580 *
1581 * @param cls service closure
1582 * @param identity the public identity of this peer
1583 */
1584static void
1585core_init (void *cls,
1586 const struct GNUNET_PeerIdentity *identity)
1587{
1588 (void) cls;
1589 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1590 "CORE called, I am %s\n",
1591 GNUNET_i2s (identity));
1592 my_identity = *identity;
1593 GNUNET_CRYPTO_hash (identity,
1594 sizeof(struct GNUNET_PeerIdentity),
1595 &my_identity_hash);
1596 GNUNET_SERVICE_resume (GDS_service);
1597}
1598
1599
1600/**
1601 * Check validity of a p2p put request.
1602 *
1603 * @param cls closure with the `struct PeerInfo` of the sender
1604 * @param message message
1605 * @return #GNUNET_OK if the message is valid
1606 */
1607static int
1608check_dht_p2p_put (void *cls,
1609 const struct PeerPutMessage *put)
1610{
1611 uint32_t putlen;
1612 uint16_t msize;
1613
1614 (void) cls;
1615 msize = ntohs (put->header.size);
1616 putlen = ntohl (put->put_path_length);
1617 if ((msize <
1618 sizeof(struct PeerPutMessage)
1619 + putlen * sizeof(struct GNUNET_PeerIdentity)) ||
1620 (putlen >
1621 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)))
1622 {
1623 GNUNET_break_op (0);
1624 return GNUNET_SYSERR;
1625 }
1626 return GNUNET_OK;
1627}
1628
1629
1630/**
1631 * Core handler for p2p put requests.
1632 *
1633 * @param cls closure with the `struct PeerInfo` of the sender
1634 * @param message message
1635 */
1636static void
1637handle_dht_p2p_put (void *cls,
1638 const struct PeerPutMessage *put)
1639{
1640 struct PeerInfo *peer = cls;
1641 const struct GNUNET_PeerIdentity *put_path;
1642 const void *payload;
1643 uint32_t putlen;
1644 uint16_t msize;
1645 size_t payload_size;
1646 enum GNUNET_DHT_RouteOption options;
1647 struct GNUNET_CONTAINER_BloomFilter *bf;
1648 struct GNUNET_HashCode test_key;
1649 int forwarded;
1650 struct GNUNET_TIME_Absolute exp_time;
1651
1652 exp_time = GNUNET_TIME_absolute_ntoh (put->expiration_time);
1653 if (0 == GNUNET_TIME_absolute_get_remaining (exp_time).rel_value_us)
1654 {
1655 GNUNET_STATISTICS_update (GDS_stats,
1656 gettext_noop ("# Expired PUTs discarded"),
1657 1,
1658 GNUNET_NO);
1659 return;
1660 }
1661 msize = ntohs (put->header.size);
1662 putlen = ntohl (put->put_path_length);
1663 GNUNET_STATISTICS_update (GDS_stats,
1664 gettext_noop ("# P2P PUT requests received"),
1665 1,
1666 GNUNET_NO);
1667 GNUNET_STATISTICS_update (GDS_stats,
1668 gettext_noop ("# P2P PUT bytes received"),
1669 msize,
1670 GNUNET_NO);
1671 put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1672 payload = &put_path[putlen];
1673 options = ntohl (put->options);
1674 payload_size = msize - (sizeof(struct PeerPutMessage)
1675 + putlen * sizeof(struct GNUNET_PeerIdentity));
1676
1677 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1678 "PUT for `%s' from %s\n",
1679 GNUNET_h2s (&put->key),
1680 GNUNET_i2s (peer->id));
1681 if (GNUNET_YES == log_route_details_stderr)
1682 {
1683 char *tmp;
1684 char *pp;
1685
1686 pp = GNUNET_STRINGS_pp2s (put_path,
1687 putlen);
1688 tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
1689 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1690 "R5N PUT %s: %s->%s (%u, %u=>%u, PP: %s)\n",
1691 GNUNET_h2s (&put->key),
1692 GNUNET_i2s (peer->id),
1693 tmp,
1694 ntohl (put->hop_count),
1695 GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
1696 &put->key),
1697 GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
1698 &put->key),
1699 pp);
1700 GNUNET_free (pp);
1701 GNUNET_free (tmp);
1702 }
1703 switch (GNUNET_BLOCK_get_key
1704 (GDS_block_context,
1705 ntohl (put->type),
1706 payload,
1707 payload_size,
1708 &test_key))
1709 {
1710 case GNUNET_YES:
1711 if (0 != memcmp (&test_key,
1712 &put->key,
1713 sizeof(struct GNUNET_HashCode)))
1714 {
1715 char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1716
1717 GNUNET_break_op (0);
1718 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1719 "PUT with key `%s' for block with key %s\n",
1720 put_s,
1721 GNUNET_h2s_full (&test_key));
1722 GNUNET_free (put_s);
1723 return;
1724 }
1725 break;
1726
1727 case GNUNET_NO:
1728 GNUNET_break_op (0);
1729 return;
1730
1731 case GNUNET_SYSERR:
1732 /* cannot verify, good luck */
1733 break;
1734 }
1735 if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
1736 {
1737 switch (GNUNET_BLOCK_evaluate (GDS_block_context,
1738 ntohl (put->type),
1739 NULL, /* query group */
1740 GNUNET_BLOCK_EO_NONE,
1741 NULL, /* query */
1742 NULL, 0, /* xquery */
1743 payload,
1744 payload_size))
1745 {
1746 case GNUNET_BLOCK_EVALUATION_OK_MORE:
1747 case GNUNET_BLOCK_EVALUATION_OK_LAST:
1748 break;
1749
1750 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1751 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1752 case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1753 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1754 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1755 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1756 default:
1757 GNUNET_break_op (0);
1758 return;
1759 }
1760 }
1761
1762 bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
1763 DHT_BLOOM_SIZE,
1764 GNUNET_CONSTANTS_BLOOMFILTER_K);
1765 GNUNET_break_op (GNUNET_YES ==
1766 GNUNET_CONTAINER_bloomfilter_test (bf,
1767 &peer->phash));
1768 {
1769 struct GNUNET_PeerIdentity pp[putlen + 1];
1770
1771 /* extend 'put path' by sender */
1772 if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1773 {
1774#if SANITY_CHECKS
1775 for (unsigned int i = 0; i <= putlen; i++)
1776 {
1777 for (unsigned int j = 0; j < i; j++)
1778 {
1779 GNUNET_break (0 != memcmp (&pp[i],
1780 &pp[j],
1781 sizeof(struct GNUNET_PeerIdentity)));
1782 }
1783 GNUNET_break (0 != memcmp (&pp[i],
1784 peer->id,
1785 sizeof(struct GNUNET_PeerIdentity)));
1786 }
1787#endif
1788 GNUNET_memcpy (pp,
1789 put_path,
1790 putlen * sizeof(struct GNUNET_PeerIdentity));
1791 pp[putlen] = *peer->id;
1792 putlen++;
1793 }
1794 else
1795 putlen = 0;
1796
1797 /* give to local clients */
1798 GDS_CLIENTS_handle_reply (exp_time,
1799 &put->key,
1800 0,
1801 NULL,
1802 putlen,
1803 pp,
1804 ntohl (put->type),
1805 payload_size,
1806 payload);
1807 /* store locally */
1808 if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1809 (GDS_am_closest_peer (&put->key, bf)))
1810 GDS_DATACACHE_handle_put (exp_time,
1811 &put->key,
1812 putlen,
1813 pp,
1814 ntohl (put->type),
1815 payload_size,
1816 payload);
1817 /* route to other peers */
1818 forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1819 options,
1820 ntohl (
1821 put->desired_replication_level),
1822 exp_time,
1823 ntohl (put->hop_count),
1824 bf,
1825 &put->key,
1826 putlen,
1827 pp,
1828 payload,
1829 payload_size);
1830 /* notify monitoring clients */
1831 GDS_CLIENTS_process_put (options
1832 | ((GNUNET_OK == forwarded)
1833 ? GNUNET_DHT_RO_LAST_HOP
1834 : 0),
1835 ntohl (put->type),
1836 ntohl (put->hop_count),
1837 ntohl (put->desired_replication_level),
1838 putlen, pp,
1839 exp_time,
1840 &put->key,
1841 payload,
1842 payload_size);
1843 }
1844 GNUNET_CONTAINER_bloomfilter_free (bf);
1845}
1846
1847
1848/**
1849 * We have received a FIND PEER request. Send matching
1850 * HELLOs back.
1851 *
1852 * @param sender sender of the FIND PEER request
1853 * @param key peers close to this key are desired
1854 * @param bg group for filtering peers
1855 */
1856static void
1857handle_find_peer (const struct GNUNET_PeerIdentity *sender,
1858 const struct GNUNET_HashCode *key,
1859 struct GNUNET_BLOCK_Group *bg)
1860{
1861 int bucket_idx;
1862 struct PeerBucket *bucket;
1863 struct PeerInfo *peer;
1864 unsigned int choice;
1865 const struct GNUNET_HELLO_Message *hello;
1866 size_t hello_size;
1867
1868 /* first, check about our own HELLO */
1869 if (NULL != GDS_my_hello)
1870 {
1871 hello_size = GNUNET_HELLO_size ((const struct
1872 GNUNET_HELLO_Message *) GDS_my_hello);
1873 GNUNET_break (hello_size >= sizeof(struct GNUNET_MessageHeader));
1874 if (GNUNET_BLOCK_EVALUATION_OK_MORE ==
1875 GNUNET_BLOCK_evaluate (GDS_block_context,
1876 GNUNET_BLOCK_TYPE_DHT_HELLO,
1877 bg,
1878 GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO,
1879 &my_identity_hash,
1880 NULL, 0,
1881 GDS_my_hello,
1882 hello_size))
1883 {
1884 GDS_NEIGHBOURS_handle_reply (sender,
1885 GNUNET_BLOCK_TYPE_DHT_HELLO,
1886 GNUNET_TIME_relative_to_absolute (
1887 hello_expiration),
1888 key,
1889 0,
1890 NULL,
1891 0,
1892 NULL,
1893 GDS_my_hello,
1894 hello_size);
1895 }
1896 else
1897 {
1898 GNUNET_STATISTICS_update (GDS_stats,
1899 gettext_noop (
1900 "# FIND PEER requests ignored due to Bloomfilter"),
1901 1,
1902 GNUNET_NO);
1903 }
1904 }
1905 else
1906 {
1907 GNUNET_STATISTICS_update (GDS_stats,
1908 gettext_noop (
1909 "# FIND PEER requests ignored due to lack of HELLO"),
1910 1,
1911 GNUNET_NO);
1912 }
1913
1914 /* then, also consider sending a random HELLO from the closest bucket */
1915 if (0 == memcmp (&my_identity_hash,
1916 key,
1917 sizeof(struct GNUNET_HashCode)))
1918 bucket_idx = closest_bucket;
1919 else
1920 bucket_idx = GNUNET_MIN ((int) closest_bucket,
1921 find_bucket (key));
1922 if (bucket_idx < 0)
1923 return;
1924 bucket = &k_buckets[bucket_idx];
1925 if (bucket->peers_size == 0)
1926 return;
1927 choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1928 bucket->peers_size);
1929 peer = bucket->head;
1930 while (choice > 0)
1931 {
1932 GNUNET_assert (NULL != peer);
1933 peer = peer->next;
1934 choice--;
1935 }
1936 choice = bucket->peers_size;
1937 do
1938 {
1939 peer = peer->next;
1940 if (0 == choice--)
1941 return; /* no non-masked peer available */
1942 if (NULL == peer)
1943 peer = bucket->head;
1944 hello = GDS_HELLO_get (peer->id);
1945 }
1946 while ((NULL == hello) ||
1947 (GNUNET_BLOCK_EVALUATION_OK_MORE !=
1948 GNUNET_BLOCK_evaluate (GDS_block_context,
1949 GNUNET_BLOCK_TYPE_DHT_HELLO,
1950 bg,
1951 GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO,
1952 &peer->phash,
1953 NULL, 0,
1954 hello,
1955 (hello_size = GNUNET_HELLO_size (hello)))));
1956 GDS_NEIGHBOURS_handle_reply (sender,
1957 GNUNET_BLOCK_TYPE_DHT_HELLO,
1958 GNUNET_TIME_relative_to_absolute
1959 (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
1960 key,
1961 0,
1962 NULL,
1963 0,
1964 NULL,
1965 hello,
1966 hello_size);
1967}
1968
1969
1970/**
1971 * Handle a result from local datacache for a GET operation.
1972 *
1973 * @param cls the `struct PeerInfo` for which this is a reply
1974 * @param type type of the block
1975 * @param expiration_time when does the content expire
1976 * @param key key for the content
1977 * @param put_path_length number of entries in @a put_path
1978 * @param put_path peers the original PUT traversed (if tracked)
1979 * @param get_path_length number of entries in @a get_path
1980 * @param get_path peers this reply has traversed so far (if tracked)
1981 * @param data payload of the reply
1982 * @param data_size number of bytes in @a data
1983 */
1984static void
1985handle_local_result (void *cls,
1986 enum GNUNET_BLOCK_Type type,
1987 struct GNUNET_TIME_Absolute expiration_time,
1988 const struct GNUNET_HashCode *key,
1989 unsigned int put_path_length,
1990 const struct GNUNET_PeerIdentity *put_path,
1991 unsigned int get_path_length,
1992 const struct GNUNET_PeerIdentity *get_path,
1993 const void *data,
1994 size_t data_size)
1995{
1996 struct PeerInfo *peer = cls;
1997 char *pp;
1998
1999 pp = GNUNET_STRINGS_pp2s (put_path,
2000 put_path_length);
2001 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2002 "Found local result for %s (PP: %s)\n",
2003 GNUNET_h2s (key),
2004 pp);
2005 GNUNET_free (pp);
2006 GDS_NEIGHBOURS_handle_reply (peer->id,
2007 type,
2008 expiration_time,
2009 key,
2010 put_path_length, put_path,
2011 get_path_length, get_path,
2012 data, data_size);
2013}
2014
2015
2016/**
2017 * Check validity of p2p get request.
2018 *
2019 * @param cls closure with the `struct PeerInfo` of the sender
2020 * @param get the message
2021 * @return #GNUNET_OK if the message is well-formed
2022 */
2023static int
2024check_dht_p2p_get (void *cls,
2025 const struct PeerGetMessage *get)
2026{
2027 uint32_t xquery_size;
2028 uint16_t msize;
2029
2030 (void) cls;
2031 msize = ntohs (get->header.size);
2032 xquery_size = ntohl (get->xquery_size);
2033 if (msize < sizeof(struct PeerGetMessage) + xquery_size)
2034 {
2035 GNUNET_break_op (0);
2036 return GNUNET_SYSERR;
2037 }
2038 return GNUNET_OK;
2039}
2040
2041
2042/**
2043 * Core handler for p2p get requests.
2044 *
2045 * @param cls closure with the `struct PeerInfo` of the sender
2046 * @param get the message
2047 */
2048static void
2049handle_dht_p2p_get (void *cls,
2050 const struct PeerGetMessage *get)
2051{
2052 struct PeerInfo *peer = cls;
2053 uint32_t xquery_size;
2054 size_t reply_bf_size;
2055 uint16_t msize;
2056 enum GNUNET_BLOCK_Type type;
2057 enum GNUNET_DHT_RouteOption options;
2058 enum GNUNET_BLOCK_EvaluationResult eval;
2059 struct GNUNET_BLOCK_Group *bg;
2060 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
2061 const char *xquery;
2062 int forwarded;
2063
2064 /* parse and validate message */
2065 msize = ntohs (get->header.size);
2066 xquery_size = ntohl (get->xquery_size);
2067 reply_bf_size = msize - (sizeof(struct PeerGetMessage) + xquery_size);
2068 type = ntohl (get->type);
2069 options = ntohl (get->options);
2070 xquery = (const char *) &get[1];
2071 GNUNET_STATISTICS_update (GDS_stats,
2072 gettext_noop ("# P2P GET requests received"),
2073 1,
2074 GNUNET_NO);
2075 GNUNET_STATISTICS_update (GDS_stats,
2076 gettext_noop ("# P2P GET bytes received"),
2077 msize,
2078 GNUNET_NO);
2079 if (GNUNET_YES == log_route_details_stderr)
2080 {
2081 char *tmp;
2082
2083 tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2084 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2085 "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2086 GNUNET_h2s (&get->key),
2087 GNUNET_i2s (peer->id),
2088 tmp,
2089 ntohl (get->hop_count),
2090 GNUNET_CRYPTO_hash_matching_bits (&peer->phash,
2091 &get->key),
2092 GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
2093 &get->key),
2094 ntohl (get->xquery_size),
2095 xquery);
2096 GNUNET_free (tmp);
2097 }
2098 eval
2099 = GNUNET_BLOCK_evaluate (GDS_block_context,
2100 type,
2101 NULL,
2102 GNUNET_BLOCK_EO_NONE,
2103 &get->key,
2104 xquery,
2105 xquery_size,
2106 NULL,
2107 0);
2108 if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
2109 {
2110 /* request invalid or block type not supported */
2111 GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2112 return;
2113 }
2114 peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter,
2115 DHT_BLOOM_SIZE,
2116 GNUNET_CONSTANTS_BLOOMFILTER_K);
2117 GNUNET_break_op (GNUNET_YES ==
2118 GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2119 &peer->phash));
2120 bg = GNUNET_BLOCK_group_create (GDS_block_context,
2121 type,
2122 get->bf_mutator,
2123 &xquery[xquery_size],
2124 reply_bf_size,
2125 "filter-size",
2126 reply_bf_size,
2127 NULL);
2128 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2129 "GET for %s at %s after %u hops\n",
2130 GNUNET_h2s (&get->key),
2131 GNUNET_i2s (&my_identity),
2132 (unsigned int) ntohl (get->hop_count));
2133 /* local lookup (this may update the reply_bf) */
2134 if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2135 (GDS_am_closest_peer (&get->key,
2136 peer_bf)))
2137 {
2138 if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2139 {
2140 GNUNET_STATISTICS_update (GDS_stats,
2141 gettext_noop (
2142 "# P2P FIND PEER requests processed"),
2143 1,
2144 GNUNET_NO);
2145 handle_find_peer (peer->id,
2146 &get->key,
2147 bg);
2148 }
2149 else
2150 {
2151 eval = GDS_DATACACHE_handle_get (&get->key,
2152 type,
2153 xquery,
2154 xquery_size,
2155 bg,
2156 &handle_local_result,
2157 peer);
2158 }
2159 }
2160 else
2161 {
2162 GNUNET_STATISTICS_update (GDS_stats,
2163 gettext_noop ("# P2P GET requests ONLY routed"),
2164 1,
2165 GNUNET_NO);
2166 }
2167
2168 /* remember request for routing replies */
2169 GDS_ROUTING_add (peer->id,
2170 type,
2171 bg, /* bg now owned by routing, but valid at least until end of this function! */
2172 options,
2173 &get->key,
2174 xquery,
2175 xquery_size);
2176
2177 /* P2P forwarding */
2178 forwarded = GNUNET_NO;
2179 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2180 forwarded = GDS_NEIGHBOURS_handle_get (type,
2181 options,
2182 ntohl (
2183 get->desired_replication_level),
2184 ntohl (get->hop_count),
2185 &get->key,
2186 xquery,
2187 xquery_size,
2188 bg,
2189 peer_bf);
2190 GDS_CLIENTS_process_get (options
2191 | ((GNUNET_OK == forwarded)
2192 ? GNUNET_DHT_RO_LAST_HOP : 0),
2193 type,
2194 ntohl (get->hop_count),
2195 ntohl (get->desired_replication_level),
2196 0,
2197 NULL,
2198 &get->key);
2199
2200 /* clean up; note that 'bg' is owned by routing now! */
2201 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2202}
2203
2204
2205/**
2206 * Check validity of p2p result message.
2207 *
2208 * @param cls closure
2209 * @param message message
2210 * @return #GNUNET_YES if the message is well-formed
2211 */
2212static int
2213check_dht_p2p_result (void *cls,
2214 const struct PeerResultMessage *prm)
2215{
2216 uint32_t get_path_length;
2217 uint32_t put_path_length;
2218 uint16_t msize;
2219
2220 (void) cls;
2221 msize = ntohs (prm->header.size);
2222 put_path_length = ntohl (prm->put_path_length);
2223 get_path_length = ntohl (prm->get_path_length);
2224 if ((msize <
2225 sizeof(struct PeerResultMessage) + (get_path_length
2226 + put_path_length)
2227 * sizeof(struct GNUNET_PeerIdentity)) ||
2228 (get_path_length >
2229 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
2230 (put_path_length >
2231 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)))
2232 {
2233 GNUNET_break_op (0);
2234 return GNUNET_SYSERR;
2235 }
2236 return GNUNET_OK;
2237}
2238
2239
2240/**
2241 * Process a reply, after the @a get_path has been updated.
2242 *
2243 * @param expiration_time when does the reply expire
2244 * @param key key matching the query
2245 * @param get_path_length number of entries in @a get_path
2246 * @param get_path path the reply has taken
2247 * @param put_path_length number of entries in @a put_path
2248 * @param put_path path the PUT has taken
2249 * @param type type of the block
2250 * @param data_size number of bytes in @a data
2251 * @param data payload of the reply
2252 */
2253static void
2254process_reply_with_path (struct GNUNET_TIME_Absolute expiration_time,
2255 const struct GNUNET_HashCode *key,
2256 unsigned int get_path_length,
2257 const struct GNUNET_PeerIdentity *get_path,
2258 unsigned int put_path_length,
2259 const struct GNUNET_PeerIdentity *put_path,
2260 enum GNUNET_BLOCK_Type type,
2261 size_t data_size,
2262 const void *data)
2263{
2264 /* forward to local clients */
2265 GDS_CLIENTS_handle_reply (expiration_time,
2266 key,
2267 get_path_length,
2268 get_path,
2269 put_path_length,
2270 put_path,
2271 type,
2272 data_size,
2273 data);
2274 GDS_CLIENTS_process_get_resp (type,
2275 get_path,
2276 get_path_length,
2277 put_path,
2278 put_path_length,
2279 expiration_time,
2280 key,
2281 data,
2282 data_size);
2283 if (GNUNET_YES == cache_results)
2284 {
2285 struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2286
2287 GNUNET_memcpy (xput_path,
2288 put_path,
2289 put_path_length * sizeof(struct GNUNET_PeerIdentity));
2290 GNUNET_memcpy (&xput_path[put_path_length],
2291 get_path,
2292 get_path_length * sizeof(struct GNUNET_PeerIdentity));
2293
2294 GDS_DATACACHE_handle_put (expiration_time,
2295 key,
2296 get_path_length + put_path_length,
2297 xput_path,
2298 type,
2299 data_size,
2300 data);
2301 }
2302 /* forward to other peers */
2303 GDS_ROUTING_process (type,
2304 expiration_time,
2305 key,
2306 put_path_length,
2307 put_path,
2308 get_path_length,
2309 get_path,
2310 data,
2311 data_size);
2312}
2313
2314
2315/**
2316 * Core handler for p2p result messages.
2317 *
2318 * @param cls closure
2319 * @param message message
2320 */
2321static void
2322handle_dht_p2p_result (void *cls,
2323 const struct PeerResultMessage *prm)
2324{
2325 struct PeerInfo *peer = cls;
2326 const struct GNUNET_PeerIdentity *put_path;
2327 const struct GNUNET_PeerIdentity *get_path;
2328 const void *data;
2329 uint32_t get_path_length;
2330 uint32_t put_path_length;
2331 uint16_t msize;
2332 size_t data_size;
2333 enum GNUNET_BLOCK_Type type;
2334 struct GNUNET_TIME_Absolute exp_time;
2335
2336 /* parse and validate message */
2337 exp_time = GNUNET_TIME_absolute_ntoh (prm->expiration_time);
2338 if (0 == GNUNET_TIME_absolute_get_remaining (exp_time).rel_value_us)
2339 {
2340 GNUNET_STATISTICS_update (GDS_stats,
2341 gettext_noop ("# Expired results discarded"),
2342 1,
2343 GNUNET_NO);
2344 return;
2345 }
2346 msize = ntohs (prm->header.size);
2347 put_path_length = ntohl (prm->put_path_length);
2348 get_path_length = ntohl (prm->get_path_length);
2349 put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2350 get_path = &put_path[put_path_length];
2351 type = ntohl (prm->type);
2352 data = (const void *) &get_path[get_path_length];
2353 data_size = msize - (sizeof(struct PeerResultMessage)
2354 + (get_path_length
2355 + put_path_length) * sizeof(struct
2356 GNUNET_PeerIdentity));
2357 GNUNET_STATISTICS_update (GDS_stats,
2358 gettext_noop ("# P2P RESULTS received"),
2359 1,
2360 GNUNET_NO);
2361 GNUNET_STATISTICS_update (GDS_stats,
2362 gettext_noop ("# P2P RESULT bytes received"),
2363 msize,
2364 GNUNET_NO);
2365 if (GNUNET_YES == log_route_details_stderr)
2366 {
2367 char *tmp;
2368 char *pp;
2369 char *gp;
2370
2371 gp = GNUNET_STRINGS_pp2s (get_path,
2372 get_path_length);
2373 pp = GNUNET_STRINGS_pp2s (put_path,
2374 put_path_length);
2375 tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2376 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2377 "R5N RESULT %s: %s->%s (GP: %s, PP: %s)\n",
2378 GNUNET_h2s (&prm->key),
2379 GNUNET_i2s (peer->id),
2380 tmp,
2381 gp,
2382 pp);
2383 GNUNET_free (gp);
2384 GNUNET_free (pp);
2385 GNUNET_free (tmp);
2386 }
2387 /* if we got a HELLO, consider it for our own routing table */
2388 if (GNUNET_BLOCK_TYPE_DHT_HELLO == type)
2389 {
2390 const struct GNUNET_MessageHeader *h;
2391 struct GNUNET_PeerIdentity pid;
2392
2393 /* Should be a HELLO, validate and consider using it! */
2394 if (data_size < sizeof(struct GNUNET_HELLO_Message))
2395 {
2396 GNUNET_break_op (0);
2397 return;
2398 }
2399 h = data;
2400 if (data_size != ntohs (h->size))
2401 {
2402 GNUNET_break_op (0);
2403 return;
2404 }
2405 if (GNUNET_OK !=
2406 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2407 &pid))
2408 {
2409 GNUNET_break_op (0);
2410 return;
2411 }
2412 if ((GNUNET_YES != disable_try_connect) &&
2413 (0 != memcmp (&my_identity,
2414 &pid,
2415 sizeof(struct GNUNET_PeerIdentity))))
2416 try_connect (&pid,
2417 h);
2418 }
2419
2420 /* First, check if 'peer' is already on the path, and if
2421 so, truncate it instead of expanding. */
2422 for (unsigned int i = 0; i <= get_path_length; i++)
2423 if (0 == memcmp (&get_path[i],
2424 peer->id,
2425 sizeof(struct GNUNET_PeerIdentity)))
2426 {
2427 process_reply_with_path (exp_time,
2428 &prm->key,
2429 i,
2430 get_path,
2431 put_path_length,
2432 put_path,
2433 type,
2434 data_size,
2435 data);
2436 return;
2437 }
2438
2439 /* Need to append 'peer' to 'get_path' (normal case) */
2440 {
2441 struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2442
2443 GNUNET_memcpy (xget_path,
2444 get_path,
2445 get_path_length * sizeof(struct GNUNET_PeerIdentity));
2446 xget_path[get_path_length] = *peer->id;
2447
2448 process_reply_with_path (exp_time,
2449 &prm->key,
2450 get_path_length + 1,
2451 xget_path,
2452 put_path_length,
2453 put_path,
2454 type,
2455 data_size,
2456 data);
2457 }
2458}
2459
2460
2461/**
2462 * Initialize neighbours subsystem.
2463 *
2464 * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
2465 */
2466int
2467GDS_NEIGHBOURS_init ()
2468{
2469 struct GNUNET_MQ_MessageHandler core_handlers[] = {
2470 GNUNET_MQ_hd_var_size (dht_p2p_get,
2471 GNUNET_MESSAGE_TYPE_DHT_P2P_GET,
2472 struct PeerGetMessage,
2473 NULL),
2474 GNUNET_MQ_hd_var_size (dht_p2p_put,
2475 GNUNET_MESSAGE_TYPE_DHT_P2P_PUT,
2476 struct PeerPutMessage,
2477 NULL),
2478 GNUNET_MQ_hd_var_size (dht_p2p_result,
2479 GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT,
2480 struct PeerResultMessage,
2481 NULL),
2482 GNUNET_MQ_handler_end ()
2483 };
2484 unsigned long long temp_config_num;
2485
2486 disable_try_connect
2487 = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2488 "DHT",
2489 "DISABLE_TRY_CONNECT");
2490 if (GNUNET_OK ==
2491 GNUNET_CONFIGURATION_get_value_number (GDS_cfg,
2492 "DHT",
2493 "bucket_size",
2494 &temp_config_num))
2495 bucket_size = (unsigned int) temp_config_num;
2496 cache_results
2497 = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2498 "DHT",
2499 "CACHE_RESULTS");
2500
2501 log_route_details_stderr =
2502 (NULL != getenv ("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2503 ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2504 core_api = GNUNET_CORE_connect (GDS_cfg,
2505 NULL,
2506 &core_init,
2507 &handle_core_connect,
2508 &handle_core_disconnect,
2509 core_handlers);
2510 if (NULL == core_api)
2511 return GNUNET_SYSERR;
2512 all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2513 GNUNET_YES);
2514 all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2515 GNUNET_NO);
2516 return GNUNET_OK;
2517}
2518
2519
2520/**
2521 * Shutdown neighbours subsystem.
2522 */
2523void
2524GDS_NEIGHBOURS_done ()
2525{
2526 if (NULL == core_api)
2527 return;
2528 GNUNET_CORE_disconnect (core_api);
2529 core_api = NULL;
2530 GNUNET_assert (0 ==
2531 GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2532 GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2533 all_connected_peers = NULL;
2534 GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,
2535 &free_connect_info,
2536 NULL);
2537 GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers);
2538 all_desired_peers = NULL;
2539 GNUNET_ATS_connectivity_done (ats_ch);
2540 ats_ch = NULL;
2541 GNUNET_assert (NULL == find_peer_task);
2542}
2543
2544
2545/**
2546 * Get the ID of the local node.
2547 *
2548 * @return identity of the local node
2549 */
2550struct GNUNET_PeerIdentity *
2551GDS_NEIGHBOURS_get_id ()
2552{
2553 return &my_identity;
2554}
2555
2556
2557/* end of gnunet-service-dht_neighbours.c */