aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht.c
diff options
context:
space:
mode:
authorNathan S. Evans <evans@in.tum.de>2010-08-09 10:42:26 +0000
committerNathan S. Evans <evans@in.tum.de>2010-08-09 10:42:26 +0000
commitf723d71ade6929069a73792ebc08ab33166d2633 (patch)
treeba0d157beb0360418fa523a750433f0df31d404d /src/dht/gnunet-service-dht.c
parent0977923097dde9ee1fcdf2145226db114f017b8c (diff)
downloadgnunet-f723d71ade6929069a73792ebc08ab33166d2633.tar.gz
gnunet-f723d71ade6929069a73792ebc08ab33166d2633.zip
mosgly fixes for memory leaks, misunderstanding of some mysql library stuff
Diffstat (limited to 'src/dht/gnunet-service-dht.c')
-rw-r--r--src/dht/gnunet-service-dht.c200
1 files changed, 139 insertions, 61 deletions
diff --git a/src/dht/gnunet-service-dht.c b/src/dht/gnunet-service-dht.c
index 125bd7fff..1201b5a2f 100644
--- a/src/dht/gnunet-service-dht.c
+++ b/src/dht/gnunet-service-dht.c
@@ -72,6 +72,8 @@
72 72
73#define DHT_DEFAULT_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1) 73#define DHT_DEFAULT_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1)
74 74
75#define DHT_DEFAULT_PING_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1)
76
75/** 77/**
76 * Real maximum number of hops, at which point we refuse 78 * Real maximum number of hops, at which point we refuse
77 * to forward the message. 79 * to forward the message.
@@ -170,6 +172,11 @@ struct PeerInfo
170 */ 172 */
171 unsigned int distance; 173 unsigned int distance;
172 174
175 /**
176 * Task for scheduling periodic ping messages for this peer.
177 */
178 GNUNET_SCHEDULER_TaskIdentifier ping_task;
179
173}; 180};
174 181
175/** 182/**
@@ -963,8 +970,6 @@ static void delete_peer (struct PeerInfo *peer,
963{ 970{
964 struct P2PPendingMessage *pos; 971 struct P2PPendingMessage *pos;
965 struct P2PPendingMessage *next; 972 struct P2PPendingMessage *next;
966 //fprintf(stderr, "BEFORE REMOVAL\n");
967 //print_routing_table();
968#if EXTRA_CHECKS 973#if EXTRA_CHECKS
969 struct PeerInfo *peer_pos; 974 struct PeerInfo *peer_pos;
970 975
@@ -994,10 +999,8 @@ static void delete_peer (struct PeerInfo *peer,
994 } 999 }
995 1000
996 GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->id.hashPubKey)); 1001 GNUNET_assert(GNUNET_CONTAINER_multihashmap_contains(all_known_peers, &peer->id.hashPubKey));
997 GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (all_known_peers, &peer->id.hashPubKey, peer)); 1002 GNUNET_CONTAINER_multihashmap_remove (all_known_peers, &peer->id.hashPubKey, peer);
998 GNUNET_free(peer); 1003 GNUNET_free(peer);
999 //fprintf(stderr, "AFTER REMOVAL\n");
1000 //print_routing_table();
1001} 1004}
1002 1005
1003 1006
@@ -1056,6 +1059,7 @@ static void enable_next_bucket()
1056 1059
1057 /* Remove peers from lowest bucket, insert into next lowest bucket */ 1060 /* Remove peers from lowest bucket, insert into next lowest bucket */
1058 GNUNET_CONTAINER_multihashmap_iterate(to_remove, &move_lowest_bucket, NULL); 1061 GNUNET_CONTAINER_multihashmap_iterate(to_remove, &move_lowest_bucket, NULL);
1062 GNUNET_CONTAINER_multihashmap_destroy(to_remove);
1059 lowest_bucket = lowest_bucket - 1; 1063 lowest_bucket = lowest_bucket - 1;
1060#if PRINT_TABLES 1064#if PRINT_TABLES
1061 fprintf(stderr, "Printing RT after new bucket\n"); 1065 fprintf(stderr, "Printing RT after new bucket\n");
@@ -1063,6 +1067,111 @@ static void enable_next_bucket()
1063#endif 1067#endif
1064} 1068}
1065 1069
1070/**
1071 * Function called to send a request out to another peer.
1072 * Called both for locally initiated requests and those
1073 * received from other peers.
1074 *
1075 * @param cls DHT service closure argument (unused)
1076 * @param msg the encapsulated message
1077 * @param peer the peer to forward the message to
1078 * @param msg_ctx the context of the message (hop count, bloom, etc.)
1079 */
1080static void forward_message (void *cls,
1081 const struct GNUNET_MessageHeader *msg,
1082 struct PeerInfo *peer,
1083 struct DHT_MessageContext *msg_ctx)
1084{
1085 struct GNUNET_DHT_P2PRouteMessage *route_message;
1086 struct P2PPendingMessage *pending;
1087 size_t msize;
1088 size_t psize;
1089
1090 msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
1091 GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1092 psize = sizeof(struct P2PPendingMessage) + msize;
1093 pending = GNUNET_malloc(psize);
1094 pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
1095 pending->importance = DHT_SEND_PRIORITY;
1096 pending->timeout = GNUNET_TIME_relative_get_forever();
1097 route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg;
1098 route_message->header.size = htons(msize);
1099 route_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
1100 route_message->options = htonl(msg_ctx->msg_options);
1101 route_message->hop_count = htonl(msg_ctx->hop_count + 1);
1102 route_message->network_size = htonl(msg_ctx->network_size);
1103 route_message->desired_replication_level = htonl(msg_ctx->replication);
1104 route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
1105 if (msg_ctx->bloom != NULL)
1106 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
1107 if (msg_ctx->key != NULL)
1108 memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
1109 memcpy(&route_message[1], msg, ntohs(msg->size));
1110#if DEBUG_DHT > 1
1111 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1112#endif
1113 GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
1114 if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1115 peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
1116}
1117
1118#if DO_PING
1119/**
1120 * Task used to send ping messages to peers so that
1121 * they don't get disconnected.
1122 *
1123 * @param cls the peer to send a ping message to
1124 * @param tc context, reason, etc.
1125 */
1126static void
1127periodic_ping_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1128{
1129 struct PeerInfo *peer = cls;
1130 struct GNUNET_MessageHeader ping_message;
1131 struct DHT_MessageContext message_context;
1132
1133 if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
1134 return;
1135
1136 ping_message.size = htons(sizeof(struct GNUNET_MessageHeader));
1137 ping_message.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_PING);
1138
1139 memset(&message_context, 0, sizeof(struct DHT_MessageContext));
1140#if DEBUG_PING
1141 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "%s:%s Sending periodic ping to %s\n", my_short_id, "DHT", GNUNET_i2s(&peer->id));
1142#endif
1143 forward_message(NULL, &ping_message, peer, &message_context);
1144 peer->ping_task = GNUNET_SCHEDULER_add_delayed(sched, DHT_DEFAULT_PING_DELAY, &periodic_ping_task, peer);
1145}
1146
1147/**
1148 * Schedule PING messages for the top X peers in each
1149 * bucket of the routing table (so core won't disconnect them!)
1150 */
1151void schedule_ping_messages()
1152{
1153 unsigned int bucket;
1154 unsigned int count;
1155 struct PeerInfo *pos;
1156 for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
1157 {
1158 pos = k_buckets[bucket].head;
1159 count = 0;
1160 while (pos != NULL)
1161 {
1162 if ((count < bucket_size) && (pos->ping_task == GNUNET_SCHEDULER_NO_TASK))
1163 GNUNET_SCHEDULER_add_now(sched, &periodic_ping_task, pos);
1164 else if ((count >= bucket_size) && (pos->ping_task != GNUNET_SCHEDULER_NO_TASK))
1165 {
1166 GNUNET_SCHEDULER_cancel(sched, pos->ping_task);
1167 pos->ping_task = GNUNET_SCHEDULER_NO_TASK;
1168 }
1169 pos = pos->next;
1170 count++;
1171 }
1172 }
1173}
1174#endif
1066 1175
1067/** 1176/**
1068 * Attempt to add a peer to our k-buckets. 1177 * Attempt to add a peer to our k-buckets.
@@ -1089,7 +1198,9 @@ try_add_peer(const struct GNUNET_PeerIdentity *peer,
1089 1198
1090 if ((k_buckets[lowest_bucket].peers_size) >= bucket_size) 1199 if ((k_buckets[lowest_bucket].peers_size) >= bucket_size)
1091 enable_next_bucket(); 1200 enable_next_bucket();
1092 1201#if DO_PING
1202 schedule_ping_messages();
1203#endif
1093 return new_peer; 1204 return new_peer;
1094} 1205}
1095 1206
@@ -1265,11 +1376,6 @@ static int route_result_message(void *cls,
1265 /** 1376 /**
1266 * If a find peer result message is received and contains a valid 1377 * If a find peer result message is received and contains a valid
1267 * HELLO for another peer, offer it to the transport service. 1378 * HELLO for another peer, offer it to the transport service.
1268 *
1269 * FIXME: Check whether we need this peer (based on routing table
1270 * fullness) and only try to connect to it conditionally. This should
1271 * reduce trying to connect to say (500) peers when the bucket size will
1272 * discard most of them.
1273 */ 1379 */
1274 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT) 1380 if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
1275 { 1381 {
@@ -1580,6 +1686,7 @@ handle_dht_find_peer (void *cls,
1580 new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); 1686 new_msg_ctx->bloom = GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
1581 new_msg_ctx->hop_count = 0; 1687 new_msg_ctx->hop_count = 0;
1582 route_result_message(cls, find_peer_result, new_msg_ctx); 1688 route_result_message(cls, find_peer_result, new_msg_ctx);
1689 GNUNET_free(new_msg_ctx);
1583#if DEBUG_DHT_ROUTING 1690#if DEBUG_DHT_ROUTING
1584 if ((debug_routes) && (dhtlog_handle != NULL)) 1691 if ((debug_routes) && (dhtlog_handle != NULL))
1585 { 1692 {
@@ -1789,9 +1896,10 @@ am_closest_peer (const GNUNET_HashCode * target)
1789 return GNUNET_NO; 1896 return GNUNET_NO;
1790 else if (other_bits == bits) /* We match the same number of bits, do distance comparison */ 1897 else if (other_bits == bits) /* We match the same number of bits, do distance comparison */
1791 { 1898 {
1899 return GNUNET_YES;
1792 /* FIXME: why not just return GNUNET_YES here? We are certainly close. */ 1900 /* FIXME: why not just return GNUNET_YES here? We are certainly close. */
1793 if (distance(&pos->id.hashPubKey, target) < my_distance) 1901 /*if (distance(&pos->id.hashPubKey, target) < my_distance)
1794 return GNUNET_NO; 1902 return GNUNET_NO;*/
1795 } 1903 }
1796 pos = pos->next; 1904 pos = pos->next;
1797 } 1905 }
@@ -1944,51 +2052,6 @@ select_peer (const GNUNET_HashCode * target,
1944#endif 2052#endif
1945} 2053}
1946 2054
1947/**
1948 * Function called to send a request out to another peer.
1949 * Called both for locally initiated requests and those
1950 * received from other peers.
1951 *
1952 * @param cls DHT service closure argument
1953 * @param msg the encapsulated message
1954 * @param peer the peer to forward the message to
1955 * @param msg_ctx the context of the message (hop count, bloom, etc.)
1956 */
1957static void forward_message (void *cls,
1958 const struct GNUNET_MessageHeader *msg,
1959 struct PeerInfo *peer,
1960 struct DHT_MessageContext *msg_ctx)
1961{
1962 struct GNUNET_DHT_P2PRouteMessage *route_message;
1963 struct P2PPendingMessage *pending;
1964 size_t msize;
1965 size_t psize;
1966
1967 msize = sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs(msg->size);
1968 GNUNET_assert(msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
1969 psize = sizeof(struct P2PPendingMessage) + msize;
1970 pending = GNUNET_malloc(psize);
1971 pending->msg = (struct GNUNET_MessageHeader *)&pending[1];
1972 pending->importance = DHT_SEND_PRIORITY;
1973 pending->timeout = GNUNET_TIME_relative_get_forever();
1974 route_message = (struct GNUNET_DHT_P2PRouteMessage *)pending->msg;
1975 route_message->header.size = htons(msize);
1976 route_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
1977 route_message->options = htonl(msg_ctx->msg_options);
1978 route_message->hop_count = htonl(msg_ctx->hop_count + 1);
1979 route_message->network_size = htonl(msg_ctx->network_size);
1980 route_message->desired_replication_level = htonl(msg_ctx->replication);
1981 route_message->unique_id = GNUNET_htonll(msg_ctx->unique_id);
1982 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data(msg_ctx->bloom, route_message->bloomfilter, DHT_BLOOM_SIZE));
1983 memcpy(&route_message->key, msg_ctx->key, sizeof(GNUNET_HashCode));
1984 memcpy(&route_message[1], msg, ntohs(msg->size));
1985#if DEBUG_DHT > 1
1986 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Adding pending message size %d for peer %s\n", my_short_id, "DHT", msize, GNUNET_i2s(&peer->id));
1987#endif
1988 GNUNET_CONTAINER_DLL_insert_after(peer->head, peer->tail, peer->tail, pending);
1989 if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1990 peer->send_task = GNUNET_SCHEDULER_add_now(sched, &try_core_send, peer);
1991}
1992 2055
1993/** 2056/**
1994 * Task used to remove forwarding entries, either 2057 * Task used to remove forwarding entries, either
@@ -2288,6 +2351,7 @@ send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
2288 struct GNUNET_MessageHeader *find_peer_msg; 2351 struct GNUNET_MessageHeader *find_peer_msg;
2289 struct DHT_MessageContext message_context; 2352 struct DHT_MessageContext message_context;
2290 int ret; 2353 int ret;
2354 struct GNUNET_TIME_Relative next_send_time;
2291 2355
2292 if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) 2356 if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN)
2293 return; 2357 return;
@@ -2304,12 +2368,14 @@ send_find_peer_message (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
2304 message_context.peer = &my_identity; 2368 message_context.peer = &my_identity;
2305 2369
2306 ret = route_message(NULL, find_peer_msg, &message_context); 2370 ret = route_message(NULL, find_peer_msg, &message_context);
2307 2371 GNUNET_free(find_peer_msg);
2308 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2309 "`%s:%s': Sent `%s' request to %d peers\n", my_short_id, "DHT", 2373 "`%s:%s': Sent `%s' request to %d peers\n", my_short_id, "DHT",
2310 "FIND PEER", ret); 2374 "FIND PEER", ret);
2375 next_send_time = DHT_DEFAULT_FIND_PEER_INTERVAL;
2376 next_send_time.value = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, next_send_time.value * 3);
2311 GNUNET_SCHEDULER_add_delayed (sched, 2377 GNUNET_SCHEDULER_add_delayed (sched,
2312 DHT_DEFAULT_FIND_PEER_INTERVAL, 2378 next_send_time,
2313 &send_find_peer_message, NULL); 2379 &send_find_peer_message, NULL);
2314} 2380}
2315 2381
@@ -2423,6 +2489,14 @@ handle_dht_p2p_route_request (void *cls,
2423 struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1]; 2489 struct GNUNET_MessageHeader *enc_msg = (struct GNUNET_MessageHeader *)&incoming[1];
2424 struct DHT_MessageContext *message_context; 2490 struct DHT_MessageContext *message_context;
2425 2491
2492 if (ntohs(enc_msg->type) == GNUNET_MESSAGE_TYPE_DHT_P2P_PING) /* Throw these away. FIXME: Don't throw these away? (reply)*/
2493 {
2494#if DEBUG_PING
2495 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "%s:%s Received P2P Ping message.\n", my_short_id, "DHT");
2496#endif
2497 return GNUNET_YES;
2498 }
2499
2426 if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE) 2500 if (ntohs(enc_msg->size) > GNUNET_SERVER_MAX_MESSAGE_SIZE)
2427 { 2501 {
2428 GNUNET_break_op(0); 2502 GNUNET_break_op(0);
@@ -2638,7 +2712,9 @@ void handle_core_connect (void *cls,
2638 latency, 2712 latency,
2639 distance); 2713 distance);
2640 if (ret != NULL) 2714 if (ret != NULL)
2641 GNUNET_CONTAINER_multihashmap_put(all_known_peers, &peer->hashPubKey, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 2715 {
2716 GNUNET_CONTAINER_multihashmap_put(all_known_peers, &peer->hashPubKey, ret, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2717 }
2642#if DEBUG_DHT 2718#if DEBUG_DHT
2643 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2719 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2644 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == NULL ? "NOT ADDED" : "PEER ADDED"); 2720 "%s:%s Adding peer to routing list: %s\n", my_short_id, "DHT", ret == NULL ? "NOT ADDED" : "PEER ADDED");
@@ -2681,6 +2757,7 @@ run (void *cls,
2681 struct GNUNET_SERVER_Handle *server, 2757 struct GNUNET_SERVER_Handle *server,
2682 const struct GNUNET_CONFIGURATION_Handle *c) 2758 const struct GNUNET_CONFIGURATION_Handle *c)
2683{ 2759{
2760 int random_seconds;
2684 sched = scheduler; 2761 sched = scheduler;
2685 cfg = c; 2762 cfg = c;
2686 datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache"); 2763 datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache");
@@ -2751,6 +2828,7 @@ run (void *cls,
2751 } 2828 }
2752 2829
2753#if DO_FIND_PEER 2830#if DO_FIND_PEER
2831 random_seconds = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 180);
2754 GNUNET_SCHEDULER_add_delayed (sched, 2832 GNUNET_SCHEDULER_add_delayed (sched,
2755 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30), 2833 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30),
2756 &send_find_peer_message, NULL); 2834 &send_find_peer_message, NULL);