aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht_neighbours.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-08-01 12:27:45 +0000
committerChristian Grothoff <christian@grothoff.org>2016-08-01 12:27:45 +0000
commit3d3cedf2e2c41883771cc2170b761840ac26b869 (patch)
tree41a6ce96f61bb74f97cee09251aad290ad1bdb9a /src/dht/gnunet-service-dht_neighbours.c
parenteebd97ba059c8289ee3098f5e66bda8f01a72e2f (diff)
downloadgnunet-3d3cedf2e2c41883771cc2170b761840ac26b869.tar.gz
gnunet-3d3cedf2e2c41883771cc2170b761840ac26b869.zip
-migrate to new core MQ API
Diffstat (limited to 'src/dht/gnunet-service-dht_neighbours.c')
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c958
1 files changed, 413 insertions, 545 deletions
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
index b7a2f89a2..1a2fa32e4 100644
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2009-2015 GNUnet e.V. 3 Copyright (C) 2009-2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -255,40 +255,6 @@ struct PeerGetMessage
255}; 255};
256GNUNET_NETWORK_STRUCT_END 256GNUNET_NETWORK_STRUCT_END
257 257
258/**
259 * Linked list of messages to send to a particular other peer.
260 */
261struct P2PPendingMessage
262{
263 /**
264 * Pointer to next item in the list
265 */
266 struct P2PPendingMessage *next;
267
268 /**
269 * Pointer to previous item in the list
270 */
271 struct P2PPendingMessage *prev;
272
273 /**
274 * Message importance level. FIXME: used? useful?
275 */
276 unsigned int importance;
277
278 /**
279 * When does this message time out?
280 */
281 struct GNUNET_TIME_Absolute timeout;
282
283 /**
284 * Actual message to be sent, allocated at the end of the struct:
285 * // msg = (cast) &pm[1];
286 * // GNUNET_memcpy (&pm[1], data, len);
287 */
288 const struct GNUNET_MessageHeader *msg;
289
290};
291
292 258
293/** 259/**
294 * Entry for a peer in a bucket. 260 * Entry for a peer in a bucket.
@@ -306,41 +272,14 @@ struct PeerInfo
306 struct PeerInfo *prev; 272 struct PeerInfo *prev;
307 273
308 /** 274 /**
309 * Count of outstanding messages for peer. 275 * Handle for sending messages to this peer.
310 */
311 unsigned int pending_count;
312
313 /**
314 * Head of pending messages to be sent to this peer.
315 */
316 struct P2PPendingMessage *head;
317
318 /**
319 * Tail of pending messages to be sent to this peer.
320 */
321 struct P2PPendingMessage *tail;
322
323 /**
324 * Core handle for sending messages to this peer.
325 */ 276 */
326 struct GNUNET_CORE_TransmitHandle *th; 277 struct GNUNET_MQ_Handle *mq;
327 278
328 /** 279 /**
329 * What is the identity of the peer? 280 * What is the identity of the peer?
330 */ 281 */
331 struct GNUNET_PeerIdentity id; 282 const struct GNUNET_PeerIdentity *id;
332
333#if 0
334 /**
335 * What is the average latency for replies received?
336 */
337 struct GNUNET_TIME_Relative latency;
338
339 /**
340 * Transport level distance to peer.
341 */
342 unsigned int distance;
343#endif
344 283
345}; 284};
346 285
@@ -766,28 +705,29 @@ send_find_peer_message (void *cls)
766 * 705 *
767 * @param cls closure 706 * @param cls closure
768 * @param peer peer identity this notification is about 707 * @param peer peer identity this notification is about
708 * @param mq message queue for sending messages to @a peer
709 * @return our `struct PeerInfo` for @a peer
769 */ 710 */
770static void 711static void *
771handle_core_connect (void *cls, 712handle_core_connect (void *cls,
772 const struct GNUNET_PeerIdentity *peer) 713 const struct GNUNET_PeerIdentity *peer,
714 struct GNUNET_MQ_Handle *mq)
773{ 715{
774 struct PeerInfo *ret; 716 struct PeerInfo *ret;
775 struct GNUNET_HashCode phash; 717 struct GNUNET_HashCode phash;
776 int peer_bucket; 718 int peer_bucket;
777 719
778 /* Check for connect to self message */ 720 /* Check for connect to self message */
779 if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) 721 if (0 == memcmp (&my_identity,
780 return; 722 peer,
723 sizeof (struct GNUNET_PeerIdentity)))
724 return NULL;
781 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 725 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782 "Connected to %s\n", 726 "Connected to %s\n",
783 GNUNET_i2s (peer)); 727 GNUNET_i2s (peer));
784 if (GNUNET_YES == 728 GNUNET_assert (GNUNET_NO ==
785 GNUNET_CONTAINER_multipeermap_contains (all_connected_peers, 729 GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
786 peer)) 730 peer));
787 {
788 GNUNET_break (0);
789 return;
790 }
791 GNUNET_STATISTICS_update (GDS_stats, 731 GNUNET_STATISTICS_update (GDS_stats,
792 gettext_noop ("# peers connected"), 732 gettext_noop ("# peers connected"),
793 1, 733 1,
@@ -798,11 +738,8 @@ handle_core_connect (void *cls,
798 peer_bucket = find_bucket (&phash); 738 peer_bucket = find_bucket (&phash);
799 GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS)); 739 GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS));
800 ret = GNUNET_new (struct PeerInfo); 740 ret = GNUNET_new (struct PeerInfo);
801#if 0 741 ret->id = peer;
802 ret->latency = latency; 742 ret->mq = mq;
803 ret->distance = distance;
804#endif
805 ret->id = *peer;
806 GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head, 743 GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head,
807 k_buckets[peer_bucket].tail, 744 k_buckets[peer_bucket].tail,
808 ret); 745 ret);
@@ -811,7 +748,7 @@ handle_core_connect (void *cls,
811 peer_bucket); 748 peer_bucket);
812 GNUNET_assert (GNUNET_OK == 749 GNUNET_assert (GNUNET_OK ==
813 GNUNET_CONTAINER_multipeermap_put (all_connected_peers, 750 GNUNET_CONTAINER_multipeermap_put (all_connected_peers,
814 peer, 751 ret->id,
815 ret, 752 ret,
816 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 753 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
817 if ( (peer_bucket > 0) && 754 if ( (peer_bucket > 0) &&
@@ -828,6 +765,7 @@ handle_core_connect (void *cls,
828 find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message, 765 find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message,
829 NULL); 766 NULL);
830 } 767 }
768 return ret;
831} 769}
832 770
833 771
@@ -836,33 +774,23 @@ handle_core_connect (void *cls,
836 * 774 *
837 * @param cls closure 775 * @param cls closure
838 * @param peer peer identity this notification is about 776 * @param peer peer identity this notification is about
777 * @param internal_cls our `struct PeerInfo` for @a peer
839 */ 778 */
840static void 779static void
841handle_core_disconnect (void *cls, 780handle_core_disconnect (void *cls,
842 const struct GNUNET_PeerIdentity *peer) 781 const struct GNUNET_PeerIdentity *peer,
782 void *internal_cls)
843{ 783{
844 struct PeerInfo *to_remove; 784 struct PeerInfo *to_remove = internal_cls;
845 int current_bucket; 785 int current_bucket;
846 struct P2PPendingMessage *pos;
847 unsigned int discarded;
848 struct GNUNET_HashCode phash; 786 struct GNUNET_HashCode phash;
849 787
850 /* Check for disconnect from self message */ 788 /* Check for disconnect from self message */
851 if (0 == memcmp (&my_identity, 789 if (NULL == to_remove)
852 peer,
853 sizeof (struct GNUNET_PeerIdentity)))
854 return; 790 return;
855 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 791 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856 "Disconnected %s\n", 792 "Disconnected %s\n",
857 GNUNET_i2s (peer)); 793 GNUNET_i2s (peer));
858 to_remove =
859 GNUNET_CONTAINER_multipeermap_get (all_connected_peers,
860 peer);
861 if (NULL == to_remove)
862 {
863 GNUNET_break (0);
864 return;
865 }
866 GNUNET_STATISTICS_update (GDS_stats, 794 GNUNET_STATISTICS_update (GDS_stats,
867 gettext_noop ("# peers connected"), 795 gettext_noop ("# peers connected"),
868 -1, 796 -1,
@@ -871,8 +799,8 @@ handle_core_disconnect (void *cls,
871 GNUNET_CONTAINER_multipeermap_remove (all_connected_peers, 799 GNUNET_CONTAINER_multipeermap_remove (all_connected_peers,
872 peer, 800 peer,
873 to_remove)); 801 to_remove));
874 if ( (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers) && 802 if ( (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)) &&
875 (GNUNET_YES != disable_try_connect)) ) 803 (GNUNET_YES != disable_try_connect) )
876 { 804 {
877 GNUNET_SCHEDULER_cancel (find_peer_task); 805 GNUNET_SCHEDULER_cancel (find_peer_task);
878 find_peer_task = NULL; 806 find_peer_task = NULL;
@@ -890,179 +818,13 @@ handle_core_disconnect (void *cls,
890 while ( (closest_bucket > 0) && 818 while ( (closest_bucket > 0) &&
891 (0 == k_buckets[closest_bucket].peers_size) ) 819 (0 == k_buckets[closest_bucket].peers_size) )
892 closest_bucket--; 820 closest_bucket--;
893 if (NULL != to_remove->th)
894 {
895 GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th);
896 to_remove->th = NULL;
897 }
898 discarded = 0;
899 while (NULL != (pos = to_remove->head))
900 {
901 GNUNET_CONTAINER_DLL_remove (to_remove->head,
902 to_remove->tail,
903 pos);
904 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
905 "Dropping message of type %u due to disconnect\n",
906 ntohs (pos->msg->type));
907 discarded++;
908 GNUNET_free (pos);
909 }
910 if (k_buckets[current_bucket].peers_size < bucket_size) 821 if (k_buckets[current_bucket].peers_size < bucket_size)
911 update_connect_preferences (); 822 update_connect_preferences ();
912 GNUNET_STATISTICS_update (GDS_stats,
913 gettext_noop ("# Queued messages discarded (peer disconnected)"),
914 discarded,
915 GNUNET_NO);
916 GNUNET_free (to_remove); 823 GNUNET_free (to_remove);
917} 824}
918 825
919 826
920/** 827/**
921 * Called when core is ready to send a message we asked for
922 * out to the destination.
923 *
924 * @param cls the 'struct PeerInfo' of the target peer
925 * @param size number of bytes available in @a buf
926 * @param buf where the callee should write the message
927 * @return number of bytes written to @a buf
928 */
929static size_t
930core_transmit_notify (void *cls,
931 size_t size,
932 void *buf)
933{
934 struct PeerInfo *peer = cls;
935 char *cbuf = buf;
936 struct P2PPendingMessage *pending;
937 size_t off;
938 size_t msize;
939
940 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941 "DHT ctn called with buffer of %u bytes\n",
942 (unsigned int) size);
943 peer->th = NULL;
944 while ((NULL != (pending = peer->head)) &&
945 (0 == GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value_us))
946 {
947 GNUNET_STATISTICS_update (GDS_stats,
948 gettext_noop
949 ("# Messages dropped (CORE timeout)"),
950 1,
951 GNUNET_NO);
952 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
953 "Dropping message of type %u due to timeout\n",
954 ntohs (pending->msg->type));
955 peer->pending_count--;
956 GNUNET_CONTAINER_DLL_remove (peer->head,
957 peer->tail,
958 pending);
959 GNUNET_free (pending);
960 }
961 if (NULL == pending)
962 {
963 /* no messages pending */
964 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
965 "No messages pending\n");
966 return 0;
967 }
968 if (NULL == buf)
969 {
970 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971 "Got NULL buffer, trying again\n");
972 peer->th =
973 GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
974 GNUNET_CORE_PRIO_BEST_EFFORT,
975 GNUNET_TIME_absolute_get_remaining
976 (pending->timeout),
977 &peer->id,
978 ntohs (pending->msg->size),
979 &core_transmit_notify,
980 peer);
981 GNUNET_break (NULL != peer->th);
982 return 0;
983 }
984 off = 0;
985 while ((NULL != (pending = peer->head)) &&
986 (size - off >= (msize = ntohs (pending->msg->size))))
987 {
988 GNUNET_STATISTICS_update (GDS_stats,
989 gettext_noop
990 ("# Bytes transmitted to other peers"), msize,
991 GNUNET_NO);
992 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
993 "Transmitting message of type %u to %s\n",
994 ntohs (pending->msg->type),
995 GNUNET_i2s (&peer->id));
996 GNUNET_memcpy (&cbuf[off],
997 pending->msg,
998 msize);
999 off += msize;
1000 peer->pending_count--;
1001 GNUNET_CONTAINER_DLL_remove (peer->head,
1002 peer->tail,
1003 pending);
1004 GNUNET_free (pending);
1005 }
1006 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1007 "%u bytes fit in %u bytes available, next message is %u bytes\n",
1008 (unsigned int) off,
1009 (unsigned int) size,
1010 (NULL != peer->head) ? ntohs (peer->head->msg->size) : 0);
1011 if (NULL != (pending = peer->head))
1012 {
1013 /* technically redundant, but easier to read and
1014 avoids bogus gcc warning... */
1015 msize = ntohs (pending->msg->size);
1016 peer->th =
1017 GNUNET_CORE_notify_transmit_ready (core_api,
1018 GNUNET_NO,
1019 GNUNET_CORE_PRIO_BEST_EFFORT,
1020 GNUNET_TIME_absolute_get_remaining (pending->timeout),
1021 &peer->id,
1022 msize,
1023 &core_transmit_notify,
1024 peer);
1025 GNUNET_break (NULL != peer->th);
1026 }
1027 return off;
1028}
1029
1030
1031/**
1032 * Transmit all messages in the peer's message queue.
1033 *
1034 * @param peer message queue to process
1035 */
1036static void
1037process_peer_queue (struct PeerInfo *peer)
1038{
1039 struct P2PPendingMessage *pending;
1040
1041 if (NULL == (pending = peer->head))
1042 return;
1043 if (NULL != peer->th)
1044 {
1045 GNUNET_CORE_notify_transmit_ready_cancel (peer->th);
1046 peer->th = NULL;
1047 }
1048 GNUNET_STATISTICS_update (GDS_stats,
1049 gettext_noop
1050 ("# Bytes of bandwidth requested from core"),
1051 ntohs (pending->msg->size), GNUNET_NO);
1052 peer->th =
1053 GNUNET_CORE_notify_transmit_ready (core_api, GNUNET_NO,
1054 GNUNET_CORE_PRIO_BEST_EFFORT,
1055 GNUNET_TIME_absolute_get_remaining
1056 (pending->timeout),
1057 &peer->id,
1058 ntohs (pending->msg->size),
1059 &core_transmit_notify,
1060 peer);
1061 GNUNET_break (NULL != peer->th);
1062}
1063
1064
1065/**
1066 * To how many peers should we (on average) forward the request to 828 * To how many peers should we (on average) forward the request to
1067 * obtain the desired target_replication count (on average). 829 * obtain the desired target_replication count (on average).
1068 * 830 *
@@ -1071,7 +833,8 @@ process_peer_queue (struct PeerInfo *peer)
1071 * @return Some number of peers to forward the message to 833 * @return Some number of peers to forward the message to
1072 */ 834 */
1073static unsigned int 835static unsigned int
1074get_forward_count (uint32_t hop_count, uint32_t target_replication) 836get_forward_count (uint32_t hop_count,
837 uint32_t target_replication)
1075{ 838{
1076 uint32_t random_value; 839 uint32_t random_value;
1077 uint32_t forward_count; 840 uint32_t forward_count;
@@ -1201,7 +964,7 @@ am_closest_peer (const struct GNUNET_HashCode *key,
1201 count = 0; 964 count = 0;
1202 while ((NULL != pos) && (count < bucket_size)) 965 while ((NULL != pos) && (count < bucket_size))
1203 { 966 {
1204 GNUNET_CRYPTO_hash (&pos->id, 967 GNUNET_CRYPTO_hash (pos->id,
1205 sizeof (struct GNUNET_PeerIdentity), 968 sizeof (struct GNUNET_PeerIdentity),
1206 &phash); 969 &phash);
1207 if ((NULL != bloom) && 970 if ((NULL != bloom) &&
@@ -1265,7 +1028,7 @@ select_peer (const struct GNUNET_HashCode *key,
1265 count = 0; 1028 count = 0;
1266 while ((pos != NULL) && (count < bucket_size)) 1029 while ((pos != NULL) && (count < bucket_size))
1267 { 1030 {
1268 GNUNET_CRYPTO_hash (&pos->id, 1031 GNUNET_CRYPTO_hash (pos->id,
1269 sizeof (struct GNUNET_PeerIdentity), 1032 sizeof (struct GNUNET_PeerIdentity),
1270 &phash); 1033 &phash);
1271 if ((bloom == NULL) || 1034 if ((bloom == NULL) ||
@@ -1283,7 +1046,8 @@ select_peer (const struct GNUNET_HashCode *key,
1283 { 1046 {
1284 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1047 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1285 "Excluded peer `%s' due to BF match in greedy routing for %s\n", 1048 "Excluded peer `%s' due to BF match in greedy routing for %s\n",
1286 GNUNET_i2s (&pos->id), GNUNET_h2s (key)); 1049 GNUNET_i2s (pos->id),
1050 GNUNET_h2s (key));
1287 GNUNET_STATISTICS_update (GDS_stats, 1051 GNUNET_STATISTICS_update (GDS_stats,
1288 gettext_noop 1052 gettext_noop
1289 ("# Peers excluded from routing due to Bloomfilter"), 1053 ("# Peers excluded from routing due to Bloomfilter"),
@@ -1314,7 +1078,7 @@ select_peer (const struct GNUNET_HashCode *key,
1314 pos = k_buckets[bc].head; 1078 pos = k_buckets[bc].head;
1315 while ((pos != NULL) && (count < bucket_size)) 1079 while ((pos != NULL) && (count < bucket_size))
1316 { 1080 {
1317 GNUNET_CRYPTO_hash (&pos->id, 1081 GNUNET_CRYPTO_hash (pos->id,
1318 sizeof (struct GNUNET_PeerIdentity), 1082 sizeof (struct GNUNET_PeerIdentity),
1319 &phash); 1083 &phash);
1320 if ((bloom != NULL) && 1084 if ((bloom != NULL) &&
@@ -1327,7 +1091,8 @@ select_peer (const struct GNUNET_HashCode *key,
1327 1, GNUNET_NO); 1091 1, GNUNET_NO);
1328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1092 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1329 "Excluded peer `%s' due to BF match in random routing for %s\n", 1093 "Excluded peer `%s' due to BF match in random routing for %s\n",
1330 GNUNET_i2s (&pos->id), GNUNET_h2s (key)); 1094 GNUNET_i2s (pos->id),
1095 GNUNET_h2s (key));
1331 pos = pos->next; 1096 pos = pos->next;
1332 continue; /* Ignore bloomfiltered peers */ 1097 continue; /* Ignore bloomfiltered peers */
1333 } 1098 }
@@ -1349,7 +1114,7 @@ select_peer (const struct GNUNET_HashCode *key,
1349 { 1114 {
1350 for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next) 1115 for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next)
1351 { 1116 {
1352 GNUNET_CRYPTO_hash (&pos->id, 1117 GNUNET_CRYPTO_hash (pos->id,
1353 sizeof (struct GNUNET_PeerIdentity), 1118 sizeof (struct GNUNET_PeerIdentity),
1354 &phash); 1119 &phash);
1355 if ((bloom != NULL) && 1120 if ((bloom != NULL) &&
@@ -1383,7 +1148,8 @@ select_peer (const struct GNUNET_HashCode *key,
1383static unsigned int 1148static unsigned int
1384get_target_peers (const struct GNUNET_HashCode *key, 1149get_target_peers (const struct GNUNET_HashCode *key,
1385 struct GNUNET_CONTAINER_BloomFilter *bloom, 1150 struct GNUNET_CONTAINER_BloomFilter *bloom,
1386 uint32_t hop_count, uint32_t target_replication, 1151 uint32_t hop_count,
1152 uint32_t target_replication,
1387 struct PeerInfo ***targets) 1153 struct PeerInfo ***targets)
1388{ 1154{
1389 unsigned int ret; 1155 unsigned int ret;
@@ -1393,20 +1159,22 @@ get_target_peers (const struct GNUNET_HashCode *key,
1393 struct GNUNET_HashCode nhash; 1159 struct GNUNET_HashCode nhash;
1394 1160
1395 GNUNET_assert (NULL != bloom); 1161 GNUNET_assert (NULL != bloom);
1396 ret = get_forward_count (hop_count, target_replication); 1162 ret = get_forward_count (hop_count,
1163 target_replication);
1397 if (0 == ret) 1164 if (0 == ret)
1398 { 1165 {
1399 *targets = NULL; 1166 *targets = NULL;
1400 return 0; 1167 return 0;
1401 } 1168 }
1402 rtargets = GNUNET_malloc (sizeof (struct PeerInfo *) * ret); 1169 rtargets = GNUNET_new_array (ret,
1170 struct PeerInfo *);
1403 for (off = 0; off < ret; off++) 1171 for (off = 0; off < ret; off++)
1404 { 1172 {
1405 nxt = select_peer (key, bloom, hop_count); 1173 nxt = select_peer (key, bloom, hop_count);
1406 if (NULL == nxt) 1174 if (NULL == nxt)
1407 break; 1175 break;
1408 rtargets[off] = nxt; 1176 rtargets[off] = nxt;
1409 GNUNET_CRYPTO_hash (&nxt->id, 1177 GNUNET_CRYPTO_hash (nxt->id,
1410 sizeof (struct GNUNET_PeerIdentity), 1178 sizeof (struct GNUNET_PeerIdentity),
1411 &nhash); 1179 &nhash);
1412 GNUNET_break (GNUNET_NO == 1180 GNUNET_break (GNUNET_NO ==
@@ -1467,14 +1235,15 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1467 const struct GNUNET_HashCode *key, 1235 const struct GNUNET_HashCode *key,
1468 unsigned int put_path_length, 1236 unsigned int put_path_length,
1469 struct GNUNET_PeerIdentity *put_path, 1237 struct GNUNET_PeerIdentity *put_path,
1470 const void *data, size_t data_size) 1238 const void *data,
1239 size_t data_size)
1471{ 1240{
1472 unsigned int target_count; 1241 unsigned int target_count;
1473 unsigned int i; 1242 unsigned int i;
1474 struct PeerInfo **targets; 1243 struct PeerInfo **targets;
1475 struct PeerInfo *target; 1244 struct PeerInfo *target;
1476 struct P2PPendingMessage *pending;
1477 size_t msize; 1245 size_t msize;
1246 struct GNUNET_MQ_Envelope *env;
1478 struct PeerPutMessage *ppm; 1247 struct PeerPutMessage *ppm;
1479 struct GNUNET_PeerIdentity *pp; 1248 struct GNUNET_PeerIdentity *pp;
1480 struct GNUNET_HashCode thash; 1249 struct GNUNET_HashCode thash;
@@ -1486,47 +1255,54 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1486 GNUNET_i2s (&my_identity), 1255 GNUNET_i2s (&my_identity),
1487 GNUNET_h2s (key)); 1256 GNUNET_h2s (key));
1488 GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash); 1257 GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash);
1489 GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"), 1258 GNUNET_STATISTICS_update (GDS_stats,
1490 1, GNUNET_NO); 1259 gettext_noop ("# PUT requests routed"),
1260 1,
1261 GNUNET_NO);
1491 target_count = 1262 target_count =
1492 get_target_peers (key, bf, hop_count, desired_replication_level, 1263 get_target_peers (key,
1264 bf,
1265 hop_count,
1266 desired_replication_level,
1493 &targets); 1267 &targets);
1494 if (0 == target_count) 1268 if (0 == target_count)
1495 { 1269 {
1496 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1270 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1497 "Routing PUT for %s terminates after %u hops at %s\n", 1271 "Routing PUT for %s terminates after %u hops at %s\n",
1498 GNUNET_h2s (key), (unsigned int) hop_count, 1272 GNUNET_h2s (key),
1273 (unsigned int) hop_count,
1499 GNUNET_i2s (&my_identity)); 1274 GNUNET_i2s (&my_identity));
1500 return GNUNET_NO; 1275 return GNUNET_NO;
1501 } 1276 }
1502 msize = 1277 msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size;
1503 put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + 1278 if (msize + sizeof (struct PeerPutMessage)
1504 sizeof (struct PeerPutMessage); 1279 >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1505 if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1506 { 1280 {
1507 put_path_length = 0; 1281 put_path_length = 0;
1508 msize = data_size + sizeof (struct PeerPutMessage); 1282 msize = data_size;
1509 } 1283 }
1510 if (msize >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) 1284 if (msize + sizeof (struct PeerPutMessage)
1285 >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE)
1511 { 1286 {
1512 GNUNET_break (0); 1287 GNUNET_break (0);
1513 GNUNET_free (targets); 1288 GNUNET_free (targets);
1514 return GNUNET_NO; 1289 return GNUNET_NO;
1515 } 1290 }
1516 GNUNET_STATISTICS_update (GDS_stats, 1291 GNUNET_STATISTICS_update (GDS_stats,
1517 gettext_noop 1292 gettext_noop ("# PUT messages queued for transmission"),
1518 ("# PUT messages queued for transmission"), 1293 target_count,
1519 target_count, GNUNET_NO); 1294 GNUNET_NO);
1520 skip_count = 0; 1295 skip_count = 0;
1521 for (i = 0; i < target_count; i++) 1296 for (i = 0; i < target_count; i++)
1522 { 1297 {
1523 target = targets[i]; 1298 target = targets[i];
1524 if (target->pending_count >= MAXIMUM_PENDING_PER_PEER) 1299 if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1525 { 1300 {
1526 /* skip */ 1301 /* skip */
1527 GNUNET_STATISTICS_update (GDS_stats, 1302 GNUNET_STATISTICS_update (GDS_stats,
1528 gettext_noop ("# P2P messages dropped due to full queue"), 1303 gettext_noop ("# P2P messages dropped due to full queue"),
1529 1, GNUNET_NO); 1304 1,
1305 GNUNET_NO);
1530 skip_count++; 1306 skip_count++;
1531 continue; 1307 continue;
1532 } 1308 }
@@ -1534,21 +1310,17 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1534 "Routing PUT for %s after %u hops to %s\n", 1310 "Routing PUT for %s after %u hops to %s\n",
1535 GNUNET_h2s (key), 1311 GNUNET_h2s (key),
1536 (unsigned int) hop_count, 1312 (unsigned int) hop_count,
1537 GNUNET_i2s (&target->id)); 1313 GNUNET_i2s (target->id));
1538 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 1314 env = GNUNET_MQ_msg_extra (ppm,
1539 pending->importance = 0; /* FIXME */ 1315 msize,
1540 pending->timeout = expiration_time; 1316 GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1541 ppm = (struct PeerPutMessage *) &pending[1];
1542 pending->msg = &ppm->header;
1543 ppm->header.size = htons (msize);
1544 ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1545 ppm->options = htonl (options); 1317 ppm->options = htonl (options);
1546 ppm->type = htonl (type); 1318 ppm->type = htonl (type);
1547 ppm->hop_count = htonl (hop_count + 1); 1319 ppm->hop_count = htonl (hop_count + 1);
1548 ppm->desired_replication_level = htonl (desired_replication_level); 1320 ppm->desired_replication_level = htonl (desired_replication_level);
1549 ppm->put_path_length = htonl (put_path_length); 1321 ppm->put_path_length = htonl (put_path_length);
1550 ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time); 1322 ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1551 GNUNET_CRYPTO_hash (&target->id, 1323 GNUNET_CRYPTO_hash (target->id,
1552 sizeof (struct GNUNET_PeerIdentity), 1324 sizeof (struct GNUNET_PeerIdentity),
1553 &thash); 1325 &thash);
1554 GNUNET_break (GNUNET_YES == 1326 GNUNET_break (GNUNET_YES ==
@@ -1560,17 +1332,14 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1560 DHT_BLOOM_SIZE)); 1332 DHT_BLOOM_SIZE));
1561 ppm->key = *key; 1333 ppm->key = *key;
1562 pp = (struct GNUNET_PeerIdentity *) &ppm[1]; 1334 pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1563 GNUNET_memcpy (pp, put_path, 1335 GNUNET_memcpy (pp,
1564 sizeof (struct GNUNET_PeerIdentity) * put_path_length); 1336 put_path,
1337 sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1565 GNUNET_memcpy (&pp[put_path_length], 1338 GNUNET_memcpy (&pp[put_path_length],
1566 data, 1339 data,
1567 data_size); 1340 data_size);
1568 GNUNET_CONTAINER_DLL_insert_tail (target->head, 1341 GNUNET_MQ_send (target->mq,
1569 target->tail, 1342 env);
1570 pending);
1571 target->pending_count++;
1572 if (pending == target->head)
1573 process_peer_queue (target);
1574 } 1343 }
1575 GNUNET_free (targets); 1344 GNUNET_free (targets);
1576 return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO; 1345 return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
@@ -1609,7 +1378,7 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1609 unsigned int i; 1378 unsigned int i;
1610 struct PeerInfo **targets; 1379 struct PeerInfo **targets;
1611 struct PeerInfo *target; 1380 struct PeerInfo *target;
1612 struct P2PPendingMessage *pending; 1381 struct GNUNET_MQ_Envelope *env;
1613 size_t msize; 1382 size_t msize;
1614 struct PeerGetMessage *pgm; 1383 struct PeerGetMessage *pgm;
1615 char *xq; 1384 char *xq;
@@ -1618,11 +1387,15 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1618 unsigned int skip_count; 1387 unsigned int skip_count;
1619 1388
1620 GNUNET_assert (NULL != peer_bf); 1389 GNUNET_assert (NULL != peer_bf);
1621 GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"), 1390 GNUNET_STATISTICS_update (GDS_stats,
1622 1, GNUNET_NO); 1391 gettext_noop ("# GET requests routed"),
1623 target_count = 1392 1,
1624 get_target_peers (key, peer_bf, hop_count, desired_replication_level, 1393 GNUNET_NO);
1625 &targets); 1394 target_count = get_target_peers (key,
1395 peer_bf,
1396 hop_count,
1397 desired_replication_level,
1398 &targets);
1626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1399 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1627 "Adding myself (%s) to GET bloomfilter for %s\n", 1400 "Adding myself (%s) to GET bloomfilter for %s\n",
1628 GNUNET_i2s (&my_identity), 1401 GNUNET_i2s (&my_identity),
@@ -1633,28 +1406,29 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1633 { 1406 {
1634 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1407 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1635 "Routing GET for %s terminates after %u hops at %s\n", 1408 "Routing GET for %s terminates after %u hops at %s\n",
1636 GNUNET_h2s (key), (unsigned int) hop_count, 1409 GNUNET_h2s (key),
1410 (unsigned int) hop_count,
1637 GNUNET_i2s (&my_identity)); 1411 GNUNET_i2s (&my_identity));
1638 return GNUNET_NO; 1412 return GNUNET_NO;
1639 } 1413 }
1640 reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf); 1414 reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
1641 msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size; 1415 msize = xquery_size + reply_bf_size;
1642 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) 1416 if (msize + sizeof (struct PeerGetMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1643 { 1417 {
1644 GNUNET_break (0); 1418 GNUNET_break (0);
1645 GNUNET_free (targets); 1419 GNUNET_free (targets);
1646 return GNUNET_NO; 1420 return GNUNET_NO;
1647 } 1421 }
1648 GNUNET_STATISTICS_update (GDS_stats, 1422 GNUNET_STATISTICS_update (GDS_stats,
1649 gettext_noop 1423 gettext_noop ("# GET messages queued for transmission"),
1650 ("# GET messages queued for transmission"), 1424 target_count,
1651 target_count, GNUNET_NO); 1425 GNUNET_NO);
1652 /* forward request */ 1426 /* forward request */
1653 skip_count = 0; 1427 skip_count = 0;
1654 for (i = 0; i < target_count; i++) 1428 for (i = 0; i < target_count; i++)
1655 { 1429 {
1656 target = targets[i]; 1430 target = targets[i];
1657 if (target->pending_count >= MAXIMUM_PENDING_PER_PEER) 1431 if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER)
1658 { 1432 {
1659 /* skip */ 1433 /* skip */
1660 GNUNET_STATISTICS_update (GDS_stats, 1434 GNUNET_STATISTICS_update (GDS_stats,
@@ -1667,21 +1441,17 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1667 "Routing GET for %s after %u hops to %s\n", 1441 "Routing GET for %s after %u hops to %s\n",
1668 GNUNET_h2s (key), 1442 GNUNET_h2s (key),
1669 (unsigned int) hop_count, 1443 (unsigned int) hop_count,
1670 GNUNET_i2s (&target->id)); 1444 GNUNET_i2s (target->id));
1671 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 1445 env = GNUNET_MQ_msg_extra (pgm,
1672 pending->importance = 0; /* FIXME */ 1446 msize,
1673 pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT); 1447 GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1674 pgm = (struct PeerGetMessage *) &pending[1];
1675 pending->msg = &pgm->header;
1676 pgm->header.size = htons (msize);
1677 pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1678 pgm->options = htonl (options); 1448 pgm->options = htonl (options);
1679 pgm->type = htonl (type); 1449 pgm->type = htonl (type);
1680 pgm->hop_count = htonl (hop_count + 1); 1450 pgm->hop_count = htonl (hop_count + 1);
1681 pgm->desired_replication_level = htonl (desired_replication_level); 1451 pgm->desired_replication_level = htonl (desired_replication_level);
1682 pgm->xquery_size = htonl (xquery_size); 1452 pgm->xquery_size = htonl (xquery_size);
1683 pgm->bf_mutator = reply_bf_mutator; 1453 pgm->bf_mutator = reply_bf_mutator;
1684 GNUNET_CRYPTO_hash (&target->id, 1454 GNUNET_CRYPTO_hash (target->id,
1685 sizeof (struct GNUNET_PeerIdentity), 1455 sizeof (struct GNUNET_PeerIdentity),
1686 &thash); 1456 &thash);
1687 GNUNET_break (GNUNET_YES == 1457 GNUNET_break (GNUNET_YES ==
@@ -1693,19 +1463,17 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1693 DHT_BLOOM_SIZE)); 1463 DHT_BLOOM_SIZE));
1694 pgm->key = *key; 1464 pgm->key = *key;
1695 xq = (char *) &pgm[1]; 1465 xq = (char *) &pgm[1];
1696 GNUNET_memcpy (xq, xquery, xquery_size); 1466 GNUNET_memcpy (xq,
1467 xquery,
1468 xquery_size);
1697 if (NULL != reply_bf) 1469 if (NULL != reply_bf)
1698 GNUNET_assert (GNUNET_OK == 1470 GNUNET_assert (GNUNET_OK ==
1699 GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf, 1471 GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
1700 &xq 1472 &xq
1701 [xquery_size], 1473 [xquery_size],
1702 reply_bf_size)); 1474 reply_bf_size));
1703 GNUNET_CONTAINER_DLL_insert_tail (target->head, 1475 GNUNET_MQ_send (target->mq,
1704 target->tail, 1476 env);
1705 pending);
1706 target->pending_count++;
1707 if (pending == target->head)
1708 process_peer_queue (target);
1709 } 1477 }
1710 GNUNET_free (targets); 1478 GNUNET_free (targets);
1711 return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO; 1479 return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO;
@@ -1741,16 +1509,14 @@ GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1741 size_t data_size) 1509 size_t data_size)
1742{ 1510{
1743 struct PeerInfo *pi; 1511 struct PeerInfo *pi;
1744 struct P2PPendingMessage *pending; 1512 struct GNUNET_MQ_Envelope *env;
1745 size_t msize; 1513 size_t msize;
1746 struct PeerResultMessage *prm; 1514 struct PeerResultMessage *prm;
1747 struct GNUNET_PeerIdentity *paths; 1515 struct GNUNET_PeerIdentity *paths;
1748 1516
1749 msize = 1517 msize = data_size + (get_path_length + put_path_length) *
1750 data_size + sizeof (struct PeerResultMessage) + (get_path_length +
1751 put_path_length) *
1752 sizeof (struct GNUNET_PeerIdentity); 1518 sizeof (struct GNUNET_PeerIdentity);
1753 if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || 1519 if ((msize + sizeof (struct PeerResultMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
1754 (get_path_length > 1520 (get_path_length >
1755 GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || 1521 GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) ||
1756 (put_path_length > 1522 (put_path_length >
@@ -1770,7 +1536,7 @@ GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1770 GNUNET_h2s (key)); 1536 GNUNET_h2s (key));
1771 return; 1537 return;
1772 } 1538 }
1773 if (pi->pending_count >= MAXIMUM_PENDING_PER_PEER) 1539 if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER)
1774 { 1540 {
1775 /* skip */ 1541 /* skip */
1776 GNUNET_STATISTICS_update (GDS_stats, 1542 GNUNET_STATISTICS_update (GDS_stats,
@@ -1791,13 +1557,9 @@ GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1791 gettext_noop 1557 gettext_noop
1792 ("# RESULT messages queued for transmission"), 1, 1558 ("# RESULT messages queued for transmission"), 1,
1793 GNUNET_NO); 1559 GNUNET_NO);
1794 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 1560 env = GNUNET_MQ_msg_extra (prm,
1795 pending->importance = 0; /* FIXME */ 1561 msize,
1796 pending->timeout = expiration_time; 1562 GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1797 prm = (struct PeerResultMessage *) &pending[1];
1798 pending->msg = &prm->header;
1799 prm->header.size = htons (msize);
1800 prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT);
1801 prm->type = htonl (type); 1563 prm->type = htonl (type);
1802 prm->put_path_length = htonl (put_path_length); 1564 prm->put_path_length = htonl (put_path_length);
1803 prm->get_path_length = htonl (get_path_length); 1565 prm->get_path_length = htonl (get_path_length);
@@ -1805,19 +1567,16 @@ GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target,
1805 prm->key = *key; 1567 prm->key = *key;
1806 paths = (struct GNUNET_PeerIdentity *) &prm[1]; 1568 paths = (struct GNUNET_PeerIdentity *) &prm[1];
1807 GNUNET_memcpy (paths, 1569 GNUNET_memcpy (paths,
1808 put_path, 1570 put_path,
1809 put_path_length * sizeof (struct GNUNET_PeerIdentity)); 1571 put_path_length * sizeof (struct GNUNET_PeerIdentity));
1810 GNUNET_memcpy (&paths[put_path_length], 1572 GNUNET_memcpy (&paths[put_path_length],
1811 get_path, 1573 get_path,
1812 get_path_length * sizeof (struct GNUNET_PeerIdentity)); 1574 get_path_length * sizeof (struct GNUNET_PeerIdentity));
1813 GNUNET_memcpy (&paths[put_path_length + get_path_length], 1575 GNUNET_memcpy (&paths[put_path_length + get_path_length],
1814 data, 1576 data,
1815 data_size); 1577 data_size);
1816 GNUNET_CONTAINER_DLL_insert (pi->head, 1578 GNUNET_MQ_send (pi->mq,
1817 pi->tail, 1579 env);
1818 pending);
1819 pi->pending_count++;
1820 process_peer_queue (pi);
1821} 1580}
1822 1581
1823 1582
@@ -1843,21 +1602,45 @@ core_init (void *cls,
1843 1602
1844 1603
1845/** 1604/**
1846 * Core handler for p2p put requests. 1605 * Check validity of a p2p put request.
1847 * 1606 *
1848 * @param cls closure 1607 * @param cls closure with the `struct PeerInfo` of the sender
1849 * @param peer sender of the request
1850 * @param message message 1608 * @param message message
1851 * @param peer peer identity this notification is about 1609 * @return #GNUNET_OK if the message is valid
1852 * @return #GNUNET_OK to keep the connection open,
1853 * #GNUNET_SYSERR to close it (signal serious error)
1854 */ 1610 */
1855static int 1611static int
1612check_dht_p2p_put (void *cls,
1613 const struct PeerPutMessage *put)
1614{
1615 uint32_t putlen;
1616 uint16_t msize;
1617
1618 msize = ntohs (put->header.size);
1619 putlen = ntohl (put->put_path_length);
1620 if ((msize <
1621 sizeof (struct PeerPutMessage) +
1622 putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1623 (putlen >
1624 GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1625 {
1626 GNUNET_break_op (0);
1627 return GNUNET_SYSERR;
1628 }
1629 return GNUNET_OK;
1630}
1631
1632
1633/**
1634 * Core handler for p2p put requests.
1635 *
1636 * @param cls closure with the `struct PeerInfo` of the sender
1637 * @param message message
1638 */
1639static void
1856handle_dht_p2p_put (void *cls, 1640handle_dht_p2p_put (void *cls,
1857 const struct GNUNET_PeerIdentity *peer, 1641 const struct PeerPutMessage *put)
1858 const struct GNUNET_MessageHeader *message)
1859{ 1642{
1860 const struct PeerPutMessage *put; 1643 struct PeerInfo *peer = cls;
1861 const struct GNUNET_PeerIdentity *put_path; 1644 const struct GNUNET_PeerIdentity *put_path;
1862 const void *payload; 1645 const void *payload;
1863 uint32_t putlen; 1646 uint32_t putlen;
@@ -1869,23 +1652,8 @@ handle_dht_p2p_put (void *cls,
1869 struct GNUNET_HashCode phash; 1652 struct GNUNET_HashCode phash;
1870 int forwarded; 1653 int forwarded;
1871 1654
1872 msize = ntohs (message->size); 1655 msize = ntohs (put->header.size);
1873 if (msize < sizeof (struct PeerPutMessage))
1874 {
1875 GNUNET_break_op (0);
1876 return GNUNET_YES;
1877 }
1878 put = (const struct PeerPutMessage *) message;
1879 putlen = ntohl (put->put_path_length); 1656 putlen = ntohl (put->put_path_length);
1880 if ((msize <
1881 sizeof (struct PeerPutMessage) +
1882 putlen * sizeof (struct GNUNET_PeerIdentity)) ||
1883 (putlen >
1884 GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
1885 {
1886 GNUNET_break_op (0);
1887 return GNUNET_YES;
1888 }
1889 GNUNET_STATISTICS_update (GDS_stats, 1657 GNUNET_STATISTICS_update (GDS_stats,
1890 gettext_noop ("# P2P PUT requests received"), 1658 gettext_noop ("# P2P PUT requests received"),
1891 1, 1659 1,
@@ -1897,13 +1665,16 @@ handle_dht_p2p_put (void *cls,
1897 put_path = (const struct GNUNET_PeerIdentity *) &put[1]; 1665 put_path = (const struct GNUNET_PeerIdentity *) &put[1];
1898 payload = &put_path[putlen]; 1666 payload = &put_path[putlen];
1899 options = ntohl (put->options); 1667 options = ntohl (put->options);
1900 payload_size = 1668 payload_size = msize - (sizeof (struct PeerPutMessage) +
1901 msize - (sizeof (struct PeerPutMessage) + 1669 putlen * sizeof (struct GNUNET_PeerIdentity));
1902 putlen * sizeof (struct GNUNET_PeerIdentity));
1903 1670
1904 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n", 1671 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1905 GNUNET_h2s (&put->key), GNUNET_i2s (peer)); 1672 "PUT for `%s' from %s\n",
1906 GNUNET_CRYPTO_hash (peer, sizeof (struct GNUNET_PeerIdentity), &phash); 1673 GNUNET_h2s (&put->key),
1674 GNUNET_i2s (peer->id));
1675 GNUNET_CRYPTO_hash (peer->id,
1676 sizeof (struct GNUNET_PeerIdentity),
1677 &phash);
1907 if (GNUNET_YES == log_route_details_stderr) 1678 if (GNUNET_YES == log_route_details_stderr)
1908 { 1679 {
1909 char *tmp; 1680 char *tmp;
@@ -1912,7 +1683,7 @@ handle_dht_p2p_put (void *cls,
1912 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, 1683 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1913 "R5N PUT %s: %s->%s (%u, %u=>%u)\n", 1684 "R5N PUT %s: %s->%s (%u, %u=>%u)\n",
1914 GNUNET_h2s (&put->key), 1685 GNUNET_h2s (&put->key),
1915 GNUNET_i2s (peer), 1686 GNUNET_i2s (peer->id),
1916 tmp, 1687 tmp,
1917 ntohl(put->hop_count), 1688 ntohl(put->hop_count),
1918 GNUNET_CRYPTO_hash_matching_bits (&phash, 1689 GNUNET_CRYPTO_hash_matching_bits (&phash,
@@ -1923,25 +1694,31 @@ handle_dht_p2p_put (void *cls,
1923 GNUNET_free (tmp); 1694 GNUNET_free (tmp);
1924 } 1695 }
1925 switch (GNUNET_BLOCK_get_key 1696 switch (GNUNET_BLOCK_get_key
1926 (GDS_block_context, ntohl (put->type), payload, payload_size, 1697 (GDS_block_context,
1698 ntohl (put->type),
1699 payload,
1700 payload_size,
1927 &test_key)) 1701 &test_key))
1928 { 1702 {
1929 case GNUNET_YES: 1703 case GNUNET_YES:
1930 if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode))) 1704 if (0 != memcmp (&test_key,
1705 &put->key,
1706 sizeof (struct GNUNET_HashCode)))
1931 { 1707 {
1932 char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key)); 1708 char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
1933 1709
1934 GNUNET_break_op (0); 1710 GNUNET_break_op (0);
1935 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1711 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1936 "PUT with key `%s' for block with key %s\n", 1712 "PUT with key `%s' for block with key %s\n",
1937 put_s, GNUNET_h2s_full (&test_key)); 1713 put_s,
1714 GNUNET_h2s_full (&test_key));
1938 GNUNET_free (put_s); 1715 GNUNET_free (put_s);
1939 return GNUNET_YES; 1716 return;
1940 } 1717 }
1941 break; 1718 break;
1942 case GNUNET_NO: 1719 case GNUNET_NO:
1943 GNUNET_break_op (0); 1720 GNUNET_break_op (0);
1944 return GNUNET_YES; 1721 return;
1945 case GNUNET_SYSERR: 1722 case GNUNET_SYSERR:
1946 /* cannot verify, good luck */ 1723 /* cannot verify, good luck */
1947 break; 1724 break;
@@ -1968,7 +1745,7 @@ handle_dht_p2p_put (void *cls,
1968 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: 1745 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1969 default: 1746 default:
1970 GNUNET_break_op (0); 1747 GNUNET_break_op (0);
1971 return GNUNET_OK; 1748 return;
1972 } 1749 }
1973 } 1750 }
1974 1751
@@ -1976,15 +1753,18 @@ handle_dht_p2p_put (void *cls,
1976 DHT_BLOOM_SIZE, 1753 DHT_BLOOM_SIZE,
1977 GNUNET_CONSTANTS_BLOOMFILTER_K); 1754 GNUNET_CONSTANTS_BLOOMFILTER_K);
1978 GNUNET_break_op (GNUNET_YES == 1755 GNUNET_break_op (GNUNET_YES ==
1979 GNUNET_CONTAINER_bloomfilter_test (bf, &phash)); 1756 GNUNET_CONTAINER_bloomfilter_test (bf,
1757 &phash));
1980 { 1758 {
1981 struct GNUNET_PeerIdentity pp[putlen + 1]; 1759 struct GNUNET_PeerIdentity pp[putlen + 1];
1982 1760
1983 /* extend 'put path' by sender */ 1761 /* extend 'put path' by sender */
1984 if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE)) 1762 if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
1985 { 1763 {
1986 GNUNET_memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity)); 1764 GNUNET_memcpy (pp,
1987 pp[putlen] = *peer; 1765 put_path,
1766 putlen * sizeof (struct GNUNET_PeerIdentity));
1767 pp[putlen] = *peer->id;
1988 putlen++; 1768 putlen++;
1989 } 1769 }
1990 else 1770 else
@@ -1992,8 +1772,14 @@ handle_dht_p2p_put (void *cls,
1992 1772
1993 /* give to local clients */ 1773 /* give to local clients */
1994 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time), 1774 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1995 &put->key, 0, NULL, putlen, pp, ntohl (put->type), 1775 &put->key,
1996 payload_size, payload); 1776 0,
1777 NULL,
1778 putlen,
1779 pp,
1780 ntohl (put->type),
1781 payload_size,
1782 payload);
1997 /* store locally */ 1783 /* store locally */
1998 if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || 1784 if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
1999 (am_closest_peer (&put->key, bf))) 1785 (am_closest_peer (&put->key, bf)))
@@ -2001,10 +1787,12 @@ handle_dht_p2p_put (void *cls,
2001 (put->expiration_time), &put->key, putlen, pp, 1787 (put->expiration_time), &put->key, putlen, pp,
2002 ntohl (put->type), payload_size, payload); 1788 ntohl (put->type), payload_size, payload);
2003 /* route to other peers */ 1789 /* route to other peers */
2004 forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type), options, 1790 forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1791 options,
2005 ntohl (put->desired_replication_level), 1792 ntohl (put->desired_replication_level),
2006 GNUNET_TIME_absolute_ntoh (put->expiration_time), 1793 GNUNET_TIME_absolute_ntoh (put->expiration_time),
2007 ntohl (put->hop_count), bf, 1794 ntohl (put->hop_count),
1795 bf,
2008 &put->key, 1796 &put->key,
2009 putlen, 1797 putlen,
2010 pp, 1798 pp,
@@ -2025,7 +1813,6 @@ handle_dht_p2p_put (void *cls,
2025 payload_size); 1813 payload_size);
2026 } 1814 }
2027 GNUNET_CONTAINER_bloomfilter_free (bf); 1815 GNUNET_CONTAINER_bloomfilter_free (bf);
2028 return GNUNET_YES;
2029} 1816}
2030 1817
2031 1818
@@ -2040,8 +1827,9 @@ handle_dht_p2p_put (void *cls,
2040 */ 1827 */
2041static void 1828static void
2042handle_find_peer (const struct GNUNET_PeerIdentity *sender, 1829handle_find_peer (const struct GNUNET_PeerIdentity *sender,
2043 const struct GNUNET_HashCode * key, 1830 const struct GNUNET_HashCode *key,
2044 struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator) 1831 struct GNUNET_CONTAINER_BloomFilter *bf,
1832 uint32_t bf_mutator)
2045{ 1833{
2046 int bucket_idx; 1834 int bucket_idx;
2047 struct PeerBucket *bucket; 1835 struct PeerBucket *bucket;
@@ -2058,10 +1846,16 @@ handle_find_peer (const struct GNUNET_PeerIdentity *sender,
2058 if ((NULL == bf) || 1846 if ((NULL == bf) ||
2059 (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash))) 1847 (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash)))
2060 { 1848 {
2061 GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO, 1849 GDS_NEIGHBOURS_handle_reply (sender,
1850 GNUNET_BLOCK_TYPE_DHT_HELLO,
2062 GNUNET_TIME_relative_to_absolute 1851 GNUNET_TIME_relative_to_absolute
2063 (hello_expiration), 1852 (hello_expiration),
2064 key, 0, NULL, 0, NULL, GDS_my_hello, 1853 key,
1854 0,
1855 NULL,
1856 0,
1857 NULL,
1858 GDS_my_hello,
2065 GNUNET_HELLO_size ((const struct 1859 GNUNET_HELLO_size ((const struct
2066 GNUNET_HELLO_Message *) 1860 GNUNET_HELLO_Message *)
2067 GDS_my_hello)); 1861 GDS_my_hello));
@@ -2069,31 +1863,34 @@ handle_find_peer (const struct GNUNET_PeerIdentity *sender,
2069 else 1863 else
2070 { 1864 {
2071 GNUNET_STATISTICS_update (GDS_stats, 1865 GNUNET_STATISTICS_update (GDS_stats,
2072 gettext_noop 1866 gettext_noop ("# FIND PEER requests ignored due to Bloomfilter"),
2073 ("# FIND PEER requests ignored due to Bloomfilter"), 1867 1,
2074 1, GNUNET_NO); 1868 GNUNET_NO);
2075 } 1869 }
2076 } 1870 }
2077 else 1871 else
2078 { 1872 {
2079 GNUNET_STATISTICS_update (GDS_stats, 1873 GNUNET_STATISTICS_update (GDS_stats,
2080 gettext_noop 1874 gettext_noop ("# FIND PEER requests ignored due to lack of HELLO"),
2081 ("# FIND PEER requests ignored due to lack of HELLO"), 1875 1,
2082 1, GNUNET_NO); 1876 GNUNET_NO);
2083 } 1877 }
2084 1878
2085 /* then, also consider sending a random HELLO from the closest bucket */ 1879 /* then, also consider sending a random HELLO from the closest bucket */
2086 if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode))) 1880 if (0 == memcmp (&my_identity_hash,
1881 key,
1882 sizeof (struct GNUNET_HashCode)))
2087 bucket_idx = closest_bucket; 1883 bucket_idx = closest_bucket;
2088 else 1884 else
2089 bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key)); 1885 bucket_idx = GNUNET_MIN (closest_bucket,
1886 find_bucket (key));
2090 if (bucket_idx == GNUNET_SYSERR) 1887 if (bucket_idx == GNUNET_SYSERR)
2091 return; 1888 return;
2092 bucket = &k_buckets[bucket_idx]; 1889 bucket = &k_buckets[bucket_idx];
2093 if (bucket->peers_size == 0) 1890 if (bucket->peers_size == 0)
2094 return; 1891 return;
2095 choice = 1892 choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2096 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size); 1893 bucket->peers_size);
2097 peer = bucket->head; 1894 peer = bucket->head;
2098 while (choice > 0) 1895 while (choice > 0)
2099 { 1896 {
@@ -2107,41 +1904,69 @@ handle_find_peer (const struct GNUNET_PeerIdentity *sender,
2107 peer = peer->next; 1904 peer = peer->next;
2108 if (choice-- == 0) 1905 if (choice-- == 0)
2109 return; /* no non-masked peer available */ 1906 return; /* no non-masked peer available */
2110 if (peer == NULL) 1907 if (NULL == peer)
2111 peer = bucket->head; 1908 peer = bucket->head;
2112 GNUNET_CRYPTO_hash (&peer->id, 1909 GNUNET_CRYPTO_hash (peer->id,
2113 sizeof (struct GNUNET_PeerIdentity), 1910 sizeof (struct GNUNET_PeerIdentity),
2114 &phash); 1911 &phash);
2115 GNUNET_BLOCK_mingle_hash (&phash, 1912 GNUNET_BLOCK_mingle_hash (&phash,
2116 bf_mutator, 1913 bf_mutator,
2117 &mhash); 1914 &mhash);
2118 hello = GDS_HELLO_get (&peer->id); 1915 hello = GDS_HELLO_get (peer->id);
2119 } 1916 } while ( (hello == NULL) ||
2120 while ((hello == NULL) || 1917 (GNUNET_YES ==
2121 (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash))); 1918 GNUNET_CONTAINER_bloomfilter_test (bf,
2122 GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO, 1919 &mhash)) );
1920 GDS_NEIGHBOURS_handle_reply (sender,
1921 GNUNET_BLOCK_TYPE_DHT_HELLO,
2123 GNUNET_TIME_relative_to_absolute 1922 GNUNET_TIME_relative_to_absolute
2124 (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), key, 1923 (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION),
2125 0, NULL, 0, NULL, hello, 1924 key,
1925 0,
1926 NULL,
1927 0,
1928 NULL,
1929 hello,
2126 GNUNET_HELLO_size (hello)); 1930 GNUNET_HELLO_size (hello));
2127} 1931}
2128 1932
2129 1933
2130/** 1934/**
2131 * Core handler for p2p get requests. 1935 * Check validity of p2p get request.
2132 * 1936 *
2133 * @param cls closure 1937 * @param cls closure with the `struct PeerInfo` of the sender
2134 * @param peer sender of the request 1938 * @param get the message
2135 * @param message message 1939 * @return #GNUNET_OK if the message is well-formed
2136 * @return #GNUNET_OK to keep the connection open,
2137 * #GNUNET_SYSERR to close it (signal serious error)
2138 */ 1940 */
2139static int 1941static int
1942check_dht_p2p_get (void *cls,
1943 const struct PeerGetMessage *get)
1944{
1945 uint32_t xquery_size;
1946 uint16_t msize;
1947
1948 msize = ntohs (get->header.size);
1949 xquery_size = ntohl (get->xquery_size);
1950 if (msize < sizeof (struct PeerGetMessage) + xquery_size)
1951 {
1952 GNUNET_break_op (0);
1953 return GNUNET_SYSERR;
1954 }
1955 return GNUNET_OK;
1956}
1957
1958
1959/**
1960 * Core handler for p2p get requests.
1961 *
1962 * @param cls closure with the `struct PeerInfo` of the sender
1963 * @param get the message
1964 */
1965static void
2140handle_dht_p2p_get (void *cls, 1966handle_dht_p2p_get (void *cls,
2141 const struct GNUNET_PeerIdentity *peer, 1967 const struct PeerGetMessage *get)
2142 const struct GNUNET_MessageHeader *message)
2143{ 1968{
2144 struct PeerGetMessage *get; 1969 struct PeerInfo *peer = cls;
2145 uint32_t xquery_size; 1970 uint32_t xquery_size;
2146 size_t reply_bf_size; 1971 size_t reply_bf_size;
2147 uint16_t msize; 1972 uint16_t msize;
@@ -2154,36 +1979,28 @@ handle_dht_p2p_get (void *cls,
2154 struct GNUNET_HashCode phash; 1979 struct GNUNET_HashCode phash;
2155 int forwarded; 1980 int forwarded;
2156 1981
2157 GNUNET_break (0 != 1982 if (NULL == peer)
2158 memcmp (peer,
2159 &my_identity,
2160 sizeof (struct GNUNET_PeerIdentity)));
2161 /* parse and validate message */
2162 msize = ntohs (message->size);
2163 if (msize < sizeof (struct PeerGetMessage))
2164 { 1983 {
2165 GNUNET_break_op (0); 1984 GNUNET_break (0);
2166 return GNUNET_YES; 1985 return;
2167 } 1986 }
2168 get = (struct PeerGetMessage *) message; 1987 /* parse and validate message */
1988 msize = ntohs (get->header.size);
2169 xquery_size = ntohl (get->xquery_size); 1989 xquery_size = ntohl (get->xquery_size);
2170 if (msize < sizeof (struct PeerGetMessage) + xquery_size)
2171 {
2172 GNUNET_break_op (0);
2173 return GNUNET_YES;
2174 }
2175 reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size); 1990 reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size);
2176 type = ntohl (get->type); 1991 type = ntohl (get->type);
2177 options = ntohl (get->options); 1992 options = ntohl (get->options);
2178 xquery = (const char *) &get[1]; 1993 xquery = (const char *) &get[1];
2179 reply_bf = NULL; 1994 reply_bf = NULL;
2180 GNUNET_STATISTICS_update (GDS_stats, 1995 GNUNET_STATISTICS_update (GDS_stats,
2181 gettext_noop ("# P2P GET requests received"), 1, 1996 gettext_noop ("# P2P GET requests received"),
1997 1,
2182 GNUNET_NO); 1998 GNUNET_NO);
2183 GNUNET_STATISTICS_update (GDS_stats, 1999 GNUNET_STATISTICS_update (GDS_stats,
2184 gettext_noop ("# P2P GET bytes received"), msize, 2000 gettext_noop ("# P2P GET bytes received"),
2001 msize,
2185 GNUNET_NO); 2002 GNUNET_NO);
2186 GNUNET_CRYPTO_hash (peer, 2003 GNUNET_CRYPTO_hash (peer->id,
2187 sizeof (struct GNUNET_PeerIdentity), 2004 sizeof (struct GNUNET_PeerIdentity),
2188 &phash); 2005 &phash);
2189 if (GNUNET_YES == log_route_details_stderr) 2006 if (GNUNET_YES == log_route_details_stderr)
@@ -2193,17 +2010,23 @@ handle_dht_p2p_get (void *cls,
2193 tmp = GNUNET_strdup (GNUNET_i2s (&my_identity)); 2010 tmp = GNUNET_strdup (GNUNET_i2s (&my_identity));
2194 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, 2011 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2195 "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n", 2012 "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n",
2196 GNUNET_h2s (&get->key), GNUNET_i2s (peer), tmp, 2013 GNUNET_h2s (&get->key),
2014 GNUNET_i2s (peer->id),
2015 tmp,
2197 ntohl(get->hop_count), 2016 ntohl(get->hop_count),
2198 GNUNET_CRYPTO_hash_matching_bits (&phash, &get->key), 2017 GNUNET_CRYPTO_hash_matching_bits (&phash,
2199 GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &get->key), 2018 &get->key),
2200 ntohl(get->xquery_size), xquery); 2019 GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash,
2020 &get->key),
2021 ntohl(get->xquery_size),
2022 xquery);
2201 GNUNET_free (tmp); 2023 GNUNET_free (tmp);
2202 } 2024 }
2203 2025
2204 if (reply_bf_size > 0) 2026 if (reply_bf_size > 0)
2205 reply_bf = 2027 reply_bf =
2206 GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], reply_bf_size, 2028 GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size],
2029 reply_bf_size,
2207 GNUNET_CONSTANTS_BLOOMFILTER_K); 2030 GNUNET_CONSTANTS_BLOOMFILTER_K);
2208 eval = 2031 eval =
2209 GNUNET_BLOCK_evaluate (GDS_block_context, 2032 GNUNET_BLOCK_evaluate (GDS_block_context,
@@ -2222,17 +2045,23 @@ handle_dht_p2p_get (void *cls,
2222 GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED); 2045 GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED);
2223 if (NULL != reply_bf) 2046 if (NULL != reply_bf)
2224 GNUNET_CONTAINER_bloomfilter_free (reply_bf); 2047 GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2225 return GNUNET_YES; 2048 return;
2226 } 2049 }
2227 peer_bf = 2050 peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter,
2228 GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE, 2051 DHT_BLOOM_SIZE,
2229 GNUNET_CONSTANTS_BLOOMFILTER_K); 2052 GNUNET_CONSTANTS_BLOOMFILTER_K);
2230 GNUNET_break_op (GNUNET_YES == 2053 GNUNET_break_op (GNUNET_YES ==
2231 GNUNET_CONTAINER_bloomfilter_test (peer_bf, 2054 GNUNET_CONTAINER_bloomfilter_test (peer_bf,
2232 &phash)); 2055 &phash));
2233 /* remember request for routing replies */ 2056 /* remember request for routing replies */
2234 GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size, 2057 GDS_ROUTING_add (peer->id,
2235 reply_bf, get->bf_mutator); 2058 type,
2059 options,
2060 &get->key,
2061 xquery,
2062 xquery_size,
2063 reply_bf,
2064 get->bf_mutator);
2236 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2065 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2237 "GET for %s at %s after %u hops\n", 2066 "GET for %s at %s after %u hops\n",
2238 GNUNET_h2s (&get->key), 2067 GNUNET_h2s (&get->key),
@@ -2240,48 +2069,59 @@ handle_dht_p2p_get (void *cls,
2240 (unsigned int) ntohl (get->hop_count)); 2069 (unsigned int) ntohl (get->hop_count));
2241 /* local lookup (this may update the reply_bf) */ 2070 /* local lookup (this may update the reply_bf) */
2242 if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || 2071 if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
2243 (am_closest_peer (&get->key, peer_bf))) 2072 (am_closest_peer (&get->key,
2073 peer_bf)))
2244 { 2074 {
2245 if ((0 != (options & GNUNET_DHT_RO_FIND_PEER))) 2075 if ((0 != (options & GNUNET_DHT_RO_FIND_PEER)))
2246 { 2076 {
2247 GNUNET_STATISTICS_update (GDS_stats, 2077 GNUNET_STATISTICS_update (GDS_stats,
2248 gettext_noop 2078 gettext_noop ("# P2P FIND PEER requests processed"),
2249 ("# P2P FIND PEER requests processed"), 1, 2079 1,
2250 GNUNET_NO); 2080 GNUNET_NO);
2251 handle_find_peer (peer, &get->key, reply_bf, get->bf_mutator); 2081 handle_find_peer (peer->id,
2082 &get->key,
2083 reply_bf,
2084 get->bf_mutator);
2252 } 2085 }
2253 else 2086 else
2254 { 2087 {
2255 eval = 2088 eval = GDS_DATACACHE_handle_get (&get->key,
2256 GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size, 2089 type,
2257 &reply_bf, get->bf_mutator); 2090 xquery,
2091 xquery_size,
2092 &reply_bf,
2093 get->bf_mutator);
2258 } 2094 }
2259 } 2095 }
2260 else 2096 else
2261 { 2097 {
2262 GNUNET_STATISTICS_update (GDS_stats, 2098 GNUNET_STATISTICS_update (GDS_stats,
2263 gettext_noop ("# P2P GET requests ONLY routed"), 2099 gettext_noop ("# P2P GET requests ONLY routed"),
2264 1, GNUNET_NO); 2100 1,
2101 GNUNET_NO);
2265 } 2102 }
2266 2103
2267 /* P2P forwarding */ 2104 /* P2P forwarding */
2268 forwarded = GNUNET_NO; 2105 forwarded = GNUNET_NO;
2269 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) 2106 if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
2270 forwarded = GDS_NEIGHBOURS_handle_get (type, options, 2107 forwarded = GDS_NEIGHBOURS_handle_get (type,
2108 options,
2271 ntohl (get->desired_replication_level), 2109 ntohl (get->desired_replication_level),
2272 ntohl (get->hop_count), 2110 ntohl (get->hop_count),
2273 &get->key, 2111 &get->key,
2274 xquery, 2112 xquery,
2275 xquery_size, 2113 xquery_size,
2276 reply_bf, 2114 reply_bf,
2277 get->bf_mutator, peer_bf); 2115 get->bf_mutator,
2116 peer_bf);
2278 GDS_CLIENTS_process_get (options 2117 GDS_CLIENTS_process_get (options
2279 | (GNUNET_OK == forwarded) 2118 | (GNUNET_OK == forwarded)
2280 ? GNUNET_DHT_RO_LAST_HOP : 0, 2119 ? GNUNET_DHT_RO_LAST_HOP : 0,
2281 type, 2120 type,
2282 ntohl (get->hop_count), 2121 ntohl (get->hop_count),
2283 ntohl (get->desired_replication_level), 2122 ntohl (get->desired_replication_level),
2284 0, NULL, 2123 0,
2124 NULL,
2285 &get->key); 2125 &get->key);
2286 2126
2287 2127
@@ -2289,41 +2129,25 @@ handle_dht_p2p_get (void *cls,
2289 if (NULL != reply_bf) 2129 if (NULL != reply_bf)
2290 GNUNET_CONTAINER_bloomfilter_free (reply_bf); 2130 GNUNET_CONTAINER_bloomfilter_free (reply_bf);
2291 GNUNET_CONTAINER_bloomfilter_free (peer_bf); 2131 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
2292 return GNUNET_YES;
2293} 2132}
2294 2133
2295 2134
2296/** 2135/**
2297 * Core handler for p2p result messages. 2136 * Check validity of p2p result message.
2298 * 2137 *
2299 * @param cls closure 2138 * @param cls closure
2300 * @param message message 2139 * @param message message
2301 * @param peer peer identity this notification is about 2140 * @return #GNUNET_YES if the message is well-formed
2302 * @return #GNUNET_YES (do not cut p2p connection)
2303 */ 2141 */
2304static int 2142static int
2305handle_dht_p2p_result (void *cls, 2143check_dht_p2p_result (void *cls,
2306 const struct GNUNET_PeerIdentity *peer, 2144 const struct PeerResultMessage *prm)
2307 const struct GNUNET_MessageHeader *message)
2308{ 2145{
2309 const struct PeerResultMessage *prm;
2310 const struct GNUNET_PeerIdentity *put_path;
2311 const struct GNUNET_PeerIdentity *get_path;
2312 const void *data;
2313 uint32_t get_path_length; 2146 uint32_t get_path_length;
2314 uint32_t put_path_length; 2147 uint32_t put_path_length;
2315 uint16_t msize; 2148 uint16_t msize;
2316 size_t data_size;
2317 enum GNUNET_BLOCK_Type type;
2318 2149
2319 /* parse and validate message */ 2150 msize = ntohs (prm->header.size);
2320 msize = ntohs (message->size);
2321 if (msize < sizeof (struct PeerResultMessage))
2322 {
2323 GNUNET_break_op (0);
2324 return GNUNET_YES;
2325 }
2326 prm = (struct PeerResultMessage *) message;
2327 put_path_length = ntohl (prm->put_path_length); 2151 put_path_length = ntohl (prm->put_path_length);
2328 get_path_length = ntohl (prm->get_path_length); 2152 get_path_length = ntohl (prm->get_path_length);
2329 if ((msize < 2153 if ((msize <
@@ -2336,21 +2160,51 @@ handle_dht_p2p_result (void *cls,
2336 GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity))) 2160 GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2337 { 2161 {
2338 GNUNET_break_op (0); 2162 GNUNET_break_op (0);
2339 return GNUNET_YES; 2163 return GNUNET_SYSERR;
2340 } 2164 }
2165 return GNUNET_OK;
2166}
2167
2168
2169/**
2170 * Core handler for p2p result messages.
2171 *
2172 * @param cls closure
2173 * @param message message
2174 */
2175static void
2176handle_dht_p2p_result (void *cls,
2177 const struct PeerResultMessage *prm)
2178{
2179 struct PeerInfo *peer = cls;
2180 const struct GNUNET_PeerIdentity *put_path;
2181 const struct GNUNET_PeerIdentity *get_path;
2182 const void *data;
2183 uint32_t get_path_length;
2184 uint32_t put_path_length;
2185 uint16_t msize;
2186 size_t data_size;
2187 enum GNUNET_BLOCK_Type type;
2188
2189 /* parse and validate message */
2190 msize = ntohs (prm->header.size);
2191 put_path_length = ntohl (prm->put_path_length);
2192 get_path_length = ntohl (prm->get_path_length);
2341 put_path = (const struct GNUNET_PeerIdentity *) &prm[1]; 2193 put_path = (const struct GNUNET_PeerIdentity *) &prm[1];
2342 get_path = &put_path[put_path_length]; 2194 get_path = &put_path[put_path_length];
2343 type = ntohl (prm->type); 2195 type = ntohl (prm->type);
2344 data = (const void *) &get_path[get_path_length]; 2196 data = (const void *) &get_path[get_path_length];
2345 data_size = 2197 data_size = msize - (sizeof (struct PeerResultMessage) +
2346 msize - (sizeof (struct PeerResultMessage) + 2198 (get_path_length +
2347 (get_path_length + 2199 put_path_length) * sizeof (struct GNUNET_PeerIdentity));
2348 put_path_length) * sizeof (struct GNUNET_PeerIdentity)); 2200 GNUNET_STATISTICS_update (GDS_stats,
2349 GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULTS received"), 2201 gettext_noop ("# P2P RESULTS received"),
2350 1, GNUNET_NO); 2202 1,
2203 GNUNET_NO);
2351 GNUNET_STATISTICS_update (GDS_stats, 2204 GNUNET_STATISTICS_update (GDS_stats,
2352 gettext_noop ("# P2P RESULT bytes received"), 2205 gettext_noop ("# P2P RESULT bytes received"),
2353 msize, GNUNET_NO); 2206 msize,
2207 GNUNET_NO);
2354 if (GNUNET_YES == log_route_details_stderr) 2208 if (GNUNET_YES == log_route_details_stderr)
2355 { 2209 {
2356 char *tmp; 2210 char *tmp;
@@ -2359,7 +2213,7 @@ handle_dht_p2p_result (void *cls,
2359 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, 2213 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
2360 "R5N RESULT %s: %s->%s (%u)\n", 2214 "R5N RESULT %s: %s->%s (%u)\n",
2361 GNUNET_h2s (&prm->key), 2215 GNUNET_h2s (&prm->key),
2362 GNUNET_i2s (peer), 2216 GNUNET_i2s (peer->id),
2363 tmp, 2217 tmp,
2364 get_path_length + 1); 2218 get_path_length + 1);
2365 GNUNET_free (tmp); 2219 GNUNET_free (tmp);
@@ -2371,23 +2225,23 @@ handle_dht_p2p_result (void *cls,
2371 struct GNUNET_PeerIdentity pid; 2225 struct GNUNET_PeerIdentity pid;
2372 2226
2373 /* Should be a HELLO, validate and consider using it! */ 2227 /* Should be a HELLO, validate and consider using it! */
2374 if (data_size < sizeof (struct GNUNET_MessageHeader)) 2228 if (data_size < sizeof (struct GNUNET_HELLO_Message))
2375 { 2229 {
2376 GNUNET_break_op (0); 2230 GNUNET_break_op (0);
2377 return GNUNET_YES; 2231 return;
2378 } 2232 }
2379 h = data; 2233 h = data;
2380 if (data_size != ntohs (h->size)) 2234 if (data_size != ntohs (h->size))
2381 { 2235 {
2382 GNUNET_break_op (0); 2236 GNUNET_break_op (0);
2383 return GNUNET_YES; 2237 return;
2384 } 2238 }
2385 if (GNUNET_OK != 2239 if (GNUNET_OK !=
2386 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h, 2240 GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h,
2387 &pid)) 2241 &pid))
2388 { 2242 {
2389 GNUNET_break_op (0); 2243 GNUNET_break_op (0);
2390 return GNUNET_YES; 2244 return;
2391 } 2245 }
2392 if ( (GNUNET_YES != disable_try_connect) && 2246 if ( (GNUNET_YES != disable_try_connect) &&
2393 (0 != memcmp (&my_identity, 2247 (0 != memcmp (&my_identity,
@@ -2402,9 +2256,9 @@ handle_dht_p2p_result (void *cls,
2402 struct GNUNET_PeerIdentity xget_path[get_path_length + 1]; 2256 struct GNUNET_PeerIdentity xget_path[get_path_length + 1];
2403 2257
2404 GNUNET_memcpy (xget_path, 2258 GNUNET_memcpy (xget_path,
2405 get_path, 2259 get_path,
2406 get_path_length * sizeof (struct GNUNET_PeerIdentity)); 2260 get_path_length * sizeof (struct GNUNET_PeerIdentity));
2407 xget_path[get_path_length] = *peer; 2261 xget_path[get_path_length] = *peer->id;
2408 get_path_length++; 2262 get_path_length++;
2409 2263
2410 /* forward to local clients */ 2264 /* forward to local clients */
@@ -2429,10 +2283,12 @@ handle_dht_p2p_result (void *cls,
2429 { 2283 {
2430 struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length]; 2284 struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length];
2431 2285
2432 GNUNET_memcpy (xput_path, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity)); 2286 GNUNET_memcpy (xput_path,
2287 put_path,
2288 put_path_length * sizeof (struct GNUNET_PeerIdentity));
2433 GNUNET_memcpy (&xput_path[put_path_length], 2289 GNUNET_memcpy (&xput_path[put_path_length],
2434 xget_path, 2290 xget_path,
2435 get_path_length * sizeof (struct GNUNET_PeerIdentity)); 2291 get_path_length * sizeof (struct GNUNET_PeerIdentity));
2436 2292
2437 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time), 2293 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (prm->expiration_time),
2438 &prm->key, 2294 &prm->key,
@@ -2453,8 +2309,6 @@ handle_dht_p2p_result (void *cls,
2453 data, 2309 data,
2454 data_size); 2310 data_size);
2455 } 2311 }
2456
2457 return GNUNET_YES;
2458} 2312}
2459 2313
2460 2314
@@ -2466,38 +2320,51 @@ handle_dht_p2p_result (void *cls,
2466int 2320int
2467GDS_NEIGHBOURS_init () 2321GDS_NEIGHBOURS_init ()
2468{ 2322{
2469 static struct GNUNET_CORE_MessageHandler core_handlers[] = { 2323 GNUNET_MQ_hd_var_size (dht_p2p_get,
2470 {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0}, 2324 GNUNET_MESSAGE_TYPE_DHT_P2P_GET,
2471 {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0}, 2325 struct PeerGetMessage);
2472 {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0}, 2326 GNUNET_MQ_hd_var_size (dht_p2p_put,
2473 {NULL, 0, 0} 2327 GNUNET_MESSAGE_TYPE_DHT_P2P_PUT,
2328 struct PeerPutMessage);
2329 GNUNET_MQ_hd_var_size (dht_p2p_result,
2330 GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT,
2331 struct PeerResultMessage);
2332 struct GNUNET_MQ_MessageHandler core_handlers[] = {
2333 make_dht_p2p_get_handler (NULL),
2334 make_dht_p2p_put_handler (NULL),
2335 make_dht_p2p_result_handler (NULL),
2336 GNUNET_MQ_handler_end ()
2474 }; 2337 };
2475 unsigned long long temp_config_num; 2338 unsigned long long temp_config_num;
2476 2339
2477 disable_try_connect 2340 disable_try_connect
2478 = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "DISABLE_TRY_CONNECT"); 2341 = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2342 "DHT",
2343 "DISABLE_TRY_CONNECT");
2479 if (GNUNET_OK == 2344 if (GNUNET_OK ==
2480 GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size", 2345 GNUNET_CONFIGURATION_get_value_number (GDS_cfg,
2346 "DHT",
2347 "bucket_size",
2481 &temp_config_num)) 2348 &temp_config_num))
2482 bucket_size = (unsigned int) temp_config_num; 2349 bucket_size = (unsigned int) temp_config_num;
2483 cache_results 2350 cache_results
2484 = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "CACHE_RESULTS"); 2351 = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg,
2352 "DHT",
2353 "CACHE_RESULTS");
2485 2354
2486 log_route_details_stderr = 2355 log_route_details_stderr =
2487 (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO; 2356 (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO;
2488 ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg); 2357 ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg);
2489 core_api = 2358 core_api = GNUNET_CORE_connecT (GDS_cfg,
2490 GNUNET_CORE_connect (GDS_cfg, NULL, 2359 NULL,
2491 &core_init, 2360 &core_init,
2492 &handle_core_connect, 2361 &handle_core_connect,
2493 &handle_core_disconnect, 2362 &handle_core_disconnect,
2494 NULL, GNUNET_NO, 2363 core_handlers);
2495 NULL, GNUNET_NO, 2364 if (NULL == core_api)
2496 core_handlers);
2497 if (core_api == NULL)
2498 return GNUNET_SYSERR; 2365 return GNUNET_SYSERR;
2499 all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256, 2366 all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256,
2500 GNUNET_NO); 2367 GNUNET_YES);
2501 all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256, 2368 all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256,
2502 GNUNET_NO); 2369 GNUNET_NO);
2503 return GNUNET_OK; 2370 return GNUNET_OK;
@@ -2512,9 +2379,10 @@ GDS_NEIGHBOURS_done ()
2512{ 2379{
2513 if (NULL == core_api) 2380 if (NULL == core_api)
2514 return; 2381 return;
2515 GNUNET_CORE_disconnect (core_api); 2382 GNUNET_CORE_disconnecT (core_api);
2516 core_api = NULL; 2383 core_api = NULL;
2517 GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)); 2384 GNUNET_assert (0 ==
2385 GNUNET_CONTAINER_multipeermap_size (all_connected_peers));
2518 GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers); 2386 GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers);
2519 all_connected_peers = NULL; 2387 all_connected_peers = NULL;
2520 GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers, 2388 GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers,