aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/dht/gnunet-service-dht-new.c175
-rw-r--r--src/dht/gnunet-service-dht.h43
-rw-r--r--src/dht/gnunet-service-dht_clients.c27
-rw-r--r--src/dht/gnunet-service-dht_clients.h1
-rw-r--r--src/dht/gnunet-service-dht_datacache.c315
-rw-r--r--src/dht/gnunet-service-dht_datacache.h84
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c782
-rw-r--r--src/dht/gnunet-service-dht_neighbours.h35
8 files changed, 1145 insertions, 317 deletions
diff --git a/src/dht/gnunet-service-dht-new.c b/src/dht/gnunet-service-dht-new.c
index 2fb06457e..87cf97a40 100644
--- a/src/dht/gnunet-service-dht-new.c
+++ b/src/dht/gnunet-service-dht-new.c
@@ -391,11 +391,6 @@ static struct FindPeerMessageContext find_peer_context;
391static unsigned int newly_found_peers; 391static unsigned int newly_found_peers;
392 392
393/** 393/**
394 * Handle to the datacache service (for inserting/retrieving data)
395 */
396static struct GNUNET_DATACACHE_Handle *datacache;
397
398/**
399 * Handle for the statistics service. 394 * Handle for the statistics service.
400 */ 395 */
401struct GNUNET_STATISTICS_Handle *stats; 396struct GNUNET_STATISTICS_Handle *stats;
@@ -1260,130 +1255,6 @@ route_result_message (struct GNUNET_MessageHeader *msg,
1260} 1255}
1261 1256
1262 1257
1263/**
1264 * Iterator for local get request results,
1265 *
1266 * @param cls closure for iterator, a DatacacheGetContext
1267 * @param exp when does this value expire?
1268 * @param key the key this data is stored under
1269 * @param size the size of the data identified by key
1270 * @param data the actual data
1271 * @param type the type of the data
1272 *
1273 * @return GNUNET_OK to continue iteration, anything else
1274 * to stop iteration.
1275 */
1276static int
1277datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
1278 const GNUNET_HashCode * key, size_t size,
1279 const char *data, enum GNUNET_BLOCK_Type type)
1280{
1281 struct DHT_MessageContext *msg_ctx = cls;
1282 struct DHT_MessageContext new_msg_ctx;
1283 struct GNUNET_DHT_GetResultMessage *get_result;
1284 enum GNUNET_BLOCK_EvaluationResult eval;
1285 const struct DHTPutEntry *put_entry;
1286 int get_size;
1287 char *path_offset;
1288
1289#if DEBUG_DHT
1290 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1291 "`%s:%s': Received `%s' response from datacache\n", my_short_id,
1292 "DHT", "GET");
1293#endif
1294
1295 put_entry = (const struct DHTPutEntry *) data;
1296
1297 if (size !=
1298 sizeof (struct DHTPutEntry) + put_entry->data_size +
1299 (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)))
1300 {
1301 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1302 "Path + data size doesn't add up for data inserted into datacache!\nData size %d, path length %d, expected %d, got %d\n",
1303 put_entry->data_size, put_entry->path_length,
1304 sizeof (struct DHTPutEntry) + put_entry->data_size +
1305 (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)),
1306 size);
1307 msg_ctx->do_forward = GNUNET_NO;
1308 return GNUNET_OK;
1309 }
1310
1311 eval =
1312 GNUNET_BLOCK_evaluate (block_context, type, key, &msg_ctx->reply_bf,
1313 msg_ctx->reply_bf_mutator, msg_ctx->xquery,
1314 msg_ctx->xquery_size, &put_entry[1],
1315 put_entry->data_size);
1316
1317 switch (eval)
1318 {
1319 case GNUNET_BLOCK_EVALUATION_OK_LAST:
1320 msg_ctx->do_forward = GNUNET_NO;
1321 case GNUNET_BLOCK_EVALUATION_OK_MORE:
1322 memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
1323 if (GNUNET_DHT_RO_RECORD_ROUTE ==
1324 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
1325 {
1326 new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
1327 }
1328
1329 get_size =
1330 sizeof (struct GNUNET_DHT_GetResultMessage) + put_entry->data_size +
1331 (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity));
1332 get_result = GNUNET_malloc (get_size);
1333 get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
1334 get_result->header.size = htons (get_size);
1335 get_result->expiration = GNUNET_TIME_absolute_hton (exp);
1336 get_result->type = htons (type);
1337 get_result->put_path_length = htons (put_entry->path_length);
1338 path_offset = (char *) &put_entry[1];
1339 path_offset += put_entry->data_size;
1340 /* Copy the actual data and the path_history to the end of the get result */
1341 memcpy (&get_result[1], &put_entry[1],
1342 put_entry->data_size +
1343 (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)));
1344 new_msg_ctx.peer = my_identity;
1345 new_msg_ctx.bloom = NULL;
1346 new_msg_ctx.hop_count = 0;
1347 new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2; /* Make result routing a higher priority */
1348 new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
1349 increment_stats (STAT_GET_RESPONSE_START);
1350 route_result_message (&get_result->header, &new_msg_ctx);
1351 GNUNET_free (get_result);
1352 break;
1353 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1354#if DEBUG_DHT
1355 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Duplicate block error\n",
1356 my_short_id, "DHT");
1357#endif
1358 break;
1359 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1360#if DEBUG_DHT
1361 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "`%s:%s': Invalid request error\n",
1362 my_short_id, "DHT");
1363#endif
1364 break;
1365 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1366#if DEBUG_DHT
1367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1368 "`%s:%s': Valid request, no results.\n", my_short_id, "DHT");
1369#endif
1370 GNUNET_break (0);
1371 break;
1372 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1373 GNUNET_break_op (0);
1374 msg_ctx->do_forward = GNUNET_NO;
1375 break;
1376 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1377#if DEBUG_DHT
1378 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1379 "`%s:%s': Unsupported block type (%u) in response!\n",
1380 my_short_id, "DHT", type);
1381#endif
1382 /* msg_ctx->do_forward = GNUNET_NO; // not sure... */
1383 break;
1384 }
1385 return GNUNET_OK;
1386}
1387 1258
1388 1259
1389/** 1260/**
@@ -1464,10 +1335,6 @@ handle_dht_get (const struct GNUNET_MessageHeader *msg,
1464 increment_stats (STAT_GETS); 1335 increment_stats (STAT_GETS);
1465 results = 0; 1336 results = 0;
1466 msg_ctx->do_forward = GNUNET_YES; 1337 msg_ctx->do_forward = GNUNET_YES;
1467 if (datacache != NULL)
1468 results =
1469 GNUNET_DATACACHE_get (datacache, &msg_ctx->key, type,
1470 &datacache_get_iterator, msg_ctx);
1471#if DEBUG_DHT 1338#if DEBUG_DHT
1472 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1473 "`%s:%s': Found %d results for `%s' request uid %llu\n", 1340 "`%s:%s': Found %d results for `%s' request uid %llu\n",
@@ -1826,36 +1693,6 @@ handle_dht_put (const struct GNUNET_MessageHeader *msg,
1826#endif 1693#endif
1827 1694
1828 increment_stats (STAT_PUTS_INSERTED); 1695 increment_stats (STAT_PUTS_INSERTED);
1829 if (datacache != NULL)
1830 {
1831 /* Put size is actual data size plus struct overhead plus path length (if any) */
1832 put_size =
1833 data_size + sizeof (struct DHTPutEntry) +
1834 (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1835 put_entry = GNUNET_malloc (put_size);
1836 put_entry->data_size = data_size;
1837 put_entry->path_length = msg_ctx->path_history_len;
1838 /* Copy data to end of put entry */
1839 memcpy (&put_entry[1], &put_msg[1], data_size);
1840 if (msg_ctx->path_history_len > 0)
1841 {
1842 /* Copy path after data */
1843 path_offset = (char *) &put_entry[1];
1844 path_offset += data_size;
1845 memcpy (path_offset, msg_ctx->path_history,
1846 msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
1847 }
1848
1849 ret =
1850 GNUNET_DATACACHE_put (datacache, &msg_ctx->key, put_size,
1851 (const char *) put_entry, put_type,
1852 GNUNET_TIME_absolute_ntoh (put_msg->expiration));
1853 GNUNET_free (put_entry);
1854 }
1855 else
1856 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1857 "`%s:%s': %s request received, but have no datacache!\n",
1858 my_short_id, "DHT", "PUT");
1859 1696
1860 route_message (msg, msg_ctx); 1697 route_message (msg, msg_ctx);
1861} 1698}
@@ -2366,6 +2203,7 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2366 transport_handle = NULL; 2203 transport_handle = NULL;
2367 } 2204 }
2368 GDS_NEIGHBOURS_done (); 2205 GDS_NEIGHBOURS_done ();
2206 GDS_DATACACHE_done ();
2369 GDS_NSE_done (); 2207 GDS_NSE_done ();
2370 for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++) 2208 for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; bucket_count++)
2371 { 2209 {
@@ -2380,15 +2218,6 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2380 delete_peer (pos, bucket_count); 2218 delete_peer (pos, bucket_count);
2381 } 2219 }
2382 } 2220 }
2383 if (datacache != NULL)
2384 {
2385#if DEBUG_DHT
2386 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Destroying datacache!\n",
2387 my_short_id, "DHT");
2388#endif
2389 GNUNET_DATACACHE_destroy (datacache);
2390 datacache = NULL;
2391 }
2392 if (stats != NULL) 2221 if (stats != NULL)
2393 { 2222 {
2394 GNUNET_STATISTICS_destroy (stats, GNUNET_YES); 2223 GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
@@ -2418,7 +2247,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
2418 unsigned long long temp_config_num; 2247 unsigned long long temp_config_num;
2419 2248
2420 cfg = c; 2249 cfg = c;
2421 datacache = GNUNET_DATACACHE_create (cfg, "dhtcache"); 2250 GDS_DATACACHE_init ();
2422 coreAPI = GNUNET_CORE_connect (cfg, /* Main configuration */ 2251 coreAPI = GNUNET_CORE_connect (cfg, /* Main configuration */
2423 DEFAULT_CORE_QUEUE_SIZE, /* queue size */ 2252 DEFAULT_CORE_QUEUE_SIZE, /* queue size */
2424 NULL, /* Closure passed to DHT functions */ 2253 NULL, /* Closure passed to DHT functions */
diff --git a/src/dht/gnunet-service-dht.h b/src/dht/gnunet-service-dht.h
new file mode 100644
index 000000000..c2e16151d
--- /dev/null
+++ b/src/dht/gnunet-service-dht.h
@@ -0,0 +1,43 @@
1/*
2 This file is part of GNUnet.
3 (C) 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht.h
23 * @brief GNUnet DHT globals
24 * @author Christian Grothoff
25 */
26#ifndef GNUNET_SERVICE_DHT_H
27#define GNUNET_SERVICE_DHT_H
28
29#include "gnunet_util_lib.h"
30
31/**
32 * Configuration we use.
33 */
34extern struct GNUNET_ConfigurationHandle *GDS_cfg;
35
36
37/**
38 * Our handle to the BLOCK library.
39 */
40extern struct GNUNET_BLOCK_Context *GDS_block_context;
41
42
43#endif
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
index 75506534b..95a0d68d0 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -39,6 +39,7 @@
39#include "dht_new.h" 39#include "dht_new.h"
40#include <fenv.h> 40#include <fenv.h>
41#include "gnunet-service-dht_clients.h" 41#include "gnunet-service-dht_clients.h"
42#include "gnunet-service-dht_datacache.h"
42#include "gnunet-service-dht_neighbours.h" 43#include "gnunet-service-dht_neighbours.h"
43 44
44 45
@@ -403,6 +404,22 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
403 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 404 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
404 } 405 }
405 dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; 406 dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
407 /* give to local clients */
408 GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
409 &dht_msg->key,
410 0, NULL,
411 0, NULL,
412 ntohl (dht_msg->type),
413 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
414 &dht_msg[1]);
415 /* store locally */
416 GST_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
417 &dht_msg->key,
418 0, NULL,
419 ntohl (dht_msg->type),
420 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
421 &dht_msg[1]);
422 /* route to other peers */
406 GST_NEIGHBOURS_handle_put (ntohl (dht_msg->type), 423 GST_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
407 ntohl (dht_msg->options), 424 ntohl (dht_msg->options),
408 ntohl (dht_msg->desired_replication_level), 425 ntohl (dht_msg->desired_replication_level),
@@ -446,6 +463,7 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
446 get = (const struct GNUNET_DHT_ClientGetMessage *) message; 463 get = (const struct GNUNET_DHT_ClientGetMessage *) message;
447 xquery = (const char*) &get[1]; 464 xquery = (const char*) &get[1];
448 465
466
449 cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); 467 cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
450 cqr->key = get->key; 468 cqr->key = get->key;
451 cqr->client = find_active_client (client); 469 cqr->client = find_active_client (client);
@@ -458,12 +476,19 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
458 cqr->xquery_size = xquery_size; 476 cqr->xquery_size = xquery_size;
459 cqr->replication = ntohl (get->desired_replication_level); 477 cqr->replication = ntohl (get->desired_replication_level);
460 cqr->msg_options = ntohl (get->options); 478 cqr->msg_options = ntohl (get->options);
461 cqr->msg_type = ntohl (get->type); 479 cqr->msg_type = ntohl (get->type);
462 GNUNET_CONTAINER_multihashmap_put (forward_map, KEY, cqr, 480 GNUNET_CONTAINER_multihashmap_put (forward_map, KEY, cqr,
463 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 481 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
482 /* start remote requests */
464 if (GNUNET_SCHEDULER_NO_TASK != retry_task) 483 if (GNUNET_SCHEDULER_NO_TASK != retry_task)
465 GNUNET_SCHEDULER_cancel (retry_task); 484 GNUNET_SCHEDULER_cancel (retry_task);
466 retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL); 485 retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
486 /* perform local lookup */
487 GDS_DATACACHE_handle_get (&get->key,
488 cqr->msg_type,
489 cqr->xquery,
490 xquery_size,
491 NULL, 0);
467 GNUNET_SERVER_receive_done (client, GNUNET_OK); 492 GNUNET_SERVER_receive_done (client, GNUNET_OK);
468} 493}
469 494
diff --git a/src/dht/gnunet-service-dht_clients.h b/src/dht/gnunet-service-dht_clients.h
index 931ca1a93..db4f0b9fe 100644
--- a/src/dht/gnunet-service-dht_clients.h
+++ b/src/dht/gnunet-service-dht_clients.h
@@ -27,6 +27,7 @@
27#ifndef GNUNET_SERVICE_DHT_CLIENTS_H 27#ifndef GNUNET_SERVICE_DHT_CLIENTS_H
28#define GNUNET_SERVICE_DHT_CLIENTS_H 28#define GNUNET_SERVICE_DHT_CLIENTS_H
29 29
30
30/** 31/**
31 * Handle a reply we've received from another peer. If the reply 32 * Handle a reply we've received from another peer. If the reply
32 * matches any of our pending queries, forward it to the respective 33 * matches any of our pending queries, forward it to the respective
diff --git a/src/dht/gnunet-service-dht_datacache.c b/src/dht/gnunet-service-dht_datacache.c
new file mode 100644
index 000000000..b2dd05ac9
--- /dev/null
+++ b/src/dht/gnunet-service-dht_datacache.c
@@ -0,0 +1,315 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht_datacache.c
23 * @brief GNUnet DHT service's datacache integration
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "gnunet-service-dht_datacache.h"
28
29
30/**
31 * Handle to the datacache service (for inserting/retrieving data)
32 */
33static struct GNUNET_DATACACHE_Handle *datacache;
34
35
36/**
37 * Entry for inserting data into datacache from the DHT.
38 */
39struct DHTPutEntry
40{
41 /**
42 * Size of data.
43 */
44 uint16_t data_size;
45
46 /**
47 * Length of recorded path.
48 */
49 uint16_t path_length;
50
51 /* PATH ENTRIES */
52
53 /* PUT DATA */
54
55};
56
57
58/**
59 * Handle a datum we've received from another peer. Cache if
60 * possible.
61 *
62 * @param expiration when will the reply expire
63 * @param key the query this reply is for
64 * @param put_path_length number of peers in 'put_path'
65 * @param put_path path the reply took on put
66 * @param type type of the reply
67 * @param data_size number of bytes in 'data'
68 * @param data application payload data
69 */
70void
71GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration,
72 const GNUNET_HashCode *key,
73 unsigned int put_path_length,
74 const struct GNUNET_PeerIdentity *put_path,
75 uint32_t type,
76 size_t data_size,
77 const void *data)
78{
79 size_t plen = data_size + put_path_length * sizeof(struct GNUNET_PeerIdentity) + sizeof(struct DHTPutEntry);
80 char buf[plen];
81 struct DHTPutEntry *pe;
82 struct GNUNET_PeerIdentity *pp;
83 char *path_offset;
84
85 if (datacache == NULL)
86 {
87 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
88 "%s request received, but have no datacache!\n",
89 "PUT");
90 return;
91 }
92 if (data_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
93 {
94 GNUNET_break (0);
95 return;
96 }
97 /* Put size is actual data size plus struct overhead plus path length (if any) */
98 pe = (struct DHTPutEntry *) buf;
99 pe->data_size = htons (data_size);
100 pe->path_length = htons ((uint16_t) put_path_length);
101 pp = (struct GNUNET_PeerIdentity *) &pe[1];
102 memcpy (pp, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
103 memcpy (&pp[put_path_length],
104 data, data_size);
105 (void) GNUNET_DATACACHE_put (datacache, key,
106 plen, (const char *) pe, type,
107 expiration);
108}
109
110
111/**
112 * Context containing information about a GET request.
113 */
114struct GetRequestContext
115{
116 /**
117 * extended query (see gnunet_block_lib.h).
118 */
119 const void *xquery;
120
121 /**
122 * Bloomfilter to filter out duplicate replies (updated)
123 */
124 struct GNUNET_CONTAINER_BloomFilter **reply_bf;
125
126 /**
127 * The key this request was about
128 */
129 GNUNET_HashCode key;
130
131 /**
132 * Number of bytes in xquery.
133 */
134 size_t xquery_size;
135
136 /**
137 * Mutator value for the reply_bf, see gnunet_block_lib.h
138 */
139 uint32_t reply_bf_mutator;
140
141};
142
143
144/**
145 * Iterator for local get request results,
146 *
147 * @param cls closure for iterator, a DatacacheGetContext
148 * @param exp when does this value expire?
149 * @param key the key this data is stored under
150 * @param size the size of the data identified by key
151 * @param data the actual data
152 * @param type the type of the data
153 *
154 * @return GNUNET_OK to continue iteration, anything else
155 * to stop iteration.
156 */
157static int
158datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
159 const GNUNET_HashCode * key, size_t size,
160 const char *data, enum GNUNET_BLOCK_Type type)
161{
162 struct GetRequestContext *ctx = cls;
163 const struct DHTPutEntry *pe;
164 const struct GNUNET_PeerIdentity *pp;
165 const char *data;
166 size_t data_size;
167 uint16_t put_path_length;
168 enum GNUNET_BLOCK_EvaluationResult eval;
169
170 pe = (const struct DHTPutEntry *) data;
171 put_path_length = ntohs (pe->path_length);
172 data_size = ntohs (pe->data_size);
173
174 if (size !=
175 sizeof (struct DHTPutEntry) + data_size +
176 (put_path_length * sizeof (struct GNUNET_PeerIdentity)))
177 {
178 GNUNET_break (0);
179 return GNUNET_OK;
180 }
181 pp = (const struct GNUNET_PeerIdentity *) &pe[1];
182 data = (const char*) &pp[put_path_length];
183 eval =
184 GNUNET_BLOCK_evaluate (block_context, type, key,
185 ctx->reply_bf,
186 ctx->reply_bf_mutator,
187 ctx->xquery,
188 ctx->xquery_size,
189 data,
190 data_size);
191 switch (eval)
192 {
193 case GNUNET_BLOCK_EVALUATION_OK_LAST:
194 case GNUNET_BLOCK_EVALUATION_OK_MORE:
195 /* forward to local clients */
196 GDS_CLIENT_handle_reply (exp,
197 key,
198 0, NULL,
199 put_path_length, pp,
200 type, data_size, data);
201 /* forward to other peers */
202 GDS_NEIGHBOURS_handle_reply (type, exp,
203 key, put_path_length, pp,
204 0, NULL, data, data_size);
205 break;
206 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
207 break;
208 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
209 break;
210 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
211 GNUNET_break (0);
212 break;
213 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
214 GNUNET_break_op (0);
215 return GNUNET_SYSERR;
216 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
217 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
218 "Unsupported block type (%u) in local response!\n",
219 type);
220 break;
221 }
222 return GNUNET_OK;
223}
224
225
226/**
227 * Context containing information about a GET request.
228 */
229struct GetRequestContext
230{
231 /**
232 * extended query (see gnunet_block_lib.h).
233 */
234 const void *xquery;
235
236 /**
237 * Bloomfilter to filter out duplicate replies (updated)
238 */
239 struct GNUNET_CONTAINER_BloomFilter **reply_bf;
240
241 /**
242 * The key this request was about
243 */
244 GNUNET_HashCode key;
245
246 /**
247 * Number of bytes in xquery.
248 */
249 size_t xquery_size;
250
251 /**
252 * Mutator value for the reply_bf, see gnunet_block_lib.h
253 */
254 uint32_t reply_bf_mutator;
255
256};
257
258
259/**
260 * Handle a GET request we've received from another peer.
261 *
262 * @param key the query
263 * @param type requested data type
264 * @param xquery extended query
265 * @param xquery_size number of bytes in xquery
266 * @param reply_bf where the reply bf is (to be) stored, possibly updated, can be NULL
267 * @param reply_bf_mutator mutation value for reply_bf
268 */
269void
270GDS_DATACACHE_handle_get (const GNUNET_HashCode *key,
271 uint32_t type,
272 const void *xquery,
273 size_t xquery_size,
274 struct GNUNET_CONTAINER_BloomFilter **reply_bf,
275 uint32_t reply_bf_mutator)
276{
277 struct GetRequestContext ctx;
278
279 if (datacache == NULL)
280 return;
281 ctx.key = *key;
282 ctx.xquery = xquery;
283 ctx.xquery_size = xquery_size;
284 ctx.reply_bf = reply_bf;
285 ctx.reply_bf_mutator = reply_bf_mutator;
286 (void) GNUNET_DATACACHE_get (datacache, &msg_ctx->key, type,
287 &datacache_get_iterator, &ctx);
288}
289
290
291/**
292 * Initialize datacache subsystem.
293 */
294void
295GDS_DATACACHE_init ()
296{
297 datacache = GNUNET_DATACACHE_create (cfg, "dhtcache");
298}
299
300
301/**
302 * Shutdown datacache subsystem.
303 */
304void
305GDS_DATACACHE_done ()
306{
307 if (datacache != NULL)
308 {
309 GNUNET_DATACACHE_destroy (datacache);
310 datacache = NULL;
311 }
312}
313
314
315/* end of gnunet-service-dht_datacache.c */
diff --git a/src/dht/gnunet-service-dht_datacache.h b/src/dht/gnunet-service-dht_datacache.h
new file mode 100644
index 000000000..0501e9e4c
--- /dev/null
+++ b/src/dht/gnunet-service-dht_datacache.h
@@ -0,0 +1,84 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file dht/gnunet-service-dht_datacache.h
23 * @brief GNUnet DHT service's datacache integration
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#ifndef GNUNET_SERVICE_DHT_DATACACHE_H
28#define GNUNET_SERVICE_DHT_DATACACHE_H
29
30/**
31 * Handle a datum we've received from another peer. Cache if
32 * possible.
33 *
34 * @param expiration when will the reply expire
35 * @param key the query this reply is for
36 * @param put_path_length number of peers in 'put_path'
37 * @param put_path path the reply took on put
38 * @param type type of the reply
39 * @param data_size number of bytes in 'data'
40 * @param data application payload data
41 */
42void
43GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration,
44 const GNUNET_HashCode *key,
45 unsigned int put_path_length,
46 const struct GNUNET_PeerIdentity *put_path,
47 uint32_t type,
48 size_t data_size,
49 const void *data);
50
51
52/**
53 * Handle a GET request we've received from another peer.
54 *
55 * @param key the query
56 * @param type requested data type
57 * @param xquery extended query
58 * @param xquery_size number of bytes in xquery
59 * @param reply_bf where the reply bf is (to be) stored, possibly updated!, can be NULL
60 * @param reply_bf_mutator mutation value for reply_bf
61 */
62void
63GDS_DATACACHE_handle_get (const GNUNET_HashCode *key,
64 uint32_t type,
65 const void *xquery,
66 size_t xquery_size,
67 struct GNUNET_CONTAINER_BloomFilter **reply_bf,
68 uint32_t reply_bf_mutator);
69
70
71/**
72 * Initialize datacache subsystem.
73 */
74void
75GDS_DATACACHE_init (void);
76
77
78/**
79 * Shutdown datacache subsystem.
80 */
81void
82GDS_DATACACHE_done (void);
83
84#endif
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
index 8c87314e5..b7cc2048e 100644
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -37,6 +37,7 @@
37#include "gnunet_dht_service.h" 37#include "gnunet_dht_service.h"
38#include "gnunet_statistics_service.h" 38#include "gnunet_statistics_service.h"
39#include "dht.h" 39#include "dht.h"
40#include "gnunet-service-dht_datacache.h"
40#include <fenv.h> 41#include <fenv.h>
41 42
42/** 43/**
@@ -86,11 +87,14 @@ struct PeerPutMessage
86 uint32_t desired_replication_level GNUNET_PACKED; 87 uint32_t desired_replication_level GNUNET_PACKED;
87 88
88 /** 89 /**
89 * Generic route path length for a message in the 90 * Length of the PUT path that follows (if tracked).
90 * DHT that arrived at a peer and generated
91 * a reply. Copied to the end of this message.
92 */ 91 */
93 uint32_t outgoing_path_length GNUNET_PACKED; 92 uint32_t put_path_length GNUNET_PACKED;
93
94 /**
95 * When does the content expire?
96 */
97 struct GNUNET_TIME_AbsoluteNBO expiration_time;
94 98
95 /** 99 /**
96 * Bloomfilter (for peer identities) to stop circular routes 100 * Bloomfilter (for peer identities) to stop circular routes
@@ -110,12 +114,51 @@ struct PeerPutMessage
110 114
111 115
112/** 116/**
117 * P2P Result message
118 */
119struct PeerResultMessage
120{
121 /**
122 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
123 */
124 struct GNUNET_MessageHeader header;
125
126 /**
127 * Content type.
128 */
129 uint32_t type GNUNET_PACKED;
130
131 /**
132 * Length of the PUT path that follows (if tracked).
133 */
134 uint32_t put_path_length GNUNET_PACKED;
135
136 /**
137 * Length of the GET path that follows (if tracked).
138 */
139 uint32_t get_path_length GNUNET_PACKED;
140
141 /**
142 * The key of the corresponding GET request.
143 */
144 GNUNET_HashCode key;
145
146 /* put path (if tracked) */
147
148 /* get path (if tracked) */
149
150 /* Payload */
151
152};
153
154
155/**
113 * P2P GET message 156 * P2P GET message
114 */ 157 */
115struct PeerGetMessage 158struct PeerGetMessage
116{ 159{
117 /** 160 /**
118 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT 161 * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
119 */ 162 */
120 struct GNUNET_MessageHeader header; 163 struct GNUNET_MessageHeader header;
121 164
@@ -182,19 +225,14 @@ struct P2PPendingMessage
182 struct P2PPendingMessage *prev; 225 struct P2PPendingMessage *prev;
183 226
184 /** 227 /**
185 * Message importance level. 228 * Message importance level. FIXME: used? useful?
186 */ 229 */
187 unsigned int importance; 230 unsigned int importance;
188 231
189 /** 232 /**
190 * Time when this request was scheduled to be sent. 233 * When does this message time out?
191 */
192 struct GNUNET_TIME_Absolute scheduled;
193
194 /**
195 * How long to wait before sending message.
196 */ 234 */
197 struct GNUNET_TIME_Relative timeout; 235 struct GNUNET_TIME_Absolute timeout;
198 236
199 /** 237 /**
200 * Actual message to be sent, allocated at the end of the struct: 238 * Actual message to be sent, allocated at the end of the struct:
@@ -222,7 +260,8 @@ struct PeerInfo
222 struct PeerInfo *prev; 260 struct PeerInfo *prev;
223 261
224 /** 262 /**
225 * Count of outstanding messages for peer. 263 * Count of outstanding messages for peer. FIXME: NEEDED?
264 * FIXME: bound queue size!?
226 */ 265 */
227 unsigned int pending_count; 266 unsigned int pending_count;
228 267
@@ -467,13 +506,340 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
467 506
468 507
469/** 508/**
470 * Perform a PUT operation. // FIXME: document if this is only 509 * Called when core is ready to send a message we asked for
471 * routing or also storage and/or even local client notification! 510 * out to the destination.
511 *
512 * @param cls the 'struct PeerInfo' of the target peer
513 * @param size number of bytes available in buf
514 * @param buf where the callee should write the message
515 * @return number of bytes written to buf
516 */
517static size_t
518core_transmit_notify (void *cls, size_t size, void *buf)
519{
520 struct PeerInfo *peer = cls;
521 char *cbuf = buf;
522 struct P2PPendingMessage *pending;
523 size_t off;
524 size_t msize;
525
526 peer->th = NULL;
527 if (buf == NULL)
528 {
529 /* client disconnected */
530 return 0;
531 }
532 if (peer->head == NULL)
533 {
534 /* no messages pending */
535 return 0;
536 }
537 off = 0;
538 while ( (NULL != (pending = peer->head)) &&
539 (size - off >= (msize = ntohs (pending->msg->size))) )
540 {
541 memcpy (&cbuf[off], pending->msg, msize);
542 off += msize;
543 peer->pending_count--;
544 GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
545 GNUNET_free (pending);
546 }
547 if (peer->head != NULL)
548 peer->th
549 = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
550 pending->importance,
551 pending->timeout, &peer->id, msize,
552 &core_transmit_notify, peer);
553
554 return off;
555}
556
557
558/**
559 * Transmit all messages in the peer's message queue.
560 *
561 * @param peer message queue to process
562 */
563static void
564process_peer_queue (struct PeerInfo *peer)
565{
566 struct P2PPendingMessage *pending;
567
568 if (NULL != (pending = peer->head))
569 return;
570 if (NULL != peer->th)
571 return;
572 peer->th
573 = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
574 pending->importance,
575 pending->timeout, &peer->id,
576 ntohs (pending->msg->size),
577 &core_transmit_notify, peer);
578}
579
580
581/**
582 * To how many peers should we (on average) forward the request to
583 * obtain the desired target_replication count (on average).
584 *
585 * FIXME: double-check that this is fine
586 *
587 * @param hop_count number of hops the message has traversed
588 * @param target_replication the number of total paths desired
589 * @return Some number of peers to forward the message to
590 */
591static unsigned int
592get_forward_count (uint32_t hop_count,
593 uint32_t target_replication)
594{
595 uint32_t random_value;
596 uint32_t forward_count;
597 float target_value;
598
599 /* bound by system-wide maximum */
600 target_replication = GNUNET_MIN (16 /* FIXME: use named constant */,
601 target_replication);
602 if (hop_count > log_of_network_size_estimate * 2.0)
603 {
604 /* Once we have reached our ideal number of hops, only forward to 1 peer */
605 return 1;
606 }
607 target_value =
608 1 + (target_replication - 1.0) / (log_of_network_size_estimate +
609 ((float) (target_replication - 1.0) *
610 hop_count));
611 /* Set forward count to floor of target_value */
612 forward_count = (uint32_t) target_value;
613 /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */
614 target_value = target_value - forward_count;
615 random_value =
616 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX);
617 if (random_value < (target_value * UINT32_MAX))
618 forward_count++;
619 return forward_count;
620}
621
622
623/**
624 * Check whether my identity is closer than any known peers. If a
625 * non-null bloomfilter is given, check if this is the closest peer
626 * that hasn't already been routed to.
627 * FIXME: needed?
628 *
629 * @param key hash code to check closeness to
630 * @param bloom bloomfilter, exclude these entries from the decision
631 * @return GNUNET_YES if node location is closest,
632 * GNUNET_NO otherwise.
633 */
634static int
635am_closest_peer (const GNUNET_HashCode *key,
636 const struct GNUNET_CONTAINER_BloomFilter *bloom)
637{
638 int bits;
639 int other_bits;
640 int bucket_num;
641 int count;
642 struct PeerInfo *pos;
643 unsigned int my_distance;
644
645 if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
646 return GNUNET_YES;
647 bucket_num = find_current_bucket (key);
648 bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
649 my_distance = distance (&my_identity.hashPubKey, key);
650 pos = k_buckets[bucket_num].head;
651 count = 0;
652 while ((pos != NULL) && (count < bucket_size))
653 {
654 if ((bloom != NULL) &&
655 (GNUNET_YES ==
656 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
657 {
658 pos = pos->next;
659 continue; /* Skip already checked entries */
660 }
661 other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
662 if (other_bits > bits)
663 return GNUNET_NO;
664 if (other_bits == bits) /* We match the same number of bits */
665 return GNUNET_YES;
666 pos = pos->next;
667 }
668 /* No peers closer, we are the closest! */
669 return GNUNET_YES;
670}
671
672
673/**
674 * Select a peer from the routing table that would be a good routing
675 * destination for sending a message for "key". The resulting peer
676 * must not be in the set of blocked peers.<p>
677 *
678 * Note that we should not ALWAYS select the closest peer to the
679 * target, peers further away from the target should be chosen with
680 * exponentially declining probability.
681 *
682 * FIXME: double-check that this is fine
683 *
684 *
685 * @param key the key we are selecting a peer to route to
686 * @param bloom a bloomfilter containing entries this request has seen already
687 * @param hops how many hops has this message traversed thus far
688 * @return Peer to route to, or NULL on error
689 */
690static struct PeerInfo *
691select_peer (const GNUNET_HashCode *key,
692 const struct GNUNET_CONTAINER_BloomFilter *bloom,
693 uint32_t hops)
694{
695 unsigned int bc;
696 unsigned int count;
697 unsigned int selected;
698 struct PeerInfo *pos;
699 unsigned int distance;
700 unsigned int largest_distance;
701 struct PeerInfo *chosen;
702
703 if (hops >= log_of_network_size_estimate)
704 {
705 /* greedy selection (closest peer that is not in bloomfilter) */
706 largest_distance = 0;
707 chosen = NULL;
708 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
709 {
710 pos = k_buckets[bc].head;
711 count = 0;
712 while ((pos != NULL) && (count < bucket_size))
713 {
714 /* If we are doing strict Kademlia routing, then checking the bloomfilter is basically cheating! */
715 if (GNUNET_NO ==
716 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
717 {
718 distance = inverse_distance (key, &pos->id.hashPubKey);
719 if (distance > largest_distance)
720 {
721 chosen = pos;
722 largest_distance = distance;
723 }
724 }
725 count++;
726 pos = pos->next;
727 }
728 }
729 return chosen;
730 }
731
732 /* select "random" peer */
733 /* count number of peers that are available and not filtered */
734 count = 0;
735 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
736 {
737 pos = k_buckets[bc].head;
738 while ((pos != NULL) && (count < bucket_size))
739 {
740 if (GNUNET_YES ==
741 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
742 {
743 pos = pos->next;
744 continue; /* Ignore bloomfiltered peers */
745 }
746 count++;
747 pos = pos->next;
748 }
749 }
750 if (count == 0) /* No peers to select from! */
751 {
752 return NULL;
753 }
754 /* Now actually choose a peer */
755 selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
756 count = 0;
757 for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
758 {
759 pos = k_buckets[bc].head;
760 while ((pos != NULL) && (count < bucket_size))
761 {
762 if (GNUNET_YES ==
763 GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
764 {
765 pos = pos->next;
766 continue; /* Ignore bloomfiltered peers */
767 }
768 if (0 == selected--)
769 return pos;
770 pos = pos->next;
771 }
772 }
773 GNUNET_break (0);
774 return NULL;
775}
776
777
778/**
779 * Compute the set of peers that the given request should be
780 * forwarded to.
781 *
782 * @param key routing key
783 * @param bloom bloom filter excluding peers as targets, all selected
784 * peers will be added to the bloom filter
785 * @param hop_count number of hops the request has traversed so far
786 * @param target_replication desired number of replicas
787 * @param targets where to store an array of target peers (to be
788 * free'd by the caller)
789 * @return number of peers returned in 'targets'.
790 */
791static unsigned int
792get_target_peers (const GNUNET_HashCode *key,
793 struct GNUNET_CONTAINER_BloomFilter *bloom,
794 uint32_t hop_count,
795 uint32_t target_replication,
796 struct PeerInfo ***targets)
797{
798 unsigned int ret;
799 unsigned int off;
800 struct PeerInfo **rtargets;
801 struct PeerInfo *nxt;
802
803 ret = get_forward_count (hop_count, target_replication);
804 if (ret == 0)
805 {
806 *targets = NULL;
807 return 0;
808 }
809 rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
810 off = 0;
811 while (ret-- > 0)
812 {
813 nxt = select_peer (key, bloom, hop_count);
814 if (nxt == NULL)
815 break;
816 rtargets[off++] = nxt;
817 GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
818 }
819 if (0 == off)
820 {
821 GNUNET_free (rtargets);
822 *targets = NULL;
823 return 0;
824 }
825 *targets = rtargets;
826 return off;
827}
828
829
830/**
831 * Perform a PUT operation. Forwards the given request to other
832 * peers. Does not store the data locally. Does not give the
833 * data to local clients. May do nothing if this is the only
834 * peer in the network (or if we are the closest peer in the
835 * network).
472 * 836 *
473 * @param type type of the block 837 * @param type type of the block
474 * @param options routing options 838 * @param options routing options
475 * @param desired_replication_level desired replication count 839 * @param desired_replication_level desired replication count
476 * @param expiration_time when does the content expire 840 * @param expiration_time when does the content expire
841 * @param hop_count how many hops has this message traversed so far
842 * @param bf Bloom filter of peers this PUT has already traversed
477 * @param key key for the content 843 * @param key key for the content
478 * @param put_path_length number of entries in put_path 844 * @param put_path_length number of entries in put_path
479 * @param put_path peers this request has traversed so far (if tracked) 845 * @param put_path peers this request has traversed so far (if tracked)
@@ -481,27 +847,87 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
481 * @param data_size number of bytes in data 847 * @param data_size number of bytes in data
482 */ 848 */
483void 849void
484GST_NEIGHBOURS_handle_put (uint32_t type, 850GDS_NEIGHBOURS_handle_put (uint32_t type,
485 uint32_t options, 851 uint32_t options,
486 uint32_t desired_replication_level, 852 uint32_t desired_replication_level,
487 GNUNET_TIME_Absolute expiration_time, 853 GNUNET_TIME_Absolute expiration_time,
854 uint32_t hop_count,
855 struct GNUNET_CONTAINER_BloomFilter *bf,
488 const GNUNET_HashCode *key, 856 const GNUNET_HashCode *key,
489 unsigned int put_path_length, 857 unsigned int put_path_length,
490 struct GNUNET_PeerIdentity *put_path, 858 struct GNUNET_PeerIdentity *put_path,
491 const void *data, 859 const void *data,
492 size_t data_size) 860 size_t data_size)
493{ 861{
494 // FIXME 862 unsigned int target_count;
863 unsigned int i;
864 struct PeerInfo **targets;
865 struct PeerInfo *target;
866 struct P2PPendingMessage *pending;
867 size_t msize;
868 struct PeerPutMessage *ppm;
869 struct GNUNET_PeerIdentity *pp;
870
871 target_count = get_target_peers (key, bf, hop_count,
872 desired_replication_level,
873 &targets);
874 if (0 == target_count)
875 return;
876 msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + sizeof (struct PeerPutMessage);
877 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
878 {
879 put_path_length = 0;
880 msize = data_size + sizeof (struct PeerPutMessage);
881 }
882 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
883 {
884 GNUNET_break (0);
885 return;
886 }
887 for (i=0;i<target_count;i++)
888 {
889 target = targets[i];
890 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
891 pending->importance = 0; /* FIXME */
892 pending->timeout = expiration_time;
893 ppm = (struct PeerPutMessage*) &pending[1];
894 pending->msg = &ppm->header;
895 ppm->header.size = htons (msize);
896 ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
897 ppm->options = htonl (options);
898 ppm->type = htonl (type);
899 ppm->hop_count = htonl (hop_count + 1);
900 ppm->desired_replication_level = htonl (desired_replication_level);
901 ppm->put_path_length = htonl (put_path_length);
902 ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
903 GNUNET_assert (GNUNET_OK ==
904 GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
905 ppm->bloomfilter,
906 DHT_BLOOM_SIZE));
907 ppm->key = *key;
908 pp = (const struct GNUNET_PeerIdentity*) &ppm[1];
909 memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length);
910 memcpy (&pp[put_path_length], data, data_size);
911 GNUNET_CONTAINER_DLL_insert (target->head,
912 target->tail,
913 pending);
914 target->pending_count++;
915 process_peer_queue (target);
916 }
917 GNUNET_free (targets);
495} 918}
496 919
497 920
498/** 921/**
499 * Perform a GET operation. // FIXME: document if this is only 922 * Perform a GET operation. Forwards the given request to other
500 * routing or also state-tracking and/or even local lookup! 923 * peers. Does not lookup the key locally. May do nothing if this is
924 * the only peer in the network (or if we are the closest peer in the
925 * network).
501 * 926 *
502 * @param type type of the block 927 * @param type type of the block
503 * @param options routing options 928 * @param options routing options
504 * @param desired_replication_level desired replication count 929 * @param desired_replication_level desired replication count
930 * @param hop_count how many hops did this request traverse so far?
505 * @param key key for the content 931 * @param key key for the content
506 * @param xquery extended query 932 * @param xquery extended query
507 * @param xquery_size number of bytes in xquery 933 * @param xquery_size number of bytes in xquery
@@ -510,9 +936,10 @@ GST_NEIGHBOURS_handle_put (uint32_t type,
510 * @param peer_bf filter for peers not to select (again) 936 * @param peer_bf filter for peers not to select (again)
511 */ 937 */
512void 938void
513GST_NEIGHBOURS_handle_get (uint32_t type, 939GDS_NEIGHBOURS_handle_get (uint32_t type,
514 uint32_t options, 940 uint32_t options,
515 uint32_t desired_replication_level, 941 uint32_t desired_replication_level,
942 uint32_t hop_count,
516 const GNUNET_HashCode *key, 943 const GNUNET_HashCode *key,
517 const void *xquery, 944 const void *xquery,
518 size_t xquery_size, 945 size_t xquery_size,
@@ -520,13 +947,69 @@ GST_NEIGHBOURS_handle_get (uint32_t type,
520 uint32_t reply_bf_mutator, 947 uint32_t reply_bf_mutator,
521 const struct GNUNET_CONTAINER_BloomFilter *peer_bf) 948 const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
522{ 949{
523 // FIXME 950 unsigned int target_count;
951 unsigned int i;
952 struct PeerInfo **targets;
953 struct PeerInfo *target;
954 struct P2PPendingMessage *pending;
955 size_t msize;
956 struct PeerGetMessage *pgm;
957 char *xq;
958 size_t reply_bf_size;
959
960 target_count = get_target_peers (key, peer_bf, hop_count,
961 desired_replication_level,
962 &targets);
963 if (0 == target_count)
964 return;
965 reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
966 msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
967 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
968 {
969 GNUNET_break (0);
970 return;
971 }
972 for (i=0;i<target_count;i++)
973 {
974 target = targets[i];
975 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
976 pending->importance = 0; /* FIXME */
977 pending->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_HOURS); /* FIXME */
978 pgm = (struct PeerGetMessage*) &pending[1];
979 pending->msg = &pgm->header;
980 pgm->header.size = htons (msize);
981 pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
982 pgm->options = htonl (options);
983 pgm->type = htonl (type);
984 pgm->hop_count = htonl (hop_count + 1);
985 pgm->desired_replication_level = htonl (desired_replication_level);
986 pgm->xquery_size = htonl (xquery_size);
987 pgm->bf_mutator = reply_bf_mutator;
988 GNUNET_assert (GNUNET_OK ==
989 GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
990 pgm->bloomfilter,
991 DHT_BLOOM_SIZE));
992 pgm->key = *key;
993 xq = (const struct GNUNET_PeerIdentity*) &ppm[1];
994 memcpy (xq, xquery, xquery_size);
995 GNUNET_assert (GNUNET_OK ==
996 GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
997 &xq[xquery_size],
998 reply_bf_size));
999 GNUNET_CONTAINER_DLL_insert (target->head,
1000 target->tail,
1001 pending);
1002 target->pending_count++;
1003 process_peer_queue (target);
1004 }
1005 GNUNET_free (targets);
524} 1006}
525 1007
526 1008
527/** 1009/**
528 * Handle a reply (route to origin). FIXME: should this be here? 1010 * Handle a reply (route to origin). Only forwards the reply back to
529 * (reply-routing table might be better done elsewhere). 1011 * other peers waiting for it. Does not do local caching or
1012 * forwarding to local clients.
530 * 1013 *
531 * @param type type of the block 1014 * @param type type of the block
532 * @param options routing options 1015 * @param options routing options
@@ -540,7 +1023,7 @@ GST_NEIGHBOURS_handle_get (uint32_t type,
540 * @param data_size number of bytes in data 1023 * @param data_size number of bytes in data
541 */ 1024 */
542void 1025void
543GST_NEIGHBOURS_handle_reply (uint32_t type, 1026GDS_NEIGHBOURS_handle_reply (uint32_t type,
544 uint32_t options, 1027 uint32_t options,
545 GNUNET_TIME_Absolute expiration_time, 1028 GNUNET_TIME_Absolute expiration_time,
546 const GNUNET_HashCode *key, 1029 const GNUNET_HashCode *key,
@@ -556,10 +1039,27 @@ GST_NEIGHBOURS_handle_reply (uint32_t type,
556 1039
557 1040
558/** 1041/**
1042 * Closure for 'add_known_to_bloom'.
1043 */
1044struct BloomConstructorContext
1045{
1046 /**
1047 * Bloom filter under construction.
1048 */
1049 struct GNUNET_CONTAINER_BloomFilter *bloom;
1050
1051 /**
1052 * Mutator to use.
1053 */
1054 uint32_t bf_mutator;
1055};
1056
1057
1058/**
559 * Add each of the peers we already know to the bloom filter of 1059 * Add each of the peers we already know to the bloom filter of
560 * the request so that we don't get duplicate HELLOs. 1060 * the request so that we don't get duplicate HELLOs.
561 * 1061 *
562 * @param cls the 'struct GNUNET_CONTAINER_BloomFilter' we're building 1062 * @param cls the 'struct BloomConstructorContext'.
563 * @param key peer identity to add to the bloom filter 1063 * @param key peer identity to add to the bloom filter
564 * @param value value the peer information (unused) 1064 * @param value value the peer information (unused)
565 * @return GNUNET_YES (we should continue to iterate) 1065 * @return GNUNET_YES (we should continue to iterate)
@@ -567,9 +1067,11 @@ GST_NEIGHBOURS_handle_reply (uint32_t type,
567static int 1067static int
568add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value) 1068add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
569{ 1069{
570 struct GNUNET_CONTAINER_BloomFilter *bloom = cls; 1070 struct BloomConstructorContext *ctx = cls;
1071 GNUNET_HashCode mh;
571 1072
572 GNUNET_CONTAINER_bloomfilter_add (bloom, key); 1073 GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
1074 GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
573 return GNUNET_YES; 1075 return GNUNET_YES;
574} 1076}
575 1077
@@ -589,7 +1091,7 @@ send_find_peer_message (void *cls,
589 struct GNUNET_DHT_FindPeerMessage *find_peer_msg; 1091 struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
590 struct DHT_MessageContext msg_ctx; 1092 struct DHT_MessageContext msg_ctx;
591 struct GNUNET_TIME_Relative next_send_time; 1093 struct GNUNET_TIME_Relative next_send_time;
592 struct GNUNET_CONTAINER_BloomFilter *temp_bloom; 1094 struct BloomConstructorContext bcc;
593 1095
594 find_peer_task = GNUNET_SCHEDULER_NO_TASK; 1096 find_peer_task = GNUNET_SCHEDULER_NO_TASK;
595 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) 1097 if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
@@ -602,38 +1104,21 @@ send_find_peer_message (void *cls,
602 newly_found_peers = 0; 1104 newly_found_peers = 0;
603 return; 1105 return;
604 } 1106 }
605 1107 bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
606 // FIXME: build message... 1108 bcc.bloom =
607 find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage)); 1109 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
608 find_peer_msg->header.size = 1110 GNUNET_CONTAINER_multihashmap_iterate (all_known_peers,
609 htons (sizeof (struct GNUNET_DHT_FindPeerMessage)); 1111 &add_known_to_bloom,
610 find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER); 1112 &bcc);
611 temp_bloom = 1113 // FIXME: pass priority!?
612 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K); 1114 GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
613 GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom, 1115 GNUNET_DHT_RO_FIND_PEER,
614 temp_bloom); 1116 16 /* FIXME: replication level? */,
615 GNUNET_assert (GNUNET_OK == 1117 0,
616 GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom, 1118 &my_identity.hashPubKey,
617 find_peer_msg-> 1119 NULL, 0,
618 bloomfilter, 1120 bcc.bloom, bcc.bf_mutator, NULL);
619 DHT_BLOOM_SIZE)); 1121 GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
620 GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
621
622 memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
623 memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
624 msg_ctx.unique_id =
625 GNUNET_ntohll (GNUNET_CRYPTO_random_u64
626 (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
627 msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
628 msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
629 msg_ctx.network_size = log_of_network_size_estimate;
630 msg_ctx.peer = my_identity;
631 msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
632 msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
633 // FIXME: transmit message...
634 demultiplex_message (&find_peer_msg->header, &msg_ctx);
635 GNUNET_free (find_peer_msg);
636
637 /* schedule next round */ 1122 /* schedule next round */
638 newly_found_peers = 0; 1123 newly_found_peers = 0;
639 next_send_time.rel_value = 1124 next_send_time.rel_value =
@@ -674,9 +1159,10 @@ core_init (void *cls, struct GNUNET_CORE_Handle *server,
674 1159
675 1160
676/** 1161/**
677 * Core handler for p2p get requests. 1162 * Core handler for p2p put requests.
678 * 1163 *
679 * @param cls closure 1164 * @param cls closure
1165 * @param peer sender of the request
680 * @param message message 1166 * @param message message
681 * @param peer peer identity this notification is about 1167 * @param peer peer identity this notification is about
682 * @param atsi performance data 1168 * @param atsi performance data
@@ -684,84 +1170,105 @@ core_init (void *cls, struct GNUNET_CORE_Handle *server,
684 * GNUNET_SYSERR to close it (signal serious error) 1170 * GNUNET_SYSERR to close it (signal serious error)
685 */ 1171 */
686static int 1172static int
687handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer, 1173handle_dht_p2p_put (void *cls,
1174 const struct GNUNET_PeerIdentity *peer,
688 const struct GNUNET_MessageHeader *message, 1175 const struct GNUNET_MessageHeader *message,
689 const struct GNUNET_TRANSPORT_ATS_Information 1176 const struct GNUNET_TRANSPORT_ATS_Information
690 *atsi) 1177 *atsi)
691{ 1178{
692 struct GNUNET_DHT_P2PRouteMessage *incoming = 1179 const struct PeerPutMessage *put;
693 (struct GNUNET_DHT_P2PRouteMessage *) message; 1180 const struct GNUNET_PeerIdentity *put_path;
694 struct GNUNET_MessageHeader *enc_msg = 1181 const void *payload;
695 (struct GNUNET_MessageHeader *) &incoming[1]; 1182 uint32_t putlen;
696 struct DHT_MessageContext *msg_ctx; 1183 uint16_t msize;
697 char *route_path; 1184 size_t payload_size;
698 int path_size; 1185 struct GNUNET_CONTAINER_BloomFilter *bf;
699 1186 GNUNET_HashCode test_key;
700 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) 1187
1188 msize = ntohs (message->size);
1189 if (msize < sizeof (struct PeerPutMessage))
701 { 1190 {
702 GNUNET_break_op (0); 1191 GNUNET_break_op (0);
703 return GNUNET_YES; 1192 return GNUNET_YES;
704 } 1193 }
705 1194 put = (const struct PeerPutMessage*) message;
706 if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value) 1195 putlen = ntohl (put->put_path_length);
707 { 1196 if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) ||
708 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1197 (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) )
709 "Sending of previous replies took too long, backing off!\n"); 1198 {
710 increment_stats ("# route requests dropped due to high load"); 1199 GNUNET_break_op (0);
711 decrease_max_send_delay (get_max_send_delay ()); 1200 return GNUNET_YES;
712 return GNUNET_YES; 1201 }
713 } 1202 put_path = (const struct GNUNET_PeerIdentity*) &put[1];
714 msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext)); 1203 payload = &put_path[putlen];
715 msg_ctx->bloom = 1204 payload_size = msize - (sizeof (struct PeerPutMessage) +
716 GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE, 1205 putlen * sizeof (struct GNUNET_PeerIdentity));
717 DHT_BLOOM_K); 1206 switch (GNUNET_BLOCK_get_key (block_context,
718 GNUNET_assert (msg_ctx->bloom != NULL); 1207 ntohl (put->type),
719 msg_ctx->hop_count = ntohl (incoming->hop_count); 1208 payload, payload_size,
720 memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode)); 1209 &test_key))
721 msg_ctx->replication = ntohl (incoming->desired_replication_level);
722 msg_ctx->msg_options = ntohl (incoming->options);
723 if (GNUNET_DHT_RO_RECORD_ROUTE ==
724 (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
725 { 1210 {
726 path_size = 1211 case GNUNET_YES:
727 ntohl (incoming->outgoing_path_length) * 1212 if (0 != memcmp (&test_key, key, sizeof (GNUNET_HashCode)))
728 sizeof (struct GNUNET_PeerIdentity);
729 if (ntohs (message->size) !=
730 (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
731 path_size))
732 { 1213 {
733 GNUNET_break_op (0); 1214 GNUNET_break_op (0);
734 GNUNET_free (msg_ctx);
735 return GNUNET_YES; 1215 return GNUNET_YES;
736 } 1216 }
737 route_path = (char *) &incoming[1]; 1217 break;
738 route_path = route_path + ntohs (enc_msg->size); 1218 case GNUNET_NO:
739 msg_ctx->path_history = 1219 GNUNET_break_op (0);
740 GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size); 1220 return GNUNET_YES;
741 memcpy (msg_ctx->path_history, route_path, path_size); 1221 case GNUNET_SYSERR:
742 memcpy (&msg_ctx->path_history[path_size], &my_identity, 1222 /* cannot verify, good luck */
743 sizeof (struct GNUNET_PeerIdentity)); 1223 break;
744 msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
745 } 1224 }
746 msg_ctx->network_size = ntohl (incoming->network_size); 1225 bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
747 msg_ctx->peer = *peer; 1226 DHT_BLOOM_SIZE,
748 msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE; 1227 DHT_BLOOM_K);
749 msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
750 demultiplex_message (enc_msg, msg_ctx);
751 if (msg_ctx->bloom != NULL)
752 { 1228 {
753 GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom); 1229 struct GNUNET_PeerIdentity pp[putlen+1];
754 msg_ctx->bloom = NULL; 1230
1231 /* extend 'put path' by sender */
1232 memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
1233 pp[putlen] = *sender;
1234
1235 /* give to local clients */
1236 GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1237 &put->key,
1238 0, NULL,
1239 putlen + 1,
1240 pp,
1241 ntohl (put->type),
1242 payload_size,
1243 payload);
1244 /* store locally */
1245 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
1246 &put->key,
1247 putlen + 1, pp,
1248 ntohl (put->type),
1249 payload_size,
1250 payload);
1251 /* route to other peers */
1252 GDS_NEIGHBOURS_handle_put (ntohl (put->type),
1253 ntohl (put->options),
1254 ntohl (put->desired_replication_level),
1255 GNUNET_TIME_absolute_ntoh (put->expiration_time),
1256 ntohl (put->hop_count),
1257 bf,
1258 putlen + 1, pp,
1259 payload,
1260 payload_size);
755 } 1261 }
756 GNUNET_free (msg_ctx); 1262 GNUNET_CONTAINER_bloomfilter_free (bf);
757 return GNUNET_YES; 1263 return GNUNET_YES;
758} 1264}
759 1265
760 1266
761/** 1267/**
762 * Core handler for p2p put requests. 1268 * Core handler for p2p get requests.
763 * 1269 *
764 * @param cls closure 1270 * @param cls closure
1271 * @param peer sender of the request
765 * @param message message 1272 * @param message message
766 * @param peer peer identity this notification is about 1273 * @param peer peer identity this notification is about
767 * @param atsi performance data 1274 * @param atsi performance data
@@ -769,11 +1276,18 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
769 * GNUNET_SYSERR to close it (signal serious error) 1276 * GNUNET_SYSERR to close it (signal serious error)
770 */ 1277 */
771static int 1278static int
772handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer, 1279handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
773 const struct GNUNET_MessageHeader *message, 1280 const struct GNUNET_MessageHeader *message,
774 const struct GNUNET_TRANSPORT_ATS_Information 1281 const struct GNUNET_TRANSPORT_ATS_Information
775 *atsi) 1282 *atsi)
776{ 1283{
1284 // 1) validate GET
1285 // 2) store in routing table
1286 // 3) check options (i.e. FIND PEER)
1287 // 4) local lookup (=> need eval result!)
1288 // 5) p2p forwarding
1289
1290
777 struct GNUNET_DHT_P2PRouteMessage *incoming = 1291 struct GNUNET_DHT_P2PRouteMessage *incoming =
778 (struct GNUNET_DHT_P2PRouteMessage *) message; 1292 (struct GNUNET_DHT_P2PRouteMessage *) message;
779 struct GNUNET_MessageHeader *enc_msg = 1293 struct GNUNET_MessageHeader *enc_msg =
@@ -782,6 +1296,7 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
782 char *route_path; 1296 char *route_path;
783 int path_size; 1297 int path_size;
784 1298
1299 // FIXME
785 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) 1300 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
786 { 1301 {
787 GNUNET_break_op (0); 1302 GNUNET_break_op (0);
@@ -844,7 +1359,7 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
844 1359
845 1360
846/** 1361/**
847 * Core handler for p2p route results. 1362 * Core handler for p2p result messages.
848 * 1363 *
849 * @param cls closure 1364 * @param cls closure
850 * @param message message 1365 * @param message message
@@ -858,12 +1373,17 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
858 const struct GNUNET_TRANSPORT_ATS_Information 1373 const struct GNUNET_TRANSPORT_ATS_Information
859 *atsi) 1374 *atsi)
860{ 1375{
1376 // 1) validate result format
1377 // 2) append 'peer' to put path
1378 // 3) forward to local clients
1379 // 4) p2p routing
861 const struct GNUNET_DHT_P2PRouteResultMessage *incoming = 1380 const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
862 (const struct GNUNET_DHT_P2PRouteResultMessage *) message; 1381 (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
863 struct GNUNET_MessageHeader *enc_msg = 1382 struct GNUNET_MessageHeader *enc_msg =
864 (struct GNUNET_MessageHeader *) &incoming[1]; 1383 (struct GNUNET_MessageHeader *) &incoming[1];
865 struct DHT_MessageContext msg_ctx; 1384 struct DHT_MessageContext msg_ctx;
866 1385
1386 // FIXME
867 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1) 1387 if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
868 { 1388 {
869 GNUNET_break_op (0); 1389 GNUNET_break_op (0);
@@ -903,7 +1423,7 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
903 * Initialize neighbours subsystem. 1423 * Initialize neighbours subsystem.
904 */ 1424 */
905int 1425int
906GST_NEIGHBOURS_init () 1426GDS_NEIGHBOURS_init ()
907{ 1427{
908 static struct GNUNET_CORE_MessageHandler core_handlers[] = { 1428 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
909 {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0}, 1429 {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
@@ -918,18 +1438,16 @@ GST_NEIGHBOURS_init ()
918 GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size", 1438 GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
919 &temp_config_num)) 1439 &temp_config_num))
920 bucket_size = (unsigned int) temp_config_num; 1440 bucket_size = (unsigned int) temp_config_num;
921 coreAPI = GNUNET_CORE_connect (GDS_cfg, /* Main configuration */ 1441 coreAPI = GNUNET_CORE_connect (GDS_cfg,
922 DEFAULT_CORE_QUEUE_SIZE, /* queue size */ 1442 DEFAULT_CORE_QUEUE_SIZE,
923 NULL, /* Closure passed to DHT functions */ 1443 NULL,
924 &core_init, /* Call core_init once connected */ 1444 &core_init,
925 &handle_core_connect, /* Handle connects */ 1445 &handle_core_connect,
926 &handle_core_disconnect, /* remove peers on disconnects */ 1446 &handle_core_disconnect,
927 NULL, /* Do we care about "status" updates? */ 1447 NULL, /* Do we care about "status" updates? */
928 NULL, /* Don't want notified about all incoming messages */ 1448 NULL, GNUNET_NO,
929 GNUNET_NO, /* For header only inbound notification */ 1449 NULL, GNUNET_NO,
930 NULL, /* Don't want notified about all outbound messages */ 1450 core_handlers);
931 GNUNET_NO, /* For header only outbound notification */
932 core_handlers); /* Register these handlers */
933 if (coreAPI == NULL) 1451 if (coreAPI == NULL)
934 return GNUNET_SYSERR; 1452 return GNUNET_SYSERR;
935 all_known_peers = GNUNET_CONTAINER_multihashmap_create (256); 1453 all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
@@ -941,7 +1459,7 @@ GST_NEIGHBOURS_init ()
941 * Shutdown neighbours subsystem. 1459 * Shutdown neighbours subsystem.
942 */ 1460 */
943void 1461void
944GST_NEIGHBOURS_done () 1462GDS_NEIGHBOURS_done ()
945{ 1463{
946 GNUNET_assert (coreAPI != NULL); 1464 GNUNET_assert (coreAPI != NULL);
947 GNUNET_CORE_disconnect (coreAPI); 1465 GNUNET_CORE_disconnect (coreAPI);
diff --git a/src/dht/gnunet-service-dht_neighbours.h b/src/dht/gnunet-service-dht_neighbours.h
index 1f2ae08e6..2c20df2c4 100644
--- a/src/dht/gnunet-service-dht_neighbours.h
+++ b/src/dht/gnunet-service-dht_neighbours.h
@@ -29,12 +29,18 @@
29 29
30 30
31/** 31/**
32 * Perform a PUT operation. 32 * Perform a PUT operation. Forwards the given request to other
33 * peers. Does not store the data locally. Does not give the
34 * data to local clients. May do nothing if this is the only
35 * peer in the network (or if we are the closest peer in the
36 * network).
33 * 37 *
34 * @param type type of the block 38 * @param type type of the block
35 * @param options routing options 39 * @param options routing options
36 * @param desired_replication_level desired replication count 40 * @param desired_replication_level desired replication level
37 * @param expiration_time when does the content expire 41 * @param expiration_time when does the content expire
42 * @param hop_count how many hops has this message traversed so far
43 * @param bf Bloom filter of peers this PUT has already traversed
38 * @param key key for the content 44 * @param key key for the content
39 * @param put_path_length number of entries in put_path 45 * @param put_path_length number of entries in put_path
40 * @param put_path peers this request has traversed so far (if tracked) 46 * @param put_path peers this request has traversed so far (if tracked)
@@ -42,10 +48,12 @@
42 * @param data_size number of bytes in data 48 * @param data_size number of bytes in data
43 */ 49 */
44void 50void
45GST_NEIGHBOURS_handle_put (uint32_t type, 51GDS_NEIGHBOURS_handle_put (uint32_t type,
46 uint32_t options, 52 uint32_t options,
47 uint32_t desired_replication_level, 53 uint32_t desired_replication_level,
48 GNUNET_TIME_Absolute expiration_time, 54 GNUNET_TIME_Absolute expiration_time,
55 uint32_t hop_count,
56 struct GNUNET_CONTAINER_BloomFilter *bf,
49 const GNUNET_HashCode *key, 57 const GNUNET_HashCode *key,
50 unsigned int put_path_length, 58 unsigned int put_path_length,
51 struct GNUNET_PeerIdentity *put_path, 59 struct GNUNET_PeerIdentity *put_path,
@@ -54,11 +62,15 @@ GST_NEIGHBOURS_handle_put (uint32_t type,
54 62
55 63
56/** 64/**
57 * Perform a GET operation. 65 * Perform a GET operation. Forwards the given request to other
66 * peers. Does not lookup the key locally. May do nothing if this is
67 * the only peer in the network (or if we are the closest peer in the
68 * network).
58 * 69 *
59 * @param type type of the block 70 * @param type type of the block
60 * @param options routing options 71 * @param options routing options
61 * @param desired_replication_level desired replication count 72 * @param desired_replication_level desired replication count
73 * @param hop_count how many hops did this request traverse so far?
62 * @param key key for the content 74 * @param key key for the content
63 * @param xquery extended query 75 * @param xquery extended query
64 * @param xquery_size number of bytes in xquery 76 * @param xquery_size number of bytes in xquery
@@ -67,9 +79,10 @@ GST_NEIGHBOURS_handle_put (uint32_t type,
67 * @param peer_bf filter for peers not to select (again) 79 * @param peer_bf filter for peers not to select (again)
68 */ 80 */
69void 81void
70GST_NEIGHBOURS_handle_get (uint32_t type, 82GDS_NEIGHBOURS_handle_get (uint32_t type,
71 uint32_t options, 83 uint32_t options,
72 uint32_t desired_replication_level, 84 uint32_t desired_replication_level,
85 uint32_t hop_count,
73 const GNUNET_HashCode *key, 86 const GNUNET_HashCode *key,
74 const void *xquery, 87 const void *xquery,
75 size_t xquery_size, 88 size_t xquery_size,
@@ -79,10 +92,11 @@ GST_NEIGHBOURS_handle_get (uint32_t type,
79 92
80 93
81/** 94/**
82 * Handle a reply (route to origin). 95 * Handle a reply (route to origin). Only forwards the reply back to
96 * other peers waiting for it. Does not do local caching or
97 * forwarding to local clients.
83 * 98 *
84 * @param type type of the block 99 * @param type type of the block
85 * @param options routing options
86 * @param expiration_time when does the content expire 100 * @param expiration_time when does the content expire
87 * @param key key for the content 101 * @param key key for the content
88 * @param put_path_length number of entries in put_path 102 * @param put_path_length number of entries in put_path
@@ -93,8 +107,7 @@ GST_NEIGHBOURS_handle_get (uint32_t type,
93 * @param data_size number of bytes in data 107 * @param data_size number of bytes in data
94 */ 108 */
95void 109void
96GST_NEIGHBOURS_handle_reply (uint32_t type, 110GDS_NEIGHBOURS_handle_reply (uint32_t type,
97 uint32_t options,
98 GNUNET_TIME_Absolute expiration_time, 111 GNUNET_TIME_Absolute expiration_time,
99 const GNUNET_HashCode *key, 112 const GNUNET_HashCode *key,
100 unsigned int put_path_length, 113 unsigned int put_path_length,
@@ -109,13 +122,13 @@ GST_NEIGHBOURS_handle_reply (uint32_t type,
109 * Initialize neighbours subsystem. 122 * Initialize neighbours subsystem.
110 */ 123 */
111void 124void
112GST_NEIGHBOURS_init (void); 125GDS_NEIGHBOURS_init (void);
113 126
114/** 127/**
115 * Shutdown neighbours subsystem. 128 * Shutdown neighbours subsystem.
116 */ 129 */
117void 130void
118GST_NEIGHBOURS_done (void); 131GDS_NEIGHBOURS_done (void);
119 132
120 133
121#endif 134#endif