aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-09-21 16:47:21 +0000
committerChristian Grothoff <christian@grothoff.org>2011-09-21 16:47:21 +0000
commit828051543fb820627c41eacb40f3a009e237e029 (patch)
treec6bfce886209cd5575f3ff141001c8f2b27ea192 /src
parentc49bdc2e3f448a02d1031ab296ecc0805f4d8c61 (diff)
downloadgnunet-828051543fb820627c41eacb40f3a009e237e029.tar.gz
gnunet-828051543fb820627c41eacb40f3a009e237e029.zip
stuff
Diffstat (limited to 'src')
-rw-r--r--src/dht/gnunet-service-dht-new.c483
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c737
-rw-r--r--src/dht/gnunet-service-dht_neighbours.h1
3 files changed, 673 insertions, 548 deletions
diff --git a/src/dht/gnunet-service-dht-new.c b/src/dht/gnunet-service-dht-new.c
index 62705bf38..2fb06457e 100644
--- a/src/dht/gnunet-service-dht-new.c
+++ b/src/dht/gnunet-service-dht-new.c
@@ -777,27 +777,6 @@ inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
777 777
778 778
779/** 779/**
780 * Find the optimal bucket for this key, regardless
781 * of the current number of buckets in use.
782 *
783 * @param hc the hashcode to compare our identity to
784 *
785 * @return the proper bucket index, or GNUNET_SYSERR
786 * on error (same hashcode)
787 */
788static int
789find_bucket (const GNUNET_HashCode * hc)
790{
791 unsigned int bits;
792
793 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
794 if (bits == MAX_BUCKETS)
795 return GNUNET_SYSERR;
796 return MAX_BUCKETS - bits - 1;
797}
798
799
800/**
801 * Find which k-bucket this peer should go into, 780 * Find which k-bucket this peer should go into,
802 * taking into account the size of the k-bucket 781 * taking into account the size of the k-bucket
803 * array. This means that if more bits match than 782 * array. This means that if more bits match than
@@ -917,127 +896,6 @@ update_core_preference (void *cls,
917 &update_core_preference_finish, peer); 896 &update_core_preference_finish, peer);
918} 897}
919 898
920
921/**
922 * Given a peer and its corresponding bucket,
923 * remove it from that bucket. Does not free
924 * the PeerInfo struct, nor cancel messages
925 * or free messages waiting to be sent to this
926 * peer!
927 *
928 * @param peer the peer to remove
929 * @param bucket the bucket the peer belongs to
930 */
931static void
932remove_peer (struct PeerInfo *peer, unsigned int bucket)
933{
934 GNUNET_assert (k_buckets[bucket].peers_size > 0);
935 GNUNET_CONTAINER_DLL_remove (k_buckets[bucket].head, k_buckets[bucket].tail,
936 peer);
937 k_buckets[bucket].peers_size--;
938 if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) &&
939 (lowest_bucket < MAX_BUCKETS - 1))
940 lowest_bucket++;
941}
942
943/**
944 * Removes peer from a bucket, then frees associated
945 * resources and frees peer.
946 *
947 * @param peer peer to be removed and freed
948 * @param bucket which bucket this peer belongs to
949 */
950static void
951delete_peer (struct PeerInfo *peer, unsigned int bucket)
952{
953 struct P2PPendingMessage *pos;
954 struct P2PPendingMessage *next;
955
956 remove_peer (peer, bucket); /* First remove the peer from its bucket */
957 if (peer->send_task != GNUNET_SCHEDULER_NO_TASK)
958 GNUNET_SCHEDULER_cancel (peer->send_task);
959 if ((peer->th != NULL) && (coreAPI != NULL))
960 GNUNET_CORE_notify_transmit_ready_cancel (peer->th);
961
962 pos = peer->head;
963 while (pos != NULL) /* Remove any pending messages for this peer */
964 {
965 increment_stats
966 ("# dht pending messages discarded (due to disconnect/shutdown)");
967 next = pos->next;
968 GNUNET_free (pos);
969 pos = next;
970 }
971
972 GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
973 (all_known_peers, &peer->id.hashPubKey));
974 GNUNET_assert (GNUNET_YES ==
975 GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
976 &peer->id.hashPubKey,
977 peer));
978 GNUNET_free (peer);
979 decrement_stats (STAT_PEERS_KNOWN);
980}
981
982
983/**
984 * Iterator over hash map entries.
985 *
986 * @param cls closure
987 * @param key current key code
988 * @param value PeerInfo of the peer to move to new lowest bucket
989 * @return GNUNET_YES if we should continue to
990 * iterate,
991 * GNUNET_NO if not.
992 */
993static int
994move_lowest_bucket (void *cls, const GNUNET_HashCode * key, void *value)
995{
996 struct PeerInfo *peer = value;
997 int new_bucket;
998
999 GNUNET_assert (lowest_bucket > 0);
1000 new_bucket = lowest_bucket - 1;
1001 remove_peer (peer, lowest_bucket);
1002 GNUNET_CONTAINER_DLL_insert_after (k_buckets[new_bucket].head,
1003 k_buckets[new_bucket].tail,
1004 k_buckets[new_bucket].tail, peer);
1005 k_buckets[new_bucket].peers_size++;
1006 return GNUNET_YES;
1007}
1008
1009
1010/**
1011 * The current lowest bucket is full, so change the lowest
1012 * bucket to the next lower down, and move any appropriate
1013 * entries in the current lowest bucket to the new bucket.
1014 */
1015static void
1016enable_next_bucket ()
1017{
1018 struct GNUNET_CONTAINER_MultiHashMap *to_remove;
1019 struct PeerInfo *pos;
1020
1021 GNUNET_assert (lowest_bucket > 0);
1022 to_remove = GNUNET_CONTAINER_multihashmap_create (bucket_size);
1023 pos = k_buckets[lowest_bucket].head;
1024
1025 /* Populate the array of peers which should be in the next lowest bucket */
1026 while (pos != NULL)
1027 {
1028 if (find_bucket (&pos->id.hashPubKey) < lowest_bucket)
1029 GNUNET_CONTAINER_multihashmap_put (to_remove, &pos->id.hashPubKey, pos,
1030 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1031 pos = pos->next;
1032 }
1033
1034 /* Remove peers from lowest bucket, insert into next lowest bucket */
1035 GNUNET_CONTAINER_multihashmap_iterate (to_remove, &move_lowest_bucket, NULL);
1036 GNUNET_CONTAINER_multihashmap_destroy (to_remove);
1037 lowest_bucket = lowest_bucket - 1;
1038}
1039
1040
1041/** 899/**
1042 * Find the closest peer in our routing table to the 900 * Find the closest peer in our routing table to the
1043 * given hashcode. 901 * given hashcode.
@@ -2464,286 +2322,6 @@ demultiplex_message (const struct GNUNET_MessageHeader *msg,
2464 2322
2465 2323
2466/** 2324/**
2467 * Iterator over hash map entries.
2468 *
2469 * @param cls closure
2470 * @param key current key code
2471 * @param value value in the hash map
2472 * @return GNUNET_YES if we should continue to
2473 * iterate,
2474 * GNUNET_NO if not.
2475 */
2476static int
2477add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
2478{
2479 struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
2480
2481 GNUNET_CONTAINER_bloomfilter_add (bloom, key);
2482 return GNUNET_YES;
2483}
2484
2485/**
2486 * Task to send a find peer message for our own peer identifier
2487 * so that we can find the closest peers in the network to ourselves
2488 * and attempt to connect to them.
2489 *
2490 * @param cls closure for this task
2491 * @param tc the context under which the task is running
2492 */
2493static void
2494send_find_peer_message (void *cls,
2495 const struct GNUNET_SCHEDULER_TaskContext *tc)
2496{
2497 struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
2498 struct DHT_MessageContext msg_ctx;
2499 struct GNUNET_TIME_Relative next_send_time;
2500 struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
2501
2502 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
2503 return;
2504
2505 if (newly_found_peers > bucket_size) /* If we are finding peers already, no need to send out our request right now! */
2506 {
2507 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2508 "Have %d newly found peers since last find peer message sent!\n",
2509 newly_found_peers);
2510 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
2511 &send_find_peer_message, NULL);
2512 newly_found_peers = 0;
2513 return;
2514 }
2515
2516 increment_stats (STAT_FIND_PEER_START);
2517#if FIND_PEER_WITH_HELLO
2518 find_peer_msg =
2519 GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) +
2520 GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *)
2521 my_hello));
2522 find_peer_msg->header.size =
2523 htons (sizeof (struct GNUNET_DHT_FindPeerMessage) +
2524 GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
2525 memcpy (&find_peer_msg[1], my_hello,
2526 GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello));
2527#else
2528 find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
2529 find_peer_msg->header.size =
2530 htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
2531#endif
2532 find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
2533 temp_bloom =
2534 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2535 GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
2536 temp_bloom);
2537 GNUNET_assert (GNUNET_OK ==
2538 GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
2539 find_peer_msg->
2540 bloomfilter,
2541 DHT_BLOOM_SIZE));
2542 GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
2543 memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
2544 memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
2545 msg_ctx.unique_id =
2546 GNUNET_ntohll (GNUNET_CRYPTO_random_u64
2547 (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
2548 msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
2549 msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
2550 msg_ctx.network_size = log_of_network_size_estimate;
2551 msg_ctx.peer = my_identity;
2552 msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
2553 msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
2554
2555 demultiplex_message (&find_peer_msg->header, &msg_ctx);
2556 GNUNET_free (find_peer_msg);
2557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2558 "`%s:%s': Sent `%s' request to some (?) peers\n", my_short_id,
2559 "DHT", "FIND PEER");
2560 if (newly_found_peers < bucket_size)
2561 {
2562 next_send_time.rel_value =
2563 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
2564 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
2565 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
2566 }
2567 else
2568 {
2569 next_send_time.rel_value =
2570 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
2571 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
2572 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value -
2573 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
2574 }
2575
2576 GNUNET_assert (next_send_time.rel_value != 0);
2577 find_peer_context.count = 0;
2578 newly_found_peers = 0;
2579 find_peer_context.start = GNUNET_TIME_absolute_get ();
2580 GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message,
2581 NULL);
2582}
2583
2584
2585/**
2586 * Core handler for p2p route requests.
2587 *
2588 * @param cls closure
2589 * @param message message
2590 * @param peer peer identity this notification is about
2591 * @param atsi performance data
2592 * @return GNUNET_OK to keep the connection open,
2593 * GNUNET_SYSERR to close it (signal serious error)
2594 */
2595static int
2596handle_dht_p2p_route_request (void *cls, const struct GNUNET_PeerIdentity *peer,
2597 const struct GNUNET_MessageHeader *message,
2598 const struct GNUNET_TRANSPORT_ATS_Information
2599 *atsi)
2600{
2601#if DEBUG_DHT
2602 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2603 "`%s:%s': Received P2P request from peer %s\n", my_short_id,
2604 "DHT", GNUNET_i2s (peer));
2605#endif
2606 struct GNUNET_DHT_P2PRouteMessage *incoming =
2607 (struct GNUNET_DHT_P2PRouteMessage *) message;
2608 struct GNUNET_MessageHeader *enc_msg =
2609 (struct GNUNET_MessageHeader *) &incoming[1];
2610 struct DHT_MessageContext *msg_ctx;
2611 char *route_path;
2612 int path_size;
2613
2614 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
2615 {
2616 GNUNET_break_op (0);
2617 return GNUNET_YES;
2618 }
2619
2620 if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
2621 {
2622 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2623 "Sending of previous replies took too long, backing off!\n");
2624 increment_stats ("# route requests dropped due to high load");
2625 decrease_max_send_delay (get_max_send_delay ());
2626 return GNUNET_YES;
2627 }
2628 msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
2629 msg_ctx->bloom =
2630 GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
2631 DHT_BLOOM_K);
2632 GNUNET_assert (msg_ctx->bloom != NULL);
2633 msg_ctx->hop_count = ntohl (incoming->hop_count);
2634 memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
2635 msg_ctx->replication = ntohl (incoming->desired_replication_level);
2636 msg_ctx->msg_options = ntohl (incoming->options);
2637 if (GNUNET_DHT_RO_RECORD_ROUTE ==
2638 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
2639 {
2640 path_size =
2641 ntohl (incoming->outgoing_path_length) *
2642 sizeof (struct GNUNET_PeerIdentity);
2643 if (ntohs (message->size) !=
2644 (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
2645 path_size))
2646 {
2647 GNUNET_break_op (0);
2648 GNUNET_free (msg_ctx);
2649 return GNUNET_YES;
2650 }
2651 route_path = (char *) &incoming[1];
2652 route_path = route_path + ntohs (enc_msg->size);
2653 msg_ctx->path_history =
2654 GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
2655 memcpy (msg_ctx->path_history, route_path, path_size);
2656 memcpy (&msg_ctx->path_history[path_size], &my_identity,
2657 sizeof (struct GNUNET_PeerIdentity));
2658 msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
2659 }
2660 msg_ctx->network_size = ntohl (incoming->network_size);
2661 msg_ctx->peer = *peer;
2662 msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
2663 msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
2664 demultiplex_message (enc_msg, msg_ctx);
2665 if (msg_ctx->bloom != NULL)
2666 {
2667 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
2668 msg_ctx->bloom = NULL;
2669 }
2670 GNUNET_free (msg_ctx);
2671 return GNUNET_YES;
2672}
2673
2674
2675/**
2676 * Core handler for p2p route results.
2677 *
2678 * @param cls closure
2679 * @param message message
2680 * @param peer peer identity this notification is about
2681 * @param atsi performance data
2682 *
2683 */
2684static int
2685handle_dht_p2p_route_result (void *cls, const struct GNUNET_PeerIdentity *peer,
2686 const struct GNUNET_MessageHeader *message,
2687 const struct GNUNET_TRANSPORT_ATS_Information
2688 *atsi)
2689{
2690#if DEBUG_DHT
2691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2692 "`%s:%s': Received request from peer %s\n", my_short_id, "DHT",
2693 GNUNET_i2s (peer));
2694#endif
2695 const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
2696 (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
2697 struct GNUNET_MessageHeader *enc_msg =
2698 (struct GNUNET_MessageHeader *) &incoming[1];
2699 struct DHT_MessageContext msg_ctx;
2700
2701 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
2702 {
2703 GNUNET_break_op (0);
2704 return GNUNET_YES;
2705 }
2706
2707 memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
2708 memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
2709 msg_ctx.msg_options = ntohl (incoming->options);
2710 msg_ctx.hop_count = ntohl (incoming->hop_count);
2711 msg_ctx.peer = *peer;
2712 msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
2713 msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
2714 if ((GNUNET_DHT_RO_RECORD_ROUTE ==
2715 (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
2716 (ntohl (incoming->outgoing_path_length) > 0))
2717 {
2718 if (ntohs (message->size) -
2719 sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
2720 ntohs (enc_msg->size) !=
2721 ntohl (incoming->outgoing_path_length) *
2722 sizeof (struct GNUNET_PeerIdentity))
2723 {
2724#if DEBUG_DHT
2725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2726 "Return message indicated a path was included, but sizes are wrong: Total size %d, enc size %d, left %d, expected %d\n",
2727 ntohs (message->size), ntohs (enc_msg->size),
2728 ntohs (message->size) -
2729 sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
2730 ntohs (enc_msg->size),
2731 ntohl (incoming->outgoing_path_length) *
2732 sizeof (struct GNUNET_PeerIdentity));
2733#endif
2734 GNUNET_break_op (0);
2735 return GNUNET_NO;
2736 }
2737 msg_ctx.path_history = (char *) &incoming[1];
2738 msg_ctx.path_history += ntohs (enc_msg->size);
2739 msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
2740 }
2741 route_result_message (enc_msg, &msg_ctx);
2742 return GNUNET_YES;
2743}
2744
2745
2746/**
2747 * Receive the HELLO from transport service, 2325 * Receive the HELLO from transport service,
2748 * free current and replace if necessary. 2326 * free current and replace if necessary.
2749 * 2327 *
@@ -2757,7 +2335,6 @@ process_hello (void *cls, const struct GNUNET_MessageHeader *message)
2757 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2335 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2758 "Received our `%s' from transport service\n", "HELLO"); 2336 "Received our `%s' from transport service\n", "HELLO");
2759#endif 2337#endif
2760
2761 GNUNET_assert (message != NULL); 2338 GNUNET_assert (message != NULL);
2762 GNUNET_free_non_null (my_hello); 2339 GNUNET_free_non_null (my_hello);
2763 my_hello = GNUNET_malloc (ntohs (message->size)); 2340 my_hello = GNUNET_malloc (ntohs (message->size));
@@ -2788,16 +2365,8 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2788 GNUNET_TRANSPORT_disconnect (transport_handle); 2365 GNUNET_TRANSPORT_disconnect (transport_handle);
2789 transport_handle = NULL; 2366 transport_handle = NULL;
2790 } 2367 }
2368 GDS_NEIGHBOURS_done ();
2791 GDS_NSE_done (); 2369 GDS_NSE_done ();
2792 if (coreAPI != NULL)
2793 {
2794#if DEBUG_DHT
2795 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Disconnecting core!\n",
2796 my_short_id, "DHT");
2797#endif
2798 GNUNET_CORE_disconnect (coreAPI);
2799 coreAPI = NULL;
2800 }
2801 for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++) 2370 for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
2802 { 2371 {
2803 while (k_buckets[bucket_count].head != NULL) 2372 while (k_buckets[bucket_count].head != NULL)
@@ -2830,59 +2399,9 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2830 GNUNET_BLOCK_context_destroy (block_context); 2399 GNUNET_BLOCK_context_destroy (block_context);
2831 block_context = NULL; 2400 block_context = NULL;
2832 } 2401 }
2833 GNUNET_free_non_null (my_short_id);
2834 my_short_id = NULL;
2835} 2402}
2836 2403
2837 2404
2838/**
2839 * To be called on core init/fail.
2840 *
2841 * @param cls service closure
2842 * @param server handle to the server for this service
2843 * @param identity the public identity of this peer
2844 * @param publicKey the public key of this peer
2845 */
2846static void
2847core_init (void *cls, struct GNUNET_CORE_Handle *server,
2848 const struct GNUNET_PeerIdentity *identity,
2849 const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
2850{
2851
2852 if (server == NULL)
2853 {
2854#if DEBUG_DHT
2855 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Connection to core FAILED!\n",
2856 "dht", GNUNET_i2s (identity));
2857#endif
2858 GNUNET_SCHEDULER_cancel (cleanup_task);
2859 GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
2860 return;
2861 }
2862#if DEBUG_DHT
2863 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2864 "%s: Core connection initialized, I am peer: %s\n", "dht",
2865 GNUNET_i2s (identity));
2866#endif
2867
2868 /* Copy our identity so we can use it */
2869 memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
2870 if (my_short_id != NULL)
2871 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2872 "%s Receive CORE INIT message but have already been initialized! Did CORE fail?\n",
2873 "DHT SERVICE");
2874 my_short_id = GNUNET_strdup (GNUNET_i2s (&my_identity));
2875}
2876
2877
2878static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2879 {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0},
2880 {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0},
2881 {NULL, 0, 0}
2882};
2883
2884
2885
2886 2405
2887/** 2406/**
2888 * Process dht requests. 2407 * Process dht requests.
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
index 7585b5a47..8c87314e5 100644
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -49,6 +49,122 @@
49 */ 49 */
50#define DEFAULT_BUCKET_SIZE 4 50#define DEFAULT_BUCKET_SIZE 4
51 51
52/**
53 * Size of the bloom filter the DHT uses to filter peers.
54 */
55#define DHT_BLOOM_SIZE 128
56
57
58/**
59 * P2P PUT message
60 */
61struct PeerPutMessage
62{
63 /**
64 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
65 */
66 struct GNUNET_MessageHeader header;
67
68 /**
69 * Processing options
70 */
71 uint32_t options GNUNET_PACKED;
72
73 /**
74 * Content type.
75 */
76 uint32_t type GNUNET_PACKED;
77
78 /**
79 * Hop count
80 */
81 uint32_t hop_count GNUNET_PACKED;
82
83 /**
84 * Replication level for this message
85 */
86 uint32_t desired_replication_level GNUNET_PACKED;
87
88 /**
89 * Generic route path length for a message in the
90 * DHT that arrived at a peer and generated
91 * a reply. Copied to the end of this message.
92 */
93 uint32_t outgoing_path_length GNUNET_PACKED;
94
95 /**
96 * Bloomfilter (for peer identities) to stop circular routes
97 */
98 char bloomfilter[DHT_BLOOM_SIZE];
99
100 /**
101 * The key we are storing under.
102 */
103 GNUNET_HashCode key;
104
105 /* put path (if tracked) */
106
107 /* Payload */
108
109};
110
111
112/**
113 * P2P GET message
114 */
115struct PeerGetMessage
116{
117 /**
118 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
119 */
120 struct GNUNET_MessageHeader header;
121
122 /**
123 * Processing options
124 */
125 uint32_t options GNUNET_PACKED;
126
127 /**
128 * Desired content type.
129 */
130 uint32_t type GNUNET_PACKED;
131
132 /**
133 * Hop count
134 */
135 uint32_t hop_count GNUNET_PACKED;
136
137 /**
138 * Desired replication level for this request.
139 */
140 uint32_t desired_replication_level GNUNET_PACKED;
141
142 /**
143 * Size of the extended query.
144 */
145 uint32_t xquery_size;
146
147 /**
148 * Bloomfilter mutator.
149 */
150 uint32_t bf_mutator;
151
152 /**
153 * Bloomfilter (for peer identities) to stop circular routes
154 */
155 char bloomfilter[DHT_BLOOM_SIZE];
156
157 /**
158 * The key we are looking for.
159 */
160 GNUNET_HashCode key;
161
162 /* xquery */
163
164 /* result bloomfilter */
165
166};
167
52 168
53/** 169/**
54 * Linked list of messages to send to a particular other peer. 170 * Linked list of messages to send to a particular other peer.
@@ -183,13 +299,18 @@ struct PeerBucket
183 299
184 300
185/** 301/**
186 * The lowest currently used bucket. 302 * The lowest currently used bucket, initially 0 (for 0-bits matching bucket).
187 */ 303 */
188static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */ 304static unsigned int closest_bucket;
189 305
190/** 306/**
191 * The buckets (Kademlia routing table, complete with growth). 307 * How many peers have we added since we sent out our last
192 * Array of size MAX_BUCKET_SIZE. 308 * find peer request?
309 */
310static unsigned int newly_found_peers;
311
312/**
313 * The buckets. Array of size MAX_BUCKET_SIZE. Offset 0 means 0 bits matching.
193 */ 314 */
194static struct PeerBucket k_buckets[MAX_BUCKETS]; 315static struct PeerBucket k_buckets[MAX_BUCKETS];
195 316
@@ -203,6 +324,33 @@ static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers;
203 */ 324 */
204static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; 325static unsigned int bucket_size = DEFAULT_BUCKET_SIZE;
205 326
327/**
328 * Task that sends FIND PEER requests.
329 */
330static GNUNET_SCHEDULER_TaskIdentifier find_peer_task;
331
332
333/**
334 * Find the optimal bucket for this key.
335 *
336 * @param hc the hashcode to compare our identity to
337 * @return the proper bucket index, or GNUNET_SYSERR
338 * on error (same hashcode)
339 */
340static int
341find_bucket (const GNUNET_HashCode * hc)
342{
343 unsigned int bits;
344
345 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc);
346 if (bits == MAX_BUCKETS)
347 {
348 /* How can all bits match? Got my own ID? */
349 GNUNET_break (0);
350 return GNUNET_SYSERR;
351 }
352 return MAX_BUCKETS - bits - 1;
353}
206 354
207 355
208/** 356/**
@@ -222,29 +370,15 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
222 /* Check for connect to self message */ 370 /* Check for connect to self message */
223 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) 371 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
224 return; 372 return;
225
226#if DEBUG_DHT
227 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
228 "%s:%s Receives core connect message for peer %s distance %d!\n",
229 my_short_id, "dht", GNUNET_i2s (peer), distance);
230#endif
231
232 if (GNUNET_YES == 373 if (GNUNET_YES ==
233 GNUNET_CONTAINER_multihashmap_contains (all_known_peers, 374 GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
234 &peer->hashPubKey)) 375 &peer->hashPubKey))
235 { 376 {
236#if DEBUG_DHT
237 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
238 "%s:%s Received %s message for peer %s, but already have peer in RT!",
239 my_short_id, "DHT", "CORE CONNECT", GNUNET_i2s (peer));
240#endif
241 GNUNET_break (0); 377 GNUNET_break (0);
242 return; 378 return;
243 } 379 }
244 380 peer_bucket = find_bucket (&peer->hashPubKey);
245 peer_bucket = find_current_bucket (&peer->hashPubKey); 381 GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) );
246 GNUNET_assert (peer_bucket >= lowest_bucket);
247 GNUNET_assert (peer_bucket < MAX_BUCKETS);
248 ret = GNUNET_malloc (sizeof (struct PeerInfo)); 382 ret = GNUNET_malloc (sizeof (struct PeerInfo));
249#if 0 383#if 0
250 ret->latency = latency; 384 ret->latency = latency;
@@ -255,23 +389,17 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
255 k_buckets[peer_bucket].tail, 389 k_buckets[peer_bucket].tail,
256 k_buckets[peer_bucket].tail, ret); 390 k_buckets[peer_bucket].tail, ret);
257 k_buckets[peer_bucket].peers_size++; 391 k_buckets[peer_bucket].peers_size++;
258 if ((GNUNET_CRYPTO_hash_matching_bits 392 closest_bucket = GNUNET_MAX (closest_bucket,
259 (&my_identity.hashPubKey, &peer->hashPubKey) > 0) && 393 peer_bucket);
260 (k_buckets[peer_bucket].peers_size <= bucket_size)) 394 if ( (peer_bucket > 0) &&
261 ret->preference_task = 395 (k_buckets[peer_bucket].peers_size <= bucket_size) )
262 GNUNET_SCHEDULER_add_now (&update_core_preference, ret); 396 ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret);
263 if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
264 enable_next_bucket ();
265 newly_found_peers++; 397 newly_found_peers++;
266 GNUNET_CONTAINER_multihashmap_put (all_known_peers, &peer->hashPubKey, ret, 398 GNUNET_assert (GNUNET_OK ==
267 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 399 GNUNET_CONTAINER_multihashmap_put (all_known_peers,
400 &peer->hashPubKey, ret,
401 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
268 increment_stats (STAT_PEERS_KNOWN); 402 increment_stats (STAT_PEERS_KNOWN);
269
270#if DEBUG_DHT
271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
272 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT",
273 ret == NULL ? "NOT ADDED" : "PEER ADDED");
274#endif
275} 403}
276 404
277 405
@@ -286,68 +414,547 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
286{ 414{
287 struct PeerInfo *to_remove; 415 struct PeerInfo *to_remove;
288 int current_bucket; 416 int current_bucket;
417 struct P2PPendingMessage *pos;
418 struct P2PPendingMessage *next;
289 419
290 /* Check for disconnect from self message */ 420 /* Check for disconnect from self message */
291 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) 421 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
292 return; 422 return;
293#if DEBUG_DHT
294 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
295 "%s:%s: Received peer disconnect message for peer `%s' from %s\n",
296 my_short_id, "DHT", GNUNET_i2s (peer), "CORE");
297#endif
298
299 if (GNUNET_YES !=
300 GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
301 &peer->hashPubKey))
302 {
303 GNUNET_break (0);
304#if DEBUG_DHT
305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
306 "%s:%s: do not have peer `%s' in RT, can't disconnect!\n",
307 my_short_id, "DHT", GNUNET_i2s (peer));
308#endif
309 return;
310 }
311 increment_stats (STAT_DISCONNECTS);
312 GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains
313 (all_known_peers, &peer->hashPubKey));
314 to_remove = 423 to_remove =
315 GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey); 424 GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
316 GNUNET_assert (to_remove != NULL); 425 if (NULL == to_remove)
426 {
427 GNUNET_break (0);
428 return;
429 }
430 GNUNET_assert (GNUNET_YES ==
431 GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
432 &peer->hashPubKey,
433 to_remove));
317 if (NULL != to_remove->info_ctx) 434 if (NULL != to_remove->info_ctx)
318 { 435 {
319 GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx); 436 GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx);
320 to_remove->info_ctx = NULL; 437 to_remove->info_ctx = NULL;
321 } 438 }
322 GNUNET_assert (0 ==
323 memcmp (peer, &to_remove->id,
324 sizeof (struct GNUNET_PeerIdentity)));
325 current_bucket = find_current_bucket (&to_remove->id.hashPubKey); 439 current_bucket = find_current_bucket (&to_remove->id.hashPubKey);
326 delete_peer (to_remove, current_bucket); 440 GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head,
441 k_buckets[current_bucket].tail,
442 to_remove);
443 GNUNET_assert (k_buckets[current_bucket].peers_size > 0);
444 k_buckets[current_bucket].peers_size--;
445 while ( (lowest_bucket > 0) &&
446 (k_buckets[lowest_bucket].peers_size == 0) )
447 lowest_bucket--;
448
449 if (to_remove->send_task != GNUNET_SCHEDULER_NO_TASK)
450 {
451 GNUNET_SCHEDULER_cancel (peer->send_task);
452 peer->send_task = GNUNET_SCHEDULER_NO_TASK;
453 }
454 if (to_remove->th != NULL)
455 {
456 GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
457 to_remove->th = NULL;
458 }
459 while (NULL != (pos = to_remove->head))
460 {
461 GNUNET_CONTAINER_DLL_remove (to_remove->head,
462 to_remove->tail,
463 pos);
464 GNUNET_free (pos);
465 }
327} 466}
328 467
329 468
469/**
470 * Perform a PUT operation. // FIXME: document if this is only
471 * routing or also storage and/or even local client notification!
472 *
473 * @param type type of the block
474 * @param options routing options
475 * @param desired_replication_level desired replication count
476 * @param expiration_time when does the content expire
477 * @param key key for the content
478 * @param put_path_length number of entries in put_path
479 * @param put_path peers this request has traversed so far (if tracked)
480 * @param data payload to store
481 * @param data_size number of bytes in data
482 */
483void
484GST_NEIGHBOURS_handle_put (uint32_t type,
485 uint32_t options,
486 uint32_t desired_replication_level,
487 GNUNET_TIME_Absolute expiration_time,
488 const GNUNET_HashCode *key,
489 unsigned int put_path_length,
490 struct GNUNET_PeerIdentity *put_path,
491 const void *data,
492 size_t data_size)
493{
494 // FIXME
495}
496
330 497
331/** 498/**
332 * Initialize neighbours subsystem. 499 * Perform a GET operation. // FIXME: document if this is only
500 * routing or also state-tracking and/or even local lookup!
501 *
502 * @param type type of the block
503 * @param options routing options
504 * @param desired_replication_level desired replication count
505 * @param key key for the content
506 * @param xquery extended query
507 * @param xquery_size number of bytes in xquery
508 * @param reply_bf bloomfilter to filter duplicates
509 * @param reply_bf_mutator mutator for reply_bf
510 * @param peer_bf filter for peers not to select (again)
333 */ 511 */
334void 512void
335GST_NEIGHBOURS_init () 513GST_NEIGHBOURS_handle_get (uint32_t type,
514 uint32_t options,
515 uint32_t desired_replication_level,
516 const GNUNET_HashCode *key,
517 const void *xquery,
518 size_t xquery_size,
519 const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
520 uint32_t reply_bf_mutator,
521 const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
336{ 522{
523 // FIXME
337} 524}
338 525
339 526
340/** 527/**
341 * Shutdown neighbours subsystem. 528 * Handle a reply (route to origin). FIXME: should this be here?
529 * (reply-routing table might be better done elsewhere).
530 *
531 * @param type type of the block
532 * @param options routing options
533 * @param expiration_time when does the content expire
534 * @param key key for the content
535 * @param put_path_length number of entries in put_path
536 * @param put_path peers the original PUT traversed (if tracked)
537 * @param get_path_length number of entries in put_path
538 * @param get_path peers this reply has traversed so far (if tracked)
539 * @param data payload of the reply
540 * @param data_size number of bytes in data
342 */ 541 */
343void 542void
344GST_NEIGHBOURS_done () 543GST_NEIGHBOURS_handle_reply (uint32_t type,
544 uint32_t options,
545 GNUNET_TIME_Absolute expiration_time,
546 const GNUNET_HashCode *key,
547 unsigned int put_path_length,
548 struct GNUNET_PeerIdentity *put_path,
549 unsigned int get_path_length,
550 struct GNUNET_PeerIdentity *get_path,
551 const void *data,
552 size_t data_size)
553{
554 // FIXME
555}
556
557
558/**
559 * Add each of the peers we already know to the bloom filter of
560 * the request so that we don't get duplicate HELLOs.
561 *
562 * @param cls the 'struct GNUNET_CONTAINER_BloomFilter' we're building
563 * @param key peer identity to add to the bloom filter
564 * @param value value the peer information (unused)
565 * @return GNUNET_YES (we should continue to iterate)
566 */
567static int
568add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
569{
570 struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
571
572 GNUNET_CONTAINER_bloomfilter_add (bloom, key);
573 return GNUNET_YES;
574}
575
576
577/**
578 * Task to send a find peer message for our own peer identifier
579 * so that we can find the closest peers in the network to ourselves
580 * and attempt to connect to them.
581 *
582 * @param cls closure for this task
583 * @param tc the context under which the task is running
584 */
585static void
586send_find_peer_message (void *cls,
587 const struct GNUNET_SCHEDULER_TaskContext *tc)
588{
589 struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
590 struct DHT_MessageContext msg_ctx;
591 struct GNUNET_TIME_Relative next_send_time;
592 struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
593
594 find_peer_task = GNUNET_SCHEDULER_NO_TASK;
595 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
596 return;
597 if (newly_found_peers > bucket_size)
598 {
599 /* If we are finding many peers already, no need to send out our request right now! */
600 find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
601 &send_find_peer_message, NULL);
602 newly_found_peers = 0;
603 return;
604 }
605
606 // FIXME: build message...
607 find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
608 find_peer_msg->header.size =
609 htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
610 find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
611 temp_bloom =
612 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
613 GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
614 temp_bloom);
615 GNUNET_assert (GNUNET_OK ==
616 GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
617 find_peer_msg->
618 bloomfilter,
619 DHT_BLOOM_SIZE));
620 GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
621
622 memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
623 memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
624 msg_ctx.unique_id =
625 GNUNET_ntohll (GNUNET_CRYPTO_random_u64
626 (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
627 msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
628 msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
629 msg_ctx.network_size = log_of_network_size_estimate;
630 msg_ctx.peer = my_identity;
631 msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
632 msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
633 // FIXME: transmit message...
634 demultiplex_message (&find_peer_msg->header, &msg_ctx);
635 GNUNET_free (find_peer_msg);
636
637 /* schedule next round */
638 newly_found_peers = 0;
639 next_send_time.rel_value =
640 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) +
641 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
642 DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2);
643 find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
644 &send_find_peer_message,
645 NULL);
646}
647
648
649/**
650 * To be called on core init/fail.
651 *
652 * @param cls service closure
653 * @param server handle to the server for this service
654 * @param identity the public identity of this peer
655 * @param publicKey the public key of this peer
656 */
657static void
658core_init (void *cls, struct GNUNET_CORE_Handle *server,
659 const struct GNUNET_PeerIdentity *identity,
660 const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
345{ 661{
662 GNUNET_assert (server != NULL);
663 my_identity = *identity;
664 next_send_time.rel_value =
665 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value +
666 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
667 (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value /
668 2) -
669 DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value);
670 find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time,
671 &send_find_peer_message,
672 NULL);
346} 673}
347 674
348 675
676/**
677 * Core handler for p2p get requests.
678 *
679 * @param cls closure
680 * @param message message
681 * @param peer peer identity this notification is about
682 * @param atsi performance data
683 * @return GNUNET_OK to keep the connection open,
684 * GNUNET_SYSERR to close it (signal serious error)
685 */
686static int
687handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
688 const struct GNUNET_MessageHeader *message,
689 const struct GNUNET_TRANSPORT_ATS_Information
690 *atsi)
691{
692 struct GNUNET_DHT_P2PRouteMessage *incoming =
693 (struct GNUNET_DHT_P2PRouteMessage *) message;
694 struct GNUNET_MessageHeader *enc_msg =
695 (struct GNUNET_MessageHeader *) &incoming[1];
696 struct DHT_MessageContext *msg_ctx;
697 char *route_path;
698 int path_size;
699
700 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
701 {
702 GNUNET_break_op (0);
703 return GNUNET_YES;
704 }
349 705
706 if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
707 {
708 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
709 "Sending of previous replies took too long, backing off!\n");
710 increment_stats ("# route requests dropped due to high load");
711 decrease_max_send_delay (get_max_send_delay ());
712 return GNUNET_YES;
713 }
714 msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
715 msg_ctx->bloom =
716 GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
717 DHT_BLOOM_K);
718 GNUNET_assert (msg_ctx->bloom != NULL);
719 msg_ctx->hop_count = ntohl (incoming->hop_count);
720 memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
721 msg_ctx->replication = ntohl (incoming->desired_replication_level);
722 msg_ctx->msg_options = ntohl (incoming->options);
723 if (GNUNET_DHT_RO_RECORD_ROUTE ==
724 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
725 {
726 path_size =
727 ntohl (incoming->outgoing_path_length) *
728 sizeof (struct GNUNET_PeerIdentity);
729 if (ntohs (message->size) !=
730 (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
731 path_size))
732 {
733 GNUNET_break_op (0);
734 GNUNET_free (msg_ctx);
735 return GNUNET_YES;
736 }
737 route_path = (char *) &incoming[1];
738 route_path = route_path + ntohs (enc_msg->size);
739 msg_ctx->path_history =
740 GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
741 memcpy (msg_ctx->path_history, route_path, path_size);
742 memcpy (&msg_ctx->path_history[path_size], &my_identity,
743 sizeof (struct GNUNET_PeerIdentity));
744 msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
745 }
746 msg_ctx->network_size = ntohl (incoming->network_size);
747 msg_ctx->peer = *peer;
748 msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
749 msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
750 demultiplex_message (enc_msg, msg_ctx);
751 if (msg_ctx->bloom != NULL)
752 {
753 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
754 msg_ctx->bloom = NULL;
755 }
756 GNUNET_free (msg_ctx);
757 return GNUNET_YES;
758}
350 759
351 760
761/**
762 * Core handler for p2p put requests.
763 *
764 * @param cls closure
765 * @param message message
766 * @param peer peer identity this notification is about
767 * @param atsi performance data
768 * @return GNUNET_OK to keep the connection open,
769 * GNUNET_SYSERR to close it (signal serious error)
770 */
771static int
772handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
773 const struct GNUNET_MessageHeader *message,
774 const struct GNUNET_TRANSPORT_ATS_Information
775 *atsi)
776{
777 struct GNUNET_DHT_P2PRouteMessage *incoming =
778 (struct GNUNET_DHT_P2PRouteMessage *) message;
779 struct GNUNET_MessageHeader *enc_msg =
780 (struct GNUNET_MessageHeader *) &incoming[1];
781 struct DHT_MessageContext *msg_ctx;
782 char *route_path;
783 int path_size;
784
785 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
786 {
787 GNUNET_break_op (0);
788 return GNUNET_YES;
789 }
790
791 if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
792 {
793 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794 "Sending of previous replies took too long, backing off!\n");
795 increment_stats ("# route requests dropped due to high load");
796 decrease_max_send_delay (get_max_send_delay ());
797 return GNUNET_YES;
798 }
799 msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
800 msg_ctx->bloom =
801 GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
802 DHT_BLOOM_K);
803 GNUNET_assert (msg_ctx->bloom != NULL);
804 msg_ctx->hop_count = ntohl (incoming->hop_count);
805 memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
806 msg_ctx->replication = ntohl (incoming->desired_replication_level);
807 msg_ctx->msg_options = ntohl (incoming->options);
808 if (GNUNET_DHT_RO_RECORD_ROUTE ==
809 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
810 {
811 path_size =
812 ntohl (incoming->outgoing_path_length) *
813 sizeof (struct GNUNET_PeerIdentity);
814 if (ntohs (message->size) !=
815 (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
816 path_size))
817 {
818 GNUNET_break_op (0);
819 GNUNET_free (msg_ctx);
820 return GNUNET_YES;
821 }
822 route_path = (char *) &incoming[1];
823 route_path = route_path + ntohs (enc_msg->size);
824 msg_ctx->path_history =
825 GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
826 memcpy (msg_ctx->path_history, route_path, path_size);
827 memcpy (&msg_ctx->path_history[path_size], &my_identity,
828 sizeof (struct GNUNET_PeerIdentity));
829 msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
830 }
831 msg_ctx->network_size = ntohl (incoming->network_size);
832 msg_ctx->peer = *peer;
833 msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
834 msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
835 demultiplex_message (enc_msg, msg_ctx);
836 if (msg_ctx->bloom != NULL)
837 {
838 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
839 msg_ctx->bloom = NULL;
840 }
841 GNUNET_free (msg_ctx);
842 return GNUNET_YES;
843}
844
845
846/**
847 * Core handler for p2p route results.
848 *
849 * @param cls closure
850 * @param message message
851 * @param peer peer identity this notification is about
852 * @param atsi performance data
853 *
854 */
855static int
856handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
857 const struct GNUNET_MessageHeader *message,
858 const struct GNUNET_TRANSPORT_ATS_Information
859 *atsi)
860{
861 const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
862 (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
863 struct GNUNET_MessageHeader *enc_msg =
864 (struct GNUNET_MessageHeader *) &incoming[1];
865 struct DHT_MessageContext msg_ctx;
866
867 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
868 {
869 GNUNET_break_op (0);
870 return GNUNET_YES;
871 }
872
873 memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
874 memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
875 msg_ctx.msg_options = ntohl (incoming->options);
876 msg_ctx.hop_count = ntohl (incoming->hop_count);
877 msg_ctx.peer = *peer;
878 msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
879 msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
880 if ((GNUNET_DHT_RO_RECORD_ROUTE ==
881 (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
882 (ntohl (incoming->outgoing_path_length) > 0))
883 {
884 if (ntohs (message->size) -
885 sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
886 ntohs (enc_msg->size) !=
887 ntohl (incoming->outgoing_path_length) *
888 sizeof (struct GNUNET_PeerIdentity))
889 {
890 GNUNET_break_op (0);
891 return GNUNET_NO;
892 }
893 msg_ctx.path_history = (char *) &incoming[1];
894 msg_ctx.path_history += ntohs (enc_msg->size);
895 msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
896 }
897 route_result_message (enc_msg, &msg_ctx);
898 return GNUNET_YES;
899}
900
901
902/**
903 * Initialize neighbours subsystem.
904 */
905int
906GST_NEIGHBOURS_init ()
907{
908 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
909 {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
910 {&handle_dht_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
911 {&handle_dht_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0},
912 {NULL, 0, 0}
913 };
914 unsigned long long temp_config_num;
915 struct GNUNET_TIME_Relative next_send_time;
916
917 if (GNUNET_OK ==
918 GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
919 &temp_config_num))
920 bucket_size = (unsigned int) temp_config_num;
921 coreAPI = GNUNET_CORE_connect (GDS_cfg, /* Main configuration */
922 DEFAULT_CORE_QUEUE_SIZE, /* queue size */
923 NULL, /* Closure passed to DHT functions */
924 &core_init, /* Call core_init once connected */
925 &handle_core_connect, /* Handle connects */
926 &handle_core_disconnect, /* remove peers on disconnects */
927 NULL, /* Do we care about "status" updates? */
928 NULL, /* Don't want notified about all incoming messages */
929 GNUNET_NO, /* For header only inbound notification */
930 NULL, /* Don't want notified about all outbound messages */
931 GNUNET_NO, /* For header only outbound notification */
932 core_handlers); /* Register these handlers */
933 if (coreAPI == NULL)
934 return GNUNET_SYSERR;
935 all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
936 return GNUNET_OK;
937}
938
939
940/**
941 * Shutdown neighbours subsystem.
942 */
943void
944GST_NEIGHBOURS_done ()
945{
946 GNUNET_assert (coreAPI != NULL);
947 GNUNET_CORE_disconnect (coreAPI);
948 coreAPI = NULL;
949 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_get_size (all_known_peers));
950 GNUNET_CONTAINER_multihashmap_destroy (all_known_peers);
951 all_known_peers = NULL;
952 if (GNUNET_SCHEDULER_NO_TASK != find_peer_task)
953 {
954 GNUNET_SCHEDULER_cancel (find_peer_task);
955 find_peer_task = GNUNET_SCHEDULER_NO_TASK;
956 }
957}
958
352 959
353/* end of gnunet-service-dht_neighbours.c */ 960/* end of gnunet-service-dht_neighbours.c */
diff --git a/src/dht/gnunet-service-dht_neighbours.h b/src/dht/gnunet-service-dht_neighbours.h
index 08357d7ff..1f2ae08e6 100644
--- a/src/dht/gnunet-service-dht_neighbours.h
+++ b/src/dht/gnunet-service-dht_neighbours.h
@@ -56,7 +56,6 @@ GST_NEIGHBOURS_handle_put (uint32_t type,
56/** 56/**
57 * Perform a GET operation. 57 * Perform a GET operation.
58 * 58 *
59 *
60 * @param type type of the block 59 * @param type type of the block
61 * @param options routing options 60 * @param options routing options
62 * @param desired_replication_level desired replication count 61 * @param desired_replication_level desired replication count