aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht_neighbours.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht/gnunet-service-dht_neighbours.c')
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c737
1 files changed, 672 insertions, 65 deletions
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
index 7585b5a47..8c87314e5 100644
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -49,6 +49,122 @@
49 */ 49 */
50#define DEFAULT_BUCKET_SIZE 4 50#define DEFAULT_BUCKET_SIZE 4
51 51
52/**
53 * Size of the bloom filter the DHT uses to filter peers.
54 */
55#define DHT_BLOOM_SIZE 128
56
57
58/**
59 * P2P PUT message
60 */
61struct PeerPutMessage
62{
63 /**
64 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
65 */
66 struct GNUNET_MessageHeader header;
67
68 /**
69 * Processing options
70 */
71 uint32_t options GNUNET_PACKED;
72
73 /**
74 * Content type.
75 */
76 uint32_t type GNUNET_PACKED;
77
78 /**
79 * Hop count
80 */
81 uint32_t hop_count GNUNET_PACKED;
82
83 /**
84 * Replication level for this message
85 */
86 uint32_t desired_replication_level GNUNET_PACKED;
87
88 /**
89 * Generic route path length for a message in the
90 * DHT that arrived at a peer and generated
91 * a reply. Copied to the end of this message.
92 */
93 uint32_t outgoing_path_length GNUNET_PACKED;
94
95 /**
96 * Bloomfilter (for peer identities) to stop circular routes
97 */
98 char bloomfilter[DHT_BLOOM_SIZE];
99
100 /**
101 * The key we are storing under.
102 */
103 GNUNET_HashCode key;
104
105 /* put path (if tracked) */
106
107 /* Payload */
108
109};
110
111
112/**
113 * P2P GET message
114 */
115struct PeerGetMessage
116{
117 /**
118 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
119 */
120 struct GNUNET_MessageHeader header;
121
122 /**
123 * Processing options
124 */
125 uint32_t options GNUNET_PACKED;
126
127 /**
128 * Desired content type.
129 */
130 uint32_t type GNUNET_PACKED;
131
132 /**
133 * Hop count
134 */
135 uint32_t hop_count GNUNET_PACKED;
136
137 /**
138 * Desired replication level for this request.
139 */
140 uint32_t desired_replication_level GNUNET_PACKED;
141
142 /**
143 * Size of the extended query.
144 */
145 uint32_t xquery_size;
146
147 /**
148 * Bloomfilter mutator.
149 */
150 uint32_t bf_mutator;
151
152 /**
153 * Bloomfilter (for peer identities) to stop circular routes
154 */
155 char bloomfilter[DHT_BLOOM_SIZE];
156
157 /**
158 * The key we are looking for.
159 */
160 GNUNET_HashCode key;
161
162 /* xquery */
163
164 /* result bloomfilter */
165
166};
167
52 168
53/** 169/**
54 * Linked list of messages to send to a particular other peer. 170 * Linked list of messages to send to a particular other peer.
@@ -183,13 +299,18 @@ struct PeerBucket
183 299
184 300
185/** 301/**
186 * The lowest currently used bucket. 302 * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
187 */ 303 */
188static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */ 304static unsigned int closest_bucket;
189 305
190/** 306/**
191 * The buckets (Kademlia routing table, complete with growth). 307 * How many peers have we added since we sent out our last
192 * Array of size MAX_BUCKET_SIZE. 308 * find peer request?
309 */
310static unsigned int newly_found_peers;
311
312/**
313 * The buckets. Array of size MAX_BUCKET_SIZE. Offset 0 means 0 bits matching.
193 */ 314 */
194static struct PeerBucket k_buckets[MAX_BUCKETS]; 315static struct PeerBucket k_buckets[MAX_BUCKETS];
195 316
@@ -203,6 +324,33 @@ static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
203 */ 324 */
204static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; 325static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
205 326
327/**
328 * Task that sends FIND PEER requests.
329 */
330static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
331
332
333/**
334 * Find the optimal bucket for this key.
335 *
336 * @param hc the hashcode to compare our identity to
337 * @return the proper bucket index, or GNUNET_SYSERR
338 * on error (same hashcode)
339 */
340static int
341find_bucket (const GNUNET_HashCode * hc)
342{
343 unsigned int bits;
344
345 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
346 if (bits == MAX_BUCKETS)
347 {
348 /* How can all bits match? Got my own ID? */
349 GNUNET_break (0);
350 return GNUNET_SYSERR;
351 }
352 return MAX_BUCKETS - bits - 1;
353}
206 354
207 355
208/** 356/**
@@ -222,29 +370,15 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
222 /* Check for connect to self message */ 370 /* Check for connect to self message */
223 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) 371 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
224 return; 372 return;
225
226#if DEBUG_DHT
227 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
228 "%s:%s Receives core connect message for peer %s distance %d!\n",
229 my_short_id, "dht", GNUNET_i2s (peer), distance);
230#endif
231
232 if (GNUNET_YES == 373 if (GNUNET_YES ==
233 GNUNET_CONTAINER_multihashmap_contains (all_known_peers, 374 GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
234 &peer->hashPubKey)) 375 &peer->hashPubKey))
235 { 376 {
236#if DEBUG_DHT
237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238 "%s:%s Received %s message for peer %s, but already have peer in RT!",
239 my_short_id, "DHT", "CORE CONNECT", GNUNET_i2s (peer));
240#endif
241 GNUNET_break (0); 377 GNUNET_break (0);
242 return; 378 return;
243 } 379 }
244 380 peer_bucket = find_bucket (&peer->hashPubKey);
245 peer_bucket = find_current_bucket (&peer->hashPubKey); 381 GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
246 GNUNET_assert (peer_bucket >= lowest_bucket);
247 GNUNET_assert (peer_bucket < MAX_BUCKETS);
248 ret = GNUNET_malloc (sizeof (struct PeerInfo)); 382 ret = GNUNET_malloc (sizeof (struct PeerInfo));
249#if 0 383#if 0
250 ret->latency = latency; 384 ret->latency = latency;
@@ -255,23 +389,17 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
255 k_buckets[peer_bucket].tail, 389 k_buckets[peer_bucket].tail,
256 k_buckets[peer_bucket].tail, ret); 390 k_buckets[peer_bucket].tail, ret);
257 k_buckets[peer_bucket].peers_size++; 391 k_buckets[peer_bucket].peers_size++;
258 if ((GNUNET_CRYPTO_hash_matching_bits 392 closest_bucket = GNUNET_MAX (closest_bucket,
259 (&my_identity.hashPubKey, &peer->hashPubKey) > 0) && 393 peer_bucket);
260 (k_buckets[peer_bucket].peers_size <= bucket_size)) 394 if ( (peer_bucket > 0) &&
261 ret->preference_task = 395 (k_buckets[peer_bucket].peers_size <= bucket_size) )
262 GNUNET_SCHEDULER_add_now (&update_core_preference, ret); 396 ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
263 if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
264 enable_next_bucket ();
265 newly_found_peers++; 397 newly_found_peers++;
266 GNUNET_CONTAINER_multihashmap_put (all_known_peers, &peer->hashPubKey, ret, 398 GNUNET_assert (GNUNET_OK ==
267 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 399 GNUNET_CONTAINER_multihashmap_put (all_known_peers,
400 &peer->hashPubKey, ret,
401 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
268 increment_stats (STAT_PEERS_KNOWN); 402 increment_stats (STAT_PEERS_KNOWN);
269
270#if DEBUG_DHT
271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
272 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT",
273 ret == NULL ? "NOT ADDED" : "PEER ADDED");
274#endif
275} 403}
276 404
277 405
@@ -286,68 +414,547 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
286{ 414{
287 struct PeerInfo *to_remove; 415 struct PeerInfo *to_remove;
288 int current_bucket; 416 int current_bucket;
417 struct P2PPendingMessage *pos;
418 struct P2PPendingMessage *next;
289 419
290 /* Check for disconnect from self message */ 420 /* Check for disconnect from self message */
291 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) 421 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
292 return; 422 return;
293#if DEBUG_DHT
294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295 "%s:%s: Received peer disconnect message for peer `%s' from %s\n",
296 my_short_id, "DHT", GNUNET_i2s (peer), "CORE");
297#endif
298
299 if (GNUNET_YES !=
300 GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
301 &peer->hashPubKey))
302 {
303 GNUNET_break (0);
304#if DEBUG_DHT
305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
306 "%s:%s: do not have peer `%s' in RT, can't disconnect!\n",
307 my_short_id, "DHT", GNUNET_i2s (peer));
308#endif
309 return;
310 }
311 increment_stats (STAT_DISCONNECTS);
312 GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
313 (all_known_peers, &peer->hashPubKey));
314 to_remove = 423 to_remove =
315 GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey); 424 GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
316 GNUNET_assert (to_remove != NULL); 425 if (NULL == to_remove)
426 {
427 GNUNET_break (0);
428 return;
429 }
430 GNUNET_assert (GNUNET_YES ==
431 GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
432 &peer->hashPubKey,
433 to_remove));
317 if (NULL != to_remove->info_ctx) 434 if (NULL != to_remove->info_ctx)
318 { 435 {
319 GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx); 436 GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
320 to_remove->info_ctx = NULL; 437 to_remove->info_ctx = NULL;
321 } 438 }
322 GNUNET_assert (0 ==
323 memcmp (peer, &to_remove->id,
324 sizeof (struct GNUNET_PeerIdentity)));
325 current_bucket = find_current_bucket (&to_remove->id.hashPubKey); 439 current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
326 delete_peer (to_remove, current_bucket); 440 GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
441 k_buckets[current_bucket].tail,
442 to_remove);
443 GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
444 k_buckets[current_bucket].peers_size--;
445 while ( (lowest_bucket > 0) &&
446 (k_buckets[lowest_bucket].peers_size == 0) )
447 lowest_bucket--;
448
449 if (to_remove->send_task != GNUNET_SCHEDULER_NO_TASK)
450 {
451 GNUNET_SCHEDULER_cancel (peer->send_task);
452 peer->send_task = GNUNET_SCHEDULER_NO_TASK;
453 }
454 if (to_remove->th != NULL)
455 {
456 GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
457 to_remove->th = NULL;
458 }
459 while (NULL != (pos = to_remove->head))
460 {
461 GNUNET_CONTAINER_DLL_remove (to_remove->head,
462 to_remove->tail,
463 pos);
464 GNUNET_free (pos);
465 }
327} 466}
328 467
329 468
469/**
470 * Perform a PUT operation. // FIXME: document if this is only
471 * routing or also storage and/or even local client notification!
472 *
473 * @param type type of the block
474 * @param options routing options
475 * @param desired_replication_level desired replication count
476 * @param expiration_time when does the content expire
477 * @param key key for the content
478 * @param put_path_length number of entries in put_path
479 * @param put_path peers this request has traversed so far (if tracked)
480 * @param data payload to store
481 * @param data_size number of bytes in data
482 */
483void
484GST_NEIGHBOURS_handle_put (uint32_t type,
485 uint32_t options,
486 uint32_t desired_replication_level,
487 GNUNET_TIME_Absolute expiration_time,
488 const GNUNET_HashCode *key,
489 unsigned int put_path_length,
490 struct GNUNET_PeerIdentity *put_path,
491 const void *data,
492 size_t data_size)
493{
494 // FIXME
495}
496
330 497
331/** 498/**
332 * Initialize neighbours subsystem. 499 * Perform a GET operation. // FIXME: document if this is only
500 * routing or also state-tracking and/or even local lookup!
501 *
502 * @param type type of the block
503 * @param options routing options
504 * @param desired_replication_level desired replication count
505 * @param key key for the content
506 * @param xquery extended query
507 * @param xquery_size number of bytes in xquery
508 * @param reply_bf bloomfilter to filter duplicates
509 * @param reply_bf_mutator mutator for reply_bf
510 * @param peer_bf filter for peers not to select (again)
333 */ 511 */
334void 512void
335GST_NEIGHBOURS_init () 513GST_NEIGHBOURS_handle_get (uint32_t type,
514 uint32_t options,
515 uint32_t desired_replication_level,
516 const GNUNET_HashCode *key,
517 const void *xquery,
518 size_t xquery_size,
519 const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
520 uint32_t reply_bf_mutator,
521 const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
336{ 522{
523 // FIXME
337} 524}
338 525
339 526
340/** 527/**
341 * Shutdown neighbours subsystem. 528 * Handle a reply (route to origin). FIXME: should this be here?
529 * (reply-routing table might be better done elsewhere).
530 *
531 * @param type type of the block
532 * @param options routing options
533 * @param expiration_time when does the content expire
534 * @param key key for the content
535 * @param put_path_length number of entries in put_path
536 * @param put_path peers the original PUT traversed (if tracked)
537 * @param get_path_length number of entries in put_path
538 * @param get_path peers this reply has traversed so far (if tracked)
539 * @param data payload of the reply
540 * @param data_size number of bytes in data
342 */ 541 */
343void 542void
344GST_NEIGHBOURS_done () 543GST_NEIGHBOURS_handle_reply (uint32_t type,
544 uint32_t options,
545 GNUNET_TIME_Absolute expiration_time,
546 const GNUNET_HashCode *key,
547 unsigned int put_path_length,
548 struct GNUNET_PeerIdentity *put_path,
549 unsigned int get_path_length,
550 struct GNUNET_PeerIdentity *get_path,
551 const void *data,
552 size_t data_size)
553{
554 // FIXME
555}
556
557
558/**
559 * Add each of the peers we already know to the bloom filter of
560 * the request so that we don't get duplicate HELLOs.
561 *
562 * @param cls the 'struct GNUNET_CONTAINER_BloomFilter' we're building
563 * @param key peer identity to add to the bloom filter
564 * @param value value the peer information (unused)
565 * @return GNUNET_YES (we should continue to iterate)
566 */
567static int
568add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
569{
570 struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
571
572 GNUNET_CONTAINER_bloomfilter_add (bloom, key);
573 return GNUNET_YES;
574}
575
576
577/**
578 * Task to send a find peer message for our own peer identifier
579 * so that we can find the closest peers in the network to ourselves
580 * and attempt to connect to them.
581 *
582 * @param cls closure for this task
583 * @param tc the context under which the task is running
584 */
585static void
586send_find_peer_message (void *cls,
587 const struct GNUNET_SCHEDULER_TaskContext *tc)
588{
589 struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
590 struct DHT_MessageContext msg_ctx;
591 struct GNUNET_TIME_Relative next_send_time;
592 struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
593
594 find_peer_task = GNUNET_SCHEDULER_NO_TASK;
595 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
596 return;
597 if (newly_found_peers > bucket_size)
598 {
599 /* If we are finding many peers already, no need to send out our request right now! */
600 find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
601 &send_find_peer_message, NULL);
602 newly_found_peers = 0;
603 return;
604 }
605
606 // FIXME: build message...
607 find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
608 find_peer_msg->header.size =
609 htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
610 find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
611 temp_bloom =
612 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
613 GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
614 temp_bloom);
615 GNUNET_assert (GNUNET_OK ==
616 GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
617 find_peer_msg->
618 bloomfilter,
619 DHT_BLOOM_SIZE));
620 GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
621
622 memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
623 memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
624 msg_ctx.unique_id =
625 GNUNET_ntohll (GNUNET_CRYPTO_random_u64
626 (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
627 msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
628 msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
629 msg_ctx.network_size = log_of_network_size_estimate;
630 msg_ctx.peer = my_identity;
631 msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
632 msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
633 // FIXME: transmit message...
634 demultiplex_message (&find_peer_msg->header, &msg_ctx);
635 GNUNET_free (find_peer_msg);
636
637 /* schedule next round */
638 newly_found_peers = 0;
639 next_send_time.rel_value =
640 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
641 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
642 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
643 find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
644 &send_find_peer_message,
645 NULL);
646}
647
648
649/**
650 * To be called on core init/fail.
651 *
652 * @param cls service closure
653 * @param server handle to the server for this service
654 * @param identity the public identity of this peer
655 * @param publicKey the public key of this peer
656 */
657static void
658core_init (void *cls, struct GNUNET_CORE_Handle *server,
659 const struct GNUNET_PeerIdentity *identity,
660 const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
345{ 661{
662 GNUNET_assert (server != NULL);
663 my_identity = *identity;
664 next_send_time.rel_value =
665 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
666 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
667 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
668 2) -
669 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
670 find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
671 &send_find_peer_message,
672 NULL);
346} 673}
347 674
348 675
676/**
677 * Core handler for p2p get requests.
678 *
679 * @param cls closure
680 * @param message message
681 * @param peer peer identity this notification is about
682 * @param atsi performance data
683 * @return GNUNET_OK to keep the connection open,
684 * GNUNET_SYSERR to close it (signal serious error)
685 */
686static int
687handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
688 const struct GNUNET_MessageHeader *message,
689 const struct GNUNET_TRANSPORT_ATS_Information
690 *atsi)
691{
692 struct GNUNET_DHT_P2PRouteMessage *incoming =
693 (struct GNUNET_DHT_P2PRouteMessage *) message;
694 struct GNUNET_MessageHeader *enc_msg =
695 (struct GNUNET_MessageHeader *) &incoming[1];
696 struct DHT_MessageContext *msg_ctx;
697 char *route_path;
698 int path_size;
699
700 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
701 {
702 GNUNET_break_op (0);
703 return GNUNET_YES;
704 }
349 705
706 if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
707 {
708 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
709 "Sending of previous replies took too long, backing off!\n");
710 increment_stats ("# route requests dropped due to high load");
711 decrease_max_send_delay (get_max_send_delay ());
712 return GNUNET_YES;
713 }
714 msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
715 msg_ctx->bloom =
716 GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
717 DHT_BLOOM_K);
718 GNUNET_assert (msg_ctx->bloom != NULL);
719 msg_ctx->hop_count = ntohl (incoming->hop_count);
720 memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
721 msg_ctx->replication = ntohl (incoming->desired_replication_level);
722 msg_ctx->msg_options = ntohl (incoming->options);
723 if (GNUNET_DHT_RO_RECORD_ROUTE ==
724 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
725 {
726 path_size =
727 ntohl (incoming->outgoing_path_length) *
728 sizeof (struct GNUNET_PeerIdentity);
729 if (ntohs (message->size) !=
730 (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
731 path_size))
732 {
733 GNUNET_break_op (0);
734 GNUNET_free (msg_ctx);
735 return GNUNET_YES;
736 }
737 route_path = (char *) &incoming[1];
738 route_path = route_path + ntohs (enc_msg->size);
739 msg_ctx->path_history =
740 GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
741 memcpy (msg_ctx->path_history, route_path, path_size);
742 memcpy (&msg_ctx->path_history[path_size], &my_identity,
743 sizeof (struct GNUNET_PeerIdentity));
744 msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
745 }
746 msg_ctx->network_size = ntohl (incoming->network_size);
747 msg_ctx->peer = *peer;
748 msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
749 msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
750 demultiplex_message (enc_msg, msg_ctx);
751 if (msg_ctx->bloom != NULL)
752 {
753 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
754 msg_ctx->bloom = NULL;
755 }
756 GNUNET_free (msg_ctx);
757 return GNUNET_YES;
758}
350 759
351 760
761/**
762 * Core handler for p2p put requests.
763 *
764 * @param cls closure
765 * @param message message
766 * @param peer peer identity this notification is about
767 * @param atsi performance data
768 * @return GNUNET_OK to keep the connection open,
769 * GNUNET_SYSERR to close it (signal serious error)
770 */
771static int
772handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
773 const struct GNUNET_MessageHeader *message,
774 const struct GNUNET_TRANSPORT_ATS_Information
775 *atsi)
776{
777 struct GNUNET_DHT_P2PRouteMessage *incoming =
778 (struct GNUNET_DHT_P2PRouteMessage *) message;
779 struct GNUNET_MessageHeader *enc_msg =
780 (struct GNUNET_MessageHeader *) &incoming[1];
781 struct DHT_MessageContext *msg_ctx;
782 char *route_path;
783 int path_size;
784
785 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
786 {
787 GNUNET_break_op (0);
788 return GNUNET_YES;
789 }
790
791 if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
792 {
793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794 "Sending of previous replies took too long, backing off!\n");
795 increment_stats ("# route requests dropped due to high load");
796 decrease_max_send_delay (get_max_send_delay ());
797 return GNUNET_YES;
798 }
799 msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
800 msg_ctx->bloom =
801 GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
802 DHT_BLOOM_K);
803 GNUNET_assert (msg_ctx->bloom != NULL);
804 msg_ctx->hop_count = ntohl (incoming->hop_count);
805 memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
806 msg_ctx->replication = ntohl (incoming->desired_replication_level);
807 msg_ctx->msg_options = ntohl (incoming->options);
808 if (GNUNET_DHT_RO_RECORD_ROUTE ==
809 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
810 {
811 path_size =
812 ntohl (incoming->outgoing_path_length) *
813 sizeof (struct GNUNET_PeerIdentity);
814 if (ntohs (message->size) !=
815 (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
816 path_size))
817 {
818 GNUNET_break_op (0);
819 GNUNET_free (msg_ctx);
820 return GNUNET_YES;
821 }
822 route_path = (char *) &incoming[1];
823 route_path = route_path + ntohs (enc_msg->size);
824 msg_ctx->path_history =
825 GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
826 memcpy (msg_ctx->path_history, route_path, path_size);
827 memcpy (&msg_ctx->path_history[path_size], &my_identity,
828 sizeof (struct GNUNET_PeerIdentity));
829 msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
830 }
831 msg_ctx->network_size = ntohl (incoming->network_size);
832 msg_ctx->peer = *peer;
833 msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
834 msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
835 demultiplex_message (enc_msg, msg_ctx);
836 if (msg_ctx->bloom != NULL)
837 {
838 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
839 msg_ctx->bloom = NULL;
840 }
841 GNUNET_free (msg_ctx);
842 return GNUNET_YES;
843}
844
845
846/**
847 * Core handler for p2p route results.
848 *
849 * @param cls closure
850 * @param message message
851 * @param peer peer identity this notification is about
852 * @param atsi performance data
853 *
854 */
855static int
856handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
857 const struct GNUNET_MessageHeader *message,
858 const struct GNUNET_TRANSPORT_ATS_Information
859 *atsi)
860{
861 const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
862 (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
863 struct GNUNET_MessageHeader *enc_msg =
864 (struct GNUNET_MessageHeader *) &incoming[1];
865 struct DHT_MessageContext msg_ctx;
866
867 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
868 {
869 GNUNET_break_op (0);
870 return GNUNET_YES;
871 }
872
873 memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
874 memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
875 msg_ctx.msg_options = ntohl (incoming->options);
876 msg_ctx.hop_count = ntohl (incoming->hop_count);
877 msg_ctx.peer = *peer;
878 msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
879 msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
880 if ((GNUNET_DHT_RO_RECORD_ROUTE ==
881 (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
882 (ntohl (incoming->outgoing_path_length) > 0))
883 {
884 if (ntohs (message->size) -
885 sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
886 ntohs (enc_msg->size) !=
887 ntohl (incoming->outgoing_path_length) *
888 sizeof (struct GNUNET_PeerIdentity))
889 {
890 GNUNET_break_op (0);
891 return GNUNET_NO;
892 }
893 msg_ctx.path_history = (char *) &incoming[1];
894 msg_ctx.path_history += ntohs (enc_msg->size);
895 msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
896 }
897 route_result_message (enc_msg, &msg_ctx);
898 return GNUNET_YES;
899}
900
901
902/**
903 * Initialize neighbours subsystem.
904 */
905int
906GST_NEIGHBOURS_init ()
907{
908 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
909 {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
910 {&handle_dht_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
911 {&handle_dht_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
912 {NULL, 0, 0}
913 };
914 unsigned long long temp_config_num;
915 struct GNUNET_TIME_Relative next_send_time;
916
917 if (GNUNET_OK ==
918 GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
919 &temp_config_num))
920 bucket_size = (unsigned int) temp_config_num;
921 coreAPI = GNUNET_CORE_connect (GDS_cfg, /* Main configuration */
922 DEFAULT_CORE_QUEUE_SIZE, /* queue size */
923 NULL, /* Closure passed to DHT functions */
924 &core_init, /* Call core_init once connected */
925 &handle_core_connect, /* Handle connects */
926 &handle_core_disconnect, /* remove peers on disconnects */
927 NULL, /* Do we care about "status" updates? */
928 NULL, /* Don't want notified about all incoming messages */
929 GNUNET_NO, /* For header only inbound notification */
930 NULL, /* Don't want notified about all outbound messages */
931 GNUNET_NO, /* For header only outbound notification */
932 core_handlers); /* Register these handlers */
933 if (coreAPI == NULL)
934 return GNUNET_SYSERR;
935 all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
936 return GNUNET_OK;
937}
938
939
940/**
941 * Shutdown neighbours subsystem.
942 */
943void
944GST_NEIGHBOURS_done ()
945{
946 GNUNET_assert (coreAPI != NULL);
947 GNUNET_CORE_disconnect (coreAPI);
948 coreAPI = NULL;
949 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_get_size (all_known_peers));
950 GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
951 all_known_peers = NULL;
952 if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
953 {
954 GNUNET_SCHEDULER_cancel (find_peer_task);
955 find_peer_task = GNUNET_SCHEDULER_NO_TASK;
956 }
957}
958
352 959
353/* end of gnunet-service-dht_neighbours.c */ 960/* end of gnunet-service-dht_neighbours.c */