From f723d71ade6929069a73792ebc08ab33166d2633 Mon Sep 17 00:00:00 2001 From: "Nathan S. Evans" Date: Mon, 9 Aug 2010 10:42:26 +0000 Subject: mosgly fixes for memory leaks, misunderstanding of some mysql library stuff --- src/dht/gnunet-service-dht.c | 200 ++++++++++++++++++++++++++++++------------- 1 file changed, 139 insertions(+), 61 deletions(-) (limited to 'src/dht/gnunet-service-dht.c') diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c index 125bd7fff..1201b5a2f 100644 --- a/src/dht/gnunet-service-dht.c +++ b/src/dht/gnunet-service-dht.c @@ -72,6 +72,8 @@ #define DHT_DEFAULT_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1) +#define DHT_DEFAULT_PING_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1) + /** * Real maximum number of hops, at which point we refuse * to forward the message. @@ -170,6 +172,11 @@ struct PeerInfo */ unsigned int distance; + /** + * Task for scheduling periodic ping messages for this peer. + */ + GNUNET_SCHEDULER_TaskIdentifier ping_task; + }; /** @@ -963,8 +970,6 @@ static void delete_peer (struct PeerInfo *peer, { struct P2PPendingMessage *pos; struct P2PPendingMessage *next; - //fprintf(stderr, "BEFORE REMOVAL\n"); - //print_routing_table(); #if EXTRA_CHECKS struct PeerInfo *peer_pos; @@ -994,10 +999,8 @@ static void delete_peer (struct PeerInfo *peer, } GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->id.hashPubKey)); - GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (all_known_peers, &peer->id.hashPubKey, peer)); + GNUNET_CONTAINER_multihashmap_remove (all_known_peers, &peer->id.hashPubKey, peer); GNUNET_free(peer); - //fprintf(stderr, "AFTER REMOVAL\n"); - //print_routing_table(); } @@ -1056,6 +1059,7 @@ static void enable_next_bucket() /* Remove peers from lowest bucket, insert into next lowest bucket */ GNUNET_CONTAINER_multihashmap_iterate(to_remove, &move_lowest_bucket, NULL); + GNUNET_CONTAINER_multihashmap_destroy(to_remove); lowest_bucket = lowest_bucket - 1; #if PRINT_TABLES fprintf(stderr, "Printing RT after new bucket\n"); @@ -1063,6 +1067,111 @@ static void enable_next_bucket() #endif } +/** + * Function called to send a request out to another peer. + * Called both for locally initiated requests and those + * received from other peers. + * + * @param cls DHT service closure argument (unused) + * @param msg the encapsulated message + * @param peer the peer to forward the message to + * @param msg_ctx the context of the message (hop count, bloom, etc.) + */ +static void forward_message (void *cls, + const struct GNUNET_MessageHeader *msg, + struct PeerInfo *peer, + struct DHT_MessageContext *msg_ctx) +{ + struct GNUNET_DHT_P2PRouteMessage *route_message; + struct P2PPendingMessage *pending; + size_t msize; + size_t psize; + + msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size); + GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); + psize = sizeof(struct P2PPendingMessage) + msize; + pending = GNUNET_malloc(psize); + pending->msg = (struct GNUNET_MessageHeader *)&pending[1]; + pending->importance = DHT_SEND_PRIORITY; + pending->timeout = GNUNET_TIME_relative_get_forever(); + route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg; + route_message->header.size = htons(msize); + route_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE); + route_message->options = htonl(msg_ctx->msg_options); + route_message->hop_count = htonl(msg_ctx->hop_count + 1); + route_message->network_size = htonl(msg_ctx->network_size); + route_message->desired_replication_level = htonl(msg_ctx->replication); + route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id); + if (msg_ctx->bloom != NULL) + GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE)); + if (msg_ctx->key != NULL) + memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode)); + memcpy(&route_message[1], msg, ntohs(msg->size)); +#if DEBUG_DHT > 1 + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id)); +#endif + GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending); + if (peer->send_task == GNUNET_SCHEDULER_NO_TASK) + peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer); +} + +#if DO_PING +/** + * Task used to send ping messages to peers so that + * they don't get disconnected. + * + * @param cls the peer to send a ping message to + * @param tc context, reason, etc. + */ +static void +periodic_ping_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerInfo *peer = cls; + struct GNUNET_MessageHeader ping_message; + struct DHT_MessageContext message_context; + + if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) + return; + + ping_message.size = htons(sizeof(struct GNUNET_MessageHeader)); + ping_message.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_PING); + + memset(&message_context, 0, sizeof(struct DHT_MessageContext)); +#if DEBUG_PING + GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Sending periodic ping to %s\n", my_short_id, "DHT", GNUNET_i2s(&peer->id)); +#endif + forward_message(NULL, &ping_message, peer, &message_context); + peer->ping_task = GNUNET_SCHEDULER_add_delayed(sched, DHT_DEFAULT_PING_DELAY, &periodic_ping_task, peer); +} + +/** + * Schedule PING messages for the top X peers in each + * bucket of the routing table (so core won't disconnect them!) + */ +void schedule_ping_messages() +{ + unsigned int bucket; + unsigned int count; + struct PeerInfo *pos; + for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++) + { + pos = k_buckets[bucket].head; + count = 0; + while (pos != NULL) + { + if ((count < bucket_size) && (pos->ping_task == GNUNET_SCHEDULER_NO_TASK)) + GNUNET_SCHEDULER_add_now(sched, &periodic_ping_task, pos); + else if ((count >= bucket_size) && (pos->ping_task != GNUNET_SCHEDULER_NO_TASK)) + { + GNUNET_SCHEDULER_cancel(sched, pos->ping_task); + pos->ping_task = GNUNET_SCHEDULER_NO_TASK; + } + pos = pos->next; + count++; + } + } +} +#endif /** * Attempt to add a peer to our k-buckets. @@ -1089,7 +1198,9 @@ try_add_peer(const struct GNUNET_PeerIdentity *peer, if ((k_buckets[lowest_bucket].peers_size) >= bucket_size) enable_next_bucket(); - +#if DO_PING + schedule_ping_messages(); +#endif return new_peer; } @@ -1265,11 +1376,6 @@ static int route_result_message(void *cls, /** * If a find peer result message is received and contains a valid * HELLO for another peer, offer it to the transport service. - * - * FIXME: Check whether we need this peer (based on routing table - * fullness) and only try to connect to it conditionally. This should - * reduce trying to connect to say (500) peers when the bucket size will - * discard most of them. */ if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) { @@ -1580,6 +1686,7 @@ handle_dht_find_peer (void *cls, new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); new_msg_ctx->hop_count = 0; route_result_message(cls, find_peer_result, new_msg_ctx); + GNUNET_free(new_msg_ctx); #if DEBUG_DHT_ROUTING if ((debug_routes) && (dhtlog_handle != NULL)) { @@ -1789,9 +1896,10 @@ am_closest_peer (const GNUNET_HashCode * target) return GNUNET_NO; else if (other_bits == bits) /* We match the same number of bits, do distance comparison */ { + return GNUNET_YES; /* FIXME: why not just return GNUNET_YES here? We are certainly close. */ - if (distance(&pos->id.hashPubKey, target) < my_distance) - return GNUNET_NO; + /*if (distance(&pos->id.hashPubKey, target) < my_distance) + return GNUNET_NO;*/ } pos = pos->next; } @@ -1944,51 +2052,6 @@ select_peer (const GNUNET_HashCode * target, #endif } -/** - * Function called to send a request out to another peer. - * Called both for locally initiated requests and those - * received from other peers. - * - * @param cls DHT service closure argument - * @param msg the encapsulated message - * @param peer the peer to forward the message to - * @param msg_ctx the context of the message (hop count, bloom, etc.) - */ -static void forward_message (void *cls, - const struct GNUNET_MessageHeader *msg, - struct PeerInfo *peer, - struct DHT_MessageContext *msg_ctx) -{ - struct GNUNET_DHT_P2PRouteMessage *route_message; - struct P2PPendingMessage *pending; - size_t msize; - size_t psize; - - msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size); - GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE); - psize = sizeof(struct P2PPendingMessage) + msize; - pending = GNUNET_malloc(psize); - pending->msg = (struct GNUNET_MessageHeader *)&pending[1]; - pending->importance = DHT_SEND_PRIORITY; - pending->timeout = GNUNET_TIME_relative_get_forever(); - route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg; - route_message->header.size = htons(msize); - route_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE); - route_message->options = htonl(msg_ctx->msg_options); - route_message->hop_count = htonl(msg_ctx->hop_count + 1); - route_message->network_size = htonl(msg_ctx->network_size); - route_message->desired_replication_level = htonl(msg_ctx->replication); - route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id); - GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE)); - memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode)); - memcpy(&route_message[1], msg, ntohs(msg->size)); -#if DEBUG_DHT > 1 - GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id)); -#endif - GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending); - if (peer->send_task == GNUNET_SCHEDULER_NO_TASK) - peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer); -} /** * Task used to remove forwarding entries, either @@ -2288,6 +2351,7 @@ send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc struct GNUNET_MessageHeader *find_peer_msg; struct DHT_MessageContext message_context; int ret; + struct GNUNET_TIME_Relative next_send_time; if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return; @@ -2304,12 +2368,14 @@ send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc message_context.peer = &my_identity; ret = route_message(NULL, find_peer_msg, &message_context); - + GNUNET_free(find_peer_msg); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Sent `%s' request to %d peers\n", my_short_id, "DHT", "FIND PEER", ret); + next_send_time = DHT_DEFAULT_FIND_PEER_INTERVAL; + next_send_time.value = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, next_send_time.value * 3); GNUNET_SCHEDULER_add_delayed (sched, - DHT_DEFAULT_FIND_PEER_INTERVAL, + next_send_time, &send_find_peer_message, NULL); } @@ -2423,6 +2489,14 @@ handle_dht_p2p_route_request (void *cls, struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1]; struct DHT_MessageContext *message_context; + if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_P2P_PING) /* Throw these away. FIXME: Don't throw these away? (reply)*/ + { +#if DEBUG_PING + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Received P2P Ping message.\n", my_short_id, "DHT"); +#endif + return GNUNET_YES; + } + if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE) { GNUNET_break_op(0); @@ -2638,7 +2712,9 @@ void handle_core_connect (void *cls, latency, distance); if (ret != NULL) - GNUNET_CONTAINER_multihashmap_put(all_known_peers, &peer->hashPubKey, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + { + GNUNET_CONTAINER_multihashmap_put(all_known_peers, &peer->hashPubKey, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + } #if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == NULL ? "NOT ADDED" : "PEER ADDED"); @@ -2681,6 +2757,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) { + int random_seconds; sched = scheduler; cfg = c; datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache"); @@ -2751,6 +2828,7 @@ run (void *cls, } #if DO_FIND_PEER + random_seconds = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 180); GNUNET_SCHEDULER_add_delayed (sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30), &send_find_peer_message, NULL); -- cgit v1.2.3