summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-09-26 22:02:03 +0000
committerChristian Grothoff <christian@grothoff.org>2011-09-26 22:02:03 +0000
commit1f6511d450641f20c69f616dbdbbbb1badbbbc5a (patch)
treeee31851b7eb5de4e6251c2e4a0d488fb886f8b12
parentb77a243b51a385d52250d31a285669b7ad3aed20 (diff)
downloadgnunet-1f6511d450641f20c69f616dbdbbbb1badbbbc5a.tar.gz
gnunet-1f6511d450641f20c69f616dbdbbbb1badbbbc5a.zip
more wild hxing
-rw-r--r--src/dht/gnunet-service-dht-new.c1609
-rw-r--r--src/dht/gnunet-service-dht_clients.c9
-rw-r--r--src/dht/gnunet-service-dht_clients.h2
-rw-r--r--src/dht/gnunet-service-dht_datacache.c4
-rw-r--r--src/dht/gnunet-service-dht_datacache.h4
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c259
-rw-r--r--src/dht/gnunet-service-dht_neighbours.h12
-rw-r--r--src/dht/gnunet-service-dht_nse.c6
-rw-r--r--src/dht/gnunet-service-dht_nse.h6
-rw-r--r--src/dht/gnunet-service-dht_routing.c211
-rw-r--r--src/dht/gnunet-service-dht_routing.h93
11 files changed, 479 insertions, 1736 deletions
diff --git a/src/dht/gnunet-service-dht-new.c b/src/dht/gnunet-service-dht-new.c
index 87cf97a40..17330ddb6 100644
--- a/src/dht/gnunet-service-dht-new.c
+++ b/src/dht/gnunet-service-dht-new.c
@@ -67,11 +67,6 @@
67#define MINIMUM_PEER_THRESHOLD 20 67#define MINIMUM_PEER_THRESHOLD 20
68 68
69/** 69/**
70 * Number of requests we track at most (for routing replies).
71 */
72#define DHT_MAX_RECENT (1024 * 16)
73
74/**
75 * How long do we wait at most when queueing messages with core 70 * How long do we wait at most when queueing messages with core
76 * that we are sending on behalf of other peers. 71 * that we are sending on behalf of other peers.
77 */ 72 */
@@ -133,258 +128,6 @@
133 128
134 129
135/** 130/**
136 * Context containing information about a DHT message received.
137 */
138struct DHT_MessageContext
139{
140 /**
141 * The client this request was received from.
142 * (NULL if received from another peer)
143 */
144 struct ClientList *client;
145
146 /**
147 * The peer this request was received from.
148 */
149 struct GNUNET_PeerIdentity peer;
150
151 /**
152 * Bloomfilter for this routing request.
153 */
154 struct GNUNET_CONTAINER_BloomFilter *bloom;
155
156 /**
157 * extended query (see gnunet_block_lib.h).
158 */
159 const void *xquery;
160
161 /**
162 * Bloomfilter to filter out duplicate replies.
163 */
164 struct GNUNET_CONTAINER_BloomFilter *reply_bf;
165
166 /**
167 * The key this request was about
168 */
169 GNUNET_HashCode key;
170
171 /**
172 * How long should we wait to transmit this request?
173 */
174 struct GNUNET_TIME_Relative timeout;
175
176 /**
177 * The unique identifier of this request
178 */
179 uint64_t unique_id;
180
181 /**
182 * Number of bytes in xquery.
183 */
184 size_t xquery_size;
185
186 /**
187 * Mutator value for the reply_bf, see gnunet_block_lib.h
188 */
189 uint32_t reply_bf_mutator;
190
191 /**
192 * Desired replication level
193 */
194 uint32_t replication;
195
196 /**
197 * Network size estimate, either ours or the sum of
198 * those routed to thus far. =~ Log of number of peers
199 * chosen from for this request.
200 */
201 uint32_t network_size;
202
203 /**
204 * Any message options for this request
205 */
206 uint32_t msg_options;
207
208 /**
209 * How many hops has the message already traversed?
210 */
211 uint32_t hop_count;
212
213 /**
214 * How many peer identities are present in the path history?
215 */
216 uint32_t path_history_len;
217
218 /**
219 * Path history.
220 */
221 char *path_history;
222
223 /**
224 * How important is this message?
225 */
226 unsigned int importance;
227
228 /**
229 * Should we (still) forward the request on to other peers?
230 */
231 int do_forward;
232
233 /**
234 * Did we forward this message? (may need to remember it!)
235 */
236 int forwarded;
237
238 /**
239 * Are we the closest known peer to this key (out of our neighbors?)
240 */
241 int closest;
242};
243
244
245/**
246 * Record used for remembering what peers are waiting for what
247 * responses (based on search key).
248 */
249struct DHTRouteSource
250{
251 /**
252 * This is a DLL.
253 */
254 struct DHTRouteSource *next;
255
256 /**
257 * This is a DLL.
258 */
259 struct DHTRouteSource *prev;
260
261 /**
262 * UID of the request, 0 if from another peer.
263 */
264 uint64_t uid;
265
266 /**
267 * Source of the request. Replies should be forwarded to
268 * this peer.
269 */
270 struct GNUNET_PeerIdentity source;
271
272 /**
273 * If this was a local request, remember the client; otherwise NULL.
274 */
275 struct ClientList *client;
276
277 /**
278 * Pointer to this nodes heap location (for removal)
279 */
280 struct GNUNET_CONTAINER_HeapNode *hnode;
281
282 /**
283 * Back pointer to the record storing this information.
284 */
285 struct DHTQueryRecord *record;
286
287 /**
288 * Task to remove this entry on timeout.
289 */
290 GNUNET_SCHEDULER_TaskIdentifier delete_task;
291
292 /**
293 * Bloomfilter of peers we have already sent back as
294 * replies to the initial request. Allows us to not
295 * forward the same peer multiple times for a find peer
296 * request.
297 */
298 struct GNUNET_CONTAINER_BloomFilter *find_peers_responded;
299
300};
301
302
303/**
304 * Entry in the DHT routing table.
305 */
306struct DHTQueryRecord
307{
308 /**
309 * Head of DLL for result forwarding.
310 */
311 struct DHTRouteSource *head;
312
313 /**
314 * Tail of DLL for result forwarding.
315 */
316 struct DHTRouteSource *tail;
317
318 /**
319 * Key that the record concerns.
320 */
321 GNUNET_HashCode key;
322
323};
324
325
326/**
327 * Context used to calculate the number of find peer messages
328 * per X time units since our last scheduled find peer message
329 * was sent. If we have seen too many messages, delay or don't
330 * send our own out.
331 */
332struct FindPeerMessageContext
333{
334 unsigned int count;
335
336 struct GNUNET_TIME_Absolute start;
337
338};
339
340
341struct RecentRequest
342{
343 /**
344 * Position of this node in the min heap.
345 */
346 struct GNUNET_CONTAINER_HeapNode *heap_node;
347
348 /**
349 * Bloomfilter containing entries for peers
350 * we forwarded this request to.
351 */
352 struct GNUNET_CONTAINER_BloomFilter *bloom;
353
354 /**
355 * Timestamp of this request, for ordering
356 * the min heap.
357 */
358 struct GNUNET_TIME_Absolute timestamp;
359
360 /**
361 * Key of this request.
362 */
363 GNUNET_HashCode key;
364
365 /**
366 * Unique identifier for this request, 0 if from another peer.
367 */
368 uint64_t uid;
369
370 /**
371 * Task to remove this entry on timeout.
372 */
373 GNUNET_SCHEDULER_TaskIdentifier remove_task;
374};
375
376
377/**
378 * Recent requests by time inserted.
379 */
380static struct GNUNET_CONTAINER_Heap *recent_heap;
381
382/**
383 * Context to use to calculate find peer rates.
384 */
385static struct FindPeerMessageContext find_peer_context;
386
387/**
388 * How many peers have we added since we sent out our last 131 * How many peers have we added since we sent out our last
389 * find peer request? 132 * find peer request?
390 */ 133 */
@@ -421,11 +164,6 @@ static struct GNUNET_TRANSPORT_Handle *transport_handle;
421static struct GNUNET_PeerIdentity my_identity; 164static struct GNUNET_PeerIdentity my_identity;
422 165
423/** 166/**
424 * Short id of the peer, for printing
425 */
426static char *my_short_id;
427
428/**
429 * Our HELLO 167 * Our HELLO
430 */ 168 */
431static struct GNUNET_MessageHeader *my_hello; 169static struct GNUNET_MessageHeader *my_hello;
@@ -441,28 +179,12 @@ static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
441static struct GNUNET_CONTAINER_MultiHashMap *recent_find_peer_requests; 179static struct GNUNET_CONTAINER_MultiHashMap *recent_find_peer_requests;
442 180
443/** 181/**
444 * Reply times for requests, if we are busy, don't send any
445 * more requests!
446 */
447static struct GNUNET_TIME_Relative reply_times[MAX_REPLY_TIMES];
448
449/**
450 * Current counter for replies.
451 */
452static unsigned int reply_counter;
453
454/**
455 * Our handle to the BLOCK library. 182 * Our handle to the BLOCK library.
456 */ 183 */
457static struct GNUNET_BLOCK_Context *block_context; 184static struct GNUNET_BLOCK_Context *block_context;
458 185
459 186
460 187
461/** Declare here so retry_core_send is aware of it */
462static size_t
463core_transmit_notify (void *cls, size_t size, void *buf);
464
465
466 188
467/** 189/**
468 * Given the largest send delay, artificially decrease it 190 * Given the largest send delay, artificially decrease it
@@ -531,170 +253,6 @@ decrement_stats (const char *value)
531} 253}
532 254
533 255
534/**
535 * Try to send another message from our core send list
536 */
537static void
538try_core_send (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
539{
540 struct PeerInfo *peer = cls;
541 struct P2PPendingMessage *pending;
542 size_t ssize;
543
544 peer->send_task = GNUNET_SCHEDULER_NO_TASK;
545
546 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
547 return;
548
549 if (peer->th != NULL)
550 return; /* Message send already in progress */
551
552 pending = peer->head;
553 if (pending != NULL)
554 {
555 ssize = ntohs (pending->msg->size);
556#if DEBUG_DHT > 1
557 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558 "`%s:%s': Calling notify_transmit_ready with size %d for peer %s\n",
559 my_short_id, "DHT", ssize, GNUNET_i2s (&peer->id));
560#endif
561 pending->scheduled = GNUNET_TIME_absolute_get ();
562 reply_counter++;
563 if (reply_counter >= MAX_REPLY_TIMES)
564 reply_counter = 0;
565 peer->th =
566 GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
567 pending->importance,
568 pending->timeout, &peer->id, ssize,
569 &core_transmit_notify, peer);
570 if (peer->th == NULL)
571 increment_stats ("# notify transmit ready failed");
572 }
573}
574
575
576/**
577 * Function called to send a request out to another peer.
578 * Called both for locally initiated requests and those
579 * received from other peers.
580 *
581 * @param msg the encapsulated message
582 * @param peer the peer to forward the message to
583 * @param msg_ctx the context of the message (hop count, bloom, etc.)
584 */
585static void
586forward_result_message (const struct GNUNET_MessageHeader *msg,
587 struct PeerInfo *peer,
588 struct DHT_MessageContext *msg_ctx)
589{
590 struct GNUNET_DHT_P2PRouteResultMessage *result_message;
591 struct P2PPendingMessage *pending;
592 size_t msize;
593 size_t psize;
594 char *path_start;
595 char *path_offset;
596
597 increment_stats (STAT_RESULT_FORWARDS);
598 msize =
599 sizeof (struct GNUNET_DHT_P2PRouteResultMessage) + ntohs (msg->size) +
600 (sizeof (struct GNUNET_PeerIdentity) * msg_ctx->path_history_len);
601 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
602 psize = sizeof (struct P2PPendingMessage) + msize;
603 pending = GNUNET_malloc (psize);
604 pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
605 pending->importance = DHT_SEND_PRIORITY;
606 pending->timeout = GNUNET_TIME_relative_get_forever ();
607 result_message = (struct GNUNET_DHT_P2PRouteResultMessage *) pending->msg;
608 result_message->header.size = htons (msize);
609 result_message->header.type =
610 htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE_RESULT);
611 result_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
612 if (msg_ctx->path_history_len > 0)
613 {
614 /* End of pending is where enc_msg starts */
615 path_start = (char *) &pending[1];
616 /* Offset by the size of the enc_msg */
617 path_start += ntohs (msg->size);
618 memcpy (path_start, msg_ctx->path_history,
619 msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
620 }
621 result_message->options = htonl (msg_ctx->msg_options);
622 result_message->hop_count = htonl (msg_ctx->hop_count + 1);
623 memcpy (&result_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
624 /* Copy the enc_msg, then the path history as well! */
625 memcpy (&result_message[1], msg, ntohs (msg->size));
626 path_offset = (char *) &result_message[1];
627 path_offset += ntohs (msg->size);
628 /* If we have path history, copy it to the end of the whole thing */
629 if (msg_ctx->path_history_len > 0)
630 memcpy (path_offset, msg_ctx->path_history,
631 msg_ctx->path_history_len * (sizeof (struct GNUNET_PeerIdentity)));
632#if DEBUG_DHT > 1
633 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
634 "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
635 "DHT", msize, GNUNET_i2s (&peer->id));
636#endif
637 peer->pending_count++;
638 increment_stats ("# pending messages scheduled");
639 GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
640 pending);
641 if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
642 peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
643}
644
645
646/**
647 * Called when core is ready to send a message we asked for
648 * out to the destination.
649 *
650 * @param cls closure (NULL)
651 * @param size number of bytes available in buf
652 * @param buf where the callee should write the message
653 * @return number of bytes written to buf
654 */
655static size_t
656core_transmit_notify (void *cls, size_t size, void *buf)
657{
658 struct PeerInfo *peer = cls;
659 char *cbuf = buf;
660 struct P2PPendingMessage *pending;
661
662 size_t off;
663 size_t msize;
664
665 peer->th = NULL;
666 if (buf == NULL)
667 {
668 /* client disconnected */
669#if DEBUG_DHT
670 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': buffer was NULL\n",
671 my_short_id, "DHT");
672#endif
673 return 0;
674 }
675
676 if (peer->head == NULL)
677 return 0;
678
679 off = 0;
680 pending = peer->head;
681 while (NULL != pending &&
682 (size - off >= (msize = ntohs (pending->msg->size))))
683 {
684 memcpy (&cbuf[off], pending->msg, msize);
685 off += msize;
686 peer->pending_count--;
687 increment_stats ("# pending messages sent");
688 GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
689 GNUNET_free (pending);
690 pending = peer->head;
691 }
692 if ((peer->head != NULL) && (peer->send_task == GNUNET_SCHEDULER_NO_TASK))
693 peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
694
695 return off;
696}
697
698 256
699/** 257/**
700 * Compute the distance between have and target as a 32-bit value. 258 * Compute the distance between have and target as a 32-bit value.
@@ -771,59 +329,6 @@ inverse_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have)
771} 329}
772 330
773 331
774/**
775 * Find which k-bucket this peer should go into,
776 * taking into account the size of the k-bucket
777 * array. This means that if more bits match than
778 * there are currently buckets, lowest_bucket will
779 * be returned.
780 *
781 * @param hc GNUNET_HashCode we are finding the bucket for.
782 *
783 * @return the proper bucket index for this key,
784 * or GNUNET_SYSERR on error (same hashcode)
785 */
786static int
787find_current_bucket (const GNUNET_HashCode * hc)
788{
789 int actual_bucket;
790
791 actual_bucket = find_bucket (hc);
792 if (actual_bucket == GNUNET_SYSERR) /* hc and our peer identity match! */
793 return lowest_bucket;
794 if (actual_bucket < lowest_bucket) /* actual_bucket not yet used */
795 return lowest_bucket;
796 return actual_bucket;
797}
798
799
800/**
801 * Find a routing table entry from a peer identity
802 *
803 * @param peer the peer identity to look up
804 *
805 * @return the routing table entry, or NULL if not found
806 */
807static struct PeerInfo *
808find_peer_by_id (const struct GNUNET_PeerIdentity *peer)
809{
810 int bucket;
811 struct PeerInfo *pos;
812
813 bucket = find_current_bucket (&peer->hashPubKey);
814
815 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
816 return NULL;
817
818 pos = k_buckets[bucket].head;
819 while (pos != NULL)
820 {
821 if (0 == memcmp (&pos->id, peer, sizeof (struct GNUNET_PeerIdentity)))
822 return pos;
823 pos = pos->next;
824 }
825 return NULL; /* No such peer. */
826}
827 332
828/* Forward declaration */ 333/* Forward declaration */
829static void 334static void
@@ -891,489 +396,6 @@ update_core_preference (void *cls,
891 &update_core_preference_finish, peer); 396 &update_core_preference_finish, peer);
892} 397}
893 398
894/**
895 * Find the closest peer in our routing table to the
896 * given hashcode.
897 *
898 * @return The closest peer in our routing table to the
899 * key, or NULL on error.
900 */
901static struct PeerInfo *
902find_closest_peer (const GNUNET_HashCode * hc)
903{
904 struct PeerInfo *pos;
905 struct PeerInfo *current_closest;
906 unsigned int lowest_distance;
907 unsigned int temp_distance;
908 int bucket;
909 int count;
910
911 lowest_distance = -1;
912
913 if (k_buckets[lowest_bucket].peers_size == 0)
914 return NULL;
915
916 current_closest = NULL;
917 for (bucket = lowest_bucket; bucket < MAX_BUCKETS; bucket++)
918 {
919 pos = k_buckets[bucket].head;
920 count = 0;
921 while ((pos != NULL) && (count < bucket_size))
922 {
923 temp_distance = distance (&pos->id.hashPubKey, hc);
924 if (temp_distance <= lowest_distance)
925 {
926 lowest_distance = temp_distance;
927 current_closest = pos;
928 }
929 pos = pos->next;
930 count++;
931 }
932 }
933 GNUNET_assert (current_closest != NULL);
934 return current_closest;
935}
936
937
938/**
939 * Function called to send a request out to another peer.
940 * Called both for locally initiated requests and those
941 * received from other peers.
942 *
943 * @param msg the encapsulated message
944 * @param peer the peer to forward the message to
945 * @param msg_ctx the context of the message (hop count, bloom, etc.)
946 */
947static void
948forward_message (const struct GNUNET_MessageHeader *msg, struct PeerInfo *peer,
949 struct DHT_MessageContext *msg_ctx)
950{
951 struct GNUNET_DHT_P2PRouteMessage *route_message;
952 struct P2PPendingMessage *pending;
953 size_t msize;
954 size_t psize;
955 char *route_path;
956
957 increment_stats (STAT_ROUTE_FORWARDS);
958 GNUNET_assert (peer != NULL);
959 if ((msg_ctx->closest != GNUNET_YES) &&
960 (peer == find_closest_peer (&msg_ctx->key)))
961 increment_stats (STAT_ROUTE_FORWARDS_CLOSEST);
962
963 msize =
964 sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (msg->size) +
965 (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
966 GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
967 psize = sizeof (struct P2PPendingMessage) + msize;
968 pending = GNUNET_malloc (psize);
969 pending->msg = (struct GNUNET_MessageHeader *) &pending[1];
970 pending->importance = msg_ctx->importance;
971 pending->timeout = msg_ctx->timeout;
972 route_message = (struct GNUNET_DHT_P2PRouteMessage *) pending->msg;
973 route_message->header.size = htons (msize);
974 route_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_ROUTE);
975 route_message->options = htonl (msg_ctx->msg_options);
976 route_message->hop_count = htonl (msg_ctx->hop_count + 1);
977 route_message->network_size = htonl (msg_ctx->network_size);
978 route_message->desired_replication_level = htonl (msg_ctx->replication);
979 if (msg_ctx->bloom != NULL)
980 GNUNET_assert (GNUNET_OK ==
981 GNUNET_CONTAINER_bloomfilter_get_raw_data (msg_ctx->bloom,
982 route_message->
983 bloomfilter,
984 DHT_BLOOM_SIZE));
985 memcpy (&route_message->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
986 memcpy (&route_message[1], msg, ntohs (msg->size));
987 if (GNUNET_DHT_RO_RECORD_ROUTE ==
988 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
989 {
990 route_message->outgoing_path_length = htonl (msg_ctx->path_history_len);
991 /* Set pointer to start of enc_msg */
992 route_path = (char *) &route_message[1];
993 /* Offset to the end of the enc_msg */
994 route_path += ntohs (msg->size);
995 /* Copy the route_path after enc_msg */
996 memcpy (route_path, msg_ctx->path_history,
997 msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
998 }
999#if DEBUG_DHT > 1
1000 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1001 "%s:%s Adding pending message size %d for peer %s\n", my_short_id,
1002 "DHT", msize, GNUNET_i2s (&peer->id));
1003#endif
1004 peer->pending_count++;
1005 increment_stats ("# pending messages scheduled");
1006 GNUNET_CONTAINER_DLL_insert_after (peer->head, peer->tail, peer->tail,
1007 pending);
1008 if (peer->send_task == GNUNET_SCHEDULER_NO_TASK)
1009 peer->send_task = GNUNET_SCHEDULER_add_now (&try_core_send, peer);
1010}
1011
1012
1013
1014
1015/**
1016 * Called when a reply needs to be sent to a client, as
1017 * a result it found to a GET or FIND PEER request.
1018 *
1019 * @param client the client to send the reply to
1020 * @param message the encapsulated message to send
1021 * @param msg_ctx the context of the received message
1022 */
1023static void
1024send_reply_to_client (struct ClientList *client,
1025 const struct GNUNET_MessageHeader *message,
1026 struct DHT_MessageContext *msg_ctx)
1027{
1028 struct GNUNET_DHT_RouteResultMessage *reply;
1029 struct PendingMessage *pending_message;
1030 uint16_t msize;
1031 size_t tsize;
1032 char *reply_offset;
1033
1034#if DEBUG_DHT
1035 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Sending reply to client.\n",
1036 my_short_id, "DHT");
1037#endif
1038 msize = ntohs (message->size);
1039 tsize =
1040 sizeof (struct GNUNET_DHT_RouteResultMessage) + msize +
1041 (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1042 if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1043 {
1044 GNUNET_break_op (0);
1045 return;
1046 }
1047 pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
1048 pending_message->msg = (struct GNUNET_MessageHeader *) &pending_message[1];
1049 reply = (struct GNUNET_DHT_RouteResultMessage *) &pending_message[1];
1050 reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_LOCAL_ROUTE_RESULT);
1051 reply->header.size = htons (tsize);
1052 reply->outgoing_path_length = htonl (msg_ctx->path_history_len);
1053 reply->unique_id = GNUNET_htonll (msg_ctx->unique_id);
1054 memcpy (&reply->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1055 reply_offset = (char *) &reply[1];
1056 memcpy (&reply[1], message, msize);
1057 if (msg_ctx->path_history_len > 0)
1058 {
1059 reply_offset += msize;
1060 memcpy (reply_offset, msg_ctx->path_history,
1061 msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1062 }
1063 add_pending_message (client, pending_message);
1064}
1065
1066/**
1067 * Consider whether or not we would like to have this peer added to
1068 * our routing table. Check whether bucket for this peer is full,
1069 * if so return negative; if not return positive. Since peers are
1070 * only added on CORE level connect, this doesn't actually add the
1071 * peer to the routing table.
1072 *
1073 * @param peer the peer we are considering adding
1074 *
1075 * @return GNUNET_YES if we want this peer, GNUNET_NO if not (bucket
1076 * already full)
1077 */
1078static int
1079consider_peer (struct GNUNET_PeerIdentity *peer)
1080{
1081 int bucket;
1082
1083 if ((GNUNET_YES ==
1084 GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
1085 &peer->hashPubKey)) ||
1086 (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))))
1087 return GNUNET_NO; /* We already know this peer (are connected even!) */
1088 bucket = find_current_bucket (&peer->hashPubKey);
1089
1090 if ((k_buckets[bucket].peers_size < bucket_size) ||
1091 ((bucket == lowest_bucket) && (lowest_bucket > 0)))
1092 return GNUNET_YES;
1093
1094 return GNUNET_NO;
1095}
1096
1097
1098/**
1099 * Task used to remove forwarding entries, either
1100 * after timeout, when full, or on shutdown.
1101 *
1102 * @param cls the entry to remove
1103 * @param tc context, reason, etc.
1104 */
1105static void
1106remove_forward_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1107{
1108 struct DHTRouteSource *source_info = cls;
1109 struct DHTQueryRecord *record;
1110
1111 source_info = GNUNET_CONTAINER_heap_remove_node (source_info->hnode);
1112 record = source_info->record;
1113 GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
1114
1115 if (record->head == NULL) /* No more entries in DLL */
1116 {
1117 GNUNET_assert (GNUNET_YES ==
1118 GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
1119 &record->key, record));
1120 GNUNET_free (record);
1121 }
1122 if (source_info->find_peers_responded != NULL)
1123 GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
1124 GNUNET_free (source_info);
1125}
1126
1127/**
1128 * Main function that handles whether or not to route a result
1129 * message to other peers, or to send to our local client.
1130 *
1131 * @param msg the result message to be routed
1132 * @param msg_ctx context of the message we are routing
1133 *
1134 * @return the number of peers the message was routed to,
1135 * GNUNET_SYSERR on failure
1136 */
1137static int
1138route_result_message (struct GNUNET_MessageHeader *msg,
1139 struct DHT_MessageContext *msg_ctx)
1140{
1141 struct GNUNET_PeerIdentity new_peer;
1142 struct DHTQueryRecord *record;
1143 struct DHTRouteSource *pos;
1144 struct PeerInfo *peer_info;
1145 const struct GNUNET_MessageHeader *hello_msg;
1146
1147#if DEBUG_DHT > 1
1148 unsigned int i;
1149#endif
1150
1151 increment_stats (STAT_RESULTS);
1152 /**
1153 * If a find peer result message is received and contains a valid
1154 * HELLO for another peer, offer it to the transport service.
1155 */
1156 if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
1157 {
1158 if (ntohs (msg->size) <= sizeof (struct GNUNET_MessageHeader))
1159 GNUNET_break_op (0);
1160
1161 hello_msg = &msg[1];
1162 if ((ntohs (hello_msg->type) != GNUNET_MESSAGE_TYPE_HELLO) ||
1163 (GNUNET_SYSERR ==
1164 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello_msg,
1165 &new_peer)))
1166 {
1167 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1168 "%s:%s Received non-HELLO message type in find peer result message!\n",
1169 my_short_id, "DHT");
1170 GNUNET_break_op (0);
1171 return GNUNET_NO;
1172 }
1173 else /* We have a valid hello, and peer id stored in new_peer */
1174 {
1175 find_peer_context.count++;
1176 increment_stats (STAT_FIND_PEER_REPLY);
1177 if (GNUNET_YES == consider_peer (&new_peer))
1178 {
1179 increment_stats (STAT_HELLOS_PROVIDED);
1180 GNUNET_TRANSPORT_offer_hello (transport_handle, hello_msg, NULL, NULL);
1181 GNUNET_CORE_peer_request_connect (coreAPI, &new_peer, NULL, NULL);
1182 }
1183 }
1184 }
1185
1186 record =
1187 GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
1188
1189 if (record == NULL) /* No record of this message! */
1190 {
1191#if DEBUG_DHT
1192 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1193 "`%s:%s': Have no record of response key %s uid %llu\n",
1194 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1195 msg_ctx->unique_id);
1196#endif
1197 return 0;
1198 }
1199
1200 pos = record->head;
1201 while (pos != NULL)
1202 {
1203 if (0 == memcmp (&pos->source, &my_identity, sizeof (struct GNUNET_PeerIdentity))) /* Local client (or DHT) initiated request! */
1204 {
1205#if DEBUG_DHT
1206 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1207 "`%s:%s': Sending response key %s uid %llu to client\n",
1208 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1209 msg_ctx->unique_id);
1210#endif
1211 increment_stats (STAT_RESULTS_TO_CLIENT);
1212 if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
1213 increment_stats (STAT_GET_REPLY);
1214#if DEBUG_DHT > 1
1215 for (i = 0; i < msg_ctx->path_history_len; i++)
1216 {
1217 char *path_offset;
1218
1219 path_offset =
1220 &msg_ctx->path_history[i * sizeof (struct GNUNET_PeerIdentity)];
1221 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222 "(before client) Key %s Found peer %d:%s\n",
1223 GNUNET_h2s (&msg_ctx->key), i,
1224 GNUNET_i2s ((struct GNUNET_PeerIdentity *) path_offset));
1225 }
1226#endif
1227 send_reply_to_client (pos->client, msg, msg_ctx);
1228 }
1229 else /* Send to peer */
1230 {
1231 peer_info = find_peer_by_id (&pos->source);
1232 if (peer_info == NULL) /* Didn't find the peer in our routing table, perhaps peer disconnected! */
1233 {
1234 pos = pos->next;
1235 continue;
1236 }
1237#if DEBUG_DHT
1238 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1239 "`%s:%s': Forwarding response key %s uid %llu to peer %s\n",
1240 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
1241 msg_ctx->unique_id, GNUNET_i2s (&peer_info->id));
1242#endif
1243 forward_result_message (msg, peer_info, msg_ctx);
1244 /* Try removing forward entries after sending once, only allows ONE response per request */
1245 if (pos->delete_task != GNUNET_SCHEDULER_NO_TASK)
1246 {
1247 GNUNET_SCHEDULER_cancel (pos->delete_task);
1248 pos->delete_task =
1249 GNUNET_SCHEDULER_add_now (&remove_forward_entry, pos);
1250 }
1251 }
1252 pos = pos->next;
1253 }
1254 return 0;
1255}
1256
1257
1258
1259
1260/**
1261 * Main function that handles whether or not to route a message to other
1262 * peers.
1263 *
1264 * @param msg the message to be routed
1265 * @param msg_ctx the context containing all pertinent information about the message
1266 */
1267static void
1268route_message (const struct GNUNET_MessageHeader *msg,
1269 struct DHT_MessageContext *msg_ctx);
1270
1271
1272/**
1273 * Server handler for all dht get requests, look for data,
1274 * if found, send response either to clients or other peers.
1275 *
1276 * @param msg the actual get message
1277 * @param msg_ctx struct containing pertinent information about the get request
1278 *
1279 * @return number of items found for GET request
1280 */
1281static unsigned int
1282handle_dht_get (const struct GNUNET_MessageHeader *msg,
1283 struct DHT_MessageContext *msg_ctx)
1284{
1285 const struct GNUNET_DHT_GetMessage *get_msg;
1286 uint16_t msize;
1287 uint16_t bf_size;
1288 unsigned int results;
1289 const char *end;
1290 enum GNUNET_BLOCK_Type type;
1291
1292 msize = ntohs (msg->size);
1293 if (msize < sizeof (struct GNUNET_DHT_GetMessage))
1294 {
1295 GNUNET_break (0);
1296 return 0;
1297 }
1298 get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
1299 bf_size = ntohs (get_msg->bf_size);
1300 msg_ctx->xquery_size = ntohs (get_msg->xquery_size);
1301 msg_ctx->reply_bf_mutator = get_msg->bf_mutator;
1302 if (msize !=
1303 sizeof (struct GNUNET_DHT_GetMessage) + bf_size + msg_ctx->xquery_size)
1304 {
1305 GNUNET_break_op (0);
1306 return 0;
1307 }
1308 end = (const char *) &get_msg[1];
1309 if (msg_ctx->xquery_size == 0)
1310 {
1311 msg_ctx->xquery = NULL;
1312 }
1313 else
1314 {
1315 msg_ctx->xquery = (const void *) end;
1316 end += msg_ctx->xquery_size;
1317 }
1318 if (bf_size == 0)
1319 {
1320 msg_ctx->reply_bf = NULL;
1321 }
1322 else
1323 {
1324 msg_ctx->reply_bf =
1325 GNUNET_CONTAINER_bloomfilter_init (end, bf_size,
1326 GNUNET_DHT_GET_BLOOMFILTER_K);
1327 }
1328 type = (enum GNUNET_BLOCK_Type) ntohl (get_msg->type);
1329#if DEBUG_DHT
1330 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331 "`%s:%s': Received `%s' request, message type %u, key %s, uid %llu\n",
1332 my_short_id, "DHT", "GET", type, GNUNET_h2s (&msg_ctx->key),
1333 msg_ctx->unique_id);
1334#endif
1335 increment_stats (STAT_GETS);
1336 results = 0;
1337 msg_ctx->do_forward = GNUNET_YES;
1338#if DEBUG_DHT
1339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1340 "`%s:%s': Found %d results for `%s' request uid %llu\n",
1341 my_short_id, "DHT", results, "GET", msg_ctx->unique_id);
1342#endif
1343 if (results >= 1)
1344 {
1345 }
1346 else
1347 {
1348 /* check query valid */
1349 if (GNUNET_BLOCK_EVALUATION_REQUEST_INVALID ==
1350 GNUNET_BLOCK_evaluate (block_context, type, &msg_ctx->key,
1351 &msg_ctx->reply_bf, msg_ctx->reply_bf_mutator,
1352 msg_ctx->xquery, msg_ctx->xquery_size, NULL, 0))
1353 {
1354 GNUNET_break_op (0);
1355 msg_ctx->do_forward = GNUNET_NO;
1356 }
1357 }
1358
1359 if (msg_ctx->do_forward == GNUNET_YES)
1360 route_message (msg, msg_ctx);
1361 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->reply_bf);
1362 return results;
1363}
1364
1365
1366static void
1367remove_recent_find_peer (void *cls,
1368 const struct GNUNET_SCHEDULER_TaskContext *tc)
1369{
1370 GNUNET_HashCode *key = cls;
1371
1372 GNUNET_assert (GNUNET_YES ==
1373 GNUNET_CONTAINER_multihashmap_remove
1374 (recent_find_peer_requests, key, NULL));
1375 GNUNET_free (key);
1376}
1377 399
1378 400
1379/** 401/**
@@ -1571,592 +593,6 @@ handle_dht_find_peer (const struct GNUNET_MessageHeader *find_msg,
1571} 593}
1572 594
1573 595
1574/**
1575 * Server handler for initiating local dht put requests
1576 *
1577 * @param msg the actual put message
1578 * @param msg_ctx struct containing pertinent information about the request
1579 */
1580static void
1581handle_dht_put (const struct GNUNET_MessageHeader *msg,
1582 struct DHT_MessageContext *msg_ctx)
1583{
1584 const struct GNUNET_DHT_PutMessage *put_msg;
1585 struct DHTPutEntry *put_entry;
1586 unsigned int put_size;
1587 char *path_offset;
1588 enum GNUNET_BLOCK_Type put_type;
1589 size_t data_size;
1590 int ret;
1591 GNUNET_HashCode key;
1592 struct DHTQueryRecord *record;
1593
1594 GNUNET_assert (ntohs (msg->size) >= sizeof (struct GNUNET_DHT_PutMessage));
1595
1596 put_msg = (const struct GNUNET_DHT_PutMessage *) msg;
1597 put_type = (enum GNUNET_BLOCK_Type) ntohl (put_msg->type);
1598 data_size =
1599 ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
1600 ret =
1601 GNUNET_BLOCK_get_key (block_context, put_type, &put_msg[1], data_size,
1602 &key);
1603 if (GNUNET_NO == ret)
1604 {
1605 /* invalid reply */
1606 GNUNET_break_op (0);
1607 return;
1608 }
1609 if ((GNUNET_YES == ret) &&
1610 (0 != memcmp (&key, &msg_ctx->key, sizeof (GNUNET_HashCode))))
1611 {
1612 /* invalid wrapper: key mismatch! */
1613 GNUNET_break_op (0);
1614 return;
1615 }
1616 /* ret == GNUNET_SYSERR means that there is no known relationship between
1617 * data and the key, so we cannot check it */
1618#if DEBUG_DHT
1619 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1620 "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
1621 my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
1622 msg_ctx->unique_id);
1623#endif
1624
1625 record = GNUNET_CONTAINER_multihashmap_get(forward_list.hashmap,
1626 &msg_ctx->key);
1627 if (NULL != record)
1628 {
1629 struct DHTRouteSource *pos;
1630 struct GNUNET_DHT_GetResultMessage *get_result;
1631 struct DHT_MessageContext new_msg_ctx;
1632 size_t get_size;
1633
1634 pos = record->head;
1635 while (pos != NULL)
1636 {
1637 /* TODO: do only for local started requests? or also for remote peers? */
1638 /* TODO: include this in statistics? under what? */
1639 /* TODO: reverse order of path_history? */
1640 if (NULL == pos->client)
1641 {
1642 pos = pos->next;
1643 continue;
1644 }
1645
1646 memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
1647 if (GNUNET_DHT_RO_RECORD_ROUTE ==
1648 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1649 {
1650 new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
1651 }
1652
1653 get_size =
1654 sizeof (struct GNUNET_DHT_GetResultMessage) + data_size +
1655 (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1656 get_result = GNUNET_malloc (get_size);
1657 get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
1658 get_result->header.size = htons (get_size);
1659 get_result->expiration = put_msg->expiration;
1660 get_result->type = put_msg->type;
1661 get_result->put_path_length = htons (msg_ctx->path_history_len);
1662
1663 /* Copy the actual data and the path_history to the end of the get result */
1664 memcpy (&get_result[1], &put_msg[1], data_size);
1665 path_offset = (char *) &get_result[1];
1666 path_offset += data_size;
1667 memcpy (path_offset, msg_ctx->path_history,
1668 msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1669 new_msg_ctx.peer = my_identity;
1670 new_msg_ctx.bloom = NULL;
1671 new_msg_ctx.hop_count = 0;
1672 /* Make result routing a higher priority */
1673 new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;
1674 new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
1675 new_msg_ctx.unique_id = pos->uid;
1676 send_reply_to_client(pos->client, &get_result->header, &new_msg_ctx);
1677 GNUNET_free (get_result);
1678 pos = pos->next;
1679 }
1680 }
1681
1682 if (msg_ctx->closest != GNUNET_YES)
1683 {
1684 route_message (msg, msg_ctx);
1685 return;
1686 }
1687
1688#if DEBUG_DHT
1689 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1690 "`%s:%s': Received `%s' request (inserting data!), message type %d, key %s, uid %llu\n",
1691 my_short_id, "DHT", "PUT", put_type, GNUNET_h2s (&msg_ctx->key),
1692 msg_ctx->unique_id);
1693#endif
1694
1695 increment_stats (STAT_PUTS_INSERTED);
1696
1697 route_message (msg, msg_ctx);
1698}
1699
1700
1701/**
1702 * To how many peers should we (on average)
1703 * forward the request to obtain the desired
1704 * target_replication count (on average).
1705 *
1706 * returns: target_replication / (est. hops) + (target_replication * hop_count)
1707 * where est. hops is typically 2 * the routing table depth
1708 *
1709 * @param hop_count number of hops the message has traversed
1710 * @param target_replication the number of total paths desired
1711 *
1712 * @return Some number of peers to forward the message to
1713 */
1714static unsigned int
1715get_forward_count (unsigned int hop_count, size_t target_replication)
1716{
1717 uint32_t random_value;
1718 unsigned int forward_count;
1719 float target_value;
1720
1721 if (hop_count > log_of_network_size_estimate * 4.0)
1722 {
1723 /* forcefully terminate */
1724 return 0;
1725 }
1726
1727 if (hop_count > log_of_network_size_estimate * 2.0)
1728 {
1729 /* keep forwarding, but no more replication */
1730 return 1;
1731 }
1732
1733 target_value =
1734 1 + (target_replication - 1.0) / (log_of_network_size_estimate +
1735 ((float) (target_replication - 1.0) *
1736 hop_count));
1737 /* Set forward count to floor of target_value */
1738 forward_count = (unsigned int) target_value;
1739 /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
1740 target_value = target_value - forward_count;
1741 random_value =
1742 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
1743 if (random_value < (target_value * UINT32_MAX))
1744 forward_count++;
1745 return forward_count;
1746}
1747
1748
1749/**
1750 * Check whether my identity is closer than any known peers.
1751 * If a non-null bloomfilter is given, check if this is the closest
1752 * peer that hasn't already been routed to.
1753 *
1754 * @param target hash code to check closeness to
1755 * @param bloom bloomfilter, exclude these entries from the decision
1756 * @return GNUNET_YES if node location is closest,
1757 * GNUNET_NO otherwise.
1758 */
1759static int
1760am_closest_peer (const GNUNET_HashCode * target,
1761 struct GNUNET_CONTAINER_BloomFilter *bloom)
1762{
1763 int bits;
1764 int other_bits;
1765 int bucket_num;
1766 int count;
1767 struct PeerInfo *pos;
1768 unsigned int my_distance;
1769
1770 if (0 == memcmp (&my_identity.hashPubKey, target, sizeof (GNUNET_HashCode)))
1771 return GNUNET_YES;
1772
1773 bucket_num = find_current_bucket (target);
1774
1775 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, target);
1776 my_distance = distance (&my_identity.hashPubKey, target);
1777 pos = k_buckets[bucket_num].head;
1778 count = 0;
1779 while ((pos != NULL) && (count < bucket_size))
1780 {
1781 if ((bloom != NULL) &&
1782 (GNUNET_YES ==
1783 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
1784 {
1785 pos = pos->next;
1786 continue; /* Skip already checked entries */
1787 }
1788
1789 other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, target);
1790 if (other_bits > bits)
1791 return GNUNET_NO;
1792 else if (other_bits == bits) /* We match the same number of bits */
1793 {
1794 if (distance (&pos->id.hashPubKey, target) < my_distance) /* Check all known peers, only return if we are the true closest */
1795 return GNUNET_NO;
1796 }
1797 pos = pos->next;
1798 }
1799
1800 /* No peers closer, we are the closest! */
1801 return GNUNET_YES;
1802}
1803
1804
1805/**
1806 * Select a peer from the routing table that would be a good routing
1807 * destination for sending a message for "target". The resulting peer
1808 * must not be in the set of blocked peers.<p>
1809 *
1810 * Note that we should not ALWAYS select the closest peer to the
1811 * target, peers further away from the target should be chosen with
1812 * exponentially declining probability.
1813 *
1814 * @param target the key we are selecting a peer to route to
1815 * @param bloom a bloomfilter containing entries this request has seen already
1816 * @param hops how many hops has this message traversed thus far
1817 *
1818 * @return Peer to route to, or NULL on error
1819 */
1820static struct PeerInfo *
1821select_peer (const GNUNET_HashCode * target,
1822 struct GNUNET_CONTAINER_BloomFilter *bloom, unsigned int hops)
1823{
1824 unsigned int bc;
1825 unsigned int count;
1826 unsigned int selected;
1827 struct PeerInfo *pos;
1828 unsigned int distance;
1829 unsigned int largest_distance;
1830 struct PeerInfo *chosen;
1831
1832 if (hops >= log_of_network_size_estimate)
1833 {
1834 /* greedy selection (closest peer that is not in bloomfilter) */
1835 largest_distance = 0;
1836 chosen = NULL;
1837 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1838 {
1839 pos = k_buckets[bc].head;
1840 count = 0;
1841 while ((pos != NULL) && (count < bucket_size))
1842 {
1843 /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
1844 if (GNUNET_NO ==
1845 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1846 {
1847 distance = inverse_distance (target, &pos->id.hashPubKey);
1848 if (distance > largest_distance)
1849 {
1850 chosen = pos;
1851 largest_distance = distance;
1852 }
1853 }
1854 count++;
1855 pos = pos->next;
1856 }
1857 }
1858 if ((largest_distance > 0) && (chosen != NULL))
1859 {
1860 GNUNET_CONTAINER_bloomfilter_add (bloom, &chosen->id.hashPubKey);
1861 return chosen;
1862 }
1863 return NULL; /* no peer available or we are the closest */
1864 }
1865
1866
1867 /* select "random" peer */
1868 /* count number of peers that are available and not filtered */
1869 count = 0;
1870 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1871 {
1872 pos = k_buckets[bc].head;
1873 while ((pos != NULL) && (count < bucket_size))
1874 {
1875 if (GNUNET_YES ==
1876 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1877 {
1878 pos = pos->next;
1879 increment_stats ("# peer blocked from selection by Bloom filter");
1880 continue; /* Ignore bloomfiltered peers */
1881 }
1882 count++;
1883 pos = pos->next;
1884 }
1885 }
1886 if (count == 0) /* No peers to select from! */
1887 {
1888 increment_stats ("# failed to select peer");
1889 return NULL;
1890 }
1891 /* Now actually choose a peer */
1892 selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1893 count = 0;
1894 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
1895 {
1896 pos = k_buckets[bc].head;
1897 while ((pos != NULL) && (count < bucket_size))
1898 {
1899 if (GNUNET_YES ==
1900 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
1901 {
1902 pos = pos->next;
1903 continue; /* Ignore bloomfiltered peers */
1904 }
1905 if (0 == selected--)
1906 return pos;
1907 pos = pos->next;
1908 }
1909 }
1910 GNUNET_break (0);
1911 return NULL;
1912}
1913
1914
1915/**
1916 * Remember this routing request so that if a reply is
1917 * received we can either forward it to the correct peer
1918 * or return the result locally.
1919 *
1920 * @param msg_ctx Context of the route request
1921 *
1922 * @return GNUNET_YES if this response was cached, GNUNET_NO if not
1923 */
1924static int
1925cache_response (struct DHT_MessageContext *msg_ctx)
1926{
1927 struct DHTQueryRecord *record;
1928 struct DHTRouteSource *source_info;
1929 struct DHTRouteSource *pos;
1930 struct GNUNET_TIME_Absolute now;
1931 unsigned int current_size;
1932
1933 current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
1934
1935 while (current_size >= MAX_OUTSTANDING_FORWARDS)
1936 {
1937 source_info = GNUNET_CONTAINER_heap_remove_root (forward_list.minHeap);
1938 GNUNET_assert (source_info != NULL);
1939 record = source_info->record;
1940 GNUNET_CONTAINER_DLL_remove (record->head, record->tail, source_info);
1941 if (record->head == NULL) /* No more entries in DLL */
1942 {
1943 GNUNET_assert (GNUNET_YES ==
1944 GNUNET_CONTAINER_multihashmap_remove (forward_list.hashmap,
1945 &record->key,
1946 record));
1947 GNUNET_free (record);
1948 }
1949 if (source_info->delete_task != GNUNET_SCHEDULER_NO_TASK)
1950 {
1951 GNUNET_SCHEDULER_cancel (source_info->delete_task);
1952 source_info->delete_task = GNUNET_SCHEDULER_NO_TASK;
1953 }
1954 if (source_info->find_peers_responded != NULL)
1955 GNUNET_CONTAINER_bloomfilter_free (source_info->find_peers_responded);
1956 GNUNET_free (source_info);
1957 current_size = GNUNET_CONTAINER_multihashmap_size (forward_list.hashmap);
1958 }
1959
1960 /** Non-local request and have too many outstanding forwards, discard! */
1961 if ((current_size >= MAX_OUTSTANDING_FORWARDS) && (msg_ctx->client == NULL))
1962 return GNUNET_NO;
1963
1964 now = GNUNET_TIME_absolute_get ();
1965 record =
1966 GNUNET_CONTAINER_multihashmap_get (forward_list.hashmap, &msg_ctx->key);
1967 if (record != NULL) /* Already know this request! */
1968 {
1969 pos = record->head;
1970 while (pos != NULL)
1971 {
1972 if (0 ==
1973 memcmp (&msg_ctx->peer, &pos->source,
1974 sizeof (struct GNUNET_PeerIdentity)))
1975 break; /* Already have this peer in reply list! */
1976 pos = pos->next;
1977 }
1978 if ((pos != NULL) && (pos->client == msg_ctx->client)) /* Seen this already */
1979 {
1980 GNUNET_CONTAINER_heap_update_cost (forward_list.minHeap, pos->hnode,
1981 now.abs_value);
1982 return GNUNET_NO;
1983 }
1984 }
1985 else
1986 {
1987 record = GNUNET_malloc (sizeof (struct DHTQueryRecord));
1988 GNUNET_assert (GNUNET_OK ==
1989 GNUNET_CONTAINER_multihashmap_put (forward_list.hashmap,
1990 &msg_ctx->key, record,
1991 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1992 memcpy (&record->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
1993 }
1994
1995 source_info = GNUNET_malloc (sizeof (struct DHTRouteSource));
1996 source_info->record = record;
1997 source_info->delete_task =
1998 GNUNET_SCHEDULER_add_delayed (DHT_FORWARD_TIMEOUT, &remove_forward_entry,
1999 source_info);
2000 source_info->find_peers_responded =
2001 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2002 source_info->source = msg_ctx->peer;
2003 GNUNET_CONTAINER_DLL_insert_after (record->head, record->tail, record->tail,
2004 source_info);
2005 if (msg_ctx->client != NULL) /* For local request, set timeout so high it effectively never gets pushed out */
2006 {
2007 source_info->client = msg_ctx->client;
2008 now = GNUNET_TIME_absolute_get_forever ();
2009 }
2010 source_info->hnode =
2011 GNUNET_CONTAINER_heap_insert (forward_list.minHeap, source_info,
2012 now.abs_value);
2013 source_info->uid = msg_ctx->unique_id;
2014#if DEBUG_DHT > 1
2015 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2016 "`%s:%s': Created new forward source info for %s uid %llu\n",
2017 my_short_id, "DHT", GNUNET_h2s (&msg_ctx->key),
2018 msg_ctx->unique_id);
2019#endif
2020 return GNUNET_YES;
2021}
2022
2023
2024/**
2025 * Main function that handles whether or not to route a message to other
2026 * peers.
2027 *
2028 * @param msg the message to be routed
2029 * @param msg_ctx the context containing all pertinent information about the message
2030 */
2031static void
2032route_message (const struct GNUNET_MessageHeader *msg,
2033 struct DHT_MessageContext *msg_ctx)
2034{
2035 int i;
2036 struct PeerInfo *selected;
2037 unsigned int target_forward_count;
2038 unsigned int forward_count;
2039 struct RecentRequest *recent_req;
2040 char *stat_forward_count;
2041 char *temp_stat_str;
2042
2043 increment_stats (STAT_ROUTES);
2044 target_forward_count =
2045 get_forward_count (msg_ctx->hop_count, msg_ctx->replication);
2046 GNUNET_asprintf (&stat_forward_count, "# forward counts of %d",
2047 target_forward_count);
2048 increment_stats (stat_forward_count);
2049 GNUNET_free (stat_forward_count);
2050 if (msg_ctx->bloom == NULL)
2051 msg_ctx->bloom =
2052 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2053
2054 if (GNUNET_CONTAINER_heap_get_size (recent_heap) >= DHT_MAX_RECENT)
2055 {
2056 recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
2057 GNUNET_assert (recent_req != NULL);
2058 GNUNET_SCHEDULER_cancel (recent_req->remove_task);
2059 GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
2060 GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom);
2061 GNUNET_free (recent_req);
2062 }
2063
2064 recent_req = GNUNET_malloc (sizeof (struct RecentRequest));
2065 recent_req->uid = msg_ctx->unique_id;
2066 memcpy (&recent_req->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
2067 recent_req->heap_node =
2068 GNUNET_CONTAINER_heap_insert (recent_heap, recent_req,
2069 GNUNET_TIME_absolute_get ().abs_value);
2070 recent_req->bloom =
2071 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
2072
2073 forward_count = 0;
2074 for (i = 0; i < target_forward_count; i++)
2075 {
2076 selected = select_peer (&msg_ctx->key, msg_ctx->bloom, msg_ctx->hop_count);
2077 if (selected == NULL)
2078 break;
2079 forward_count++;
2080 if (GNUNET_CRYPTO_hash_matching_bits
2081 (&selected->id.hashPubKey,
2082 &msg_ctx->key) >=
2083 GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey,
2084 &msg_ctx->key))
2085 GNUNET_asprintf (&temp_stat_str,
2086 "# requests routed to close(r) peer hop %u",
2087 msg_ctx->hop_count);
2088 else
2089 GNUNET_asprintf (&temp_stat_str,
2090 "# requests routed to less close peer hop %u",
2091 msg_ctx->hop_count);
2092 if (temp_stat_str != NULL)
2093 {
2094 increment_stats (temp_stat_str);
2095 GNUNET_free (temp_stat_str);
2096 }
2097 GNUNET_CONTAINER_bloomfilter_add (msg_ctx->bloom,
2098 &selected->id.hashPubKey);
2099 forward_message (msg, selected, msg_ctx);
2100 }
2101
2102 if (msg_ctx->bloom != NULL)
2103 {
2104 GNUNET_CONTAINER_bloomfilter_or2 (recent_req->bloom, msg_ctx->bloom,
2105 DHT_BLOOM_SIZE);
2106 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
2107 msg_ctx->bloom = NULL;
2108 }
2109}
2110
2111
2112/**
2113 * Main function that handles whether or not to route a message to other
2114 * peers.
2115 *
2116 * @param msg the message to be routed
2117 * @param msg_ctx the context containing all pertinent information about the message
2118 */
2119static void
2120demultiplex_message (const struct GNUNET_MessageHeader *msg,
2121 struct DHT_MessageContext *msg_ctx)
2122{
2123 /* FIXME: Should we use closest excluding those we won't route to (the bloomfilter problem)? */
2124 msg_ctx->closest = am_closest_peer (&msg_ctx->key, msg_ctx->bloom);
2125
2126 switch (ntohs (msg->type))
2127 {
2128 case GNUNET_MESSAGE_TYPE_DHT_GET: /* Add to hashmap of requests seen, search for data (always) */
2129 cache_response (msg_ctx);
2130 handle_dht_get (msg, msg_ctx);
2131 break;
2132 case GNUNET_MESSAGE_TYPE_DHT_PUT: /* Check if closest, if so insert data. */
2133 increment_stats (STAT_PUTS);
2134 handle_dht_put (msg, msg_ctx);
2135 break;
2136 case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER: /* Check if closest and not started by us, check options, add to requests seen */
2137 increment_stats (STAT_FIND_PEER);
2138 if (((msg_ctx->hop_count > 0) &&
2139 (0 !=
2140 memcmp (&msg_ctx->peer, &my_identity,
2141 sizeof (struct GNUNET_PeerIdentity)))) ||
2142 (msg_ctx->client != NULL))
2143 {
2144 cache_response (msg_ctx);
2145 if ((msg_ctx->closest == GNUNET_YES) ||
2146 (msg_ctx->msg_options == GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE))
2147 handle_dht_find_peer (msg, msg_ctx);
2148 }
2149 else
2150 route_message (msg, msg_ctx);
2151 break;
2152 default:
2153 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2154 "`%s': Message type (%d) not handled, forwarding anyway!\n",
2155 "DHT", ntohs (msg->type));
2156 route_message (msg, msg_ctx);
2157 }
2158}
2159
2160 596
2161/** 597/**
2162 * Receive the HELLO from transport service, 598 * Receive the HELLO from transport service,
@@ -2188,9 +624,6 @@ process_hello (void *cls, const struct GNUNET_MessageHeader *message)
2188static void 624static void
2189shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 625shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2190{ 626{
2191 int bucket_count;
2192 struct PeerInfo *pos;
2193
2194 if (NULL != ghh) 627 if (NULL != ghh)
2195 { 628 {
2196 GNUNET_TRANSPORT_get_hello_cancel (ghh); 629 GNUNET_TRANSPORT_get_hello_cancel (ghh);
@@ -2204,20 +637,9 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2204 } 637 }
2205 GDS_NEIGHBOURS_done (); 638 GDS_NEIGHBOURS_done ();
2206 GDS_DATACACHE_done (); 639 GDS_DATACACHE_done ();
640 GDS_ROUTING_done ();
641 GDS_CLIENT_done ();
2207 GDS_NSE_done (); 642 GDS_NSE_done ();
2208 for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
2209 {
2210 while (k_buckets[bucket_count].head != NULL)
2211 {
2212 pos = k_buckets[bucket_count].head;
2213#if DEBUG_DHT
2214 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2215 "%s:%s Removing peer %s from bucket %d!\n", my_short_id,
2216 "DHT", GNUNET_i2s (&pos->id), bucket_count);
2217#endif
2218 delete_peer (pos, bucket_count);
2219 }
2220 }
2221 if (stats != NULL) 643 if (stats != NULL)
2222 { 644 {
2223 GNUNET_STATISTICS_destroy (stats, GNUNET_YES); 645 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
@@ -2310,30 +732,9 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
2310int 732int
2311main (int argc, char *const *argv) 733main (int argc, char *const *argv)
2312{ 734{
2313 int ret; 735 return (GNUNET_OK ==
2314 struct RecentRequest *recent_req; 736 GNUNET_SERVICE_run (argc, argv, "dht", GNUNET_SERVICE_OPTION_NONE, &run,
2315 737 NULL)) ? 0 : 1;
2316 recent_heap =
2317 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2318 recent_find_peer_requests =
2319 GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
2320 ret =
2321 (GNUNET_OK ==
2322 GNUNET_SERVICE_run (argc, argv, "dht", GNUNET_SERVICE_OPTION_NONE, &run,
2323 NULL)) ? 0 : 1;
2324 while (GNUNET_CONTAINER_heap_get_size (recent_heap) > 0)
2325 {
2326 recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
2327 GNUNET_assert (recent_req != NULL);
2328 GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
2329 GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom);
2330 GNUNET_free (recent_req);
2331 }
2332 GNUNET_CONTAINER_heap_destroy (recent_heap);
2333 recent_heap = NULL;
2334 GNUNET_CONTAINER_multihashmap_destroy (recent_find_peer_requests);
2335 recent_find_peer_requests = NULL;
2336 return ret;
2337} 738}
2338 739
2339/* end of gnunet-service-dht.c */ 740/* end of gnunet-service-dht.c */
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
index 95a0d68d0..8790d8fbb 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -177,10 +177,9 @@ struct ClientQueryRecord
177 uint32_t msg_options; 177 uint32_t msg_options;
178 178
179 /** 179 /**
180 * The type for the data for the GET request; actually an 'enum 180 * The type for the data for the GET request.
181 * GNUNET_BLOCK_Type'.
182 */ 181 */
183 uint32_t msg_type; 182 enum GNUNET_BLOCK_Type msg_type;
184 183
185}; 184};
186 185
@@ -662,7 +661,7 @@ struct ForwardReplyContext
662 /** 661 /**
663 * Type of the data. 662 * Type of the data.
664 */ 663 */
665 uint32_t type; 664 enum GNUNET_BLOCK_Type type;
666 665
667 /** 666 /**
668 * Number of bytes in data. 667 * Number of bytes in data.
@@ -795,7 +794,7 @@ GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
795 const struct GNUNET_PeerIdentity *get_path, 794 const struct GNUNET_PeerIdentity *get_path,
796 unsigned int put_path_length, 795 unsigned int put_path_length,
797 const struct GNUNET_PeerIdentity *put_path, 796 const struct GNUNET_PeerIdentity *put_path,
798 uint32_t type, 797 enum GNUNET_BLOCK_Type type,
799 size_t data_size, 798 size_t data_size,
800 const void *data) 799 const void *data)
801{ 800{
diff --git a/src/dht/gnunet-service-dht_clients.h b/src/dht/gnunet-service-dht_clients.h
index db4f0b9fe..0bb548f71 100644
--- a/src/dht/gnunet-service-dht_clients.h
+++ b/src/dht/gnunet-service-dht_clients.h
@@ -50,7 +50,7 @@ GDS_CLIENT_handle_reply (struct GNUNET_TIME_Absolute expiration,
50 const struct GNUNET_PeerIdentity *get_path, 50 const struct GNUNET_PeerIdentity *get_path,
51 unsigned int put_path_length, 51 unsigned int put_path_length,
52 const struct GNUNET_PeerIdentity *put_path, 52 const struct GNUNET_PeerIdentity *put_path,
53 uint32_t type, 53 enum GNUNET_BLOCK_Type type,
54 size_t data_size, 54 size_t data_size,
55 const void *data); 55 const void *data);
56 56
diff --git a/src/dht/gnunet-service-dht_datacache.c b/src/dht/gnunet-service-dht_datacache.c
index b2dd05ac9..2c1a3fe20 100644
--- a/src/dht/gnunet-service-dht_datacache.c
+++ b/src/dht/gnunet-service-dht_datacache.c
@@ -72,7 +72,7 @@ GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration,
72 const GNUNET_HashCode *key, 72 const GNUNET_HashCode *key,
73 unsigned int put_path_length, 73 unsigned int put_path_length,
74 const struct GNUNET_PeerIdentity *put_path, 74 const struct GNUNET_PeerIdentity *put_path,
75 uint32_t type, 75 enum GNUNET_BLOCK_Type type,
76 size_t data_size, 76 size_t data_size,
77 const void *data) 77 const void *data)
78{ 78{
@@ -268,7 +268,7 @@ struct GetRequestContext
268 */ 268 */
269void 269void
270GDS_DATACACHE_handle_get (const GNUNET_HashCode *key, 270GDS_DATACACHE_handle_get (const GNUNET_HashCode *key,
271 uint32_t type, 271 enum GNUNET_BLOCK_Type type,
272 const void *xquery, 272 const void *xquery,
273 size_t xquery_size, 273 size_t xquery_size,
274 struct GNUNET_CONTAINER_BloomFilter **reply_bf, 274 struct GNUNET_CONTAINER_BloomFilter **reply_bf,
diff --git a/src/dht/gnunet-service-dht_datacache.h b/src/dht/gnunet-service-dht_datacache.h
index 0501e9e4c..ecb3a24a1 100644
--- a/src/dht/gnunet-service-dht_datacache.h
+++ b/src/dht/gnunet-service-dht_datacache.h
@@ -44,7 +44,7 @@ GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration,
44 const GNUNET_HashCode *key, 44 const GNUNET_HashCode *key,
45 unsigned int put_path_length, 45 unsigned int put_path_length,
46 const struct GNUNET_PeerIdentity *put_path, 46 const struct GNUNET_PeerIdentity *put_path,
47 uint32_t type, 47 enum GNUNET_BLOCK_Type type,
48 size_t data_size, 48 size_t data_size,
49 const void *data); 49 const void *data);
50 50
@@ -61,7 +61,7 @@ GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration,
61 */ 61 */
62void 62void
63GDS_DATACACHE_handle_get (const GNUNET_HashCode *key, 63GDS_DATACACHE_handle_get (const GNUNET_HashCode *key,
64 uint32_t type, 64 enum GNUNET_BLOCK_Type type,
65 const void *xquery, 65 const void *xquery,
66 size_t xquery_size, 66 size_t xquery_size,
67 struct GNUNET_CONTAINER_BloomFilter **reply_bf, 67 struct GNUNET_CONTAINER_BloomFilter **reply_bf,
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
index b7cc2048e..425411b75 100644
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -38,6 +38,7 @@
38#include "gnunet_statistics_service.h" 38#include "gnunet_statistics_service.h"
39#include "dht.h" 39#include "dht.h"
40#include "gnunet-service-dht_datacache.h" 40#include "gnunet-service-dht_datacache.h"
41#include "gnunet-service-dht_routing.h"
41#include <fenv.h> 42#include <fenv.h>
42 43
43/** 44/**
@@ -139,6 +140,11 @@ struct PeerResultMessage
139 uint32_t get_path_length GNUNET_PACKED; 140 uint32_t get_path_length GNUNET_PACKED;
140 141
141 /** 142 /**
143 * When does the content expire?
144 */
145 struct GNUNET_TIME_AbsoluteNBO expiration_time;
146
147 /**
142 * The key of the corresponding GET request. 148 * The key of the corresponding GET request.
143 */ 149 */
144 GNUNET_HashCode key; 150 GNUNET_HashCode key;
@@ -582,8 +588,6 @@ process_peer_queue (struct PeerInfo *peer)
582 * To how many peers should we (on average) forward the request to 588 * To how many peers should we (on average) forward the request to
583 * obtain the desired target_replication count (on average). 589 * obtain the desired target_replication count (on average).
584 * 590 *
585 * FIXME: double-check that this is fine
586 *
587 * @param hop_count number of hops the message has traversed 591 * @param hop_count number of hops the message has traversed
588 * @param target_replication the number of total paths desired 592 * @param target_replication the number of total paths desired
589 * @return Some number of peers to forward the message to 593 * @return Some number of peers to forward the message to
@@ -596,14 +600,19 @@ get_forward_count (uint32_t hop_count,
596 uint32_t forward_count; 600 uint32_t forward_count;
597 float target_value; 601 float target_value;
598 602
599 /* bound by system-wide maximum */ 603 if (hop_count > log_of_network_size_estimate * 4.0)
600 target_replication = GNUNET_MIN (16 /* FIXME: use named constant */, 604 {
601 target_replication); 605 /* forcefully terminate */
606 return 0;
607 }
602 if (hop_count > log_of_network_size_estimate * 2.0) 608 if (hop_count > log_of_network_size_estimate * 2.0)
603 { 609 {
604 /* Once we have reached our ideal number of hops, only forward to 1 peer */ 610 /* Once we have reached our ideal number of hops, only forward to 1 peer */
605 return 1; 611 return 1;
606 } 612 }
613 /* bound by system-wide maximum */
614 target_replication = GNUNET_MIN (16 /* FIXME: use named constant */,
615 target_replication);
607 target_value = 616 target_value =
608 1 + (target_replication - 1.0) / (log_of_network_size_estimate + 617 1 + (target_replication - 1.0) / (log_of_network_size_estimate +
609 ((float) (target_replication - 1.0) * 618 ((float) (target_replication - 1.0) *
@@ -847,8 +856,8 @@ get_target_peers (const GNUNET_HashCode *key,
847 * @param data_size number of bytes in data 856 * @param data_size number of bytes in data
848 */ 857 */
849void 858void
850GDS_NEIGHBOURS_handle_put (uint32_t type, 859GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
851 uint32_t options, 860 enum GNUNET_DHT_RouteOption options,
852 uint32_t desired_replication_level, 861 uint32_t desired_replication_level,
853 GNUNET_TIME_Absolute expiration_time, 862 GNUNET_TIME_Absolute expiration_time,
854 uint32_t hop_count, 863 uint32_t hop_count,
@@ -936,8 +945,8 @@ GDS_NEIGHBOURS_handle_put (uint32_t type,
936 * @param peer_bf filter for peers not to select (again) 945 * @param peer_bf filter for peers not to select (again)
937 */ 946 */
938void 947void
939GDS_NEIGHBOURS_handle_get (uint32_t type, 948GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
940 uint32_t options, 949 enum GNUNET_DHT_RouteOption options,
941 uint32_t desired_replication_level, 950 uint32_t desired_replication_level,
942 uint32_t hop_count, 951 uint32_t hop_count,
943 const GNUNET_HashCode *key, 952 const GNUNET_HashCode *key,
@@ -969,6 +978,7 @@ GDS_NEIGHBOURS_handle_get (uint32_t type,
969 GNUNET_break (0); 978 GNUNET_break (0);
970 return; 979 return;
971 } 980 }
981 /* forward request */
972 for (i=0;i<target_count;i++) 982 for (i=0;i<target_count;i++)
973 { 983 {
974 target = targets[i]; 984 target = targets[i];
@@ -1008,11 +1018,11 @@ GDS_NEIGHBOURS_handle_get (uint32_t type,
1008 1018
1009/** 1019/**
1010 * Handle a reply (route to origin). Only forwards the reply back to 1020 * Handle a reply (route to origin). Only forwards the reply back to
1011 * other peers waiting for it. Does not do local caching or 1021 * the given peer. Does not do local caching or forwarding to local
1012 * forwarding to local clients. 1022 * clients.
1013 * 1023 *
1024 * @param target neighbour that should receive the block (if still connected)
1014 * @param type type of the block 1025 * @param type type of the block
1015 * @param options routing options
1016 * @param expiration_time when does the content expire 1026 * @param expiration_time when does the content expire
1017 * @param key key for the content 1027 * @param key key for the content
1018 * @param put_path_length number of entries in put_path 1028 * @param put_path_length number of entries in put_path
@@ -1023,8 +1033,8 @@ GDS_NEIGHBOURS_handle_get (uint32_t type,
1023 * @param data_size number of bytes in data 1033 * @param data_size number of bytes in data
1024 */ 1034 */
1025void 1035void
1026GDS_NEIGHBOURS_handle_reply (uint32_t type, 1036GDS_NEIGHBOURS_handle_reply (const GNUNET_PeerIdentity *target,
1027 uint32_t options, 1037 enum GNUNET_BLOCK_Type type,
1028 GNUNET_TIME_Absolute expiration_time, 1038 GNUNET_TIME_Absolute expiration_time,
1029 const GNUNET_HashCode *key, 1039 const GNUNET_HashCode *key,
1030 unsigned int put_path_length, 1040 unsigned int put_path_length,
@@ -1034,7 +1044,51 @@ GDS_NEIGHBOURS_handle_reply (uint32_t type,
1034 const void *data, 1044 const void *data,
1035 size_t data_size) 1045 size_t data_size)
1036{ 1046{
1037 // FIXME 1047 struct PeerInfo *pi;
1048 struct P2PPendingMessage *pending;
1049 size_t msize;
1050 struct PeerResultMessage *prm;
1051 struct GNUNET_PeerIdentity *paths;
1052
1053 msize = data_size + sizeof (struct PeerResultMessage) +
1054 (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1055 if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1056 (get_path_length + put_path_length > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1057 (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE) )
1058 {
1059 GNUNET_break (0);
1060 return;
1061 }
1062 pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers,
1063 &target->hashPubKey);
1064 if (NULL == pi)
1065 {
1066 /* peer disconnected in the meantime, drop reply */
1067 return;
1068 }
1069 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1070 pending->importance = 0; /* FIXME */
1071 pending->timeout = expiration_time;
1072 prm = (struct PeerResultMessage*) &pending[1];
1073 pending->msg = &prm->header;
1074 prm->header.size = htons (msize);
1075 prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1076 prm->type = htonl (type);
1077 prm->put_path_length = htonl (put_path_length);
1078 prm->get_path_length = htonl (get_path_length);
1079 prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1080 prm->key = *key;
1081 paths = (struct GNUNET_PeerIdentity) &prm[1];
1082 memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
1083 memcpy (&paths[put_path_length],
1084 get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1085 memcpy (&paths[put_path_length + get_path_length],
1086 data, data_size);
1087 GNUNET_CONTAINER_DLL_insert (target->head,
1088 target->tail,
1089 pending);
1090 target->pending_count++;
1091 process_peer_queue (target);
1038} 1092}
1039 1093
1040 1094
@@ -1281,79 +1335,91 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1281 const struct GNUNET_TRANSPORT_ATS_Information 1335 const struct GNUNET_TRANSPORT_ATS_Information
1282 *atsi) 1336 *atsi)
1283{ 1337{
1284 // 1) validate GET 1338 struct PeerGetMessage *get;
1285 // 2) store in routing table 1339 uint32_t xquery_size;
1286 // 3) check options (i.e. FIND PEER) 1340 size_t reply_bf_size;
1287 // 4) local lookup (=> need eval result!) 1341 uint16_t msize;
1288 // 5) p2p forwarding 1342 enum GNUNET_BLOCK_Type type;
1289 1343 enum GNUNET_DHT_RouteOption options;
1290 1344 enum GNUNET_BLOCK_EvaluationResult eval;
1291 struct GNUNET_DHT_P2PRouteMessage *incoming = 1345 struct GNUNET_CONTAINER_BloomFilter *reply_bf;
1292 (struct GNUNET_DHT_P2PRouteMessage *) message; 1346 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
1293 struct GNUNET_MessageHeader *enc_msg = 1347 const char *xquery;
1294 (struct GNUNET_MessageHeader *) &incoming[1]; 1348
1295 struct DHT_MessageContext *msg_ctx; 1349 /* parse and validate message */
1296 char *route_path; 1350 msize = ntohs (message->size);
1297 int path_size; 1351 if (msize < sizeof (struct PeerGetMessage))
1298
1299 // FIXME
1300 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
1301 { 1352 {
1302 GNUNET_break_op (0); 1353 GNUNET_break_op (0);
1303 return GNUNET_YES; 1354 return GNUNET_YES;
1304 } 1355 }
1305 1356 get = (struct PeerGetMessage *) message;
1306 if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value) 1357 xquery_size = ntohl (get->xquery_size);
1358 if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1307 { 1359 {
1308 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1360 GNUNET_break_op (0);
1309 "Sending of previous replies took too long, backing off!\n");
1310 increment_stats ("# route requests dropped due to high load");
1311 decrease_max_send_delay (get_max_send_delay ());
1312 return GNUNET_YES; 1361 return GNUNET_YES;
1313 } 1362 }
1314 msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext)); 1363 reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
1315 msg_ctx->bloom = 1364 type = ntohl (get->type);
1316 GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE, 1365 options = ntohl (get->options);
1317 DHT_BLOOM_K); 1366 xquery = (const char*) &get[1];
1318 GNUNET_assert (msg_ctx->bloom != NULL); 1367 reply_bf = NULL;
1319 msg_ctx->hop_count = ntohl (incoming->hop_count); 1368 if (reply_bf_size > 0)
1320 memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode)); 1369 reply_bf = GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
1321 msg_ctx->replication = ntohl (incoming->desired_replication_level); 1370 reply_bf_size,
1322 msg_ctx->msg_options = ntohl (incoming->options); 1371 GNUNET_DHT_GET_BLOOMFILTER_K);
1323 if (GNUNET_DHT_RO_RECORD_ROUTE == 1372 eval = GNUNET_BLOCK_evaluate (block_context,
1324 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) 1373 type,
1325 { 1374 &get->key,
1326 path_size = 1375 &reply_bf,
1327 ntohl (incoming->outgoing_path_length) * 1376 get->bf_mutator,
1328 sizeof (struct GNUNET_PeerIdentity); 1377 xquery, xquery_size,
1329 if (ntohs (message->size) != 1378 NULL, 0);
1330 (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) + 1379 if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID)
1331 path_size))
1332 {
1333 GNUNET_break_op (0);
1334 GNUNET_free (msg_ctx);
1335 return GNUNET_YES;
1336 }
1337 route_path = (char *) &incoming[1];
1338 route_path = route_path + ntohs (enc_msg->size);
1339 msg_ctx->path_history =
1340 GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
1341 memcpy (msg_ctx->path_history, route_path, path_size);
1342 memcpy (&msg_ctx->path_history[path_size], &my_identity,
1343 sizeof (struct GNUNET_PeerIdentity));
1344 msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
1345 }
1346 msg_ctx->network_size = ntohl (incoming->network_size);
1347 msg_ctx->peer = *peer;
1348 msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
1349 msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
1350 demultiplex_message (enc_msg, msg_ctx);
1351 if (msg_ctx->bloom != NULL)
1352 { 1380 {
1353 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom); 1381 /* request invalid or block type not supported */
1354 msg_ctx->bloom = NULL; 1382 GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
1383 if (NULL != reply_bf)
1384 GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1385 return GNUNET_YES;
1355 } 1386 }
1356 GNUNET_free (msg_ctx); 1387 peer_bf =
1388 GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter,
1389 DHT_BLOOM_SIZE,
1390 DHT_BLOOM_K);
1391
1392 /* remember request for routing replies */
1393 GDS_ROUTING_add (peer,
1394 type,
1395 &get->key,
1396 xquery, xquery_size,
1397 reply_bf, get->reply_bf_mutator);
1398 /* FIXME: check options (find peer, local-processing-only-if-nearest, etc.!) */
1399
1400 /* local lookup (this may update the reply_bf) */
1401 GDS_DATACACHE_handle_get (&get->key,
1402 type,
1403 xquery, xquery_size,
1404 &reply_bf,
1405 get->reply_bf_mutator);
1406 /* FIXME: should track if the local lookup resulted in a
1407 definitive result and then NOT do P2P forwarding */
1408
1409 /* P2P forwarding */
1410 GDS_NEIGHBOURS_handle_get (type,
1411 options,
1412 ntohl (get->desired_replication_level),
1413 ntohl (get->hop_count) + 1, /* CHECK: where (else) do we do +1? */
1414 &get->key,
1415 xquery, xquery_size,
1416 reply_bf,
1417 get->reply_bf_mutator,
1418 peer_bf);
1419 /* clean up */
1420 if (NULL != reply_bf)
1421 GNUNET_CONTAINER_bloomfilter_free (reply_bf);
1422 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
1357 return GNUNET_YES; 1423 return GNUNET_YES;
1358} 1424}
1359 1425
@@ -1365,7 +1431,7 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1365 * @param message message 1431 * @param message message
1366 * @param peer peer identity this notification is about 1432 * @param peer peer identity this notification is about
1367 * @param atsi performance data 1433 * @param atsi performance data
1368 * 1434 * @return GNUNET_YES (do not cut p2p connection)
1369 */ 1435 */
1370static int 1436static int
1371handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer, 1437handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
@@ -1373,6 +1439,7 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1373 const struct GNUNET_TRANSPORT_ATS_Information 1439 const struct GNUNET_TRANSPORT_ATS_Information
1374 *atsi) 1440 *atsi)
1375{ 1441{
1442 // FIXME!
1376 // 1) validate result format 1443 // 1) validate result format
1377 // 2) append 'peer' to put path 1444 // 2) append 'peer' to put path
1378 // 3) forward to local clients 1445 // 3) forward to local clients
@@ -1383,38 +1450,8 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
1383 (struct GNUNET_MessageHeader *) &incoming[1]; 1450 (struct GNUNET_MessageHeader *) &incoming[1];
1384 struct DHT_MessageContext msg_ctx; 1451 struct DHT_MessageContext msg_ctx;
1385 1452
1386 // FIXME
1387 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
1388 {
1389 GNUNET_break_op (0);
1390 return GNUNET_YES;
1391 }
1392 1453
1393 memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext)); 1454
1394 memcpy (&msg_ctx.key, &incoming->key, sizeof (GNUNET_HashCode));
1395 msg_ctx.msg_options = ntohl (incoming->options);
1396 msg_ctx.hop_count = ntohl (incoming->hop_count);
1397 msg_ctx.peer = *peer;
1398 msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
1399 msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
1400 if ((GNUNET_DHT_RO_RECORD_ROUTE ==
1401 (msg_ctx.msg_options & GNUNET_DHT_RO_RECORD_ROUTE)) &&
1402 (ntohl (incoming->outgoing_path_length) > 0))
1403 {
1404 if (ntohs (message->size) -
1405 sizeof (struct GNUNET_DHT_P2PRouteResultMessage) -
1406 ntohs (enc_msg->size) !=
1407 ntohl (incoming->outgoing_path_length) *
1408 sizeof (struct GNUNET_PeerIdentity))
1409 {
1410 GNUNET_break_op (0);
1411 return GNUNET_NO;
1412 }
1413 msg_ctx.path_history = (char *) &incoming[1];
1414 msg_ctx.path_history += ntohs (enc_msg->size);
1415 msg_ctx.path_history_len = ntohl (incoming->outgoing_path_length);
1416 }
1417 route_result_message (enc_msg, &msg_ctx);
1418 return GNUNET_YES; 1455 return GNUNET_YES;
1419} 1456}
1420 1457
diff --git a/src/dht/gnunet-service-dht_neighbours.h b/src/dht/gnunet-service-dht_neighbours.h
index 2c20df2c4..70723deac 100644
--- a/src/dht/gnunet-service-dht_neighbours.h
+++ b/src/dht/gnunet-service-dht_neighbours.h
@@ -48,8 +48,8 @@
48 * @param data_size number of bytes in data 48 * @param data_size number of bytes in data
49 */ 49 */
50void 50void
51GDS_NEIGHBOURS_handle_put (uint32_t type, 51GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
52 uint32_t options, 52 enum GNUNET_DHT_RouteOption options,
53 uint32_t desired_replication_level, 53 uint32_t desired_replication_level,
54 GNUNET_TIME_Absolute expiration_time, 54 GNUNET_TIME_Absolute expiration_time,
55 uint32_t hop_count, 55 uint32_t hop_count,
@@ -79,8 +79,8 @@ GDS_NEIGHBOURS_handle_put (uint32_t type,
79 * @param peer_bf filter for peers not to select (again) 79 * @param peer_bf filter for peers not to select (again)
80 */ 80 */
81void 81void
82GDS_NEIGHBOURS_handle_get (uint32_t type, 82GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
83 uint32_t options, 83 enum GNUNET_DHT_RouteOption options,
84 uint32_t desired_replication_level, 84 uint32_t desired_replication_level,
85 uint32_t hop_count, 85 uint32_t hop_count,
86 const GNUNET_HashCode *key, 86 const GNUNET_HashCode *key,
@@ -96,6 +96,7 @@ GDS_NEIGHBOURS_handle_get (uint32_t type,
96 * other peers waiting for it. Does not do local caching or 96 * other peers waiting for it. Does not do local caching or
97 * forwarding to local clients. 97 * forwarding to local clients.
98 * 98 *
99 * @param target neighbour that should receive the block (if still connected)
99 * @param type type of the block 100 * @param type type of the block
100 * @param expiration_time when does the content expire 101 * @param expiration_time when does the content expire
101 * @param key key for the content 102 * @param key key for the content
@@ -107,7 +108,8 @@ GDS_NEIGHBOURS_handle_get (uint32_t type,
107 * @param data_size number of bytes in data 108 * @param data_size number of bytes in data
108 */ 109 */
109void 110void
110GDS_NEIGHBOURS_handle_reply (uint32_t type, 111GDS_NEIGHBOURS_handle_reply (const GNUNET_PeerIdentity *target,
112 enum GNUNET_BLOCK_Type type,
111 GNUNET_TIME_Absolute expiration_time, 113 GNUNET_TIME_Absolute expiration_time,
112 const GNUNET_HashCode *key, 114 const GNUNET_HashCode *key,
113 unsigned int put_path_length, 115 unsigned int put_path_length,
diff --git a/src/dht/gnunet-service-dht_nse.c b/src/dht/gnunet-service-dht_nse.c
index 4711c9c31..0715465f5 100644
--- a/src/dht/gnunet-service-dht_nse.c
+++ b/src/dht/gnunet-service-dht_nse.c
@@ -58,21 +58,21 @@ update_network_size_estimate (void *cls, struct GNUNET_TIME_Absolute timestamp,
58 58
59 59
60double 60double
61GDS_nse_get () 61GDS_NSE_get ()
62{ 62{
63 return log_of_network_size_estimate; 63 return log_of_network_size_estimate;
64} 64}
65 65
66 66
67void 67void
68GDS_nse_init () 68GDS_NSE_init ()
69{ 69{
70 nse = GNUNET_NSE_connect (GDS_cfg, &update_network_size_estimate, NULL); 70 nse = GNUNET_NSE_connect (GDS_cfg, &update_network_size_estimate, NULL);
71} 71}
72 72
73 73
74void 74void
75GDS_nse_done () 75GDS_NSE_done ()
76{ 76{
77 if (NULL != nse) 77 if (NULL != nse)
78 { 78 {
diff --git a/src/dht/gnunet-service-dht_nse.h b/src/dht/gnunet-service-dht_nse.h
index 4642d4d9c..e2f73a9dd 100644
--- a/src/dht/gnunet-service-dht_nse.h
+++ b/src/dht/gnunet-service-dht_nse.h
@@ -28,13 +28,13 @@
28 28
29 29
30double 30double
31GDS_nse_get (void); 31GDS_NSE_get (void);
32 32
33 33
34void 34void
35GDS_nse_init (void); 35GDS_NSE_init (void);
36 36
37void 37void
38GDS_nse_done (void); 38GDS_NSE_done (void);
39 39
40#endif 40#endif
diff --git a/src/dht/gnunet-service-dht_routing.c b/src/dht/gnunet-service-dht_routing.c
new file mode 100644
index 000000000..535a63267
--- /dev/null
+++ b/src/dht/gnunet-service-dht_routing.c
@@ -0,0 +1,211 @@
1/*
2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht_routing.c
23 * @brief GNUnet DHT tracking of requests for routing replies
24 * @author Christian Grothoff
25 */
26
27#include "gnunet-service-dht_routing.h"
28
29
30/**
31 * Number of requests we track at most (for routing replies).
32 */
33#define DHT_MAX_RECENT (1024 * 16)
34
35
36/**
37 * Information we keep about all recent GET requests
38 * so that we can route replies.
39 */
40struct RecentRequest
41{
42
43 /**
44 * The peer this request was received from.
45 */
46 struct GNUNET_PeerIdentity peer;
47
48 /**
49 * Position of this node in the min heap.
50 */
51 struct GNUNET_CONTAINER_HeapNode *heap_node;
52
53 /**
54 * Bloomfilter for replies to drop.
55 */
56 struct GNUNET_CONTAINER_BloomFilter *reply_bf;
57
58 /**
59 * Timestamp of this request, for ordering
60 * the min heap.
61 */
62 struct GNUNET_TIME_Absolute timestamp;
63
64 /**
65 * Type of the requested block.
66 */
67 enum GNUNET_BLOCK_Type type;
68
69 /**
70 * extended query (see gnunet_block_lib.h). Allocated at the
71 * end of this struct.
72 */
73 const void *xquery;
74
75 /**
76 * Number of bytes in xquery.
77 */
78 size_t xquery_size;
79
80 /**
81 * Mutator value for the reply_bf, see gnunet_block_lib.h
82 */
83 uint32_t reply_bf_mutator;
84
85 /**
86 * Key of this request.
87 */
88 GNUNET_HashCode key;
89
90};
91
92
93/**
94 * Recent requests by time inserted.
95 */
96static struct GNUNET_CONTAINER_Heap *recent_heap;
97
98/**
99 * Recently seen requests by key.
100 */
101static struct GNUNET_CONTAINER_MultiHashMap *recent_map;
102
103
104/**
105 * Handle a reply (route to origin). Only forwards the reply back to
106 * other peers waiting for it. Does not do local caching or
107 * forwarding to local clients. Essentially calls
108 * GDS_NEIGHBOURS_handle_reply for all peers that sent us a matching
109 * request recently.
110 *
111 * @param type type of the block
112 * @param expiration_time when does the content expire
113 * @param key key for the content
114 * @param put_path_length number of entries in put_path
115 * @param put_path peers the original PUT traversed (if tracked)
116 * @param get_path_length number of entries in put_path
117 * @param get_path peers this reply has traversed so far (if tracked)
118 * @param data payload of the reply
119 * @param data_size number of bytes in data
120 */
121void
122GDS_ROUTING_process (enum GNUNET_BLOCK_Type type,
123 GNUNET_TIME_Absolute expiration_time,
124 const GNUNET_HashCode *key,
125 unsigned int put_path_length,
126 struct GNUNET_PeerIdentity *put_path,
127 unsigned int get_path_length,
128 struct GNUNET_PeerIdentity *get_path,
129 const void *data,
130 size_t data_size)
131{
132}
133
134
135/**
136 * Add a new entry to our routing table.
137 *
138 * @param sender peer that originated the request
139 * @param type type of the block
140 * @param key key for the content
141 * @param xquery extended query
142 * @param xquery_size number of bytes in xquery
143 * @param reply_bf bloomfilter to filter duplicates
144 * @param reply_bf_mutator mutator for reply_bf
145*/
146void
147GDS_ROUTING_add (const GNUNET_PeerIdentity *sender,
148 enum GNUNET_BLOCK_Type type,
149 const GNUNET_HashCode *key,
150 const void *xquery,
151 size_t xquery_size,
152 const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
153 uint32_t reply_bf_mutator)
154{
155 if (GNUNET_CONTAINER_heap_get_size (recent_heap) >= DHT_MAX_RECENT)
156 {
157 recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
158 GNUNET_assert (recent_req != NULL);
159 GNUNET_SCHEDULER_cancel (recent_req->remove_task);
160 GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
161 GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom);
162 GNUNET_free (recent_req);
163 }
164
165 recent_req = GNUNET_malloc (sizeof (struct RecentRequest));
166 recent_req->uid = msg_ctx->unique_id;
167 memcpy (&recent_req->key, &msg_ctx->key, sizeof (GNUNET_HashCode));
168 recent_req->heap_node =
169 GNUNET_CONTAINER_heap_insert (recent_heap, recent_req,
170 GNUNET_TIME_absolute_get ().abs_value);
171 recent_req->bloom =
172 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
173
174
175}
176
177
178/**
179 * Initialize routing subsystem.
180 */
181void
182GDS_ROUTING_init ()
183{
184 recent_heap =
185 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
186 recent_map =
187 GNUNET_CONTAINER_multihashmap_create (MAX_BUCKETS / 8);
188}
189
190
191/**
192 * Shutdown routing subsystem.
193 */
194void
195GDS_ROUTING_done ()
196{
197 while (GNUNET_CONTAINER_heap_get_size (recent_heap) > 0)
198 {
199 recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
200 GNUNET_assert (recent_req != NULL);
201 GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
202 GNUNET_CONTAINER_bloomfilter_free (recent_req->bloom);
203 GNUNET_free (recent_req);
204 }
205 GNUNET_CONTAINER_heap_destroy (recent_heap);
206 recent_heap = NULL;
207 GNUNET_CONTAINER_multihashmap_destroy (recent_map);
208 recent_map = NULL;
209}
210
211/* end of gnunet-service-dht_routing.c */
diff --git a/src/dht/gnunet-service-dht_routing.h b/src/dht/gnunet-service-dht_routing.h
new file mode 100644
index 000000000..3ddfcc66e
--- /dev/null
+++ b/src/dht/gnunet-service-dht_routing.h
@@ -0,0 +1,93 @@
1/*
2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht_routing.h
23 * @brief GNUnet DHT tracking of requests for routing replies
24 * @author Christian Grothoff
25 */
26#ifndef GNUNET_SERVICE_DHT_ROUTING_H
27#define GNUNET_SERVICE_DHT_ROUTING_H
28
29
30/**
31 * Handle a reply (route to origin). Only forwards the reply back to
32 * other peers waiting for it. Does not do local caching or
33 * forwarding to local clients. Essentially calls
34 * GDS_NEIGHBOURS_handle_reply for all peers that sent us a matching
35 * request recently.
36 *
37 * @param type type of the block
38 * @param expiration_time when does the content expire
39 * @param key key for the content
40 * @param put_path_length number of entries in put_path
41 * @param put_path peers the original PUT traversed (if tracked)
42 * @param get_path_length number of entries in put_path
43 * @param get_path peers this reply has traversed so far (if tracked)
44 * @param data payload of the reply
45 * @param data_size number of bytes in data
46 */
47void
48GDS_ROUTING_process (uint32_t type,
49 GNUNET_TIME_Absolute expiration_time,
50 const GNUNET_HashCode *key,
51 unsigned int put_path_length,
52 struct GNUNET_PeerIdentity *put_path,
53 unsigned int get_path_length,
54 struct GNUNET_PeerIdentity *get_path,
55 const void *data,
56 size_t data_size);
57
58
59/**
60 * Add a new entry to our routing table.
61 *
62 * @param sender peer that originated the request
63 * @param type type of the block
64 * @param key key for the content
65 * @param xquery extended query
66 * @param xquery_size number of bytes in xquery
67 * @param reply_bf bloomfilter to filter duplicates
68 * @param reply_bf_mutator mutator for reply_bf
69*/
70void
71GDS_ROUTING_add (const GNUNET_PeerIdentity *sender,
72 uint32_t type,
73 const GNUNET_HashCode *key,
74 const void *xquery,
75 size_t xquery_size,
76 const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
77 uint32_t reply_bf_mutator);
78
79
80/**
81 * Initialize routing subsystem.
82 */
83void
84GDS_ROUTING_init (void);
85
86
87/**
88 * Shutdown routing subsystem.
89 */
90void
91GDS_ROUTING_done (void);
92
93#endif