aboutsummaryrefslogtreecommitdiff
path: root/src/dht/dht_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht/dht_api.c')
-rw-r--r--src/dht/dht_api.c1266
1 files changed, 0 insertions, 1266 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
deleted file mode 100644
index 96399cb5a..000000000
--- a/src/dht/dht_api.c
+++ /dev/null
@@ -1,1266 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011, 2012, 2016, 2018 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file dht/dht_api.c
23 * @brief library to access the DHT service
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27
28#include "platform.h"
29#include "gnunet_util_lib.h"
30#include "gnunet_constants.h"
31#include "gnunet_arm_service.h"
32#include "gnunet_hello_lib.h"
33#include "gnunet_protocols.h"
34#include "gnunet_dht_service.h"
35#include "dht.h"
36
37#define LOG(kind, ...) GNUNET_log_from (kind, "dht-api", __VA_ARGS__)
38
39
40/**
41 * Handle to a PUT request.
42 */
43struct GNUNET_DHT_PutHandle
44{
45 /**
46 * Kept in a DLL.
47 */
48 struct GNUNET_DHT_PutHandle *next;
49
50 /**
51 * Kept in a DLL.
52 */
53 struct GNUNET_DHT_PutHandle *prev;
54
55 /**
56 * Continuation to call when done.
57 */
58 GNUNET_SCHEDULER_TaskCallback cont;
59
60 /**
61 * Main handle to this DHT api
62 */
63 struct GNUNET_DHT_Handle *dht_handle;
64
65 /**
66 * Closure for @e cont.
67 */
68 void *cont_cls;
69
70 /**
71 * Envelope from the PUT operation.
72 */
73 struct GNUNET_MQ_Envelope *env;
74};
75
76/**
77 * Handle to a GET request
78 */
79struct GNUNET_DHT_GetHandle
80{
81 /**
82 * Iterator to call on data receipt
83 */
84 GNUNET_DHT_GetIterator iter;
85
86 /**
87 * Closure for @e iter.
88 */
89 void *iter_cls;
90
91 /**
92 * Main handle to this DHT api
93 */
94 struct GNUNET_DHT_Handle *dht_handle;
95
96 /**
97 * Array of hash codes over the results that we have already
98 * seen.
99 */
100 struct GNUNET_HashCode *seen_results;
101
102 /**
103 * Key that this get request is for
104 */
105 struct GNUNET_HashCode key;
106
107 /**
108 * Unique identifier for this request (for key collisions).
109 */
110 uint64_t unique_id;
111
112 /**
113 * Size of the extended query, allocated at the end of this struct.
114 */
115 size_t xquery_size;
116
117 /**
118 * Desired replication level.
119 */
120 uint32_t desired_replication_level;
121
122 /**
123 * Type of the block we are looking for.
124 */
125 enum GNUNET_BLOCK_Type type;
126
127 /**
128 * Routing options.
129 */
130 enum GNUNET_DHT_RouteOption options;
131
132 /**
133 * Size of the @e seen_results array. Note that not
134 * all positions might be used (as we over-allocate).
135 */
136 unsigned int seen_results_size;
137
138 /**
139 * Offset into the @e seen_results array marking the
140 * end of the positions that are actually used.
141 */
142 unsigned int seen_results_end;
143};
144
145
146/**
147 * Handle to a monitoring request.
148 */
149struct GNUNET_DHT_MonitorHandle
150{
151 /**
152 * DLL.
153 */
154 struct GNUNET_DHT_MonitorHandle *next;
155
156 /**
157 * DLL.
158 */
159 struct GNUNET_DHT_MonitorHandle *prev;
160
161 /**
162 * Main handle to this DHT api.
163 */
164 struct GNUNET_DHT_Handle *dht_handle;
165
166 /**
167 * Type of block looked for.
168 */
169 enum GNUNET_BLOCK_Type type;
170
171 /**
172 * Key being looked for, NULL == all.
173 */
174 struct GNUNET_HashCode *key;
175
176 /**
177 * Callback for each received message of type get.
178 */
179 GNUNET_DHT_MonitorGetCB get_cb;
180
181 /**
182 * Callback for each received message of type get response.
183 */
184 GNUNET_DHT_MonitorGetRespCB get_resp_cb;
185
186 /**
187 * Callback for each received message of type put.
188 */
189 GNUNET_DHT_MonitorPutCB put_cb;
190
191 /**
192 * Closure for @e get_cb, @e put_cb and @e get_resp_cb.
193 */
194 void *cb_cls;
195};
196
197
198/**
199 * Connection to the DHT service.
200 */
201struct GNUNET_DHT_Handle
202{
203 /**
204 * Configuration to use.
205 */
206 const struct GNUNET_CONFIGURATION_Handle *cfg;
207
208 /**
209 * Connection to DHT service.
210 */
211 struct GNUNET_MQ_Handle *mq;
212
213 /**
214 * Head of linked list of messages we would like to monitor.
215 */
216 struct GNUNET_DHT_MonitorHandle *monitor_head;
217
218 /**
219 * Tail of linked list of messages we would like to monitor.
220 */
221 struct GNUNET_DHT_MonitorHandle *monitor_tail;
222
223 /**
224 * Head of active PUT requests.
225 */
226 struct GNUNET_DHT_PutHandle *put_head;
227
228 /**
229 * Tail of active PUT requests.
230 */
231 struct GNUNET_DHT_PutHandle *put_tail;
232
233 /**
234 * Hash map containing the current outstanding unique GET requests
235 * (values are of type `struct GNUNET_DHT_GetHandle`).
236 */
237 struct GNUNET_CONTAINER_MultiHashMap *active_requests;
238
239 /**
240 * Task for trying to reconnect.
241 */
242 struct GNUNET_SCHEDULER_Task *reconnect_task;
243
244 /**
245 * How quickly should we retry? Used for exponential back-off on
246 * connect-errors.
247 */
248 struct GNUNET_TIME_Relative retry_time;
249
250 /**
251 * Generator for unique ids.
252 */
253 uint64_t uid_gen;
254};
255
256
257/**
258 * Try to (re)connect to the DHT service.
259 *
260 * @param h DHT handle to reconnect
261 * @return #GNUNET_YES on success, #GNUNET_NO on failure.
262 */
263static int
264try_connect (struct GNUNET_DHT_Handle *h);
265
266
267/**
268 * Send GET message for a @a get_handle to DHT.
269 *
270 * @param gh GET to generate messages for.
271 */
272static void
273send_get (struct GNUNET_DHT_GetHandle *gh)
274{
275 struct GNUNET_DHT_Handle *h = gh->dht_handle;
276 struct GNUNET_MQ_Envelope *env;
277 struct GNUNET_DHT_ClientGetMessage *get_msg;
278
279 env = GNUNET_MQ_msg_extra (get_msg,
280 gh->xquery_size,
281 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET);
282 get_msg->options = htonl ((uint32_t) gh->options);
283 get_msg->desired_replication_level = htonl (gh->desired_replication_level);
284 get_msg->type = htonl (gh->type);
285 get_msg->key = gh->key;
286 get_msg->unique_id = gh->unique_id;
287 GNUNET_memcpy (&get_msg[1],
288 &gh[1],
289 gh->xquery_size);
290 GNUNET_MQ_send (h->mq,
291 env);
292}
293
294
295/**
296 * Send GET message(s) for indicating which results are already known
297 * for a @a get_handle to DHT. Complex as we need to send the list of
298 * known results, which means we may need multiple messages to block
299 * known results from the result set.
300 *
301 * @param gh GET to generate messages for
302 * @param transmission_offset_start at which offset should we start?
303 */
304static void
305send_get_known_results (struct GNUNET_DHT_GetHandle *gh,
306 unsigned int transmission_offset_start)
307{
308 struct GNUNET_DHT_Handle *h = gh->dht_handle;
309 struct GNUNET_MQ_Envelope *env;
310 struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
311 unsigned int delta;
312 unsigned int max;
313 unsigned int transmission_offset;
314
315 max = (GNUNET_MAX_MESSAGE_SIZE - sizeof(*msg))
316 / sizeof(struct GNUNET_HashCode);
317 transmission_offset = transmission_offset_start;
318 while (transmission_offset < gh->seen_results_end)
319 {
320 delta = gh->seen_results_end - transmission_offset;
321 if (delta > max)
322 delta = max;
323 env = GNUNET_MQ_msg_extra (msg,
324 delta * sizeof(struct GNUNET_HashCode),
325 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
326 msg->key = gh->key;
327 msg->unique_id = gh->unique_id;
328 GNUNET_memcpy (&msg[1],
329 &gh->seen_results[transmission_offset],
330 sizeof(struct GNUNET_HashCode) * delta);
331 GNUNET_MQ_send (h->mq,
332 env);
333 transmission_offset += delta;
334 }
335}
336
337
338/**
339 * Add the GET request corresponding to the given route handle
340 * to the pending queue (if it is not already in there).
341 *
342 * @param cls the `struct GNUNET_DHT_Handle *`
343 * @param key key for the request (not used)
344 * @param value the `struct GNUNET_DHT_GetHandle *`
345 * @return #GNUNET_YES (always)
346 */
347static int
348add_get_request_to_pending (void *cls,
349 const struct GNUNET_HashCode *key,
350 void *value)
351{
352 struct GNUNET_DHT_Handle *handle = cls;
353 struct GNUNET_DHT_GetHandle *gh = value;
354
355 LOG (GNUNET_ERROR_TYPE_DEBUG,
356 "Retransmitting request related to %s to DHT %p\n",
357 GNUNET_h2s (key),
358 handle);
359 send_get (gh);
360 send_get_known_results (gh, 0);
361 return GNUNET_YES;
362}
363
364
365/**
366 * Send #GNUNET_MESSAGE_TYPE_DHT_MONITOR_START message.
367 *
368 * @param mh monitor handle to generate start message for
369 */
370static void
371send_monitor_start (struct GNUNET_DHT_MonitorHandle *mh)
372{
373 struct GNUNET_DHT_Handle *h = mh->dht_handle;
374 struct GNUNET_MQ_Envelope *env;
375 struct GNUNET_DHT_MonitorStartStopMessage *m;
376
377 env = GNUNET_MQ_msg (m,
378 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
379 m->type = htonl (mh->type);
380 m->get = htons (NULL != mh->get_cb);
381 m->get_resp = htons (NULL != mh->get_resp_cb);
382 m->put = htons (NULL != mh->put_cb);
383 if (NULL != mh->key)
384 {
385 m->filter_key = htons (1);
386 m->key = *mh->key;
387 }
388 GNUNET_MQ_send (h->mq,
389 env);
390}
391
392
393/**
394 * Try reconnecting to the dht service.
395 *
396 * @param cls a `struct GNUNET_DHT_Handle`
397 */
398static void
399try_reconnect (void *cls)
400{
401 struct GNUNET_DHT_Handle *h = cls;
402 struct GNUNET_DHT_MonitorHandle *mh;
403
404 LOG (GNUNET_ERROR_TYPE_DEBUG,
405 "Reconnecting with DHT %p\n",
406 h);
407 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
408 h->reconnect_task = NULL;
409 if (GNUNET_YES != try_connect (h))
410 {
411 LOG (GNUNET_ERROR_TYPE_WARNING,
412 "DHT reconnect failed!\n");
413 h->reconnect_task
414 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
415 &try_reconnect,
416 h);
417 return;
418 }
419 GNUNET_CONTAINER_multihashmap_iterate (h->active_requests,
420 &add_get_request_to_pending,
421 h);
422 for (mh = h->monitor_head; NULL != mh; mh = mh->next)
423 send_monitor_start (mh);
424}
425
426
427/**
428 * Try reconnecting to the DHT service.
429 *
430 * @param h handle to dht to (possibly) disconnect and reconnect
431 */
432static void
433do_disconnect (struct GNUNET_DHT_Handle *h)
434{
435 struct GNUNET_DHT_PutHandle *ph;
436 GNUNET_SCHEDULER_TaskCallback cont;
437 void *cont_cls;
438
439 if (NULL == h->mq)
440 return;
441 GNUNET_MQ_destroy (h->mq);
442 h->mq = NULL;
443 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
444 "Disconnecting from DHT service, will try to reconnect in %s\n",
445 GNUNET_STRINGS_relative_time_to_string (h->retry_time,
446 GNUNET_YES));
447 /* notify client about all PUTs that (may) have failed due to disconnect */
448 while (NULL != (ph = h->put_head))
449 {
450 cont = ph->cont;
451 cont_cls = ph->cont_cls;
452 ph->env = NULL;
453 GNUNET_DHT_put_cancel (ph);
454 if (NULL != cont)
455 cont (cont_cls);
456 }
457 GNUNET_assert (NULL == h->reconnect_task);
458 h->reconnect_task
459 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
460 &try_reconnect,
461 h);
462}
463
464
465/**
466 * Generic error handler, called with the appropriate error code and
467 * the same closure specified at the creation of the message queue.
468 * Not every message queue implementation supports an error handler.
469 *
470 * @param cls closure with the `struct GNUNET_DHT_Handle *`
471 * @param error error code
472 */
473static void
474mq_error_handler (void *cls,
475 enum GNUNET_MQ_Error error)
476{
477 struct GNUNET_DHT_Handle *h = cls;
478
479 do_disconnect (h);
480}
481
482
483/**
484 * Verify integrity of a get monitor message from the service.
485 *
486 * @param cls The DHT handle.
487 * @param msg Monitor get message from the service.
488 * @return #GNUNET_OK if everything went fine,
489 * #GNUNET_SYSERR if the message is malformed.
490 */
491static int
492check_monitor_get (void *cls,
493 const struct GNUNET_DHT_MonitorGetMessage *msg)
494{
495 uint32_t plen = ntohl (msg->get_path_length);
496 uint16_t msize = ntohs (msg->header.size) - sizeof(*msg);
497
498 if ((plen > UINT16_MAX) ||
499 (plen * sizeof(struct GNUNET_PeerIdentity) != msize))
500 {
501 GNUNET_break (0);
502 return GNUNET_SYSERR;
503 }
504 return GNUNET_OK;
505}
506
507
508/**
509 * Process a get monitor message from the service.
510 *
511 * @param cls The DHT handle.
512 * @param msg Monitor get message from the service.
513 */
514static void
515handle_monitor_get (void *cls,
516 const struct GNUNET_DHT_MonitorGetMessage *msg)
517{
518 struct GNUNET_DHT_Handle *handle = cls;
519 struct GNUNET_DHT_MonitorHandle *mh;
520
521 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
522 {
523 if (NULL == mh->get_cb)
524 continue;
525 if (((GNUNET_BLOCK_TYPE_ANY == mh->type) ||
526 (mh->type == ntohl (msg->type))) &&
527 ((NULL == mh->key) ||
528 (0 == memcmp (mh->key,
529 &msg->key,
530 sizeof(struct GNUNET_HashCode)))))
531 mh->get_cb (mh->cb_cls,
532 ntohl (msg->options),
533 (enum GNUNET_BLOCK_Type) ntohl (msg->type),
534 ntohl (msg->hop_count),
535 ntohl (msg->desired_replication_level),
536 ntohl (msg->get_path_length),
537 (struct GNUNET_PeerIdentity *) &msg[1],
538 &msg->key);
539 }
540}
541
542
543/**
544 * Validate a get response monitor message from the service.
545 *
546 * @param cls The DHT handle.
547 * @param msg monitor get response message from the service
548 * @return #GNUNET_OK if everything went fine,
549 * #GNUNET_SYSERR if the message is malformed.
550 */
551static int
552check_monitor_get_resp (void *cls,
553 const struct GNUNET_DHT_MonitorGetRespMessage *msg)
554{
555 size_t msize = ntohs (msg->header.size) - sizeof(*msg);
556 uint32_t getl = ntohl (msg->get_path_length);
557 uint32_t putl = ntohl (msg->put_path_length);
558
559 if ((getl + putl < getl) ||
560 ((msize / sizeof(struct GNUNET_PeerIdentity)) < getl + putl))
561 {
562 GNUNET_break (0);
563 return GNUNET_SYSERR;
564 }
565 return GNUNET_OK;
566}
567
568
569/**
570 * Process a get response monitor message from the service.
571 *
572 * @param cls The DHT handle.
573 * @param msg monitor get response message from the service
574 */
575static void
576handle_monitor_get_resp (void *cls,
577 const struct GNUNET_DHT_MonitorGetRespMessage *msg)
578{
579 struct GNUNET_DHT_Handle *handle = cls;
580 size_t msize = ntohs (msg->header.size) - sizeof(*msg);
581 const struct GNUNET_PeerIdentity *path;
582 uint32_t getl = ntohl (msg->get_path_length);
583 uint32_t putl = ntohl (msg->put_path_length);
584 struct GNUNET_DHT_MonitorHandle *mh;
585
586 path = (const struct GNUNET_PeerIdentity *) &msg[1];
587 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
588 {
589 if (NULL == mh->get_resp_cb)
590 continue;
591 if (((GNUNET_BLOCK_TYPE_ANY == mh->type) ||
592 (mh->type == ntohl (msg->type))) &&
593 ((NULL == mh->key) ||
594 (0 == memcmp (mh->key,
595 &msg->key,
596 sizeof(struct GNUNET_HashCode)))))
597 mh->get_resp_cb (mh->cb_cls,
598 (enum GNUNET_BLOCK_Type) ntohl (msg->type),
599 path,
600 getl,
601 &path[getl],
602 putl,
603 GNUNET_TIME_absolute_ntoh (msg->expiration_time),
604 &msg->key,
605 (const void *) &path[getl + putl],
606 msize - sizeof(struct GNUNET_PeerIdentity) * (putl
607 + getl));
608 }
609}
610
611
612/**
613 * Check validity of a put monitor message from the service.
614 *
615 * @param cls The DHT handle.
616 * @param msg Monitor put message from the service.
617 * @return #GNUNET_OK if everything went fine,
618 * #GNUNET_SYSERR if the message is malformed.
619 */
620static int
621check_monitor_put (void *cls,
622 const struct GNUNET_DHT_MonitorPutMessage *msg)
623{
624 size_t msize;
625 uint32_t putl;
626
627 msize = ntohs (msg->header.size) - sizeof(*msg);
628 putl = ntohl (msg->put_path_length);
629 if ((msize / sizeof(struct GNUNET_PeerIdentity)) < putl)
630 {
631 GNUNET_break (0);
632 return GNUNET_SYSERR;
633 }
634 return GNUNET_OK;
635}
636
637
638/**
639 * Process a put monitor message from the service.
640 *
641 * @param cls The DHT handle.
642 * @param msg Monitor put message from the service.
643 */
644static void
645handle_monitor_put (void *cls,
646 const struct GNUNET_DHT_MonitorPutMessage *msg)
647{
648 struct GNUNET_DHT_Handle *handle = cls;
649 size_t msize = ntohs (msg->header.size) - sizeof(*msg);
650 uint32_t putl = ntohl (msg->put_path_length);
651 const struct GNUNET_PeerIdentity *path;
652 struct GNUNET_DHT_MonitorHandle *mh;
653
654 path = (const struct GNUNET_PeerIdentity *) &msg[1];
655 for (mh = handle->monitor_head; NULL != mh; mh = mh->next)
656 {
657 if (NULL == mh->put_cb)
658 continue;
659 if (((GNUNET_BLOCK_TYPE_ANY == mh->type) ||
660 (mh->type == ntohl (msg->type))) &&
661 ((NULL == mh->key) ||
662 (0 == memcmp (mh->key,
663 &msg->key,
664 sizeof(struct GNUNET_HashCode)))))
665 mh->put_cb (mh->cb_cls,
666 ntohl (msg->options),
667 (enum GNUNET_BLOCK_Type) ntohl (msg->type),
668 ntohl (msg->hop_count),
669 ntohl (msg->desired_replication_level),
670 putl,
671 path,
672 GNUNET_TIME_absolute_ntoh (msg->expiration_time),
673 &msg->key,
674 (const void *) &path[putl],
675 msize - sizeof(struct GNUNET_PeerIdentity) * putl);
676 }
677}
678
679
680/**
681 * Verify that client result message received from the service is well-formed.
682 *
683 * @param cls The DHT handle.
684 * @param msg Monitor put message from the service.
685 * @return #GNUNET_OK if everything went fine,
686 * #GNUNET_SYSERR if the message is malformed.
687 */
688static int
689check_client_result (void *cls,
690 const struct GNUNET_DHT_ClientResultMessage *msg)
691{
692 size_t msize = ntohs (msg->header.size) - sizeof(*msg);
693 uint32_t put_path_length = ntohl (msg->put_path_length);
694 uint32_t get_path_length = ntohl (msg->get_path_length);
695 size_t meta_length;
696
697 meta_length =
698 sizeof(struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
699 if ((msize < meta_length) ||
700 (get_path_length >
701 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)) ||
702 (put_path_length >
703 GNUNET_MAX_MESSAGE_SIZE / sizeof(struct GNUNET_PeerIdentity)))
704 {
705 GNUNET_break (0);
706 return GNUNET_SYSERR;
707 }
708 return GNUNET_OK;
709}
710
711
712/**
713 * Process a given reply that might match the given request.
714 *
715 * @param cls the `struct GNUNET_DHT_ClientResultMessage`
716 * @param key query of the request
717 * @param value the `struct GNUNET_DHT_GetHandle` of a request matching the same key
718 * @return #GNUNET_YES to continue to iterate over all results
719 */
720static int
721process_client_result (void *cls,
722 const struct GNUNET_HashCode *key,
723 void *value)
724{
725 const struct GNUNET_DHT_ClientResultMessage *crm = cls;
726 struct GNUNET_DHT_GetHandle *get_handle = value;
727 size_t msize = ntohs (crm->header.size) - sizeof(*crm);
728 uint32_t put_path_length = ntohl (crm->put_path_length);
729 uint32_t get_path_length = ntohl (crm->get_path_length);
730 const struct GNUNET_PeerIdentity *put_path;
731 const struct GNUNET_PeerIdentity *get_path;
732 struct GNUNET_HashCode hc;
733 size_t data_length;
734 size_t meta_length;
735 const void *data;
736
737 if (crm->unique_id != get_handle->unique_id)
738 {
739 /* UID mismatch */
740 LOG (GNUNET_ERROR_TYPE_DEBUG,
741 "Ignoring reply for %s: UID mismatch: %llu/%llu\n",
742 GNUNET_h2s (key),
743 (unsigned long long) crm->unique_id,
744 (unsigned long long) get_handle->unique_id);
745 return GNUNET_YES;
746 }
747 /* FIXME: might want to check that type matches */
748 meta_length =
749 sizeof(struct GNUNET_PeerIdentity) * (get_path_length + put_path_length);
750 data_length = msize - meta_length;
751 put_path = (const struct GNUNET_PeerIdentity *) &crm[1];
752 get_path = &put_path[put_path_length];
753 {
754 char *pp;
755 char *gp;
756
757 gp = GNUNET_STRINGS_pp2s (get_path,
758 get_path_length);
759 pp = GNUNET_STRINGS_pp2s (put_path,
760 put_path_length);
761 LOG (GNUNET_ERROR_TYPE_DEBUG,
762 "Giving %u byte reply for %s to application (GP: %s, PP: %s)\n",
763 (unsigned int) data_length,
764 GNUNET_h2s (key),
765 gp,
766 pp);
767 GNUNET_free (gp);
768 GNUNET_free (pp);
769 }
770 data = &get_path[get_path_length];
771 /* remember that we've seen this result */
772 GNUNET_CRYPTO_hash (data,
773 data_length,
774 &hc);
775 if (get_handle->seen_results_size == get_handle->seen_results_end)
776 GNUNET_array_grow (get_handle->seen_results,
777 get_handle->seen_results_size,
778 get_handle->seen_results_size * 2 + 1);
779 get_handle->seen_results[get_handle->seen_results_end++] = hc;
780 /* no need to block it explicitly, service already knows about it! */
781 get_handle->iter (get_handle->iter_cls,
782 GNUNET_TIME_absolute_ntoh (crm->expiration),
783 key,
784 get_path,
785 get_path_length,
786 put_path,
787 put_path_length,
788 ntohl (crm->type),
789 data_length,
790 data);
791 return GNUNET_YES;
792}
793
794
795/**
796 * Process a client result message received from the service.
797 *
798 * @param cls The DHT handle.
799 * @param msg Monitor put message from the service.
800 */
801static void
802handle_client_result (void *cls,
803 const struct GNUNET_DHT_ClientResultMessage *msg)
804{
805 struct GNUNET_DHT_Handle *handle = cls;
806
807 GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
808 &msg->key,
809 &process_client_result,
810 (void *) msg);
811}
812
813
814/**
815 * Process a MQ PUT transmission notification.
816 *
817 * @param cls The DHT handle.
818 */
819static void
820handle_put_cont (void *cls)
821{
822 struct GNUNET_DHT_PutHandle *ph = cls;
823 GNUNET_SCHEDULER_TaskCallback cont;
824 void *cont_cls;
825
826 cont = ph->cont;
827 cont_cls = ph->cont_cls;
828 ph->env = NULL;
829 GNUNET_DHT_put_cancel (ph);
830 if (NULL != cont)
831 cont (cont_cls);
832}
833
834
835/**
836 * Try to (re)connect to the DHT service.
837 *
838 * @param h DHT handle to reconnect
839 * @return #GNUNET_YES on success, #GNUNET_NO on failure.
840 */
841static int
842try_connect (struct GNUNET_DHT_Handle *h)
843{
844 struct GNUNET_MQ_MessageHandler handlers[] = {
845 GNUNET_MQ_hd_var_size (monitor_get,
846 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
847 struct GNUNET_DHT_MonitorGetMessage,
848 h),
849 GNUNET_MQ_hd_var_size (monitor_get_resp,
850 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP,
851 struct GNUNET_DHT_MonitorGetRespMessage,
852 h),
853 GNUNET_MQ_hd_var_size (monitor_put,
854 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT,
855 struct GNUNET_DHT_MonitorPutMessage,
856 h),
857 GNUNET_MQ_hd_var_size (client_result,
858 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT,
859 struct GNUNET_DHT_ClientResultMessage,
860 h),
861 GNUNET_MQ_handler_end ()
862 };
863
864 if (NULL != h->mq)
865 return GNUNET_OK;
866 h->mq = GNUNET_CLIENT_connect (h->cfg,
867 "dht",
868 handlers,
869 &mq_error_handler,
870 h);
871 if (NULL == h->mq)
872 {
873 LOG (GNUNET_ERROR_TYPE_WARNING,
874 "Failed to connect to the DHT service!\n");
875 return GNUNET_NO;
876 }
877 return GNUNET_YES;
878}
879
880
881/**
882 * Initialize the connection with the DHT service.
883 *
884 * @param cfg configuration to use
885 * @param ht_len size of the internal hash table to use for
886 * processing multiple GET/FIND requests in parallel
887 * @return handle to the DHT service, or NULL on error
888 */
889struct GNUNET_DHT_Handle *
890GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
891 unsigned int ht_len)
892{
893 struct GNUNET_DHT_Handle *handle;
894
895 handle = GNUNET_new (struct GNUNET_DHT_Handle);
896 handle->cfg = cfg;
897 handle->uid_gen
898 = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
899 UINT64_MAX);
900 handle->active_requests
901 = GNUNET_CONTAINER_multihashmap_create (ht_len,
902 GNUNET_YES);
903 if (GNUNET_NO == try_connect (handle))
904 {
905 GNUNET_DHT_disconnect (handle);
906 return NULL;
907 }
908 return handle;
909}
910
911
912/**
913 * Shutdown connection with the DHT service.
914 *
915 * @param handle handle of the DHT connection to stop
916 */
917void
918GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
919{
920 struct GNUNET_DHT_PutHandle *ph;
921
922 GNUNET_assert (0 ==
923 GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
924 while (NULL != (ph = handle->put_head))
925 {
926 if (NULL != ph->cont)
927 ph->cont (ph->cont_cls);
928 GNUNET_DHT_put_cancel (ph);
929 }
930 if (NULL != handle->mq)
931 {
932 GNUNET_MQ_destroy (handle->mq);
933 handle->mq = NULL;
934 }
935 if (NULL != handle->reconnect_task)
936 {
937 GNUNET_SCHEDULER_cancel (handle->reconnect_task);
938 handle->reconnect_task = NULL;
939 }
940 GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
941 GNUNET_free (handle);
942}
943
944
945/**
946 * Perform a PUT operation storing data in the DHT. FIXME: we should
947 * change the protocol to get a confirmation for the PUT from the DHT
948 * and call 'cont' only after getting the confirmation; otherwise, the
949 * client has no good way of telling if the 'PUT' message actually got
950 * to the DHT service!
951 *
952 * @param handle handle to DHT service
953 * @param key the key to store under
954 * @param desired_replication_level estimate of how many
955 * nearest peers this request should reach
956 * @param options routing options for this message
957 * @param type type of the value
958 * @param size number of bytes in data; must be less than 64k
959 * @param data the data to store
960 * @param exp desired expiration time for the value
961 * @param cont continuation to call when done (transmitting request to service)
962 * You must not call #GNUNET_DHT_disconnect in this continuation
963 * @param cont_cls closure for @a cont
964 */
965struct GNUNET_DHT_PutHandle *
966GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle,
967 const struct GNUNET_HashCode *key,
968 uint32_t desired_replication_level,
969 enum GNUNET_DHT_RouteOption options,
970 enum GNUNET_BLOCK_Type type,
971 size_t size,
972 const void *data,
973 struct GNUNET_TIME_Absolute exp,
974 GNUNET_SCHEDULER_TaskCallback cont,
975 void *cont_cls)
976{
977 struct GNUNET_MQ_Envelope *env;
978 struct GNUNET_DHT_ClientPutMessage *put_msg;
979 size_t msize;
980 struct GNUNET_DHT_PutHandle *ph;
981
982 msize = sizeof(struct GNUNET_DHT_ClientPutMessage) + size;
983 if ((msize >= GNUNET_MAX_MESSAGE_SIZE) ||
984 (size >= GNUNET_MAX_MESSAGE_SIZE))
985 {
986 GNUNET_break (0);
987 return NULL;
988 }
989 if (NULL == handle->mq)
990 return NULL;
991 LOG (GNUNET_ERROR_TYPE_DEBUG,
992 "Sending PUT for %s to DHT via %p\n",
993 GNUNET_h2s (key),
994 handle);
995 ph = GNUNET_new (struct GNUNET_DHT_PutHandle);
996 ph->dht_handle = handle;
997 ph->cont = cont;
998 ph->cont_cls = cont_cls;
999 GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
1000 handle->put_tail,
1001 ph);
1002 env = GNUNET_MQ_msg_extra (put_msg,
1003 size,
1004 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
1005 GNUNET_MQ_notify_sent (env,
1006 &handle_put_cont,
1007 ph);
1008 ph->env = env;
1009 put_msg->type = htonl ((uint32_t) type);
1010 put_msg->options = htonl ((uint32_t) options);
1011 put_msg->desired_replication_level = htonl (desired_replication_level);
1012 put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
1013 put_msg->key = *key;
1014 GNUNET_memcpy (&put_msg[1],
1015 data,
1016 size);
1017 GNUNET_MQ_send (handle->mq,
1018 env);
1019 return ph;
1020}
1021
1022
1023/**
1024 * Cancels a DHT PUT operation. Note that the PUT request may still
1025 * go out over the network (we can't stop that); However, if the PUT
1026 * has not yet been sent to the service, cancelling the PUT will stop
1027 * this from happening (but there is no way for the user of this API
1028 * to tell if that is the case). The only use for this API is to
1029 * prevent a later call to 'cont' from #GNUNET_DHT_put (e.g. because
1030 * the system is shutting down).
1031 *
1032 * @param ph put operation to cancel ('cont' will no longer be called)
1033 */
1034void
1035GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
1036{
1037 struct GNUNET_DHT_Handle *handle = ph->dht_handle;
1038
1039 if (NULL != ph->env)
1040 GNUNET_MQ_notify_sent (ph->env,
1041 NULL,
1042 NULL);
1043 GNUNET_CONTAINER_DLL_remove (handle->put_head,
1044 handle->put_tail,
1045 ph);
1046 GNUNET_free (ph);
1047}
1048
1049
1050/**
1051 * Perform an asynchronous GET operation on the DHT identified. See
1052 * also #GNUNET_BLOCK_evaluate.
1053 *
1054 * @param handle handle to the DHT service
1055 * @param type expected type of the response object
1056 * @param key the key to look up
1057 * @param desired_replication_level estimate of how many
1058 nearest peers this request should reach
1059 * @param options routing options for this message
1060 * @param xquery extended query data (can be NULL, depending on type)
1061 * @param xquery_size number of bytes in @a xquery
1062 * @param iter function to call on each result
1063 * @param iter_cls closure for @a iter
1064 * @return handle to stop the async get
1065 */
1066struct GNUNET_DHT_GetHandle *
1067GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
1068 enum GNUNET_BLOCK_Type type,
1069 const struct GNUNET_HashCode *key,
1070 uint32_t desired_replication_level,
1071 enum GNUNET_DHT_RouteOption options,
1072 const void *xquery,
1073 size_t xquery_size,
1074 GNUNET_DHT_GetIterator iter,
1075 void *iter_cls)
1076{
1077 struct GNUNET_DHT_GetHandle *gh;
1078 size_t msize;
1079
1080 msize = sizeof(struct GNUNET_DHT_ClientGetMessage) + xquery_size;
1081 if ((msize >= GNUNET_MAX_MESSAGE_SIZE) ||
1082 (xquery_size >= GNUNET_MAX_MESSAGE_SIZE))
1083 {
1084 GNUNET_break (0);
1085 return NULL;
1086 }
1087 LOG (GNUNET_ERROR_TYPE_DEBUG,
1088 "Sending query for %s to DHT %p\n",
1089 GNUNET_h2s (key),
1090 handle);
1091 gh = GNUNET_malloc (sizeof(struct GNUNET_DHT_GetHandle)
1092 + xquery_size);
1093 gh->iter = iter;
1094 gh->iter_cls = iter_cls;
1095 gh->dht_handle = handle;
1096 gh->key = *key;
1097 gh->unique_id = ++handle->uid_gen;
1098 gh->xquery_size = xquery_size;
1099 gh->desired_replication_level = desired_replication_level;
1100 gh->type = type;
1101 gh->options = options;
1102 GNUNET_memcpy (&gh[1],
1103 xquery,
1104 xquery_size);
1105 GNUNET_CONTAINER_multihashmap_put (handle->active_requests,
1106 &gh->key,
1107 gh,
1108 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1109 if (NULL != handle->mq)
1110 send_get (gh);
1111 return gh;
1112}
1113
1114
1115/**
1116 * Tell the DHT not to return any of the following known results
1117 * to this client.
1118 *
1119 * @param get_handle get operation for which results should be filtered
1120 * @param num_results number of results to be blocked that are
1121 * provided in this call (size of the @a results array)
1122 * @param results array of hash codes over the 'data' of the results
1123 * to be blocked
1124 */
1125void
1126GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
1127 unsigned int num_results,
1128 const struct GNUNET_HashCode *results)
1129{
1130 unsigned int needed;
1131 unsigned int had;
1132
1133 had = get_handle->seen_results_end;
1134 needed = had + num_results;
1135 if (needed > get_handle->seen_results_size)
1136 GNUNET_array_grow (get_handle->seen_results,
1137 get_handle->seen_results_size,
1138 needed);
1139 GNUNET_memcpy (&get_handle->seen_results[get_handle->seen_results_end],
1140 results,
1141 num_results * sizeof(struct GNUNET_HashCode));
1142 get_handle->seen_results_end += num_results;
1143 if (NULL != get_handle->dht_handle->mq)
1144 send_get_known_results (get_handle,
1145 had);
1146}
1147
1148
1149/**
1150 * Stop async DHT-get.
1151 *
1152 * @param get_handle handle to the GET operation to stop
1153 */
1154void
1155GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
1156{
1157 struct GNUNET_DHT_Handle *handle = get_handle->dht_handle;
1158
1159 LOG (GNUNET_ERROR_TYPE_DEBUG,
1160 "Sending STOP for %s to DHT via %p\n",
1161 GNUNET_h2s (&get_handle->key),
1162 handle);
1163 if (NULL != handle->mq)
1164 {
1165 struct GNUNET_MQ_Envelope *env;
1166 struct GNUNET_DHT_ClientGetStopMessage *stop_msg;
1167
1168 env = GNUNET_MQ_msg (stop_msg,
1169 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP);
1170 stop_msg->reserved = htonl (0);
1171 stop_msg->unique_id = get_handle->unique_id;
1172 stop_msg->key = get_handle->key;
1173 GNUNET_MQ_send (handle->mq,
1174 env);
1175 }
1176 GNUNET_assert (GNUNET_YES ==
1177 GNUNET_CONTAINER_multihashmap_remove (handle->active_requests,
1178 &get_handle->key,
1179 get_handle));
1180 GNUNET_array_grow (get_handle->seen_results,
1181 get_handle->seen_results_end,
1182 0);
1183 GNUNET_free (get_handle);
1184}
1185
1186
1187/**
1188 * Start monitoring the local DHT service.
1189 *
1190 * @param handle Handle to the DHT service.
1191 * @param type Type of blocks that are of interest.
1192 * @param key Key of data of interest, NULL for all.
1193 * @param get_cb Callback to process monitored get messages.
1194 * @param get_resp_cb Callback to process monitored get response messages.
1195 * @param put_cb Callback to process monitored put messages.
1196 * @param cb_cls Closure for callbacks.
1197 * @return Handle to stop monitoring.
1198 */
1199struct GNUNET_DHT_MonitorHandle *
1200GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
1201 enum GNUNET_BLOCK_Type type,
1202 const struct GNUNET_HashCode *key,
1203 GNUNET_DHT_MonitorGetCB get_cb,
1204 GNUNET_DHT_MonitorGetRespCB get_resp_cb,
1205 GNUNET_DHT_MonitorPutCB put_cb,
1206 void *cb_cls)
1207{
1208 struct GNUNET_DHT_MonitorHandle *mh;
1209
1210 mh = GNUNET_new (struct GNUNET_DHT_MonitorHandle);
1211 mh->get_cb = get_cb;
1212 mh->get_resp_cb = get_resp_cb;
1213 mh->put_cb = put_cb;
1214 mh->cb_cls = cb_cls;
1215 mh->type = type;
1216 mh->dht_handle = handle;
1217 if (NULL != key)
1218 {
1219 mh->key = GNUNET_new (struct GNUNET_HashCode);
1220 *mh->key = *key;
1221 }
1222 GNUNET_CONTAINER_DLL_insert (handle->monitor_head,
1223 handle->monitor_tail,
1224 mh);
1225 if (NULL != handle->mq)
1226 send_monitor_start (mh);
1227 return mh;
1228}
1229
1230
1231/**
1232 * Stop monitoring.
1233 *
1234 * @param mh The handle to the monitor request returned by monitor_start.
1235 *
1236 * On return get_handle will no longer be valid, caller must not use again!!!
1237 */
1238void
1239GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *mh)
1240{
1241 struct GNUNET_DHT_Handle *handle = mh->dht_handle;
1242 struct GNUNET_DHT_MonitorStartStopMessage *m;
1243 struct GNUNET_MQ_Envelope *env;
1244
1245 GNUNET_CONTAINER_DLL_remove (handle->monitor_head,
1246 handle->monitor_tail,
1247 mh);
1248 env = GNUNET_MQ_msg (m,
1249 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
1250 m->type = htonl (mh->type);
1251 m->get = htons (NULL != mh->get_cb);
1252 m->get_resp = htons (NULL != mh->get_resp_cb);
1253 m->put = htons (NULL != mh->put_cb);
1254 if (NULL != mh->key)
1255 {
1256 m->filter_key = htons (1);
1257 m->key = *mh->key;
1258 }
1259 GNUNET_MQ_send (handle->mq,
1260 env);
1261 GNUNET_free (mh->key);
1262 GNUNET_free (mh);
1263}
1264
1265
1266/* end of dht_api.c */