aboutsummaryrefslogtreecommitdiff
path: root/src/dht
diff options
context:
space:
mode:
authorSupriti Singh <supritisingh08@gmail.com>2014-03-28 10:24:39 +0000
committerSupriti Singh <supritisingh08@gmail.com>2014-03-28 10:24:39 +0000
commit8b570f0b07ba98e61a2523a4a3be768ba2ad22b5 (patch)
treed74b63d4279a327e6c7e9398ebc4c264e6755a56 /src/dht
parentf21600b4f3adcf4ef00fe4713fd81a8215972121 (diff)
downloadgnunet-8b570f0b07ba98e61a2523a4a3be768ba2ad22b5.tar.gz
gnunet-8b570f0b07ba98e61a2523a4a3be768ba2ad22b5.zip
Framework for put/get/monitor
Diffstat (limited to 'src/dht')
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c4
-rw-r--r--src/dht/gnunet-service-xdht_clients.c1435
-rw-r--r--src/dht/gnunet-service-xdht_clients.h9
-rw-r--r--src/dht/gnunet-service-xdht_datacache.c4
-rw-r--r--src/dht/gnunet-service-xdht_neighbours.c774
-rw-r--r--src/dht/gnunet-service-xdht_neighbours.h77
-rw-r--r--src/dht/gnunet-service-xdht_routing.c12
7 files changed, 834 insertions, 1481 deletions
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
index 0cf46f562..cee3e7765 100644
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -1601,8 +1601,7 @@ core_init (void *cls,
1601 * #GNUNET_SYSERR to close it (signal serious error) 1601 * #GNUNET_SYSERR to close it (signal serious error)
1602 */ 1602 */
1603static int 1603static int
1604handle_dht_p2p_put (void *cls, 1604handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
1605 const struct GNUNET_PeerIdentity *peer,
1606 const struct GNUNET_MessageHeader *message) 1605 const struct GNUNET_MessageHeader *message)
1607{ 1606{
1608 const struct PeerPutMessage *put; 1607 const struct PeerPutMessage *put;
@@ -1645,6 +1644,7 @@ handle_dht_p2p_put (void *cls,
1645 payload_size = 1644 payload_size =
1646 msize - (sizeof (struct PeerPutMessage) + 1645 msize - (sizeof (struct PeerPutMessage) +
1647 putlen * sizeof (struct GNUNET_PeerIdentity)); 1646 putlen * sizeof (struct GNUNET_PeerIdentity));
1647
1648 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n", 1648 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n",
1649 GNUNET_h2s (&put->key), GNUNET_i2s (peer)); 1649 GNUNET_h2s (&put->key), GNUNET_i2s (peer));
1650 GNUNET_CRYPTO_hash (peer, sizeof (struct GNUNET_PeerIdentity), &phash); 1650 GNUNET_CRYPTO_hash (peer, sizeof (struct GNUNET_PeerIdentity), &phash);
diff --git a/src/dht/gnunet-service-xdht_clients.c b/src/dht/gnunet-service-xdht_clients.c
index f7c4df759..050d8a99e 100644
--- a/src/dht/gnunet-service-xdht_clients.c
+++ b/src/dht/gnunet-service-xdht_clients.c
@@ -19,10 +19,9 @@
19*/ 19*/
20 20
21/** 21/**
22 * @file dht/gnunet-service-xdht_clients.c 22 * @file dht/gnunet-service-dht_clients.c
23 * @brief GNUnet DHT service's client management code 23 * @brief GNUnet DHT service's client management code
24 * @author Christian Grothoff 24 * @author Supriti Singh
25 * @author Nathan Evans
26 */ 25 */
27 26
28#include "platform.h" 27#include "platform.h"
@@ -35,14 +34,6 @@
35#include "gnunet-service-xdht_neighbours.h" 34#include "gnunet-service-xdht_neighbours.h"
36#include "dht.h" 35#include "dht.h"
37 36
38
39/**
40 * Should routing details be logged to stderr (for debugging)?
41 */
42#define LOG_TRAFFIC(kind,...) GNUNET_log_from (kind, "dht-traffic",__VA_ARGS__)
43
44#define LOG(kind,...) GNUNET_log_from (kind, "dht-clients",__VA_ARGS__)
45
46/** 37/**
47 * Linked list of messages to send to clients. 38 * Linked list of messages to send to clients.
48 */ 39 */
@@ -67,7 +58,6 @@ struct PendingMessage
67 58
68}; 59};
69 60
70
71/** 61/**
72 * Struct containing information about a client, 62 * Struct containing information about a client,
73 * handle to connect to it, and any pending messages 63 * handle to connect to it, and any pending messages
@@ -110,129 +100,6 @@ struct ClientList
110 100
111 101
112/** 102/**
113 * Entry in the DHT routing table for a client's GET request.
114 */
115struct ClientQueryRecord
116{
117
118 /**
119 * The key this request was about
120 */
121 struct GNUNET_HashCode key;
122
123 /**
124 * Client responsible for the request.
125 */
126 struct ClientList *client;
127
128 /**
129 * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
130 */
131 const void *xquery;
132
133 /**
134 * Replies we have already seen for this request.
135 */
136 struct GNUNET_HashCode *seen_replies;
137
138 /**
139 * Pointer to this nodes heap location in the retry-heap (for fast removal)
140 */
141 struct GNUNET_CONTAINER_HeapNode *hnode;
142
143 /**
144 * What's the delay between re-try operations that we currently use for this
145 * request?
146 */
147 struct GNUNET_TIME_Relative retry_frequency;
148
149 /**
150 * What's the next time we should re-try this request?
151 */
152 struct GNUNET_TIME_Absolute retry_time;
153
154 /**
155 * The unique identifier of this request
156 */
157 uint64_t unique_id;
158
159 /**
160 * Number of bytes in xquery.
161 */
162 size_t xquery_size;
163
164 /**
165 * Number of entries in 'seen_replies'.
166 */
167 unsigned int seen_replies_count;
168
169 /**
170 * Desired replication level
171 */
172 uint32_t replication;
173
174 /**
175 * Any message options for this request
176 */
177 uint32_t msg_options;
178
179 /**
180 * The type for the data for the GET request.
181 */
182 enum GNUNET_BLOCK_Type type;
183
184};
185
186
187/**
188 * Struct containing paremeters of monitoring requests.
189 */
190struct ClientMonitorRecord
191{
192
193 /**
194 * Next element in DLL.
195 */
196 struct ClientMonitorRecord *next;
197
198 /**
199 * Previous element in DLL.
200 */
201 struct ClientMonitorRecord *prev;
202
203 /**
204 * Type of blocks that are of interest
205 */
206 enum GNUNET_BLOCK_Type type;
207
208 /**
209 * Key of data of interest, NULL for all.
210 */
211 struct GNUNET_HashCode *key;
212
213 /**
214 * Flag whether to notify about GET messages.
215 */
216 int16_t get;
217
218 /**
219 * Flag whether to notify about GET_REPONSE messages.
220 */
221 int16_t get_resp;
222
223 /**
224 * Flag whether to notify about PUT messages.
225 */
226 uint16_t put;
227
228 /**
229 * Client to notify of these requests.
230 */
231 struct ClientList *client;
232};
233
234
235/**
236 * List of active clients. 103 * List of active clients.
237 */ 104 */
238static struct ClientList *client_head; 105static struct ClientList *client_head;
@@ -243,29 +110,58 @@ static struct ClientList *client_head;
243static struct ClientList *client_tail; 110static struct ClientList *client_tail;
244 111
245/** 112/**
246 * List of active monitoring requests. 113 * Task run to check for messages that need to be sent to a client.
247 */ 114 *
248static struct ClientMonitorRecord *monitor_head; 115 * @param client a ClientList, containing the client and any messages to be sent to it
249
250/**
251 * List of active monitoring requests.
252 */
253static struct ClientMonitorRecord *monitor_tail;
254
255/**
256 * Hashmap for fast key based lookup, maps keys to 'struct ClientQueryRecord' entries.
257 */ 116 */
258static struct GNUNET_CONTAINER_MultiHashMap *forward_map; 117static void process_pending_messages (struct ClientList *client);
259
260/** 118/**
261 * Heap with all of our client's request, sorted by retry time (earliest on top). 119 * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready
120 * request. A ClientList is passed as closure, take the head of the list
121 * and copy it into buf, which has the result of sending the message to the
122 * client.
123 *
124 * @param cls closure to this call
125 * @param size maximum number of bytes available to send
126 * @param buf where to copy the actual message to
127 *
128 * @return the number of bytes actually copied, 0 indicates failure
262 */ 129 */
263static struct GNUNET_CONTAINER_Heap *retry_heap; 130static size_t
131send_reply_to_client (void *cls, size_t size, void *buf)
132{
133 struct ClientList *client = cls;
134 char *cbuf = buf;
135 struct PendingMessage *reply;
136 size_t off;
137 size_t msize;
264 138
265/** 139 client->transmit_handle = NULL;
266 * Task that re-transmits requests (using retry_heap). 140 if (buf == NULL)
267 */ 141 {
268static GNUNET_SCHEDULER_TaskIdentifier retry_task; 142 /* client disconnected */
143 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
144 "Client %p disconnected, pending messages will be discarded\n",
145 client->client_handle);
146 return 0;
147 }
148 off = 0;
149 while ((NULL != (reply = client->pending_head)) &&
150 (size >= off + (msize = ntohs (reply->msg->size))))
151 {
152 GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
153 reply);
154 memcpy (&cbuf[off], reply->msg, msize);
155 GNUNET_free (reply);
156 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client %p\n",
157 msize, client->client_handle);
158 off += msize;
159 }
160 process_pending_messages (client);
161 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client %p\n",
162 (unsigned int) off, (unsigned int) size, client->client_handle);
163 return off;
164}
269 165
270 166
271/** 167/**
@@ -274,7 +170,27 @@ static GNUNET_SCHEDULER_TaskIdentifier retry_task;
274 * @param client a ClientList, containing the client and any messages to be sent to it 170 * @param client a ClientList, containing the client and any messages to be sent to it
275 */ 171 */
276static void 172static void
277process_pending_messages (struct ClientList *client); 173process_pending_messages (struct ClientList *client)
174{
175 if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
176 {
177 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
178 "Not asking for transmission to %p now: %s\n",
179 client->client_handle,
180 client->pending_head ==
181 NULL ? "no more messages" : "request already pending");
182 return;
183 }
184 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
185 "Asking for transmission of %u bytes to client %p\n",
186 ntohs (client->pending_head->msg->size), client->client_handle);
187 client->transmit_handle =
188 GNUNET_SERVER_notify_transmit_ready (client->client_handle,
189 ntohs (client->pending_head->
190 msg->size),
191 GNUNET_TIME_UNIT_FOREVER_REL,
192 &send_reply_to_client, client);
193}
278 194
279 195
280/** 196/**
@@ -292,7 +208,6 @@ add_pending_message (struct ClientList *client,
292 process_pending_messages (client); 208 process_pending_messages (client);
293} 209}
294 210
295
296/** 211/**
297 * Find a client if it exists, add it otherwise. 212 * Find a client if it exists, add it otherwise.
298 * 213 *
@@ -320,253 +235,67 @@ find_active_client (struct GNUNET_SERVER_Client *client)
320 235
321 236
322/** 237/**
323 * Iterator over hash map entries that frees all entries 238 * SUPU: Call made from dht_api.c
324 * associated with the given client. 239 * Handler for monitor stop messages
325 * 240 *
326 * @param cls client to search for in source routes 241 * @param cls closure for the service
327 * @param key current key code (ignored) 242 * @param client the client we received this message from
328 * @param value value in the hash map, a ClientQueryRecord 243 * @param message the actual message received
329 * @return #GNUNET_YES (we should continue to iterate)
330 */
331static int
332remove_client_records (void *cls, const struct GNUNET_HashCode * key, void *value)
333{
334 struct ClientList *client = cls;
335 struct ClientQueryRecord *record = value;
336
337 if (record->client != client)
338 return GNUNET_YES;
339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340 "Removing client %p's record for key %s\n", client,
341 GNUNET_h2s (key));
342 GNUNET_assert (GNUNET_YES ==
343 GNUNET_CONTAINER_multihashmap_remove (forward_map, key,
344 record));
345 if (NULL != record->hnode)
346 GNUNET_CONTAINER_heap_remove_node (record->hnode);
347 GNUNET_array_grow (record->seen_replies, record->seen_replies_count, 0);
348 GNUNET_free (record);
349 return GNUNET_YES;
350}
351
352
353/**
354 * Functions with this signature are called whenever a client
355 * is disconnected on the network level.
356 * 244 *
357 * @param cls closure (NULL for dht)
358 * @param client identification of the client; NULL
359 * for the last call when the server is destroyed
360 */
361static void
362handle_client_disconnect (void *cls,
363 struct GNUNET_SERVER_Client *client)
364{
365
366 struct ClientList *pos;
367 struct PendingMessage *reply;
368 struct ClientMonitorRecord *monitor;
369
370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
371 "Local client %p disconnects\n",
372 client);
373 pos = find_active_client (client);
374 GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos);
375 if (pos->transmit_handle != NULL)
376 GNUNET_SERVER_notify_transmit_ready_cancel (pos->transmit_handle);
377 while (NULL != (reply = pos->pending_head))
378 {
379 GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, reply);
380 GNUNET_free (reply);
381 }
382 monitor = monitor_head;
383 while (NULL != monitor)
384 {
385 if (monitor->client == pos)
386 {
387 struct ClientMonitorRecord *next;
388
389 GNUNET_free_non_null (monitor->key);
390 next = monitor->next;
391 GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, monitor);
392 GNUNET_free (monitor);
393 monitor = next;
394 }
395 else
396 monitor = monitor->next;
397 }
398 GNUNET_CONTAINER_multihashmap_iterate (forward_map, &remove_client_records,
399 pos);
400 GNUNET_free (pos);
401}
402
403
404/**
405 * Route the given request via the DHT. This includes updating
406 * the bloom filter and retransmission times, building the P2P
407 * message and initiating the routing operation.
408 */ 245 */
409static void 246static void
410transmit_request (struct ClientQueryRecord *cqr) 247handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client,
248 const struct GNUNET_MessageHeader *message)
411{ 249{
412 int32_t reply_bf_mutator; 250 //const struct GNUNET_DHT_MonitorStartStopMessage *msg;
413 struct GNUNET_CONTAINER_BloomFilter *reply_bf;
414 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
415
416 GNUNET_STATISTICS_update (GDS_stats,
417 gettext_noop
418 ("# GET requests from clients injected"), 1,
419 GNUNET_NO);
420 reply_bf_mutator =
421 (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
422 UINT32_MAX);
423 reply_bf =
424 GNUNET_BLOCK_construct_bloomfilter (reply_bf_mutator, cqr->seen_replies,
425 cqr->seen_replies_count);
426 peer_bf =
427 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
428 GNUNET_CONSTANTS_BLOOMFILTER_K);
429 LOG (GNUNET_ERROR_TYPE_DEBUG,
430 "Initiating GET for %s, replication %u, already have %u replies\n",
431 GNUNET_h2s(&cqr->key), cqr->replication, cqr->seen_replies_count);
432
433 GDS_NEIGHBOURS_handle_get (cqr->type, cqr->msg_options, cqr->replication,
434 0 /* hop count */ ,
435 &cqr->key, cqr->xquery, cqr->xquery_size, reply_bf,
436 reply_bf_mutator, peer_bf);
437 GNUNET_CONTAINER_bloomfilter_free (reply_bf);
438 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
439
440 /* exponential back-off for retries.
441 * max GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
442 cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
443 cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
444} 251}
445 252
446 253
447/** 254/**
448 * Task that looks at the 'retry_heap' and transmits all of the requests 255 * SUPU: Monitor call made from dht_api.c
449 * on the heap that are ready for transmission. Then re-schedules 256 * Handler for monitor start messages
450 * itself (unless the heap is empty). 257 *
258 * @param cls closure for the service
259 * @param client the client we received this message from
260 * @param message the actual message received
451 * 261 *
452 * @param cls unused
453 * @param tc scheduler context
454 */ 262 */
455static void 263static void
456transmit_next_request_task (void *cls, 264handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client,
457 const struct GNUNET_SCHEDULER_TaskContext *tc) 265 const struct GNUNET_MessageHeader *message)
458{ 266{
459 struct ClientQueryRecord *cqr; 267 //const struct GNUNET_DHT_MonitorStartStopMessage *msg;
460 struct GNUNET_TIME_Relative delay; 268 /* FIXME: At the moment I don't know exact usage of monitor message. But most
461 269 probably it will be just copy and paste from old implementation. */
462 retry_task = GNUNET_SCHEDULER_NO_TASK;
463 if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
464 return;
465 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
466 {
467 cqr->hnode = NULL;
468 delay = GNUNET_TIME_absolute_get_remaining (cqr->retry_time);
469 if (delay.rel_value_us > 0)
470 {
471 cqr->hnode =
472 GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
473 cqr->retry_time.abs_value_us);
474 retry_task =
475 GNUNET_SCHEDULER_add_delayed (delay, &transmit_next_request_task,
476 NULL);
477 return;
478 }
479 transmit_request (cqr);
480 cqr->hnode =
481 GNUNET_CONTAINER_heap_insert (retry_heap, cqr,
482 cqr->retry_time.abs_value_us);
483 }
484} 270}
485 271
486 272
487/** 273/**SUPU: Call to this function is made whenever a client does not want the
488 * Handler for PUT messages. 274 * get request any more. There is a function in dht_api.c but I don't know
275 * yet how the call is made to this function.
276 * Handler for any generic DHT stop messages, calls the appropriate handler
277 * depending on message type (if processed locally)
489 * 278 *
490 * @param cls closure for the service 279 * @param cls closure for the service
491 * @param client the client we received this message from 280 * @param client the client we received this message from
492 * @param message the actual message received 281 * @param message the actual message received
282 *
493 */ 283 */
494static void 284static void
495handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, 285handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
496 const struct GNUNET_MessageHeader *message) 286 const struct GNUNET_MessageHeader *message)
497{ 287{
498 const struct GNUNET_DHT_ClientPutMessage *dht_msg; 288 //const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg;
499 struct GNUNET_CONTAINER_BloomFilter *peer_bf; 289 /* FIXME: Whats the use of get_stop. A client notifies the server to stop asking
500 uint16_t size; 290 for the get message. But in case of x-vine, it asks for get only once. So,
501 struct PendingMessage *pm; 291 when it has already send the get message, after that if client asks it to
502 struct GNUNET_DHT_ClientPutConfirmationMessage *conf; 292 stop, it really can't do anything. Its better to wait for the result, discard
503 293 it and don't communicate with client about the result instead of generating
504 size = ntohs (message->size); 294 more traffic.*/
505 if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
506 {
507 GNUNET_break (0);
508 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
509 return;
510 }
511 GNUNET_STATISTICS_update (GDS_stats,
512 gettext_noop
513 ("# PUT requests received from clients"), 1,
514 GNUNET_NO);
515 dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
516 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "XDHT CLIENT-PUT %s @ %u\n",
517 GNUNET_h2s (&dht_msg->key), getpid ());
518 /* give to local clients */
519 LOG (GNUNET_ERROR_TYPE_DEBUG,
520 "Handling local PUT of %u-bytes for query %s\n",
521 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
522 GNUNET_h2s (&dht_msg->key));
523 GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
524 &dht_msg->key, 0, NULL, 0, NULL,
525 ntohl (dht_msg->type),
526 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
527 &dht_msg[1]);
528 /* store locally */
529 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
530 &dht_msg->key, 0, NULL, ntohl (dht_msg->type),
531 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
532 &dht_msg[1]);
533 /* route to other peers */
534 peer_bf =
535 GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
536 GNUNET_CONSTANTS_BLOOMFILTER_K);
537
538 GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), ntohl (dht_msg->options),
539 ntohl (dht_msg->desired_replication_level),
540 GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
541 0 /* hop count */ ,
542 peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1],
543 size -
544 sizeof (struct GNUNET_DHT_ClientPutMessage));
545 GDS_CLIENTS_process_put (ntohl (dht_msg->options),
546 ntohl (dht_msg->type),
547 0,
548 ntohl (dht_msg->desired_replication_level),
549 1,
550 GDS_NEIGHBOURS_get_id(),
551 GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
552 &dht_msg->key,
553 &dht_msg[1],
554 size - sizeof (struct GNUNET_DHT_ClientPutMessage));
555 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
556 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
557 sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
558 conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1];
559 conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
560 conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
561 conf->reserved = htonl (0);
562 conf->unique_id = dht_msg->unique_id;
563 pm->msg = &conf->header;
564 add_pending_message (find_active_client (client), pm);
565 GNUNET_SERVER_receive_done (client, GNUNET_OK);
566} 295}
567 296
568
569/** 297/**
298 * FIXME: Call to this function is made whenever we have a get request.
570 * Handler for DHT GET messages from the client. 299 * Handler for DHT GET messages from the client.
571 * 300 *
572 * @param cls closure for the service 301 * @param cls closure for the service
@@ -577,12 +306,12 @@ static void
577handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, 306handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
578 const struct GNUNET_MessageHeader *message) 307 const struct GNUNET_MessageHeader *message)
579{ 308{
580 const struct GNUNET_DHT_ClientGetMessage *get; 309 struct GNUNET_DHT_ClientGetMessage *get_msg;
581 struct ClientQueryRecord *cqr; 310 struct GNUNET_PeerIdentity *get_path;
582 size_t xquery_size; 311 struct GNUNET_PeerIdentity *my_identity;
583 const char *xquery; 312 unsigned int get_path_length;
584 uint16_t size; 313 uint16_t size;
585 314
586 size = ntohs (message->size); 315 size = ntohs (message->size);
587 if (size < sizeof (struct GNUNET_DHT_ClientGetMessage)) 316 if (size < sizeof (struct GNUNET_DHT_ClientGetMessage))
588 { 317 {
@@ -590,871 +319,122 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
590 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 319 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
591 return; 320 return;
592 } 321 }
593 xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage); 322
594 get = (const struct GNUNET_DHT_ClientGetMessage *) message; 323 get_msg = (struct GNUNET_DHT_ClientGetMessage *) message;
595 xquery = (const char *) &get[1]; 324
596 GNUNET_STATISTICS_update (GDS_stats, 325 /* FIXME: Search locally? Why should we always search locally?
597 gettext_noop 326 Current implementation of datacache needs to be modified. If found here, then
598 ("# GET requests received from clients"), 1, 327 notify the requesting client. */
599 GNUNET_NO); 328
600 LOG (GNUNET_ERROR_TYPE_DEBUG, 329 /* FIXME: Call GDS_NEIGHBOURS_handle_get
601 "Received GET request for %s from local client %p, xq: %.*s\n", 330 Here you need to remember the whole path because you need to travel that path
602 GNUNET_h2s (&get->key), client, xquery_size, xquery); 331 and reach back here with the result. So, you should send your own id, client
603 332 id, get path, get path length. You also need to add yourself to the get path. */
604 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "XDHT CLIENT-GET %s @ %u\n", 333 my_identity = GDS_NEIGHBOURS_get_id();
605 GNUNET_h2s (&get->key), getpid ()); 334 get_path = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
606 335 memcpy (get_path, &my_identity, sizeof (struct GNUNET_PeerIdentity));
607 336 get_path_length = 1;
608 cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); 337
609 cqr->key = get->key; 338 /* FIXME:
610 cqr->client = find_active_client (client); 339 * 1. Find some unique identifier for the client.
611 cqr->xquery = (void *) &cqr[1]; 340 * 2. Also, I don't know the usage of block, replication and type. So, I
612 memcpy (&cqr[1], xquery, xquery_size); 341 * am not sending the parameters now. */
613 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); 342 GDS_NEIGHBOURS_handle_get (my_identity, get_path, get_path_length,
614 cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS; 343 &(get_msg->key), NULL, NULL, NULL);
615 cqr->retry_time = GNUNET_TIME_absolute_get (); 344
616 cqr->unique_id = get->unique_id;
617 cqr->xquery_size = xquery_size;
618 cqr->replication = ntohl (get->desired_replication_level);
619 cqr->msg_options = ntohl (get->options);
620 cqr->type = ntohl (get->type);
621 // FIXME use cqr->key, set multihashmap create to GNUNET_YES
622 GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
623 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
624 GDS_CLIENTS_process_get (ntohl (get->options),
625 ntohl (get->type),
626 0,
627 ntohl (get->desired_replication_level),
628 1,
629 GDS_NEIGHBOURS_get_id(),
630 &get->key);
631 /* start remote requests */
632 if (GNUNET_SCHEDULER_NO_TASK != retry_task)
633 GNUNET_SCHEDULER_cancel (retry_task);
634 retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
635 /* perform local lookup */
636 GDS_DATACACHE_handle_get (&get->key, cqr->type, cqr->xquery, xquery_size,
637 NULL, 0);
638 GNUNET_SERVER_receive_done (client, GNUNET_OK);
639}
640
641
642/**
643 * Closure for 'find_by_unique_id'.
644 */
645struct FindByUniqueIdContext
646{
647 /**
648 * Where to store the result, if found.
649 */
650 struct ClientQueryRecord *cqr;
651
652 uint64_t unique_id;
653};
654
655
656/**
657 * Function called for each existing DHT record for the given
658 * query. Checks if it matches the UID given in the closure
659 * and if so returns the entry as a result.
660 *
661 * @param cls the search context
662 * @param key query for the lookup (not used)
663 * @param value the 'struct ClientQueryRecord'
664 * @return GNUNET_YES to continue iteration (result not yet found)
665 */
666static int
667find_by_unique_id (void *cls,
668 const struct GNUNET_HashCode *key,
669 void *value)
670{
671 struct FindByUniqueIdContext *fui_ctx = cls;
672 struct ClientQueryRecord *cqr = value;
673
674 if (cqr->unique_id != fui_ctx->unique_id)
675 return GNUNET_YES;
676 fui_ctx->cqr = cqr;
677 return GNUNET_NO;
678} 345}
679 346
680 347
681/** 348/**
682 * Handler for "GET result seen" messages from the client. 349 * Handler for PUT messages.
683 *
684 * @param cls closure for the service 350 * @param cls closure for the service
685 * @param client the client we received this message from 351 * @param client the client we received this message from
686 * @param message the actual message received 352 * @param message the actual message received
687 */ 353 */
688static void 354static void
689handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client, 355handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
690 const struct GNUNET_MessageHeader *message) 356 const struct GNUNET_MessageHeader *message)
691{ 357{
692 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen; 358 struct GNUNET_DHT_ClientPutMessage *put_msg;
693 uint16_t size; 359 struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
694 unsigned int hash_count; 360 struct PendingMessage *pm;
695 unsigned int old_count; 361 uint16_t size; /* FIXME: When to use size_t and when uint16_t */
696 const struct GNUNET_HashCode *hc; 362
697 struct FindByUniqueIdContext fui_ctx;
698 struct ClientQueryRecord *cqr;
699
700 size = ntohs (message->size); 363 size = ntohs (message->size);
701 if (size < sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) 364 if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
702 {
703 GNUNET_break (0);
704 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
705 return;
706 }
707 seen = (const struct GNUNET_DHT_ClientGetResultSeenMessage *) message;
708 hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
709 if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
710 {
711 GNUNET_break (0);
712 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
713 return;
714 }
715 hc = (const struct GNUNET_HashCode*) &seen[1];
716 fui_ctx.unique_id = seen->unique_id;
717 fui_ctx.cqr = NULL;
718 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
719 &seen->key,
720 &find_by_unique_id,
721 &fui_ctx);
722 if (NULL == (cqr = fui_ctx.cqr))
723 { 365 {
724 GNUNET_break (0); 366 GNUNET_break (0);
725 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); 367 GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
726 return; 368 return;
727 } 369 }
728 /* finally, update 'seen' list */ 370
729 old_count = cqr->seen_replies_count; 371 /* FIXME:Should we define put_msg as const? */
730 GNUNET_array_grow (cqr->seen_replies, 372 put_msg = (struct GNUNET_DHT_ClientPutMessage *) message;
731 cqr->seen_replies_count, 373
732 cqr->seen_replies_count + hash_count); 374 /* store locally. FIXME: Is it secure to allow each peer to store the data? */
733 memcpy (&cqr->seen_replies[old_count], 375 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put_msg->expiration),
734 hc, 376 &put_msg->key, 0, NULL, ntohl (put_msg->type),
735 sizeof (struct GNUNET_HashCode) * hash_count); 377 size - sizeof (struct GNUNET_DHT_ClientPutMessage),
736} 378 &put_msg[1]);
737 379
738 380 /* FIXME: Right now I have just kept all the fields from the old function.
739/** 381 It may be possible that most of them are not needed. Check and remove if
740 * Closure for 'remove_by_unique_id'. 382 not needed. Usage of replication, options and type is still not clear. */
741 */ 383 GDS_NEIGHBOURS_handle_put (ntohl (put_msg->type), ntohl (put_msg->options),
742struct RemoveByUniqueIdContext 384 ntohl (put_msg->desired_replication_level),
743{ 385 GNUNET_TIME_absolute_ntoh (put_msg->expiration),
744 /** 386 0 /* hop count */ ,
745 * Client that issued the removal request. 387 &put_msg->key, 0, NULL, &put_msg[1],
746 */ 388 size -
747 struct ClientList *client; 389 sizeof (struct GNUNET_DHT_ClientPutMessage),
748 390 NULL, NULL, NULL);
749 /** 391
750 * Unique ID of the request. 392 /* FIXME: Here we send back the confirmation before verifying if put was successful
751 */ 393 or not. */
752 uint64_t unique_id; 394 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
753}; 395 sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
754 396 conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1];
755 397 conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
756/** 398 conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
757 * Iterator over hash map entries that frees all entries 399 conf->reserved = htonl (0);
758 * that match the given client and unique ID. 400 conf->unique_id = put_msg->unique_id;
759 * 401 pm->msg = &conf->header;
760 * @param cls unique ID and client to search for in source routes 402 add_pending_message (find_active_client (client), pm);
761 * @param key current key code
762 * @param value value in the hash map, a ClientQueryRecord
763 * @return GNUNET_YES (we should continue to iterate)
764 */
765static int
766remove_by_unique_id (void *cls, const struct GNUNET_HashCode * key, void *value)
767{
768 const struct RemoveByUniqueIdContext *ctx = cls;
769 struct ClientQueryRecord *record = value;
770
771 if (record->unique_id != ctx->unique_id)
772 return GNUNET_YES;
773 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774 "Removing client %p's record for key %s (by unique id)\n",
775 ctx->client->client_handle, GNUNET_h2s (key));
776 return remove_client_records (ctx->client, key, record);
777}
778
779
780/**
781 * Handler for any generic DHT stop messages, calls the appropriate handler
782 * depending on message type (if processed locally)
783 *
784 * @param cls closure for the service
785 * @param client the client we received this message from
786 * @param message the actual message received
787 *
788 */
789static void
790handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
791 const struct GNUNET_MessageHeader *message)
792{
793 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
794 (const struct GNUNET_DHT_ClientGetStopMessage *) message;
795 struct RemoveByUniqueIdContext ctx;
796
797 GNUNET_STATISTICS_update (GDS_stats,
798 gettext_noop
799 ("# GET STOP requests received from clients"), 1,
800 GNUNET_NO);
801 LOG (GNUNET_ERROR_TYPE_DEBUG,
802 "Received GET STOP request for %s from local client %p\n",
803 client, GNUNET_h2s (&dht_stop_msg->key));
804 ctx.client = find_active_client (client);
805 ctx.unique_id = dht_stop_msg->unique_id;
806 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key,
807 &remove_by_unique_id, &ctx);
808 GNUNET_SERVER_receive_done (client, GNUNET_OK);
809}
810
811
812/**
813 * Handler for monitor start messages
814 *
815 * @param cls closure for the service
816 * @param client the client we received this message from
817 * @param message the actual message received
818 *
819 */
820static void
821handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client,
822 const struct GNUNET_MessageHeader *message)
823{
824 struct ClientMonitorRecord *r;
825 const struct GNUNET_DHT_MonitorStartStopMessage *msg;
826
827 msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
828 r = GNUNET_new (struct ClientMonitorRecord);
829
830 r->client = find_active_client(client);
831 r->type = ntohl(msg->type);
832 r->get = ntohs(msg->get);
833 r->get_resp = ntohs(msg->get_resp);
834 r->put = ntohs(msg->put);
835 if (0 == ntohs(msg->filter_key))
836 r->key = NULL;
837 else
838 {
839 r->key = GNUNET_new (struct GNUNET_HashCode);
840 memcpy (r->key, &msg->key, sizeof (struct GNUNET_HashCode));
841 }
842 GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r);
843 GNUNET_SERVER_receive_done (client, GNUNET_OK);
844}
845
846/**
847 * Handler for monitor stop messages
848 *
849 * @param cls closure for the service
850 * @param client the client we received this message from
851 * @param message the actual message received
852 *
853 */
854static void
855handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client,
856 const struct GNUNET_MessageHeader *message)
857{
858 struct ClientMonitorRecord *r;
859 const struct GNUNET_DHT_MonitorStartStopMessage *msg;
860 int keys_match;
861
862 msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
863 r = monitor_head;
864
865 while (NULL != r)
866 {
867 if (NULL == r->key)
868 keys_match = (0 == ntohs(msg->filter_key));
869 else
870 {
871 keys_match = (0 != ntohs(msg->filter_key)
872 && !memcmp(r->key, &msg->key, sizeof(struct GNUNET_HashCode)));
873 }
874 if (find_active_client(client) == r->client
875 && ntohl(msg->type) == r->type
876 && r->get == msg->get
877 && r->get_resp == msg->get_resp
878 && r->put == msg->put
879 && keys_match
880 )
881 {
882 GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r);
883 GNUNET_free_non_null (r->key);
884 GNUNET_free (r);
885 GNUNET_SERVER_receive_done (client, GNUNET_OK);
886 return; /* Delete only ONE entry */
887 }
888 r = r->next;
889 }
890
891 GNUNET_SERVER_receive_done (client, GNUNET_OK); 403 GNUNET_SERVER_receive_done (client, GNUNET_OK);
892} 404}
893 405
894
895/** 406/**
896 * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_ready 407 * Functions with this signature are called whenever a client
897 * request. A ClientList is passed as closure, take the head of the list 408 * is disconnected on the network level.
898 * and copy it into buf, which has the result of sending the message to the
899 * client.
900 *
901 * @param cls closure to this call
902 * @param size maximum number of bytes available to send
903 * @param buf where to copy the actual message to
904 *
905 * @return the number of bytes actually copied, 0 indicates failure
906 */
907static size_t
908send_reply_to_client (void *cls, size_t size, void *buf)
909{
910 struct ClientList *client = cls;
911 char *cbuf = buf;
912 struct PendingMessage *reply;
913 size_t off;
914 size_t msize;
915
916 client->transmit_handle = NULL;
917 if (buf == NULL)
918 {
919 /* client disconnected */
920 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921 "Client %p disconnected, pending messages will be discarded\n",
922 client->client_handle);
923 return 0;
924 }
925 off = 0;
926 while ((NULL != (reply = client->pending_head)) &&
927 (size >= off + (msize = ntohs (reply->msg->size))))
928 {
929 GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail,
930 reply);
931 memcpy (&cbuf[off], reply->msg, msize);
932 GNUNET_free (reply);
933 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client %p\n",
934 msize, client->client_handle);
935 off += msize;
936 }
937 process_pending_messages (client);
938 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client %p\n",
939 (unsigned int) off, (unsigned int) size, client->client_handle);
940 return off;
941}
942
943
944/**
945 * Task run to check for messages that need to be sent to a client.
946 * 409 *
947 * @param client a ClientList, containing the client and any messages to be sent to it 410 * @param cls closure (NULL for dht)
411 * @param client identification of the client; NULL
412 * for the last call when the server is destroyed
948 */ 413 */
949static void 414static void
950process_pending_messages (struct ClientList *client) 415handle_client_disconnect (void *cls,
951{ 416 struct GNUNET_SERVER_Client *client)
952 if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
953 {
954 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
955 "Not asking for transmission to %p now: %s\n",
956 client->client_handle,
957 client->pending_head ==
958 NULL ? "no more messages" : "request already pending");
959 return;
960 }
961 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
962 "Asking for transmission of %u bytes to client %p\n",
963 ntohs (client->pending_head->msg->size), client->client_handle);
964 client->transmit_handle =
965 GNUNET_SERVER_notify_transmit_ready (client->client_handle,
966 ntohs (client->pending_head->
967 msg->size),
968 GNUNET_TIME_UNIT_FOREVER_REL,
969 &send_reply_to_client, client);
970}
971
972
973/**
974 * Closure for 'forward_reply'
975 */
976struct ForwardReplyContext
977{
978
979 /**
980 * Actual message to send to matching clients.
981 */
982 struct PendingMessage *pm;
983
984 /**
985 * Embedded payload.
986 */
987 const void *data;
988
989 /**
990 * Type of the data.
991 */
992 enum GNUNET_BLOCK_Type type;
993
994 /**
995 * Number of bytes in data.
996 */
997 size_t data_size;
998
999 /**
1000 * Do we need to copy 'pm' because it was already used?
1001 */
1002 int do_copy;
1003
1004};
1005
1006
1007/**
1008 * Iterator over hash map entries that send a given reply to
1009 * each of the matching clients. With some tricky recycling
1010 * of the buffer.
1011 *
1012 * @param cls the 'struct ForwardReplyContext'
1013 * @param key current key
1014 * @param value value in the hash map, a ClientQueryRecord
1015 * @return GNUNET_YES (we should continue to iterate),
1016 * if the result is mal-formed, GNUNET_NO
1017 */
1018static int
1019forward_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
1020{
1021 struct ForwardReplyContext *frc = cls;
1022 struct ClientQueryRecord *record = value;
1023 struct PendingMessage *pm;
1024 struct GNUNET_DHT_ClientResultMessage *reply;
1025 enum GNUNET_BLOCK_EvaluationResult eval;
1026 int do_free;
1027 struct GNUNET_HashCode ch;
1028 unsigned int i;
1029
1030 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
1031 "XDHT CLIENT-RESULT %s\n",
1032 GNUNET_h2s (key));
1033 if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type))
1034 {
1035 LOG (GNUNET_ERROR_TYPE_DEBUG,
1036 "Record type missmatch, not passing request for key %s to local client\n",
1037 GNUNET_h2s (key));
1038 GNUNET_STATISTICS_update (GDS_stats,
1039 gettext_noop
1040 ("# Key match, type mismatches in REPLY to CLIENT"),
1041 1, GNUNET_NO);
1042 return GNUNET_YES; /* type mismatch */
1043 }
1044 GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
1045 for (i = 0; i < record->seen_replies_count; i++)
1046 if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (struct GNUNET_HashCode)))
1047 {
1048 LOG (GNUNET_ERROR_TYPE_DEBUG,
1049 "Duplicate reply, not passing request for key %s to local client\n",
1050 GNUNET_h2s (key));
1051 GNUNET_STATISTICS_update (GDS_stats,
1052 gettext_noop
1053 ("# Duplicate REPLIES to CLIENT request dropped"),
1054 1, GNUNET_NO);
1055 return GNUNET_YES; /* duplicate */
1056 }
1057 eval =
1058 GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0,
1059 record->xquery, record->xquery_size, frc->data,
1060 frc->data_size);
1061 LOG (GNUNET_ERROR_TYPE_DEBUG,
1062 "Evaluation result is %d for key %s for local client's query\n",
1063 (int) eval, GNUNET_h2s (key));
1064 switch (eval)
1065 {
1066 case GNUNET_BLOCK_EVALUATION_OK_LAST:
1067 do_free = GNUNET_YES;
1068 break;
1069 case GNUNET_BLOCK_EVALUATION_OK_MORE:
1070 GNUNET_array_append (record->seen_replies, record->seen_replies_count, ch);
1071 do_free = GNUNET_NO;
1072 break;
1073 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
1074 /* should be impossible to encounter here */
1075 GNUNET_break (0);
1076 return GNUNET_YES;
1077 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
1078 GNUNET_break_op (0);
1079 return GNUNET_NO;
1080 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
1081 GNUNET_break (0);
1082 return GNUNET_NO;
1083 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
1084 GNUNET_break (0);
1085 return GNUNET_NO;
1086 case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
1087 return GNUNET_YES;
1088 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
1089 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1090 _("Unsupported block type (%u) in request!\n"), record->type);
1091 return GNUNET_NO;
1092 default:
1093 GNUNET_break (0);
1094 return GNUNET_NO;
1095 }
1096 if (GNUNET_NO == frc->do_copy)
1097 {
1098 /* first time, we can use the original data */
1099 pm = frc->pm;
1100 frc->do_copy = GNUNET_YES;
1101 }
1102 else
1103 {
1104 /* two clients waiting for same reply, must copy for queueing */
1105 pm = GNUNET_malloc (sizeof (struct PendingMessage) +
1106 ntohs (frc->pm->msg->size));
1107 memcpy (pm, frc->pm,
1108 sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
1109 pm->next = pm->prev = NULL;
1110 pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
1111 }
1112 GNUNET_STATISTICS_update (GDS_stats,
1113 gettext_noop ("# RESULTS queued for clients"), 1,
1114 GNUNET_NO);
1115 reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1];
1116 reply->unique_id = record->unique_id;
1117 LOG (GNUNET_ERROR_TYPE_DEBUG,
1118 "Queueing reply to query %s for client %p\n",
1119 GNUNET_h2s (key),
1120 record->client->client_handle);
1121 add_pending_message (record->client, pm);
1122 if (GNUNET_YES == do_free)
1123 remove_client_records (record->client, key, record);
1124 return GNUNET_YES;
1125}
1126
1127
1128/**
1129 * Handle a reply we've received from another peer. If the reply
1130 * matches any of our pending queries, forward it to the respective
1131 * client(s).
1132 *
1133 * @param expiration when will the reply expire
1134 * @param key the query this reply is for
1135 * @param get_path_length number of peers in @a get_path
1136 * @param get_path path the reply took on get
1137 * @param put_path_length number of peers in @a put_path
1138 * @param put_path path the reply took on put
1139 * @param type type of the reply
1140 * @param data_size number of bytes in @a data
1141 * @param data application payload data
1142 */
1143void
1144GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
1145 const struct GNUNET_HashCode *key,
1146 unsigned int get_path_length,
1147 const struct GNUNET_PeerIdentity *get_path,
1148 unsigned int put_path_length,
1149 const struct GNUNET_PeerIdentity *put_path,
1150 enum GNUNET_BLOCK_Type type, size_t data_size,
1151 const void *data)
1152{
1153 struct ForwardReplyContext frc;
1154 struct PendingMessage *pm;
1155 struct GNUNET_DHT_ClientResultMessage *reply;
1156 struct GNUNET_PeerIdentity *paths;
1157 size_t msize;
1158
1159 LOG (GNUNET_ERROR_TYPE_DEBUG,
1160 "reply for key %s\n",
1161 GNUNET_h2s (key));
1162
1163 if (NULL == GNUNET_CONTAINER_multihashmap_get (forward_map, key))
1164 {
1165 GNUNET_STATISTICS_update (GDS_stats,
1166 gettext_noop
1167 ("# REPLIES ignored for CLIENTS (no match)"), 1,
1168 GNUNET_NO);
1169 return; /* no matching request, fast exit! */
1170 }
1171 msize =
1172 sizeof (struct GNUNET_DHT_ClientResultMessage) + data_size +
1173 (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity);
1174 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1175 {
1176 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1177 _("Could not pass reply to client, message too big!\n"));
1178 return;
1179 }
1180 pm = GNUNET_malloc (msize + sizeof (struct PendingMessage));
1181 reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1];
1182 pm->msg = &reply->header;
1183 reply->header.size = htons ((uint16_t) msize);
1184 reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1185 reply->type = htonl (type);
1186 reply->get_path_length = htonl (get_path_length);
1187 reply->put_path_length = htonl (put_path_length);
1188 reply->unique_id = 0; /* filled in later */
1189 reply->expiration = GNUNET_TIME_absolute_hton (expiration);
1190 reply->key = *key;
1191 paths = (struct GNUNET_PeerIdentity *) &reply[1];
1192 memcpy (paths, put_path,
1193 sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1194 memcpy (&paths[put_path_length], get_path,
1195 sizeof (struct GNUNET_PeerIdentity) * get_path_length);
1196 memcpy (&paths[get_path_length + put_path_length], data, data_size);
1197 frc.do_copy = GNUNET_NO;
1198 frc.pm = pm;
1199 frc.data = data;
1200 frc.data_size = data_size;
1201 frc.type = type;
1202 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, key, &forward_reply,
1203 &frc);
1204 if (GNUNET_NO == frc.do_copy)
1205 {
1206 /* did not match any of the requests, free! */
1207 GNUNET_STATISTICS_update (GDS_stats,
1208 gettext_noop
1209 ("# REPLIES ignored for CLIENTS (no match)"), 1,
1210 GNUNET_NO);
1211 GNUNET_free (pm);
1212 }
1213}
1214
1215
1216/**
1217 * Check if some client is monitoring GET messages and notify
1218 * them in that case.
1219 *
1220 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1221 * @param type The type of data in the request.
1222 * @param hop_count Hop count so far.
1223 * @param path_length number of entries in path (or 0 if not recorded).
1224 * @param path peers on the GET path (or NULL if not recorded).
1225 * @param desired_replication_level Desired replication level.
1226 * @param key Key of the requested data.
1227 */
1228void
1229GDS_CLIENTS_process_get (uint32_t options,
1230 enum GNUNET_BLOCK_Type type,
1231 uint32_t hop_count,
1232 uint32_t desired_replication_level,
1233 unsigned int path_length,
1234 const struct GNUNET_PeerIdentity *path,
1235 const struct GNUNET_HashCode * key)
1236{
1237 struct ClientMonitorRecord *m;
1238 struct ClientList **cl;
1239 unsigned int cl_size;
1240
1241 cl = NULL;
1242 cl_size = 0;
1243 for (m = monitor_head; NULL != m; m = m->next)
1244 {
1245 if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1246 (NULL == m->key ||
1247 memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1248 {
1249 struct PendingMessage *pm;
1250 struct GNUNET_DHT_MonitorGetMessage *mmsg;
1251 struct GNUNET_PeerIdentity *msg_path;
1252 size_t msize;
1253 unsigned int i;
1254
1255 /* Don't send duplicates */
1256 for (i = 0; i < cl_size; i++)
1257 if (cl[i] == m->client)
1258 break;
1259 if (i < cl_size)
1260 continue;
1261 GNUNET_array_append (cl, cl_size, m->client);
1262
1263 msize = path_length * sizeof (struct GNUNET_PeerIdentity);
1264 msize += sizeof (struct GNUNET_DHT_MonitorGetMessage);
1265 msize += sizeof (struct PendingMessage);
1266 pm = GNUNET_malloc (msize);
1267 mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1];
1268 pm->msg = &mmsg->header;
1269 mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
1270 mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1271 mmsg->options = htonl(options);
1272 mmsg->type = htonl(type);
1273 mmsg->hop_count = htonl(hop_count);
1274 mmsg->desired_replication_level = htonl(desired_replication_level);
1275 mmsg->get_path_length = htonl(path_length);
1276 memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode));
1277 msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1278 if (path_length > 0)
1279 memcpy (msg_path, path,
1280 path_length * sizeof (struct GNUNET_PeerIdentity));
1281 add_pending_message (m->client, pm);
1282 }
1283 }
1284 GNUNET_free_non_null (cl);
1285}
1286
1287
1288/**
1289 * Check if some client is monitoring GET RESP messages and notify
1290 * them in that case.
1291 *
1292 * @param type The type of data in the result.
1293 * @param get_path Peers on GET path (or NULL if not recorded).
1294 * @param get_path_length number of entries in get_path.
1295 * @param put_path peers on the PUT path (or NULL if not recorded).
1296 * @param put_path_length number of entries in get_path.
1297 * @param exp Expiration time of the data.
1298 * @param key Key of the data.
1299 * @param data Pointer to the result data.
1300 * @param size Number of bytes in data.
1301 */
1302void
1303GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
1304 const struct GNUNET_PeerIdentity *get_path,
1305 unsigned int get_path_length,
1306 const struct GNUNET_PeerIdentity *put_path,
1307 unsigned int put_path_length,
1308 struct GNUNET_TIME_Absolute exp,
1309 const struct GNUNET_HashCode * key,
1310 const void *data,
1311 size_t size)
1312{ 417{
1313 struct ClientMonitorRecord *m; 418 /* You should maintain a list of client attached to this service. Then
1314 struct ClientList **cl; 419 search for the correct client and stop all its ongoing activites and
1315 unsigned int cl_size; 420 delete it from the list. */
1316
1317 cl = NULL;
1318 cl_size = 0;
1319 for (m = monitor_head; NULL != m; m = m->next)
1320 {
1321 if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1322 (NULL == m->key ||
1323 memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1324 {
1325 struct PendingMessage *pm;
1326 struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1327 struct GNUNET_PeerIdentity *path;
1328 size_t msize;
1329 unsigned int i;
1330
1331 /* Don't send duplicates */
1332 for (i = 0; i < cl_size; i++)
1333 if (cl[i] == m->client)
1334 break;
1335 if (i < cl_size)
1336 continue;
1337 GNUNET_array_append (cl, cl_size, m->client);
1338
1339 msize = size;
1340 msize += (get_path_length + put_path_length)
1341 * sizeof (struct GNUNET_PeerIdentity);
1342 msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage);
1343 msize += sizeof (struct PendingMessage);
1344 pm = GNUNET_malloc (msize);
1345 mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1];
1346 pm->msg = (struct GNUNET_MessageHeader *) mmsg;
1347 mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
1348 mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1349 mmsg->type = htonl(type);
1350 mmsg->put_path_length = htonl(put_path_length);
1351 mmsg->get_path_length = htonl(get_path_length);
1352 path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1353 if (put_path_length > 0)
1354 {
1355 memcpy (path, put_path,
1356 put_path_length * sizeof (struct GNUNET_PeerIdentity));
1357 path = &path[put_path_length];
1358 }
1359 if (get_path_length > 0)
1360 memcpy (path, get_path,
1361 get_path_length * sizeof (struct GNUNET_PeerIdentity));
1362 mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1363 memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode));
1364 if (size > 0)
1365 memcpy (&path[get_path_length], data, size);
1366 add_pending_message (m->client, pm);
1367 }
1368 }
1369 GNUNET_free_non_null (cl);
1370} 421}
1371 422
1372 423
1373/** 424/**
1374 * Check if some client is monitoring PUT messages and notify 425 * Get result from neighbours file.
1375 * them in that case.
1376 *
1377 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1378 * @param type The type of data in the request.
1379 * @param hop_count Hop count so far.
1380 * @param path_length number of entries in path (or 0 if not recorded).
1381 * @param path peers on the PUT path (or NULL if not recorded).
1382 * @param desired_replication_level Desired replication level.
1383 * @param exp Expiration time of the data.
1384 * @param key Key under which data is to be stored.
1385 * @param data Pointer to the data carried.
1386 * @param size Number of bytes in data.
1387 */ 426 */
1388void 427void
1389GDS_CLIENTS_process_put (uint32_t options, 428GDS_CLIENTS_process_get_result()
1390 enum GNUNET_BLOCK_Type type,
1391 uint32_t hop_count,
1392 uint32_t desired_replication_level,
1393 unsigned int path_length,
1394 const struct GNUNET_PeerIdentity *path,
1395 struct GNUNET_TIME_Absolute exp,
1396 const struct GNUNET_HashCode * key,
1397 const void *data,
1398 size_t size)
1399{ 429{
1400 struct ClientMonitorRecord *m; 430
1401 struct ClientList **cl;
1402 unsigned int cl_size;
1403
1404 cl = NULL;
1405 cl_size = 0;
1406 for (m = monitor_head; NULL != m; m = m->next)
1407 {
1408 if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
1409 (NULL == m->key ||
1410 memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0))
1411 {
1412 struct PendingMessage *pm;
1413 struct GNUNET_DHT_MonitorPutMessage *mmsg;
1414 struct GNUNET_PeerIdentity *msg_path;
1415 size_t msize;
1416 unsigned int i;
1417
1418 /* Don't send duplicates */
1419 for (i = 0; i < cl_size; i++)
1420 if (cl[i] == m->client)
1421 break;
1422 if (i < cl_size)
1423 continue;
1424 GNUNET_array_append (cl, cl_size, m->client);
1425
1426 msize = size;
1427 msize += path_length * sizeof (struct GNUNET_PeerIdentity);
1428 msize += sizeof (struct GNUNET_DHT_MonitorPutMessage);
1429 msize += sizeof (struct PendingMessage);
1430 pm = GNUNET_malloc (msize);
1431 mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1];
1432 pm->msg = (struct GNUNET_MessageHeader *) mmsg;
1433 mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
1434 mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1435 mmsg->options = htonl(options);
1436 mmsg->type = htonl(type);
1437 mmsg->hop_count = htonl(hop_count);
1438 mmsg->desired_replication_level = htonl(desired_replication_level);
1439 mmsg->put_path_length = htonl(path_length);
1440 msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
1441 if (path_length > 0)
1442 {
1443 memcpy (msg_path, path,
1444 path_length * sizeof (struct GNUNET_PeerIdentity));
1445 }
1446 mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
1447 memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode));
1448 if (size > 0)
1449 memcpy (&msg_path[path_length], data, size);
1450 add_pending_message (m->client, pm);
1451 }
1452 }
1453 GNUNET_free_non_null (cl);
1454} 431}
1455 432
1456 433
1457/** 434/**
435 * SUPU: Call to this function is made from gnunet-service-xdht.c
436 * Here we register handlers for each possible kind of message the service
437 * receives from the clients.
1458 * Initialize client subsystem. 438 * Initialize client subsystem.
1459 * 439 *
1460 * @param server the initialized server 440 * @param server the initialized server
@@ -1476,42 +456,19 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
1476 {&handle_dht_local_monitor_stop, NULL, 456 {&handle_dht_local_monitor_stop, NULL,
1477 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, 457 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP,
1478 sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, 458 sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
1479 {&handle_dht_local_get_result_seen, NULL,
1480 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, 0},
1481 {NULL, NULL, 0, 0} 459 {NULL, NULL, 0, 0}
1482 }; 460 };
1483 forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO); 461
1484 retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1485 GNUNET_SERVER_add_handlers (server, plugin_handlers); 462 GNUNET_SERVER_add_handlers (server, plugin_handlers);
1486 GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); 463 GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
1487} 464}
1488 465
1489
1490/** 466/**
467 * SUPU: Call made from gnunet-service-dht.c
1491 * Shutdown client subsystem. 468 * Shutdown client subsystem.
1492 */ 469 */
1493void 470void
1494GDS_CLIENTS_done () 471GDS_CLIENTS_done ()
1495{ 472{
1496 GNUNET_assert (client_head == NULL); 473
1497 GNUNET_assert (client_tail == NULL); 474} \ No newline at end of file
1498 if (GNUNET_SCHEDULER_NO_TASK != retry_task)
1499 {
1500 GNUNET_SCHEDULER_cancel (retry_task);
1501 retry_task = GNUNET_SCHEDULER_NO_TASK;
1502 }
1503 if (NULL != retry_heap)
1504 {
1505 GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1506 GNUNET_CONTAINER_heap_destroy (retry_heap);
1507 retry_heap = NULL;
1508 }
1509 if (NULL != forward_map)
1510 {
1511 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1512 GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1513 forward_map = NULL;
1514 }
1515}
1516
1517/* end of gnunet-service-dht_clients.c */
diff --git a/src/dht/gnunet-service-xdht_clients.h b/src/dht/gnunet-service-xdht_clients.h
index 975104cb5..828d2c08a 100644
--- a/src/dht/gnunet-service-xdht_clients.h
+++ b/src/dht/gnunet-service-xdht_clients.h
@@ -21,8 +21,7 @@
21/** 21/**
22 * @file dht/gnunet-service-xdht_clients.h 22 * @file dht/gnunet-service-xdht_clients.h
23 * @brief GNUnet DHT service's client management code 23 * @brief GNUnet DHT service's client management code
24 * @author Christian Grothoff 24 * @author Supriti Singh
25 * @author Nathan Evans
26 */ 25 */
27#ifndef GNUNET_SERVICE_XDHT_CLIENT_H 26#ifndef GNUNET_SERVICE_XDHT_CLIENT_H
28#define GNUNET_SERVICE_XDHT_CLIENT_H 27#define GNUNET_SERVICE_XDHT_CLIENT_H
@@ -130,6 +129,12 @@ GDS_CLIENTS_process_put (uint32_t options,
130 size_t size); 129 size_t size);
131 130
132/** 131/**
132 *
133 */
134void
135GDS_CLIENTS_process_get_result();
136
137/**
133 * Initialize client subsystem. 138 * Initialize client subsystem.
134 * 139 *
135 * @param server the initialized server 140 * @param server the initialized server
diff --git a/src/dht/gnunet-service-xdht_datacache.c b/src/dht/gnunet-service-xdht_datacache.c
index 116eb14e6..bdc247690 100644
--- a/src/dht/gnunet-service-xdht_datacache.c
+++ b/src/dht/gnunet-service-xdht_datacache.c
@@ -164,8 +164,8 @@ datacache_get_iterator (void *cls,
164 gettext_noop 164 gettext_noop
165 ("# Good RESULTS found in datacache"), 1, 165 ("# Good RESULTS found in datacache"), 1,
166 GNUNET_NO); 166 GNUNET_NO);
167 GDS_CLIENTS_handle_reply (exp, key, 0, NULL, put_path_length, put_path, 167 /* GDS_CLIENTS_handle_reply (exp, key, 0, NULL, put_path_length, put_path,
168 type, size, data); 168 type, size, data);*/
169 /* forward to other peers */ 169 /* forward to other peers */
170 GDS_ROUTING_process (type, exp, key, put_path_length, put_path, 0, NULL, 170 GDS_ROUTING_process (type, exp, key, put_path_length, put_path, 0, NULL,
171 data, size); 171 data, size);
diff --git a/src/dht/gnunet-service-xdht_neighbours.c b/src/dht/gnunet-service-xdht_neighbours.c
index 607cf3b43..4c290895b 100644
--- a/src/dht/gnunet-service-xdht_neighbours.c
+++ b/src/dht/gnunet-service-xdht_neighbours.c
@@ -51,7 +51,13 @@
51/* TODO: 51/* TODO:
52 1. Use a global array of all known peers in find_successor, Only when 52 1. Use a global array of all known peers in find_successor, Only when
53 a new peer is added in finger or friend peer map, then re calculate 53 a new peer is added in finger or friend peer map, then re calculate
54 the array. Or else use the old one. */ 54 the array. Or else use the old one.
55 2. Should we be using const in all the handle for the message we received
56 * and then copy the fields and make changes to the fields instead of sending
57 * them as they come.
58 * 3. Everywhere you are storing yourself as the first element in the trail.
59 * It is obviously taking too much space. Try to remove it and think of something
60 * better. */
55 61
56/** 62/**
57 * Maximum possible fingers of a peer. 63 * Maximum possible fingers of a peer.
@@ -78,6 +84,20 @@
78 */ 84 */
79#define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2) 85#define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
80 86
87/**
88 * FIXME: Use this variable. Should it be moved to routing.c
89 * Threshold on routing table entries for a peer.
90 */
91#define ROUTING_TABLE_THRESHOLD 64
92
93/**
94 * FIXME: Use this variable. When adding an entry in finger table, check
95 * this threshold value. At the moment, its just a random value. Also,
96 * implement teardown feature from the paper.
97 * Threshold on number of peers in a trail length
98 */
99#define TRAIL_LENGTH_THRESHOLD 64
100
81 101
82GNUNET_NETWORK_STRUCT_BEGIN 102GNUNET_NETWORK_STRUCT_BEGIN
83 103
@@ -115,50 +135,66 @@ struct PeerPutMessage
115 * Length of the PUT path that follows (if tracked). 135 * Length of the PUT path that follows (if tracked).
116 */ 136 */
117 uint32_t put_path_length GNUNET_PACKED; 137 uint32_t put_path_length GNUNET_PACKED;
118 138
119 /** 139 /**
120 * When does the content expire? 140 * Source peer
121 */ 141 */
122 struct GNUNET_TIME_AbsoluteNBO expiration_time; 142 struct GNUNET_PeerIdentity source_peer;
143
144 /**
145 * Current destination
146 */
147 struct GNUNET_PeerIdentity current_destination;
148
149 /**
150 * Current destination type
151 */
152 enum current_destination_type current_destination_type;
123 153
124 /** 154 /**
125 * Bloomfilter (for peer identities) to stop circular routes 155 * When does the content expire?
126 */ 156 */
127 char bloomfilter[DHT_BLOOM_SIZE]; 157 struct GNUNET_TIME_AbsoluteNBO expiration_time;
128 158
129 /** 159 /**
130 * The key we are storing under. 160 * The key to store the value under.
131 */ 161 */
132 struct GNUNET_HashCode key; 162 struct GNUNET_HashCode key GNUNET_PACKED;
163
133 164
134 /* put path (if tracked) */ 165 /* put path (if tracked) */
135 166
136 /* Payload */ 167 /* Payload */
137 168
138}; 169};
139 170
140 171
141/** 172/**
142 * P2P Result message 173 * P2P Result message
143 */ 174 */
144struct PeerResultMessage 175struct PeerGetResultMessage
145{ 176{
146 /** 177 /**
147 * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT 178 * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET_RESULT
148 */ 179 */
149 struct GNUNET_MessageHeader header; 180 struct GNUNET_MessageHeader header;
150 181
151 /** 182 /**
152 * Content type. 183 * Peer which is sending get result message.
153 */ 184 */
154 uint32_t type GNUNET_PACKED; 185 struct GNUNET_PeerIdentity source_peer;
155 186
156 /** 187 /**
157 * Length of the PUT path that follows (if tracked). 188 * Peer which will receive the get result message.
158 */ 189 */
159 uint32_t put_path_length GNUNET_PACKED; 190 struct GNUNET_PeerIdentity destination_peer;
160 191
161 /** 192 /**
193 * Current index in get path.
194 */
195 unsigned int current_path_index;
196
197 /**
162 * Length of the GET path that follows (if tracked). 198 * Length of the GET path that follows (if tracked).
163 */ 199 */
164 uint32_t get_path_length GNUNET_PACKED; 200 uint32_t get_path_length GNUNET_PACKED;
@@ -191,65 +227,43 @@ struct PeerGetMessage
191 * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET 227 * Type: #GNUNET_MESSAGE_TYPE_DHT_P2P_GET
192 */ 228 */
193 struct GNUNET_MessageHeader header; 229 struct GNUNET_MessageHeader header;
194 230
195 /**
196 * Processing options
197 */
198 uint32_t options GNUNET_PACKED;
199
200 /**
201 * Desired content type.
202 */
203 uint32_t type GNUNET_PACKED;
204
205 /** 231 /**
206 * Hop count 232 * Source peer
207 */ 233 */
208 uint32_t hop_count GNUNET_PACKED; 234 struct GNUNET_PeerIdentity source_peer;
209 235
210 /** 236 /**
211 * Desired replication level for this request. 237 * Total number of peers in get path.
212 */ 238 */
213 uint32_t desired_replication_level GNUNET_PACKED; 239 unsigned int get_path_length;
214 240
215 /** 241 /**
216 * Size of the extended query. 242 *
217 */ 243 */
218 uint32_t xquery_size; 244 struct GNUNET_PeerIdentity current_destination;
219 245
220 /** 246 /**
221 * Bloomfilter mutator. 247 *
222 */ 248 */
223 uint32_t bf_mutator; 249 enum current_destination_type dest_type;
224 250
225 /** 251 /**
226 * Bloomfilter (for peer identities) to stop circular routes 252 * When does the content expire?
227 */ 253 */
228 char bloomfilter[DHT_BLOOM_SIZE]; 254 struct GNUNET_TIME_AbsoluteNBO expiration_time;
229 255
230 /** 256 /**
231 * The key we are looking for. 257 * The key we are looking for.
232 */ 258 */
233 struct GNUNET_HashCode key; 259 struct GNUNET_HashCode key;
260
261 /* Get path. */
234 262
235}; 263};
236 264
237 265
238/** 266/**
239 * FIXME: Change the comment to explain about usage of this in find successor.
240 * Field in trail setup message to understand if the message is sent to an
241 * intermediate finger, friend or me.
242 */
243enum current_destination_type
244{
245 FRIEND ,
246 FINGER ,
247 MY_ID ,
248 VALUE
249};
250
251
252/**
253 * P2P Trail setup message 267 * P2P Trail setup message
254 */ 268 */
255struct PeerTrailSetupMessage 269struct PeerTrailSetupMessage
@@ -1044,93 +1058,6 @@ GDS_NEIGHBOURS_send_notify_new_successor (struct GNUNET_PeerIdentity *source_pee
1044} 1058}
1045 1059
1046 1060
1047/**FIXME: Old implementation just to remove error
1048 * TODO: Modify this function to handle our get request.
1049 * Perform a GET operation. Forwards the given request to other
1050 * peers. Does not lookup the key locally. May do nothing if this is
1051 * the only peer in the network (or if we are the closest peer in the
1052 * network).
1053 *
1054 * @param type type of the block
1055 * @param options routing options
1056 * @param desired_replication_level desired replication count
1057 * @param hop_count how many hops did this request traverse so far?
1058 * @param key key for the content
1059 * @param xquery extended query
1060 * @param xquery_size number of bytes in @a xquery
1061 * @param reply_bf bloomfilter to filter duplicates
1062 * @param reply_bf_mutator mutator for @a reply_bf
1063 * @param peer_bf filter for peers not to select (again)
1064 */
1065void
1066GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
1067 enum GNUNET_DHT_RouteOption options,
1068 uint32_t desired_replication_level,
1069 uint32_t hop_count, const struct GNUNET_HashCode * key,
1070 const void *xquery, size_t xquery_size,
1071 const struct GNUNET_CONTAINER_BloomFilter *reply_bf,
1072 uint32_t reply_bf_mutator,
1073 struct GNUNET_CONTAINER_BloomFilter *peer_bf)
1074{
1075
1076 /*
1077 1. take the key, get the 64 bit value of the key.
1078 2. call find_successor to get the successor of the key.
1079 3. successor can be either a friend or finger.
1080 4. update the field in get message to reflect if its a friend or finger table
1081 5. add the put message to pending message and send it.
1082 */
1083
1084}
1085
1086/**FIXME: Old implementation just to remove error.
1087 * TODO: Modify this function to handle our put request.
1088 * Perform a PUT operation. Forwards the given request to other
1089 * peers. Does not store the data locally. Does not give the
1090 * data to local clients. May do nothing if this is the only
1091 * peer in the network (or if we are the closest peer in the
1092 * network).
1093 *
1094 * @param type type of the block
1095 * @param options routing options
1096 * @param desired_replication_level desired replication count
1097 * @param expiration_time when does the content expire
1098 * @param hop_count how many hops has this message traversed so far
1099 * @param bf Bloom filter of peers this PUT has already traversed
1100 * @param key key for the content
1101 * @param put_path_length number of entries in @a put_path
1102 * @param put_path peers this request has traversed so far (if tracked)
1103 * @param data payload to store
1104 * @param data_size number of bytes in @a data
1105 */
1106void
1107GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1108 enum GNUNET_DHT_RouteOption options,
1109 uint32_t desired_replication_level,
1110 struct GNUNET_TIME_Absolute expiration_time,
1111 uint32_t hop_count,
1112 struct GNUNET_CONTAINER_BloomFilter *bf,
1113 const struct GNUNET_HashCode *key,
1114 unsigned int put_path_length,
1115 struct GNUNET_PeerIdentity *put_path,
1116 const void *data, size_t data_size)
1117{
1118
1119 /*
1120 1. take the key, get the 64 bit value of the key.
1121 2. call find_successor to get the successor of the key.
1122 3. successor can be either a friend or finger.
1123 4. update the field in put message to reflect if its a friend or finger table
1124 5. add the put message to pending message and send it.
1125 */
1126 /* SUPU: Call is made to this function from client. It does not seem to be
1127 waiting for a confirmation So, once we got the request, we use the key and
1128 try to find the closest successor, but in this case when we reach to the
1129 closest successor in handle_dht_p2p_put, then just do datacache_put. As the calling
1130 function does not need any confirmation, we don't need the result back. */
1131}
1132
1133
1134/** 1061/**
1135 * Randomly choose one of your friends from the friends_peer map 1062 * Randomly choose one of your friends from the friends_peer map
1136 * @return Friend 1063 * @return Friend
@@ -1468,62 +1395,11 @@ core_init (void *cls,
1468 const struct GNUNET_PeerIdentity *identity) 1395 const struct GNUNET_PeerIdentity *identity)
1469{ 1396{
1470 my_identity = *identity; 1397 my_identity = *identity;
1471 1398
1472} 1399}
1473 1400
1474 1401
1475/**
1476 * Core handler for p2p put requests.
1477 *
1478 * @param cls closure
1479 * @param peer sender of the request
1480 * @param message message
1481 * @param peer peer identity this notification is about
1482 * @return #GNUNET_OK to keep the connection open,
1483 * #GNUNET_SYSERR to close it (signal serious error)
1484 */
1485static int
1486handle_dht_p2p_put (void *cls,
1487 const struct GNUNET_PeerIdentity *peer,
1488 const struct GNUNET_MessageHeader *message)
1489{
1490 /**
1491 1. Check if destination is friend or finger.
1492 2. If finger then get the next hop from routing table and
1493 * call GDS_NEGIHBOURS_handle_get.
1494 3. If friend then call find_successor to get the next hop and again
1495 * call GDS_NEIGHBOURS_handle_get to send to chosen hop.
1496 4. If you are the destination then do datacache_store.
1497 */
1498 return 0;
1499}
1500
1501 1402
1502/**
1503 * Core handler for p2p get requests.
1504 *
1505 * @param cls closure
1506 * @param peer sender of the request
1507 * @param message message
1508 * @return #GNUNET_OK to keep the connection open,
1509 * #GNUNET_SYSERR to close it (signal serious error)
1510 */
1511static int
1512handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1513 const struct GNUNET_MessageHeader *message)
1514{
1515 /**
1516 1. Check if destination is friend or finger.
1517 2. If finger then get the next hop from routing table and
1518 * call GDS_NEGIHBOURS_handle_get.
1519 3. If friend then call find_successor to get the next hop and again
1520 * call GDS_NEIGHBOURS_handle_get to send to chosen hop.
1521 4. If you are the destination then send the data back to source peer
1522 * Assuming we have trail setup we can
1523 * either store the whole trail or again do the search process..
1524 */
1525 return 0;
1526}
1527 1403
1528 1404
1529/** 1405/**
@@ -1665,7 +1541,7 @@ compare_peer_id (const void *p1, const void *p2)
1665 1541
1666 1542
1667/** 1543/**
1668 * Return the previous element of value in all_known_peers. 1544 * Return the successor of value in all_known_peers.
1669 * @param all_known_peers list of all the peers 1545 * @param all_known_peers list of all the peers
1670 * @param value value we have to search in the all_known_peers. 1546 * @param value value we have to search in the all_known_peers.
1671 * @return 1547 * @return
@@ -1719,7 +1595,7 @@ find_closest_successor(struct Sorting_List *all_known_peers, uint64_t value,
1719 * @return Peer identity of next destination i.e. successor of value. 1595 * @return Peer identity of next destination i.e. successor of value.
1720 */ 1596 */
1721static struct GNUNET_PeerIdentity * 1597static struct GNUNET_PeerIdentity *
1722find_successor(uint64_t value, struct GNUNET_PeerIdentity *current_destination, 1598find_successor (uint64_t value, struct GNUNET_PeerIdentity *current_destination,
1723 enum current_destination_type *type) 1599 enum current_destination_type *type)
1724{ 1600{
1725 struct GNUNET_CONTAINER_MultiPeerMapIterator *friend_iter; 1601 struct GNUNET_CONTAINER_MultiPeerMapIterator *friend_iter;
@@ -1825,6 +1701,511 @@ find_successor(uint64_t value, struct GNUNET_PeerIdentity *current_destination,
1825 } 1701 }
1826} 1702}
1827 1703
1704#if 0
1705static void
1706replicate_put()
1707{
1708 /* In this function, you should find 'r' (r = desired replication level) successors
1709 and send put message to all of these r successors. Now, I really don't know
1710 if in case of node failure it will be able to find data. Or if we start with
1711 a random peer id, do we even reach to correct successor ever in case of
1712 get. */
1713}
1714#endif
1715
1716/**
1717 *
1718 * @param source_peer
1719 * @param get_path
1720 * @param get_path_length
1721 * @param key
1722 */
1723void
1724GDS_NEIGHBOURS_handle_get (struct GNUNET_PeerIdentity *source_peer,
1725 struct GNUNET_PeerIdentity *get_path,
1726 unsigned int get_path_length,
1727 struct GNUNET_HashCode *key,
1728 struct GNUNET_PeerIdentity *target_peer,
1729 struct GNUNET_PeerIdentity *current_destination,
1730 enum current_destination_type *type)
1731{
1732 struct PeerGetMessage *get_msg;
1733 struct P2PPendingMessage *pending;
1734 struct GNUNET_PeerIdentity *gp;
1735 struct FriendInfo *target_friend;
1736 uint64_t key_value;
1737 size_t msize;
1738
1739 msize = sizeof (struct PeerGetMessage) +
1740 (get_path_length * sizeof (struct GNUNET_PeerIdentity));
1741
1742 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1743 {
1744 GNUNET_break (0);
1745 return;
1746 }
1747
1748 memcpy (&key_value, key, sizeof (uint64_t));
1749
1750 if (NULL == target_peer)
1751 {
1752 /* This is the first call made from client file. */
1753 struct GNUNET_PeerIdentity *next_hop;
1754 next_hop = find_successor (key_value, current_destination, type);
1755
1756 if (*type == MY_ID)
1757 {
1758 struct GNUNET_PeerIdentity *destination_peer;
1759 int current_path_index;
1760
1761 /* FIXME: You enter in this part of code only if the call is made from the
1762 client file. And in client file you already have done the datacache_get.
1763 So, ideally you don't need it. Remove it after checking. */
1764 if (get_path_length != 1)
1765 current_path_index = get_path_length - 2;
1766 destination_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
1767 memcpy (destination_peer, source_peer, sizeof (struct GNUNET_PeerIdentity));
1768 /* I am the final destination, then call GDS_NEIGHBOURS_send_get_result.*/
1769 GDS_NEIGHBOURS_send_get_result (&my_identity,get_path, get_path_length,
1770 destination_peer, current_path_index);
1771 return;
1772 }
1773 else
1774 {
1775 /* Find the friend corresponding to next_hop */
1776 target_friend = GNUNET_CONTAINER_multipeermap_get (friend_peermap, next_hop);
1777 }
1778 }
1779 else
1780 target_friend = GNUNET_CONTAINER_multipeermap_get (friend_peermap, target_peer);
1781
1782 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1783 pending->importance = 0; /* FIXME */
1784 /* FIXME: Do we have an expiration time for get request */
1785 get_msg = (struct PeerGetMessage *) &pending[1];
1786 pending->msg = &get_msg->header;
1787 get_msg->header.size = htons (msize);
1788 get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
1789 get_msg->get_path_length = htonl (get_path_length);
1790 get_msg->key = *key;
1791 memcpy (&(get_msg->source_peer), source_peer, sizeof (struct GNUNET_PeerIdentity));
1792 memcpy (&(get_msg->current_destination), current_destination, sizeof (struct GNUNET_PeerIdentity));
1793 get_msg->dest_type = htonl (*type);
1794
1795 gp = (struct GNUNET_PeerIdentity *) &get_msg[1];
1796 memcpy (gp, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity));
1797 GNUNET_CONTAINER_DLL_insert_tail (target_friend->head, target_friend->tail, pending);
1798 target_friend->pending_count++;
1799 process_friend_queue (target_friend);
1800
1801}
1802
1803
1804/**
1805 * FIXME: In this function, you just find the target friend and send the message
1806 * to next peer. In handle_dht_p2p_put, you should check the options and type
1807 * and check if you are final destination or not. if not then find the next
1808 * destination and send the message forward.
1809 * @param type type of the block
1810 * @param options routing options
1811 * @param desired_replication_level desired replication count
1812 * @param expiration_time when does the content expire
1813 * @param hop_count how many hops has this message traversed so far
1814 * @param key key for the content
1815 * @param put_path_length number of entries in @a put_path
1816 * @param put_path peers this request has traversed so far (if tracked)
1817 * @param data payload to store
1818 * @param data_size number of bytes in @a data
1819 * @param current_destination
1820 * @param dest_type
1821 * @param target_peer_id
1822 */
1823void
1824GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
1825 enum GNUNET_DHT_RouteOption options,
1826 uint32_t desired_replication_level,
1827 struct GNUNET_TIME_Absolute expiration_time,
1828 uint32_t hop_count,
1829 struct GNUNET_HashCode *key,
1830 unsigned int put_path_length,
1831 struct GNUNET_PeerIdentity *put_path,
1832 const void *data, size_t data_size,
1833 struct GNUNET_PeerIdentity *current_destination,
1834 enum current_destination_type *dest_type,
1835 struct GNUNET_PeerIdentity *target_peer)
1836{
1837 struct PeerPutMessage *ppm;
1838 struct P2PPendingMessage *pending;
1839 struct FriendInfo *target_friend;
1840 struct GNUNET_PeerIdentity *pp;
1841 size_t msize;
1842 uint64_t key_value;
1843
1844 msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size +
1845 sizeof (struct PeerPutMessage);
1846
1847 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1848 {
1849 put_path_length = 0;
1850 msize = data_size + sizeof (struct PeerPutMessage);
1851 }
1852
1853 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1854 {
1855 GNUNET_break (0);
1856 return;
1857 }
1858
1859 memcpy (&key_value, key, sizeof (uint64_t));
1860 if (target_peer == NULL)
1861 {
1862 /* This is the first time the call has been made from handle_dht_local_put.
1863 So, you need to search for the next peer to send this message to. */
1864 struct GNUNET_PeerIdentity *next_hop;
1865 next_hop = find_successor (key_value, current_destination, dest_type);
1866
1867 if (*dest_type == MY_ID)
1868 {
1869 /* FIXME: How do we handle different block types? */
1870 /* FIXME: Here depending on the replication level choose 'r' successors
1871 to this peer and send put to all of these peers. */
1872 //replicate_put();
1873 GDS_DATACACHE_handle_put (expiration_time, key, put_path_length, put_path,
1874 type, data_size, data);
1875 return;
1876 }
1877 else
1878 {
1879 /* Find the friend corresponding to next_hop */
1880 target_friend = GNUNET_CONTAINER_multipeermap_get (friend_peermap, next_hop);
1881 }
1882 }
1883 else
1884 target_friend = GNUNET_CONTAINER_multipeermap_get (friend_peermap, target_peer);
1885
1886 pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
1887 pending->importance = 0; /* FIXME */
1888 pending->timeout = expiration_time;
1889 ppm = (struct PeerPutMessage *) &pending[1];
1890 pending->msg = &ppm->header;
1891 ppm->header.size = htons (msize);
1892 ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
1893 ppm->options = htonl (options);
1894 ppm->type = htonl (type);
1895 ppm->hop_count = htonl (hop_count + 1);
1896 ppm->desired_replication_level = htonl (desired_replication_level);
1897 ppm->put_path_length = htonl (put_path_length);
1898 ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
1899 memcpy (&(ppm->current_destination), current_destination, sizeof (struct GNUNET_PeerIdentity));
1900 ppm->current_destination_type = htonl (*dest_type);
1901 ppm->key = *key;
1902
1903 pp = (struct GNUNET_PeerIdentity *) &ppm[1];
1904 memcpy (pp, put_path,
1905 sizeof (struct GNUNET_PeerIdentity) * put_path_length);
1906 memcpy (&pp[put_path_length], data, data_size);
1907 GNUNET_CONTAINER_DLL_insert_tail (target_friend->head, target_friend->tail, pending);
1908 target_friend->pending_count++;
1909 process_friend_queue (target_friend);
1910}
1911
1912
1913/**
1914 *
1915 * @param source_peer
1916 * @param get_path
1917 * @param get_path_length
1918 * @param destination_peer
1919 */
1920void
1921GDS_NEIGHBOURS_send_get_result (struct GNUNET_PeerIdentity *source_peer,
1922 struct GNUNET_PeerIdentity *get_path,
1923 unsigned int get_path_length,
1924 struct GNUNET_PeerIdentity *destination_peer,
1925 unsigned int current_path_index)
1926{
1927 /* Add get_result into pending message and send the data to target friend. */
1928#if 0
1929 struct PeerGetResultMessage *get_result;
1930 struct P2PPendingMessage *pending;
1931 size_t msize;
1932
1933 msize = (get_path_length * sizeof (struct GNUNET_PeerIdentity)) +
1934 sizeof (struct PeerGetResultMessage);
1935
1936 if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1937 {
1938 GNUNET_break (0);
1939 return;
1940 }
1941
1942#endif
1943}
1944
1945
1946/**
1947 *
1948 * @return
1949 */
1950static int
1951handle_dht_p2p_get_result ()
1952{
1953 /* If you are the source, go back to the client file and there search for
1954 the requesting client and send back the result. */
1955 return GNUNET_YES;
1956}
1957
1958
1959
1960/**
1961 * Core handler for p2p put requests.
1962 *
1963 * @param cls closure
1964 * @param peer sender of the request
1965 * @param message message
1966 * @param peer peer identity this notification is about
1967 * @return #GNUNET_OK to keep the connection open,
1968 * #GNUNET_SYSERR to close it (signal serious error)
1969 */
1970static int
1971handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
1972 const struct GNUNET_MessageHeader *message)
1973{
1974 struct PeerPutMessage *put;
1975 struct GNUNET_PeerIdentity *put_path;
1976 enum GNUNET_DHT_RouteOption options;
1977 enum current_destination_type current_dst_type;
1978 struct GNUNET_PeerIdentity *current_destination;
1979 struct GNUNET_PeerIdentity *source_peer;
1980 struct GNUNET_PeerIdentity *next_hop;
1981 struct GNUNET_HashCode test_key;
1982 uint64_t key_value;
1983 void *payload;
1984 size_t payload_size;
1985 size_t msize;
1986 uint32_t putlen;
1987
1988 msize = ntohs (message->size);
1989 if (msize < sizeof (struct PeerPutMessage))
1990 {
1991 GNUNET_break_op (0);
1992 return GNUNET_YES;
1993 }
1994
1995 put = (struct PeerPutMessage *) message;
1996 putlen = ntohl (put->put_path_length);
1997 if ((msize <
1998 sizeof (struct PeerPutMessage) +
1999 putlen * sizeof (struct GNUNET_PeerIdentity)) ||
2000 (putlen >
2001 GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2002 {
2003 GNUNET_break_op (0);
2004 return GNUNET_YES;
2005 }
2006
2007 put_path = (struct GNUNET_PeerIdentity *) &put[1];
2008 payload = &put_path[putlen];
2009 options = ntohl (put->options);
2010 payload_size = msize - (sizeof (struct PeerPutMessage) +
2011 putlen * sizeof (struct GNUNET_PeerIdentity));
2012
2013 /* FIXME: I don't understand what exactly are we doing here. */
2014 switch (GNUNET_BLOCK_get_key
2015 (GDS_block_context, ntohl (put->type), payload, payload_size,
2016 &test_key))
2017 {
2018 case GNUNET_YES:
2019 if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode)))
2020 {
2021 char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key));
2022 GNUNET_break_op (0);
2023 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2024 "PUT with key `%s' for block with key %s\n",
2025 put_s, GNUNET_h2s_full (&test_key));
2026 GNUNET_free (put_s);
2027 return GNUNET_YES;
2028 }
2029 break;
2030 case GNUNET_NO:
2031 GNUNET_break_op (0);
2032 return GNUNET_YES;
2033 case GNUNET_SYSERR:
2034 /* cannot verify, good luck */
2035 break;
2036 }
2037
2038 /* FIXME: This part is also not clear to me.*/
2039 if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */
2040 {
2041 switch (GNUNET_BLOCK_evaluate (GDS_block_context,
2042 ntohl (put->type),
2043 NULL, /* query */
2044 NULL, 0, /* bloom filer */
2045 NULL, 0, /* xquery */
2046 payload, payload_size))
2047 {
2048 case GNUNET_BLOCK_EVALUATION_OK_MORE:
2049 case GNUNET_BLOCK_EVALUATION_OK_LAST:
2050 break;
2051
2052 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2053 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2054 case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
2055 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2056 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2057 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2058 default:
2059 GNUNET_break_op (0);
2060 return GNUNET_OK;
2061 }
2062 }
2063
2064 struct GNUNET_PeerIdentity pp[putlen + 1];
2065
2066 /* extend 'put path' by sender */
2067 if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE))
2068 {
2069 memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
2070 pp[putlen] = *peer;
2071 putlen++;
2072 }
2073 else
2074 putlen = 0;
2075
2076 /* Copy the fields of message, call find successor or gds_routing_search,
2077 depending on the destination_type and if you are the final destination,
2078 do a datache put or if option is. else call gds_neighbours_handle_get with
2079 correct parameters. */
2080 current_destination = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
2081 memcpy (current_destination, &(put->current_destination), sizeof (struct GNUNET_PeerIdentity));
2082 current_dst_type = ntohl (put->current_destination_type);
2083 memcpy (&key_value, &(put->key), sizeof (uint64_t));
2084 source_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
2085 memcpy (source_peer, &(put->source_peer), sizeof (struct GNUNET_PeerIdentity));
2086
2087 if (current_dst_type == FRIEND)
2088 {
2089 next_hop = find_successor (key_value, current_destination, &current_dst_type);
2090 }
2091 else if (current_dst_type == FINGER)
2092 {
2093 next_hop = GDS_ROUTING_search (source_peer, current_destination);
2094 }
2095
2096 if (current_dst_type == MY_ID)
2097 {
2098 /* Here datacache_put*/
2099 /* FIXME: Here depending on replication, call replicate_put() to do the
2100 put operation on 'r' successors. */
2101 GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
2102 &(put->key),putlen, pp, ntohl (put->type), payload_size,
2103 payload);
2104 return GNUNET_YES;
2105 }
2106 else
2107 {
2108 /* here call gds_neighbours*/
2109 GDS_NEIGHBOURS_handle_put (ntohl (put->type),ntohl (put->options),
2110 ntohl (put->desired_replication_level),
2111 GNUNET_TIME_absolute_ntoh (put->expiration_time),
2112 ntohl (put->hop_count),&put->key, putlen,
2113 pp, payload, payload_size,
2114 current_destination, &current_dst_type, next_hop);
2115 return GNUNET_YES;
2116 }
2117 return GNUNET_SYSERR;
2118}
2119
2120
2121/**
2122 * FIXME: Handle expiration, options, block type, replication
2123 * referring the old code.
2124 * Core handler for p2p get requests.
2125 *
2126 * @param cls closure
2127 * @param peer sender of the request
2128 * @param message message
2129 * @return #GNUNET_OK to keep the connection open,
2130 * #GNUNET_SYSERR to close it (signal serious error)
2131 */
2132static int
2133handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
2134 const struct GNUNET_MessageHeader *message)
2135{
2136 struct PeerGetMessage *get;
2137 struct GNUNET_PeerIdentity *current_destination;
2138 uint64_t key_value;
2139 enum current_destination_type dest_type;
2140 struct GNUNET_PeerIdentity *next_hop;
2141 struct GNUNET_PeerIdentity *get_path;
2142 size_t msize;
2143 unsigned int get_length;
2144
2145 msize = ntohs (message->size);
2146 if (msize < sizeof (struct PeerGetMessage))
2147 {
2148 GNUNET_break_op (0);
2149 return GNUNET_YES;
2150 }
2151
2152 get = (struct PeerGetMessage *)message;
2153 get_length = ntohl (get->get_path_length);
2154 get_path = (struct GNUNET_PeerIdentity *)&get[1];
2155
2156 if ((msize <
2157 sizeof (struct PeerGetMessage) +
2158 get_length * sizeof (struct GNUNET_PeerIdentity)) ||
2159 (get_length >
2160 GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)))
2161 {
2162 GNUNET_break_op (0);
2163 return GNUNET_YES;
2164 }
2165
2166 get = (struct PeerGetMessage *) message;
2167 current_destination = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
2168 memcpy (current_destination, &(get->current_destination), sizeof (struct GNUNET_PeerIdentity));
2169 memcpy (&key_value, &(get->key), sizeof (uint64_t));
2170 dest_type = ntohl (get->dest_type);
2171
2172 if (dest_type == FRIEND)
2173 {
2174 next_hop = find_successor (key_value, current_destination, &dest_type);
2175 }
2176 else if (dest_type == FINGER)
2177 {
2178 next_hop = GDS_ROUTING_search (&(get->source_peer), current_destination);
2179 }
2180
2181 if (dest_type == MY_ID)
2182 {
2183 struct GNUNET_PeerIdentity *destination_peer;
2184 int current_path_index;
2185
2186 /* Add yourself to the get path, increment the get length. */
2187 destination_peer = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
2188 memcpy (destination_peer, &(get->source_peer), sizeof (struct GNUNET_PeerIdentity));
2189 current_path_index = get_length - 2;
2190
2191 /* I am the final destination. Call GDS_NEIGHBOURS_send_get_result. */
2192 GDS_NEIGHBOURS_send_get_result (&my_identity, get_path, get_length,
2193 destination_peer, current_path_index);
2194 return GNUNET_YES;
2195 }
2196 else
2197 {
2198 /* FIXME: Add your self to the get path and increment the get length. */
2199
2200 /* FIXME: Does it matter if the dest_type is friend or finger. */
2201 GDS_NEIGHBOURS_handle_get (&(get->source_peer), get_path, get_length, &(get->key),
2202 next_hop, current_destination,&dest_type);
2203
2204 return GNUNET_YES;
2205 }
2206 return GNUNET_SYSERR;
2207}
2208
1828 2209
1829/** 2210/**
1830 * Handle a PeerTrailSetupMessage. 2211 * Handle a PeerTrailSetupMessage.
@@ -2434,6 +2815,7 @@ GDS_NEIGHBOURS_init()
2434{ 2815{
2435 static struct GNUNET_CORE_MessageHandler core_handlers[] = { 2816 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
2436 {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0}, 2817 {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
2818 {&handle_dht_p2p_get_result, GNUNET_MESSAGE_TYPE_DHT_P2P_GET_RESULT, 0},
2437 {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0}, 2819 {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0},
2438 {&handle_dht_p2p_trail_setup, GNUNET_MESSAGE_TYPE_DHT_P2P_TRAIL_SETUP, 0}, 2820 {&handle_dht_p2p_trail_setup, GNUNET_MESSAGE_TYPE_DHT_P2P_TRAIL_SETUP, 0},
2439 {&handle_dht_p2p_trail_setup_result, GNUNET_MESSAGE_TYPE_DHT_P2P_TRAIL_SETUP_RESULT, 0}, 2821 {&handle_dht_p2p_trail_setup_result, GNUNET_MESSAGE_TYPE_DHT_P2P_TRAIL_SETUP_RESULT, 0},
diff --git a/src/dht/gnunet-service-xdht_neighbours.h b/src/dht/gnunet-service-xdht_neighbours.h
index 0b6eaa6af..da5629ffb 100644
--- a/src/dht/gnunet-service-xdht_neighbours.h
+++ b/src/dht/gnunet-service-xdht_neighbours.h
@@ -32,6 +32,19 @@
32#include "gnunet_dht_service.h" 32#include "gnunet_dht_service.h"
33 33
34/** 34/**
35 * FIXME: Change the comment to explain about usage of this in find successor.
36 * Field in trail setup message to understand if the message is sent to an
37 * intermediate finger, friend or me.
38 */
39enum current_destination_type
40{
41 FRIEND ,
42 FINGER ,
43 MY_ID ,
44 VALUE
45};
46
47/**
35 * Perform a PUT operation. Forwards the given request to other 48 * Perform a PUT operation. Forwards the given request to other
36 * peers. Does not store the data locally. Does not give the 49 * peers. Does not store the data locally. Does not give the
37 * data to local clients. May do nothing if this is the only 50 * data to local clients. May do nothing if this is the only
@@ -40,15 +53,14 @@
40 * 53 *
41 * @param type type of the block 54 * @param type type of the block
42 * @param options routing options 55 * @param options routing options
43 * @param desired_replication_level desired replication level 56 * @param desired_replication_level desired replication count
44 * @param expiration_time when does the content expire 57 * @param expiration_time when does the content expire
45 * @param hop_count how many hops has this message traversed so far 58 * @param hop_count how many hops has this message traversed so far
46 * @param bf Bloom filter of peers this PUT has already traversed
47 * @param key key for the content 59 * @param key key for the content
48 * @param put_path_length number of entries in put_path 60 * @param put_path_length number of entries in @a put_path
49 * @param put_path peers this request has traversed so far (if tracked) 61 * @param put_path peers this request has traversed so far (if tracked)
50 * @param data payload to store 62 * @param data payload to store
51 * @param data_size number of bytes in data 63 * @param data_size number of bytes in @a data
52 */ 64 */
53void 65void
54GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, 66GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
@@ -56,39 +68,44 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
56 uint32_t desired_replication_level, 68 uint32_t desired_replication_level,
57 struct GNUNET_TIME_Absolute expiration_time, 69 struct GNUNET_TIME_Absolute expiration_time,
58 uint32_t hop_count, 70 uint32_t hop_count,
59 struct GNUNET_CONTAINER_BloomFilter *bf, 71 struct GNUNET_HashCode * key,
60 const struct GNUNET_HashCode * key,
61 unsigned int put_path_length, 72 unsigned int put_path_length,
62 struct GNUNET_PeerIdentity *put_path, 73 struct GNUNET_PeerIdentity *put_path,
63 const void *data, size_t data_size); 74 const void *data, size_t data_size,
75 struct GNUNET_PeerIdentity *current_destination,
76 enum current_destination_type *dest_type,
77 struct GNUNET_PeerIdentity *target_peer_id);
64 78
65 79
66/** 80/**
67 * Perform a GET operation. Forwards the given request to other 81 *
68 * peers. Does not lookup the key locally. May do nothing if this is 82 * @param source_peer
69 * the only peer in the network (or if we are the closest peer in the 83 * @param get_path
70 * network). 84 * @param get_path_length
71 * 85 * @param key
72 * @param type type of the block
73 * @param options routing options
74 * @param desired_replication_level desired replication count
75 * @param hop_count how many hops did this request traverse so far?
76 * @param key key for the content
77 * @param xquery extended query
78 * @param xquery_size number of bytes in xquery
79 * @param reply_bf bloomfilter to filter duplicates
80 * @param reply_bf_mutator mutator for reply_bf
81 * @param peer_bf filter for peers not to select (again, updated)
82 */ 86 */
83void 87void
84GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, 88GDS_NEIGHBOURS_handle_get (struct GNUNET_PeerIdentity *source_peer,
85 enum GNUNET_DHT_RouteOption options, 89 struct GNUNET_PeerIdentity *get_path,
86 uint32_t desired_replication_level, 90 unsigned int get_path_length,
87 uint32_t hop_count, const struct GNUNET_HashCode * key, 91 struct GNUNET_HashCode *key,
88 const void *xquery, size_t xquery_size, 92 struct GNUNET_PeerIdentity *target_peer,
89 const struct GNUNET_CONTAINER_BloomFilter *reply_bf, 93 struct GNUNET_PeerIdentity *current_destination,
90 uint32_t reply_bf_mutator, 94 enum current_destination_type *type);
91 struct GNUNET_CONTAINER_BloomFilter *peer_bf); 95
96/**
97 *
98 * @param source_peer
99 * @param get_path
100 * @param get_path_length
101 * @param destination_peer
102 */
103void
104GDS_NEIGHBOURS_send_get_result (struct GNUNET_PeerIdentity *source_peer,
105 struct GNUNET_PeerIdentity *get_path,
106 unsigned int get_path_length,
107 struct GNUNET_PeerIdentity *destination_peer,
108 unsigned int current_path_index);
92 109
93 110
94/** 111/**
diff --git a/src/dht/gnunet-service-xdht_routing.c b/src/dht/gnunet-service-xdht_routing.c
index d8912b54b..55ef1f4b2 100644
--- a/src/dht/gnunet-service-xdht_routing.c
+++ b/src/dht/gnunet-service-xdht_routing.c
@@ -28,14 +28,6 @@
28#include "gnunet-service-xdht_routing.h" 28#include "gnunet-service-xdht_routing.h"
29#include "gnunet-service-xdht.h" 29#include "gnunet-service-xdht.h"
30 30
31/* FIXME
32 * 1. We need field to understand which routing table is for which peer.
33 * 2. Better function names and variable names.
34 * 3. Use destination peer id as key for routing table.
35 * 4. What does GDS stands for?
36 *
37 */
38
39 31
40/** 32/**
41 * Number of requests we track at most (for routing replies). 33 * Number of requests we track at most (for routing replies).
@@ -109,7 +101,7 @@ GDS_ROUTING_add (struct GNUNET_PeerIdentity *source,
109} 101}
110 102
111 103
112/** 104/**FIXME: Test if its correct or not.
113 * Find the next hop to send packet to . 105 * Find the next hop to send packet to .
114 * @return next hop peer id 106 * @return next hop peer id
115 */ 107 */
@@ -153,7 +145,7 @@ GDS_ROUTING_process (enum GNUNET_BLOCK_Type type,
153 const struct GNUNET_PeerIdentity *get_path, 145 const struct GNUNET_PeerIdentity *get_path,
154 const void *data, size_t data_size) 146 const void *data, size_t data_size)
155{ 147{
156 148 return;
157} 149}
158 150
159 151