From 828051543fb820627c41eacb40f3a009e237e029 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 21 Sep 2011 16:47:21 +0000 Subject: stuff --- src/dht/gnunet-service-dht-new.c | 483 +-------------------- src/dht/gnunet-service-dht_neighbours.c | 737 +++++++++++++++++++++++++++++--- src/dht/gnunet-service-dht_neighbours.h | 1 - 3 files changed, 673 insertions(+), 548 deletions(-) (limited to 'src') diff --git a/src/dht/gnunet-service-dht-new.c b/src/dht/gnunet-service-dht-new.c index 62705bf38..2fb06457e 100644 --- a/src/dht/gnunet-service-dht-new.c +++ b/src/dht/gnunet-service-dht-new.c @@ -776,27 +776,6 @@ inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have) } -/** - * Find the optimal bucket for this key, regardless - * of the current number of buckets in use. - * - * @param hc the hashcode to compare our identity to - * - * @return the proper bucket index, or GNUNET_SYSERR - * on error (same hashcode) - */ -static int -find_bucket (const GNUNET_HashCode * hc) -{ - unsigned int bits; - - bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc); - if (bits == MAX_BUCKETS) - return GNUNET_SYSERR; - return MAX_BUCKETS - bits - 1; -} - - /** * Find which k-bucket this peer should go into, * taking into account the size of the k-bucket @@ -917,127 +896,6 @@ update_core_preference (void *cls, &update_core_preference_finish, peer); } - -/** - * Given a peer and its corresponding bucket, - * remove it from that bucket. Does not free - * the PeerInfo struct, nor cancel messages - * or free messages waiting to be sent to this - * peer! - * - * @param peer the peer to remove - * @param bucket the bucket the peer belongs to - */ -static void -remove_peer (struct PeerInfo *peer, unsigned int bucket) -{ - GNUNET_assert (k_buckets[bucket].peers_size > 0); - GNUNET_CONTAINER_DLL_remove (k_buckets[bucket].head, k_buckets[bucket].tail, - peer); - k_buckets[bucket].peers_size--; - if ((bucket == lowest_bucket) && (k_buckets[lowest_bucket].peers_size == 0) && - (lowest_bucket < MAX_BUCKETS - 1)) - lowest_bucket++; -} - -/** - * Removes peer from a bucket, then frees associated - * resources and frees peer. - * - * @param peer peer to be removed and freed - * @param bucket which bucket this peer belongs to - */ -static void -delete_peer (struct PeerInfo *peer, unsigned int bucket) -{ - struct P2PPendingMessage *pos; - struct P2PPendingMessage *next; - - remove_peer (peer, bucket); /* First remove the peer from its bucket */ - if (peer->send_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (peer->send_task); - if ((peer->th != NULL) && (coreAPI != NULL)) - GNUNET_CORE_notify_transmit_ready_cancel (peer->th); - - pos = peer->head; - while (pos != NULL) /* Remove any pending messages for this peer */ - { - increment_stats - ("# dht pending messages discarded (due to disconnect/shutdown)"); - next = pos->next; - GNUNET_free (pos); - pos = next; - } - - GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains - (all_known_peers, &peer->id.hashPubKey)); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (all_known_peers, - &peer->id.hashPubKey, - peer)); - GNUNET_free (peer); - decrement_stats (STAT_PEERS_KNOWN); -} - - -/** - * Iterator over hash map entries. - * - * @param cls closure - * @param key current key code - * @param value PeerInfo of the peer to move to new lowest bucket - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. - */ -static int -move_lowest_bucket (void *cls, const GNUNET_HashCode * key, void *value) -{ - struct PeerInfo *peer = value; - int new_bucket; - - GNUNET_assert (lowest_bucket > 0); - new_bucket = lowest_bucket - 1; - remove_peer (peer, lowest_bucket); - GNUNET_CONTAINER_DLL_insert_after (k_buckets[new_bucket].head, - k_buckets[new_bucket].tail, - k_buckets[new_bucket].tail, peer); - k_buckets[new_bucket].peers_size++; - return GNUNET_YES; -} - - -/** - * The current lowest bucket is full, so change the lowest - * bucket to the next lower down, and move any appropriate - * entries in the current lowest bucket to the new bucket. - */ -static void -enable_next_bucket () -{ - struct GNUNET_CONTAINER_MultiHashMap *to_remove; - struct PeerInfo *pos; - - GNUNET_assert (lowest_bucket > 0); - to_remove = GNUNET_CONTAINER_multihashmap_create (bucket_size); - pos = k_buckets[lowest_bucket].head; - - /* Populate the array of peers which should be in the next lowest bucket */ - while (pos != NULL) - { - if (find_bucket (&pos->id.hashPubKey) < lowest_bucket) - GNUNET_CONTAINER_multihashmap_put (to_remove, &pos->id.hashPubKey, pos, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - pos = pos->next; - } - - /* Remove peers from lowest bucket, insert into next lowest bucket */ - GNUNET_CONTAINER_multihashmap_iterate (to_remove, &move_lowest_bucket, NULL); - GNUNET_CONTAINER_multihashmap_destroy (to_remove); - lowest_bucket = lowest_bucket - 1; -} - - /** * Find the closest peer in our routing table to the * given hashcode. @@ -2463,286 +2321,6 @@ demultiplex_message (const struct GNUNET_MessageHeader *msg, } -/** - * Iterator over hash map entries. - * - * @param cls closure - * @param key current key code - * @param value value in the hash map - * @return GNUNET_YES if we should continue to - * iterate, - * GNUNET_NO if not. - */ -static int -add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value) -{ - struct GNUNET_CONTAINER_BloomFilter *bloom = cls; - - GNUNET_CONTAINER_bloomfilter_add (bloom, key); - return GNUNET_YES; -} - -/** - * Task to send a find peer message for our own peer identifier - * so that we can find the closest peers in the network to ourselves - * and attempt to connect to them. - * - * @param cls closure for this task - * @param tc the context under which the task is running - */ -static void -send_find_peer_message (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_DHT_FindPeerMessage *find_peer_msg; - struct DHT_MessageContext msg_ctx; - struct GNUNET_TIME_Relative next_send_time; - struct GNUNET_CONTAINER_BloomFilter *temp_bloom; - - if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) - return; - - if (newly_found_peers > bucket_size) /* If we are finding peers already, no need to send out our request right now! */ - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Have %d newly found peers since last find peer message sent!\n", - newly_found_peers); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, - &send_find_peer_message, NULL); - newly_found_peers = 0; - return; - } - - increment_stats (STAT_FIND_PEER_START); -#if FIND_PEER_WITH_HELLO - find_peer_msg = - GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) + - GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) - my_hello)); - find_peer_msg->header.size = - htons (sizeof (struct GNUNET_DHT_FindPeerMessage) + - GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello)); - memcpy (&find_peer_msg[1], my_hello, - GNUNET_HELLO_size ((struct GNUNET_HELLO_Message *) my_hello)); -#else - find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage)); - find_peer_msg->header.size = - htons (sizeof (struct GNUNET_DHT_FindPeerMessage)); -#endif - find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); - temp_bloom = - GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); - GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom, - temp_bloom); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom, - find_peer_msg-> - bloomfilter, - DHT_BLOOM_SIZE)); - GNUNET_CONTAINER_bloomfilter_free (temp_bloom); - memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext)); - memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode)); - msg_ctx.unique_id = - GNUNET_ntohll (GNUNET_CRYPTO_random_u64 - (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX)); - msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION; - msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE; - msg_ctx.network_size = log_of_network_size_estimate; - msg_ctx.peer = my_identity; - msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE; - msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT; - - demultiplex_message (&find_peer_msg->header, &msg_ctx); - GNUNET_free (find_peer_msg); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Sent `%s' request to some (?) peers\n", my_short_id, - "DHT", "FIND PEER"); - if (newly_found_peers < bucket_size) - { - next_send_time.rel_value = - (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) + - GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, - DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2); - } - else - { - next_send_time.rel_value = - DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value + - GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, - DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value - - DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value); - } - - GNUNET_assert (next_send_time.rel_value != 0); - find_peer_context.count = 0; - newly_found_peers = 0; - find_peer_context.start = GNUNET_TIME_absolute_get (); - GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message, - NULL); -} - - -/** - * Core handler for p2p route requests. - * - * @param cls closure - * @param message message - * @param peer peer identity this notification is about - * @param atsi performance data - * @return GNUNET_OK to keep the connection open, - * GNUNET_SYSERR to close it (signal serious error) - */ -static int -handle_dht_p2p_route_request (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message, - const struct GNUNET_TRANSPORT_ATS_Information - *atsi) -{ -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Received P2P request from peer %s\n", my_short_id, - "DHT", GNUNET_i2s (peer)); -#endif - struct GNUNET_DHT_P2PRouteMessage *incoming = - (struct GNUNET_DHT_P2PRouteMessage *) message; - struct GNUNET_MessageHeader *enc_msg = - (struct GNUNET_MessageHeader *) &incoming[1]; - struct DHT_MessageContext *msg_ctx; - char *route_path; - int path_size; - - if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) - { - GNUNET_break_op (0); - return GNUNET_YES; - } - - if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending of previous replies took too long, backing off!\n"); - increment_stats ("# route requests dropped due to high load"); - decrease_max_send_delay (get_max_send_delay ()); - return GNUNET_YES; - } - msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext)); - msg_ctx->bloom = - GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE, - DHT_BLOOM_K); - GNUNET_assert (msg_ctx->bloom != NULL); - msg_ctx->hop_count = ntohl (incoming->hop_count); - memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode)); - msg_ctx->replication = ntohl (incoming->desired_replication_level); - msg_ctx->msg_options = ntohl (incoming->options); - if (GNUNET_DHT_RO_RECORD_ROUTE == - (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) - { - path_size = - ntohl (incoming->outgoing_path_length) * - sizeof (struct GNUNET_PeerIdentity); - if (ntohs (message->size) != - (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) + - path_size)) - { - GNUNET_break_op (0); - GNUNET_free (msg_ctx); - return GNUNET_YES; - } - route_path = (char *) &incoming[1]; - route_path = route_path + ntohs (enc_msg->size); - msg_ctx->path_history = - GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size); - memcpy (msg_ctx->path_history, route_path, path_size); - memcpy (&msg_ctx->path_history[path_size], &my_identity, - sizeof (struct GNUNET_PeerIdentity)); - msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1; - } - msg_ctx->network_size = ntohl (incoming->network_size); - msg_ctx->peer = *peer; - msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE; - msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT; - demultiplex_message (enc_msg, msg_ctx); - if (msg_ctx->bloom != NULL) - { - GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom); - msg_ctx->bloom = NULL; - } - GNUNET_free (msg_ctx); - return GNUNET_YES; -} - - -/** - * Core handler for p2p route results. - * - * @param cls closure - * @param message message - * @param peer peer identity this notification is about - * @param atsi performance data - * - */ -static int -handle_dht_p2p_route_result (void *cls, const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_MessageHeader *message, - const struct GNUNET_TRANSPORT_ATS_Information - *atsi) -{ -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "`%s:%s': Received request from peer %s\n", my_short_id, "DHT", - GNUNET_i2s (peer)); -#endif - const struct GNUNET_DHT_P2PRouteResultMessage *incoming = - (const struct GNUNET_DHT_P2PRouteResultMessage *) message; - struct GNUNET_MessageHeader *enc_msg = - (struct GNUNET_MessageHeader *) &incoming[1]; - struct DHT_MessageContext msg_ctx; - - if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) - { - GNUNET_break_op (0); - return GNUNET_YES; - } - - memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext)); - memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode)); - msg_ctx.msg_options = ntohl (incoming->options); - msg_ctx.hop_count = ntohl (incoming->hop_count); - msg_ctx.peer = *peer; - msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */ - msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT; - if ((GNUNET_DHT_RO_RECORD_ROUTE == - (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) && - (ntohl (incoming->outgoing_path_length) > 0)) - { - if (ntohs (message->size) - - sizeof (struct GNUNET_DHT_P2PRouteResultMessage) - - ntohs (enc_msg->size) != - ntohl (incoming->outgoing_path_length) * - sizeof (struct GNUNET_PeerIdentity)) - { -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Return message indicated a path was included, but sizes are wrong: Total size %d, enc size %d, left %d, expected %d\n", - ntohs (message->size), ntohs (enc_msg->size), - ntohs (message->size) - - sizeof (struct GNUNET_DHT_P2PRouteResultMessage) - - ntohs (enc_msg->size), - ntohl (incoming->outgoing_path_length) * - sizeof (struct GNUNET_PeerIdentity)); -#endif - GNUNET_break_op (0); - return GNUNET_NO; - } - msg_ctx.path_history = (char *) &incoming[1]; - msg_ctx.path_history += ntohs (enc_msg->size); - msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length); - } - route_result_message (enc_msg, &msg_ctx); - return GNUNET_YES; -} - - /** * Receive the HELLO from transport service, * free current and replace if necessary. @@ -2757,7 +2335,6 @@ process_hello (void *cls, const struct GNUNET_MessageHeader *message) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received our `%s' from transport service\n", "HELLO"); #endif - GNUNET_assert (message != NULL); GNUNET_free_non_null (my_hello); my_hello = GNUNET_malloc (ntohs (message->size)); @@ -2788,16 +2365,8 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_TRANSPORT_disconnect (transport_handle); transport_handle = NULL; } + GDS_NEIGHBOURS_done (); GDS_NSE_done (); - if (coreAPI != NULL) - { -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Disconnecting core!\n", - my_short_id, "DHT"); -#endif - GNUNET_CORE_disconnect (coreAPI); - coreAPI = NULL; - } for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++) { while (k_buckets[bucket_count].head != NULL) @@ -2830,59 +2399,9 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_BLOCK_context_destroy (block_context); block_context = NULL; } - GNUNET_free_non_null (my_short_id); - my_short_id = NULL; } -/** - * To be called on core init/fail. - * - * @param cls service closure - * @param server handle to the server for this service - * @param identity the public identity of this peer - * @param publicKey the public key of this peer - */ -static void -core_init (void *cls, struct GNUNET_CORE_Handle *server, - const struct GNUNET_PeerIdentity *identity, - const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey) -{ - - if (server == NULL) - { -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Connection to core FAILED!\n", - "dht", GNUNET_i2s (identity)); -#endif - GNUNET_SCHEDULER_cancel (cleanup_task); - GNUNET_SCHEDULER_add_now (&shutdown_task, NULL); - return; - } -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s: Core connection initialized, I am peer: %s\n", "dht", - GNUNET_i2s (identity)); -#endif - - /* Copy our identity so we can use it */ - memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity)); - if (my_short_id != NULL) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%s Receive CORE INIT message but have already been initialized! Did CORE fail?\n", - "DHT SERVICE"); - my_short_id = GNUNET_strdup (GNUNET_i2s (&my_identity)); -} - - -static struct GNUNET_CORE_MessageHandler core_handlers[] = { - {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE, 0}, - {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT, 0}, - {NULL, 0, 0} -}; - - - /** * Process dht requests. diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c index 7585b5a47..8c87314e5 100644 --- a/src/dht/gnunet-service-dht_neighbours.c +++ b/src/dht/gnunet-service-dht_neighbours.c @@ -49,6 +49,122 @@ */ #define DEFAULT_BUCKET_SIZE 4 +/** + * Size of the bloom filter the DHT uses to filter peers. + */ +#define DHT_BLOOM_SIZE 128 + + +/** + * P2P PUT message + */ +struct PeerPutMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT + */ + struct GNUNET_MessageHeader header; + + /** + * Processing options + */ + uint32_t options GNUNET_PACKED; + + /** + * Content type. + */ + uint32_t type GNUNET_PACKED; + + /** + * Hop count + */ + uint32_t hop_count GNUNET_PACKED; + + /** + * Replication level for this message + */ + uint32_t desired_replication_level GNUNET_PACKED; + + /** + * Generic route path length for a message in the + * DHT that arrived at a peer and generated + * a reply. Copied to the end of this message. + */ + uint32_t outgoing_path_length GNUNET_PACKED; + + /** + * Bloomfilter (for peer identities) to stop circular routes + */ + char bloomfilter[DHT_BLOOM_SIZE]; + + /** + * The key we are storing under. + */ + GNUNET_HashCode key; + + /* put path (if tracked) */ + + /* Payload */ + +}; + + +/** + * P2P GET message + */ +struct PeerGetMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT + */ + struct GNUNET_MessageHeader header; + + /** + * Processing options + */ + uint32_t options GNUNET_PACKED; + + /** + * Desired content type. + */ + uint32_t type GNUNET_PACKED; + + /** + * Hop count + */ + uint32_t hop_count GNUNET_PACKED; + + /** + * Desired replication level for this request. + */ + uint32_t desired_replication_level GNUNET_PACKED; + + /** + * Size of the extended query. + */ + uint32_t xquery_size; + + /** + * Bloomfilter mutator. + */ + uint32_t bf_mutator; + + /** + * Bloomfilter (for peer identities) to stop circular routes + */ + char bloomfilter[DHT_BLOOM_SIZE]; + + /** + * The key we are looking for. + */ + GNUNET_HashCode key; + + /* xquery */ + + /* result bloomfilter */ + +}; + /** * Linked list of messages to send to a particular other peer. @@ -183,13 +299,18 @@ struct PeerBucket /** - * The lowest currently used bucket. + * The lowest currently used bucket, initially 0 (for 0-bits matching bucket). */ -static unsigned int lowest_bucket; /* Initially equal to MAX_BUCKETS - 1 */ +static unsigned int closest_bucket; /** - * The buckets (Kademlia routing table, complete with growth). - * Array of size MAX_BUCKET_SIZE. + * How many peers have we added since we sent out our last + * find peer request? + */ +static unsigned int newly_found_peers; + +/** + * The buckets. Array of size MAX_BUCKET_SIZE. Offset 0 means 0 bits matching. */ static struct PeerBucket k_buckets[MAX_BUCKETS]; @@ -203,6 +324,33 @@ static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers; */ static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; +/** + * Task that sends FIND PEER requests. + */ +static GNUNET_SCHEDULER_TaskIdentifier find_peer_task; + + +/** + * Find the optimal bucket for this key. + * + * @param hc the hashcode to compare our identity to + * @return the proper bucket index, or GNUNET_SYSERR + * on error (same hashcode) + */ +static int +find_bucket (const GNUNET_HashCode * hc) +{ + unsigned int bits; + + bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc); + if (bits == MAX_BUCKETS) + { + /* How can all bits match? Got my own ID? */ + GNUNET_break (0); + return GNUNET_SYSERR; + } + return MAX_BUCKETS - bits - 1; +} /** @@ -222,29 +370,15 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer, /* Check for connect to self message */ if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) return; - -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s:%s Receives core connect message for peer %s distance %d!\n", - my_short_id, "dht", GNUNET_i2s (peer), distance); -#endif - if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (all_known_peers, &peer->hashPubKey)) { -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s:%s Received %s message for peer %s, but already have peer in RT!", - my_short_id, "DHT", "CORE CONNECT", GNUNET_i2s (peer)); -#endif GNUNET_break (0); return; } - - peer_bucket = find_current_bucket (&peer->hashPubKey); - GNUNET_assert (peer_bucket >= lowest_bucket); - GNUNET_assert (peer_bucket < MAX_BUCKETS); + peer_bucket = find_bucket (&peer->hashPubKey); + GNUNET_assert ( (peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS) ); ret = GNUNET_malloc (sizeof (struct PeerInfo)); #if 0 ret->latency = latency; @@ -255,23 +389,17 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer, k_buckets[peer_bucket].tail, k_buckets[peer_bucket].tail, ret); k_buckets[peer_bucket].peers_size++; - if ((GNUNET_CRYPTO_hash_matching_bits - (&my_identity.hashPubKey, &peer->hashPubKey) > 0) && - (k_buckets[peer_bucket].peers_size <= bucket_size)) - ret->preference_task = - GNUNET_SCHEDULER_add_now (&update_core_preference, ret); - if ((k_buckets[lowest_bucket].peers_size) >= bucket_size) - enable_next_bucket (); + closest_bucket = GNUNET_MAX (closest_bucket, + peer_bucket); + if ( (peer_bucket > 0) && + (k_buckets[peer_bucket].peers_size <= bucket_size) ) + ret->preference_task = GNUNET_SCHEDULER_add_now (&update_core_preference, ret); newly_found_peers++; - GNUNET_CONTAINER_multihashmap_put (all_known_peers, &peer->hashPubKey, ret, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (all_known_peers, + &peer->hashPubKey, ret, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); increment_stats (STAT_PEERS_KNOWN); - -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", - ret == NULL ? "NOT ADDED" : "PEER ADDED"); -#endif } @@ -286,68 +414,547 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) { struct PeerInfo *to_remove; int current_bucket; + struct P2PPendingMessage *pos; + struct P2PPendingMessage *next; /* Check for disconnect from self message */ if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) return; -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s:%s: Received peer disconnect message for peer `%s' from %s\n", - my_short_id, "DHT", GNUNET_i2s (peer), "CORE"); -#endif - - if (GNUNET_YES != - GNUNET_CONTAINER_multihashmap_contains (all_known_peers, - &peer->hashPubKey)) - { - GNUNET_break (0); -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s:%s: do not have peer `%s' in RT, can't disconnect!\n", - my_short_id, "DHT", GNUNET_i2s (peer)); -#endif - return; - } - increment_stats (STAT_DISCONNECTS); - GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains - (all_known_peers, &peer->hashPubKey)); to_remove = GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey); - GNUNET_assert (to_remove != NULL); + if (NULL == to_remove) + { + GNUNET_break (0); + return; + } + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (all_known_peers, + &peer->hashPubKey, + to_remove)); if (NULL != to_remove->info_ctx) { GNUNET_CORE_peer_change_preference_cancel (to_remove->info_ctx); to_remove->info_ctx = NULL; } - GNUNET_assert (0 == - memcmp (peer, &to_remove->id, - sizeof (struct GNUNET_PeerIdentity))); current_bucket = find_current_bucket (&to_remove->id.hashPubKey); - delete_peer (to_remove, current_bucket); + GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head, + k_buckets[current_bucket].tail, + to_remove); + GNUNET_assert (k_buckets[current_bucket].peers_size > 0); + k_buckets[current_bucket].peers_size--; + while ( (lowest_bucket > 0) && + (k_buckets[lowest_bucket].peers_size == 0) ) + lowest_bucket--; + + if (to_remove->send_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (peer->send_task); + peer->send_task = GNUNET_SCHEDULER_NO_TASK; + } + if (to_remove->th != NULL) + { + GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th); + to_remove->th = NULL; + } + while (NULL != (pos = to_remove->head)) + { + GNUNET_CONTAINER_DLL_remove (to_remove->head, + to_remove->tail, + pos); + GNUNET_free (pos); + } } +/** + * Perform a PUT operation. // FIXME: document if this is only + * routing or also storage and/or even local client notification! + * + * @param type type of the block + * @param options routing options + * @param desired_replication_level desired replication count + * @param expiration_time when does the content expire + * @param key key for the content + * @param put_path_length number of entries in put_path + * @param put_path peers this request has traversed so far (if tracked) + * @param data payload to store + * @param data_size number of bytes in data + */ +void +GST_NEIGHBOURS_handle_put (uint32_t type, + uint32_t options, + uint32_t desired_replication_level, + GNUNET_TIME_Absolute expiration_time, + const GNUNET_HashCode *key, + unsigned int put_path_length, + struct GNUNET_PeerIdentity *put_path, + const void *data, + size_t data_size) +{ + // FIXME +} + /** - * Initialize neighbours subsystem. + * Perform a GET operation. // FIXME: document if this is only + * routing or also state-tracking and/or even local lookup! + * + * @param type type of the block + * @param options routing options + * @param desired_replication_level desired replication count + * @param key key for the content + * @param xquery extended query + * @param xquery_size number of bytes in xquery + * @param reply_bf bloomfilter to filter duplicates + * @param reply_bf_mutator mutator for reply_bf + * @param peer_bf filter for peers not to select (again) */ void -GST_NEIGHBOURS_init () +GST_NEIGHBOURS_handle_get (uint32_t type, + uint32_t options, + uint32_t desired_replication_level, + const GNUNET_HashCode *key, + const void *xquery, + size_t xquery_size, + const struct GNUNET_CONTAINER_BloomFilter *reply_bf, + uint32_t reply_bf_mutator, + const struct GNUNET_CONTAINER_BloomFilter *peer_bf) { + // FIXME } /** - * Shutdown neighbours subsystem. + * Handle a reply (route to origin). FIXME: should this be here? + * (reply-routing table might be better done elsewhere). + * + * @param type type of the block + * @param options routing options + * @param expiration_time when does the content expire + * @param key key for the content + * @param put_path_length number of entries in put_path + * @param put_path peers the original PUT traversed (if tracked) + * @param get_path_length number of entries in put_path + * @param get_path peers this reply has traversed so far (if tracked) + * @param data payload of the reply + * @param data_size number of bytes in data */ void -GST_NEIGHBOURS_done () +GST_NEIGHBOURS_handle_reply (uint32_t type, + uint32_t options, + GNUNET_TIME_Absolute expiration_time, + const GNUNET_HashCode *key, + unsigned int put_path_length, + struct GNUNET_PeerIdentity *put_path, + unsigned int get_path_length, + struct GNUNET_PeerIdentity *get_path, + const void *data, + size_t data_size) +{ + // FIXME +} + + +/** + * Add each of the peers we already know to the bloom filter of + * the request so that we don't get duplicate HELLOs. + * + * @param cls the 'struct GNUNET_CONTAINER_BloomFilter' we're building + * @param key peer identity to add to the bloom filter + * @param value value the peer information (unused) + * @return GNUNET_YES (we should continue to iterate) + */ +static int +add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value) +{ + struct GNUNET_CONTAINER_BloomFilter *bloom = cls; + + GNUNET_CONTAINER_bloomfilter_add (bloom, key); + return GNUNET_YES; +} + + +/** + * Task to send a find peer message for our own peer identifier + * so that we can find the closest peers in the network to ourselves + * and attempt to connect to them. + * + * @param cls closure for this task + * @param tc the context under which the task is running + */ +static void +send_find_peer_message (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_DHT_FindPeerMessage *find_peer_msg; + struct DHT_MessageContext msg_ctx; + struct GNUNET_TIME_Relative next_send_time; + struct GNUNET_CONTAINER_BloomFilter *temp_bloom; + + find_peer_task = GNUNET_SCHEDULER_NO_TASK; + if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) + return; + if (newly_found_peers > bucket_size) + { + /* If we are finding many peers already, no need to send out our request right now! */ + find_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &send_find_peer_message, NULL); + newly_found_peers = 0; + return; + } + + // FIXME: build message... + find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage)); + find_peer_msg->header.size = + htons (sizeof (struct GNUNET_DHT_FindPeerMessage)); + find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); + temp_bloom = + GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); + GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom, + temp_bloom); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom, + find_peer_msg-> + bloomfilter, + DHT_BLOOM_SIZE)); + GNUNET_CONTAINER_bloomfilter_free (temp_bloom); + + memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext)); + memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode)); + msg_ctx.unique_id = + GNUNET_ntohll (GNUNET_CRYPTO_random_u64 + (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX)); + msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION; + msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE; + msg_ctx.network_size = log_of_network_size_estimate; + msg_ctx.peer = my_identity; + msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE; + msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT; + // FIXME: transmit message... + demultiplex_message (&find_peer_msg->header, &msg_ctx); + GNUNET_free (find_peer_msg); + + /* schedule next round */ + newly_found_peers = 0; + next_send_time.rel_value = + (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2) + + GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, + DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / 2); + find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, + &send_find_peer_message, + NULL); +} + + +/** + * To be called on core init/fail. + * + * @param cls service closure + * @param server handle to the server for this service + * @param identity the public identity of this peer + * @param publicKey the public key of this peer + */ +static void +core_init (void *cls, struct GNUNET_CORE_Handle *server, + const struct GNUNET_PeerIdentity *identity, + const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey) { + GNUNET_assert (server != NULL); + my_identity = *identity; + next_send_time.rel_value = + DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value + + GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, + (DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / + 2) - + DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value); + find_peer_task = GNUNET_SCHEDULER_add_delayed (next_send_time, + &send_find_peer_message, + NULL); } +/** + * Core handler for p2p get requests. + * + * @param cls closure + * @param message message + * @param peer peer identity this notification is about + * @param atsi performance data + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_TRANSPORT_ATS_Information + *atsi) +{ + struct GNUNET_DHT_P2PRouteMessage *incoming = + (struct GNUNET_DHT_P2PRouteMessage *) message; + struct GNUNET_MessageHeader *enc_msg = + (struct GNUNET_MessageHeader *) &incoming[1]; + struct DHT_MessageContext *msg_ctx; + char *route_path; + int path_size; + + if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending of previous replies took too long, backing off!\n"); + increment_stats ("# route requests dropped due to high load"); + decrease_max_send_delay (get_max_send_delay ()); + return GNUNET_YES; + } + msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext)); + msg_ctx->bloom = + GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE, + DHT_BLOOM_K); + GNUNET_assert (msg_ctx->bloom != NULL); + msg_ctx->hop_count = ntohl (incoming->hop_count); + memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode)); + msg_ctx->replication = ntohl (incoming->desired_replication_level); + msg_ctx->msg_options = ntohl (incoming->options); + if (GNUNET_DHT_RO_RECORD_ROUTE == + (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) + { + path_size = + ntohl (incoming->outgoing_path_length) * + sizeof (struct GNUNET_PeerIdentity); + if (ntohs (message->size) != + (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) + + path_size)) + { + GNUNET_break_op (0); + GNUNET_free (msg_ctx); + return GNUNET_YES; + } + route_path = (char *) &incoming[1]; + route_path = route_path + ntohs (enc_msg->size); + msg_ctx->path_history = + GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size); + memcpy (msg_ctx->path_history, route_path, path_size); + memcpy (&msg_ctx->path_history[path_size], &my_identity, + sizeof (struct GNUNET_PeerIdentity)); + msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1; + } + msg_ctx->network_size = ntohl (incoming->network_size); + msg_ctx->peer = *peer; + msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE; + msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT; + demultiplex_message (enc_msg, msg_ctx); + if (msg_ctx->bloom != NULL) + { + GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom); + msg_ctx->bloom = NULL; + } + GNUNET_free (msg_ctx); + return GNUNET_YES; +} +/** + * Core handler for p2p put requests. + * + * @param cls closure + * @param message message + * @param peer peer identity this notification is about + * @param atsi performance data + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_TRANSPORT_ATS_Information + *atsi) +{ + struct GNUNET_DHT_P2PRouteMessage *incoming = + (struct GNUNET_DHT_P2PRouteMessage *) message; + struct GNUNET_MessageHeader *enc_msg = + (struct GNUNET_MessageHeader *) &incoming[1]; + struct DHT_MessageContext *msg_ctx; + char *route_path; + int path_size; + + if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + + if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending of previous replies took too long, backing off!\n"); + increment_stats ("# route requests dropped due to high load"); + decrease_max_send_delay (get_max_send_delay ()); + return GNUNET_YES; + } + msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext)); + msg_ctx->bloom = + GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE, + DHT_BLOOM_K); + GNUNET_assert (msg_ctx->bloom != NULL); + msg_ctx->hop_count = ntohl (incoming->hop_count); + memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode)); + msg_ctx->replication = ntohl (incoming->desired_replication_level); + msg_ctx->msg_options = ntohl (incoming->options); + if (GNUNET_DHT_RO_RECORD_ROUTE == + (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) + { + path_size = + ntohl (incoming->outgoing_path_length) * + sizeof (struct GNUNET_PeerIdentity); + if (ntohs (message->size) != + (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) + + path_size)) + { + GNUNET_break_op (0); + GNUNET_free (msg_ctx); + return GNUNET_YES; + } + route_path = (char *) &incoming[1]; + route_path = route_path + ntohs (enc_msg->size); + msg_ctx->path_history = + GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size); + memcpy (msg_ctx->path_history, route_path, path_size); + memcpy (&msg_ctx->path_history[path_size], &my_identity, + sizeof (struct GNUNET_PeerIdentity)); + msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1; + } + msg_ctx->network_size = ntohl (incoming->network_size); + msg_ctx->peer = *peer; + msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE; + msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT; + demultiplex_message (enc_msg, msg_ctx); + if (msg_ctx->bloom != NULL) + { + GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom); + msg_ctx->bloom = NULL; + } + GNUNET_free (msg_ctx); + return GNUNET_YES; +} + + +/** + * Core handler for p2p route results. + * + * @param cls closure + * @param message message + * @param peer peer identity this notification is about + * @param atsi performance data + * + */ +static int +handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_TRANSPORT_ATS_Information + *atsi) +{ + const struct GNUNET_DHT_P2PRouteResultMessage *incoming = + (const struct GNUNET_DHT_P2PRouteResultMessage *) message; + struct GNUNET_MessageHeader *enc_msg = + (struct GNUNET_MessageHeader *) &incoming[1]; + struct DHT_MessageContext msg_ctx; + + if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + + memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext)); + memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode)); + msg_ctx.msg_options = ntohl (incoming->options); + msg_ctx.hop_count = ntohl (incoming->hop_count); + msg_ctx.peer = *peer; + msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */ + msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT; + if ((GNUNET_DHT_RO_RECORD_ROUTE == + (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) && + (ntohl (incoming->outgoing_path_length) > 0)) + { + if (ntohs (message->size) - + sizeof (struct GNUNET_DHT_P2PRouteResultMessage) - + ntohs (enc_msg->size) != + ntohl (incoming->outgoing_path_length) * + sizeof (struct GNUNET_PeerIdentity)) + { + GNUNET_break_op (0); + return GNUNET_NO; + } + msg_ctx.path_history = (char *) &incoming[1]; + msg_ctx.path_history += ntohs (enc_msg->size); + msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length); + } + route_result_message (enc_msg, &msg_ctx); + return GNUNET_YES; +} + + +/** + * Initialize neighbours subsystem. + */ +int +GST_NEIGHBOURS_init () +{ + static struct GNUNET_CORE_MessageHandler core_handlers[] = { + {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0}, + {&handle_dht_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0}, + {&handle_dht_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0}, + {NULL, 0, 0} + }; + unsigned long long temp_config_num; + struct GNUNET_TIME_Relative next_send_time; + + if (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size", + &temp_config_num)) + bucket_size = (unsigned int) temp_config_num; + coreAPI = GNUNET_CORE_connect (GDS_cfg, /* Main configuration */ + DEFAULT_CORE_QUEUE_SIZE, /* queue size */ + NULL, /* Closure passed to DHT functions */ + &core_init, /* Call core_init once connected */ + &handle_core_connect, /* Handle connects */ + &handle_core_disconnect, /* remove peers on disconnects */ + NULL, /* Do we care about "status" updates? */ + NULL, /* Don't want notified about all incoming messages */ + GNUNET_NO, /* For header only inbound notification */ + NULL, /* Don't want notified about all outbound messages */ + GNUNET_NO, /* For header only outbound notification */ + core_handlers); /* Register these handlers */ + if (coreAPI == NULL) + return GNUNET_SYSERR; + all_known_peers = GNUNET_CONTAINER_multihashmap_create (256); + return GNUNET_OK; +} + + +/** + * Shutdown neighbours subsystem. + */ +void +GST_NEIGHBOURS_done () +{ + GNUNET_assert (coreAPI != NULL); + GNUNET_CORE_disconnect (coreAPI); + coreAPI = NULL; + GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_get_size (all_known_peers)); + GNUNET_CONTAINER_multihashmap_destroy (all_known_peers); + all_known_peers = NULL; + if (GNUNET_SCHEDULER_NO_TASK != find_peer_task) + { + GNUNET_SCHEDULER_cancel (find_peer_task); + find_peer_task = GNUNET_SCHEDULER_NO_TASK; + } +} + /* end of gnunet-service-dht_neighbours.c */ diff --git a/src/dht/gnunet-service-dht_neighbours.h b/src/dht/gnunet-service-dht_neighbours.h index 08357d7ff..1f2ae08e6 100644 --- a/src/dht/gnunet-service-dht_neighbours.h +++ b/src/dht/gnunet-service-dht_neighbours.h @@ -56,7 +56,6 @@ GST_NEIGHBOURS_handle_put (uint32_t type, /** * Perform a GET operation. * - * * @param type type of the block * @param options routing options * @param desired_replication_level desired replication count -- cgit v1.2.3