aboutsummaryrefslogtreecommitdiff
path: root/src/service/dht/gnunet-service-dht_clients.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/dht/gnunet-service-dht_clients.c')
-rw-r--r--src/service/dht/gnunet-service-dht_clients.c1705
1 files changed, 1705 insertions, 0 deletions
diff --git a/src/service/dht/gnunet-service-dht_clients.c b/src/service/dht/gnunet-service-dht_clients.c
new file mode 100644
index 000000000..c666265fe
--- /dev/null
+++ b/src/service/dht/gnunet-service-dht_clients.c
@@ -0,0 +1,1705 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011, 2016, 2017, 2022 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file dht/gnunet-service-dht_clients.c
23 * @brief GNUnet DHT service's client management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_constants.h"
29#include "gnunet_protocols.h"
30#include "gnunet_hello_uri_lib.h"
31#include "gnunet_statistics_service.h"
32#include "gnunet-service-dht.h"
33#include "gnunet-service-dht_datacache.h"
34#include "gnunet-service-dht_neighbours.h"
35#include "dht.h"
36
37
38/**
39 * Enable slow sanity checks to debug issues.
40 * 0: do not check
41 * 1: check all external inputs
42 * 2: check internal computations as well
43 */
44#define SANITY_CHECKS 0
45
46/**
47 * Should routing details be logged to stderr (for debugging)?
48 */
49#define LOG_TRAFFIC(kind, ...) GNUNET_log_from (kind, "dht-traffic", \
50 __VA_ARGS__)
51
52#define LOG(kind, ...) GNUNET_log_from (kind, "dht-clients", __VA_ARGS__)
53
54
55/**
56 * Struct containing information about a client,
57 * handle to connect to it, and any pending messages
58 * that need to be sent to it.
59 */
60struct ClientHandle;
61
62
63/**
64 * Entry in the local forwarding map for a client's GET request.
65 */
66struct ClientQueryRecord
67{
68 /**
69 * The key this request was about
70 */
71 struct GNUNET_HashCode key;
72
73 /**
74 * Kept in a DLL with @e client.
75 */
76 struct ClientQueryRecord *next;
77
78 /**
79 * Kept in a DLL with @e client.
80 */
81 struct ClientQueryRecord *prev;
82
83 /**
84 * Client responsible for the request.
85 */
86 struct ClientHandle *ch;
87
88 /**
89 * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
90 */
91 const void *xquery;
92
93 /**
94 * Array of (hashes of) replies we have already seen for this request.
95 */
96 struct GNUNET_HashCode *seen_replies;
97
98 /**
99 * Pointer to this nodes heap location in the retry-heap (for fast removal)
100 */
101 struct GNUNET_CONTAINER_HeapNode *hnode;
102
103 /**
104 * What's the delay between re-try operations that we currently use for this
105 * request?
106 */
107 struct GNUNET_TIME_Relative retry_frequency;
108
109 /**
110 * What's the next time we should re-try this request?
111 */
112 struct GNUNET_TIME_Absolute retry_time;
113
114 /**
115 * The unique identifier of this request
116 */
117 uint64_t unique_id;
118
119 /**
120 * Number of bytes in xquery.
121 */
122 size_t xquery_size;
123
124 /**
125 * Number of entries in @e seen_replies.
126 */
127 unsigned int seen_replies_count;
128
129 /**
130 * Desired replication level
131 */
132 uint32_t replication;
133
134 /**
135 * Any message options for this request
136 */
137 enum GNUNET_DHT_RouteOption msg_options;
138
139 /**
140 * The type for the data for the GET request.
141 */
142 enum GNUNET_BLOCK_Type type;
143};
144
145
146/**
147 * Struct containing parameters of monitoring requests.
148 */
149struct ClientMonitorRecord
150{
151 /**
152 * Next element in DLL.
153 */
154 struct ClientMonitorRecord *next;
155
156 /**
157 * Previous element in DLL.
158 */
159 struct ClientMonitorRecord *prev;
160
161 /**
162 * Client to notify of these requests.
163 */
164 struct ClientHandle *ch;
165
166 /**
167 * Key of data of interest. All bits zero for 'all'.
168 */
169 struct GNUNET_HashCode key;
170
171 /**
172 * Type of blocks that are of interest
173 */
174 enum GNUNET_BLOCK_Type type;
175
176 /**
177 * Flag whether to notify about GET messages.
178 */
179 int16_t get;
180
181 /**
182 * Flag whether to notify about GET_REPONSE messages.
183 */
184 int16_t get_resp;
185
186 /**
187 * Flag whether to notify about PUT messages.
188 */
189 uint16_t put;
190
191};
192
193
194/**
195 * Struct containing information about a client,
196 * handle to connect to it, and any pending messages
197 * that need to be sent to it.
198 */
199struct ClientHandle
200{
201 /**
202 * Linked list of active queries of this client.
203 */
204 struct ClientQueryRecord *cqr_head;
205
206 /**
207 * Linked list of active queries of this client.
208 */
209 struct ClientQueryRecord *cqr_tail;
210
211 /**
212 * The handle to this client
213 */
214 struct GNUNET_SERVICE_Client *client;
215
216 /**
217 * The message queue to this client
218 */
219 struct GNUNET_MQ_Handle *mq;
220};
221
222
223/**
224 * Our handle to the BLOCK library.
225 */
226struct GNUNET_BLOCK_Context *GDS_block_context;
227
228/**
229 * Handle for the statistics service.
230 */
231struct GNUNET_STATISTICS_Handle *GDS_stats;
232
233/**
234 * Handle for the service.
235 */
236struct GNUNET_SERVICE_Handle *GDS_service;
237
238/**
239 * The configuration the DHT service is running with
240 */
241const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
242
243/**
244 * List of active monitoring requests.
245 */
246static struct ClientMonitorRecord *monitor_head;
247
248/**
249 * List of active monitoring requests.
250 */
251static struct ClientMonitorRecord *monitor_tail;
252
253/**
254 * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
255 */
256static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
257
258/**
259 * Heap with all of our client's request, sorted by retry time (earliest on top).
260 */
261static struct GNUNET_CONTAINER_Heap *retry_heap;
262
263/**
264 * Task that re-transmits requests (using retry_heap).
265 */
266static struct GNUNET_SCHEDULER_Task *retry_task;
267
268
269/**
270 * Free data structures associated with the given query.
271 *
272 * @param record record to remove
273 */
274static void
275remove_client_query_record (struct ClientQueryRecord *record)
276{
277 struct ClientHandle *ch = record->ch;
278
279 GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
280 ch->cqr_tail,
281 record);
282 GNUNET_assert (GNUNET_YES ==
283 GNUNET_CONTAINER_multihashmap_remove (forward_map,
284 &record->key,
285 record));
286 if (NULL != record->hnode)
287 GNUNET_CONTAINER_heap_remove_node (record->hnode);
288 GNUNET_array_grow (record->seen_replies,
289 record->seen_replies_count,
290 0);
291 GNUNET_free (record);
292}
293
294
295/**
296 * Functions with this signature are called whenever a local client is
297 * connects to us.
298 *
299 * @param cls closure (NULL for dht)
300 * @param client identification of the client
301 * @param mq message queue for talking to @a client
302 * @return our `struct ClientHandle` for @a client
303 */
304static void *
305client_connect_cb (void *cls,
306 struct GNUNET_SERVICE_Client *client,
307 struct GNUNET_MQ_Handle *mq)
308{
309 struct ClientHandle *ch;
310
311 (void) cls;
312 ch = GNUNET_new (struct ClientHandle);
313 ch->client = client;
314 ch->mq = mq;
315 return ch;
316}
317
318
319/**
320 * Functions with this signature are called whenever a client
321 * is disconnected on the network level.
322 *
323 * @param cls closure (NULL for dht)
324 * @param client identification of the client
325 * @param app_ctx our `struct ClientHandle` for @a client
326 */
327static void
328client_disconnect_cb (void *cls,
329 struct GNUNET_SERVICE_Client *client,
330 void *app_ctx)
331{
332 struct ClientHandle *ch = app_ctx;
333
334 (void) cls;
335 (void) client;
336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
337 "Local client %p disconnects\n",
338 ch);
339 {
340 struct ClientMonitorRecord *next;
341
342 for (struct ClientMonitorRecord *monitor = monitor_head;
343 NULL != monitor;
344 monitor = next)
345 {
346 next = monitor->next;
347 if (monitor->ch != ch)
348 continue;
349 GNUNET_CONTAINER_DLL_remove (monitor_head,
350 monitor_tail,
351 monitor);
352 GNUNET_free (monitor);
353 }
354 }
355
356 {
357 struct ClientQueryRecord *cqr;
358
359 while (NULL != (cqr = ch->cqr_head))
360 remove_client_query_record (cqr);
361 }
362 GNUNET_free (ch);
363}
364
365
366/**
367 * Route the given request via the DHT. This includes updating
368 * the bloom filter and retransmission times, building the P2P
369 * message and initiating the routing operation.
370 *
371 * @param cqr request to transmit
372 */
373static void
374transmit_request (struct ClientQueryRecord *cqr)
375{
376 struct GNUNET_BLOCK_Group *bg;
377 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
378
379 GNUNET_STATISTICS_update (GDS_stats,
380 "# GET requests from clients injected",
381 1,
382 GNUNET_NO);
383 bg = GNUNET_BLOCK_group_create (GDS_block_context,
384 cqr->type,
385 NULL, /* raw data */
386 0, /* raw data size */
387 "seen-set-size",
388 cqr->seen_replies_count,
389 NULL);
390 GNUNET_BLOCK_group_set_seen (bg,
391 cqr->seen_replies,
392 cqr->seen_replies_count);
393 peer_bf
394 = GNUNET_CONTAINER_bloomfilter_init (NULL,
395 DHT_BLOOM_SIZE,
396 GNUNET_CONSTANTS_BLOOMFILTER_K);
397 LOG (GNUNET_ERROR_TYPE_DEBUG,
398 "Initiating GET for %s, replication %u, already have %u replies\n",
399 GNUNET_h2s (&cqr->key),
400 cqr->replication,
401 cqr->seen_replies_count);
402 GDS_NEIGHBOURS_handle_get (cqr->type,
403 cqr->msg_options,
404 cqr->replication,
405 0 /* hop count */,
406 &cqr->key,
407 cqr->xquery,
408 cqr->xquery_size,
409 bg,
410 peer_bf);
411 GNUNET_BLOCK_group_destroy (bg);
412 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
413
414 /* Exponential back-off for retries.
415 * max. is #GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
416 cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
417 cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
418}
419
420
421/**
422 * Task that looks at the #retry_heap and transmits all of the requests
423 * on the heap that are ready for transmission. Then re-schedules
424 * itself (unless the heap is empty).
425 *
426 * @param cls unused
427 */
428static void
429transmit_next_request_task (void *cls)
430{
431 struct ClientQueryRecord *cqr;
432
433 (void) cls;
434 retry_task = NULL;
435 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
436 {
437 cqr->hnode = NULL;
438 if (! GNUNET_TIME_absolute_is_past (cqr->retry_time))
439 {
440 cqr->hnode
441 = GNUNET_CONTAINER_heap_insert (retry_heap,
442 cqr,
443 cqr->retry_time.abs_value_us);
444 retry_task
445 = GNUNET_SCHEDULER_add_at (cqr->retry_time,
446 &transmit_next_request_task,
447 NULL);
448 return;
449 }
450 transmit_request (cqr);
451 cqr->hnode
452 = GNUNET_CONTAINER_heap_insert (retry_heap,
453 cqr,
454 cqr->retry_time.abs_value_us);
455 }
456}
457
458
459/**
460 * Check DHT PUT messages from the client.
461 *
462 * @param cls the client we received this message from
463 * @param dht_msg the actual message received
464 * @return #GNUNET_OK (always)
465 */
466static enum GNUNET_GenericReturnValue
467check_dht_local_put (void *cls,
468 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
469{
470 uint32_t replication_level = ntohl (dht_msg->desired_replication_level);
471
472 (void) cls;
473 if (replication_level > GNUNET_DHT_MAXIMUM_REPLICATION_LEVEL)
474 {
475 GNUNET_break_op (0);
476 return GNUNET_SYSERR;
477 }
478 return GNUNET_OK;
479}
480
481
482/**
483 * Handler for PUT messages.
484 *
485 * @param cls the client we received this message from
486 * @param dht_msg the actual message received
487 */
488static void
489handle_dht_local_put (void *cls,
490 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
491{
492 struct ClientHandle *ch = cls;
493 uint16_t size = ntohs (dht_msg->header.size);
494 uint32_t replication_level
495 = ntohl (dht_msg->desired_replication_level);
496 struct GNUNET_DATACACHE_Block bd = {
497 .key = dht_msg->key,
498 .expiration_time = GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
499 .data = &dht_msg[1],
500 .data_size = size - sizeof (*dht_msg),
501 .type = ntohl (dht_msg->type),
502 .ro = (enum GNUNET_DHT_RouteOption) ntohl (dht_msg->options)
503 };
504
505 LOG (GNUNET_ERROR_TYPE_DEBUG,
506 "Handling local PUT of %lu-bytes for query %s of type %u\n",
507 (unsigned long) (size - sizeof(struct GNUNET_DHT_ClientPutMessage)),
508 GNUNET_h2s (&dht_msg->key),
509 (unsigned int) bd.type);
510#if SANITY_CHECKS > 0
511 if (GNUNET_OK !=
512 GNUNET_BLOCK_check_block (GDS_block_context,
513 bd.type,
514 bd.data,
515 bd.data_size))
516 {
517 GNUNET_break (0);
518 return;
519 }
520#endif
521 GNUNET_STATISTICS_update (GDS_stats,
522 "# PUT requests received from clients",
523 1,
524 GNUNET_NO);
525 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
526 "CLIENT-PUT %s\n",
527 GNUNET_h2s_full (&dht_msg->key));
528 /* give to local clients */
529 GNUNET_break (GDS_CLIENTS_handle_reply (&bd,
530 &bd.key,
531 0, NULL /* get path */));
532
533 {
534 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
535
536 peer_bf
537 = GNUNET_CONTAINER_bloomfilter_init (NULL,
538 DHT_BLOOM_SIZE,
539 GNUNET_CONSTANTS_BLOOMFILTER_K);
540 /* store locally */
541 if ( (0 != (bd.ro & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
542 (GDS_am_closest_peer (&dht_msg->key,
543 peer_bf)))
544 GDS_DATACACHE_handle_put (&bd);
545 /* route to other peers */
546 if (GNUNET_OK !=
547 GDS_NEIGHBOURS_handle_put (&bd,
548 replication_level,
549 0 /* hop count */,
550 peer_bf))
551 {
552 GNUNET_STATISTICS_update (GDS_stats,
553 "# Local PUT requests not routed",
554 1,
555 GNUNET_NO);
556 }
557 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
558 }
559 GDS_CLIENTS_process_put (
560 &bd,
561 0, /* hop count */
562 replication_level);
563 GNUNET_SERVICE_client_continue (ch->client);
564}
565
566
567/**
568 * Handle a result from local datacache for a GET operation.
569 *
570 * @param cls the `struct ClientHandle` of the client doing the query
571 * @param bd details about the block that was found
572 */
573static void
574handle_local_result (void *cls,
575 const struct GNUNET_DATACACHE_Block *bd)
576{
577 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
578 "Datacache provided result for query key %s\n",
579 GNUNET_h2s (&bd->key));
580 GNUNET_break (GDS_CLIENTS_handle_reply (bd,
581 &bd->key,
582 0, NULL /* get_path */));
583}
584
585
586/**
587 * Check DHT GET messages from the client.
588 *
589 * @param cls the client we received this message from
590 * @param get the actual message received
591 * @return #GNUNET_OK (always)
592 */
593static enum GNUNET_GenericReturnValue
594check_dht_local_get (void *cls,
595 const struct GNUNET_DHT_ClientGetMessage *get)
596{
597 (void) cls;
598 (void) get;
599 /* always well-formed */
600 return GNUNET_OK;
601}
602
603
604/**
605 * Handler for DHT GET messages from the client.
606 *
607 * @param cls the client we received this message from
608 * @param get the actual message received
609 */
610static void
611handle_dht_local_get (void *cls,
612 const struct GNUNET_DHT_ClientGetMessage *get)
613{
614 struct ClientHandle *ch = cls;
615 struct ClientQueryRecord *cqr;
616 uint16_t size = ntohs (get->header.size);
617 const char *xquery = (const char *) &get[1];
618 size_t xquery_size = size - sizeof(struct GNUNET_DHT_ClientGetMessage);
619
620 LOG (GNUNET_ERROR_TYPE_DEBUG,
621 "Received GET request for %s from local client %p, xq: %.*s\n",
622 GNUNET_h2s (&get->key),
623 ch->client,
624 (int) xquery_size,
625 xquery);
626 GNUNET_STATISTICS_update (GDS_stats,
627 "# GET requests received from clients",
628 1,
629 GNUNET_NO);
630 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
631 "CLIENT-GET %s\n",
632 GNUNET_h2s_full (&get->key));
633
634 cqr = GNUNET_malloc (sizeof(struct ClientQueryRecord) + xquery_size);
635 cqr->key = get->key;
636 cqr->ch = ch;
637 cqr->xquery = (const void *) &cqr[1];
638 GNUNET_memcpy (&cqr[1],
639 xquery,
640 xquery_size);
641 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap,
642 cqr,
643 0);
644 cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
645 cqr->retry_time = GNUNET_TIME_absolute_get ();
646 cqr->unique_id = get->unique_id;
647 cqr->xquery_size = xquery_size;
648 cqr->replication = ntohl (get->desired_replication_level);
649 cqr->msg_options = (enum GNUNET_DHT_RouteOption) ntohl (get->options);
650 cqr->type = ntohl (get->type);
651 GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
652 ch->cqr_tail,
653 cqr);
654 GNUNET_CONTAINER_multihashmap_put (forward_map,
655 &cqr->key,
656 cqr,
657 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
658 GDS_CLIENTS_process_get (cqr->msg_options,
659 cqr->type,
660 0, /* hop count */
661 cqr->replication,
662 &get->key);
663 /* start remote requests */
664 if (NULL != retry_task)
665 GNUNET_SCHEDULER_cancel (retry_task);
666 retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task,
667 NULL);
668 /* perform local lookup */
669 GDS_DATACACHE_handle_get (&get->key,
670 cqr->type,
671 cqr->xquery,
672 xquery_size,
673 NULL,
674 &handle_local_result,
675 ch);
676 GNUNET_SERVICE_client_continue (ch->client);
677}
678
679
680/**
681 * Closure for #find_by_unique_id().
682 */
683struct FindByUniqueIdContext
684{
685 /**
686 * Where to store the result, if found.
687 */
688 struct ClientQueryRecord *cqr;
689
690 /**
691 * Unique ID to look for.
692 */
693 uint64_t unique_id;
694};
695
696
697/**
698 * Function called for each existing DHT record for the given
699 * query. Checks if it matches the UID given in the closure
700 * and if so returns the entry as a result.
701 *
702 * @param cls the search context
703 * @param key query for the lookup (not used)
704 * @param value the `struct ClientQueryRecord`
705 * @return #GNUNET_YES to continue iteration (result not yet found)
706 */
707static enum GNUNET_GenericReturnValue
708find_by_unique_id (void *cls,
709 const struct GNUNET_HashCode *key,
710 void *value)
711{
712 struct FindByUniqueIdContext *fui_ctx = cls;
713 struct ClientQueryRecord *cqr = value;
714
715 if (cqr->unique_id != fui_ctx->unique_id)
716 return GNUNET_YES;
717 fui_ctx->cqr = cqr;
718 return GNUNET_NO;
719}
720
721
722/**
723 * Check "GET result seen" messages from the client.
724 *
725 * @param cls the client we received this message from
726 * @param seen the actual message received
727 * @return #GNUNET_OK if @a seen is well-formed
728 */
729static enum GNUNET_GenericReturnValue
730check_dht_local_get_result_seen (
731 void *cls,
732 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
733{
734 uint16_t size = ntohs (seen->header.size);
735 unsigned int hash_count =
736 (size - sizeof(*seen))
737 / sizeof(struct GNUNET_HashCode);
738
739 if (size != sizeof(*seen) + hash_count * sizeof(struct GNUNET_HashCode))
740 {
741 GNUNET_break (0);
742 return GNUNET_SYSERR;
743 }
744 return GNUNET_OK;
745}
746
747
748/**
749 * Handler for "GET result seen" messages from the client.
750 *
751 * @param cls the client we received this message from
752 * @param seen the actual message received
753 */
754static void
755handle_dht_local_get_result_seen (
756 void *cls,
757 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
758{
759 struct ClientHandle *ch = cls;
760 uint16_t size = ntohs (seen->header.size);
761 unsigned int hash_count = (size - sizeof(*seen))
762 / sizeof(struct GNUNET_HashCode);
763 const struct GNUNET_HashCode *hc = (const struct GNUNET_HashCode*) &seen[1];
764 struct FindByUniqueIdContext fui_ctx = {
765 .unique_id = seen->unique_id
766 };
767 unsigned int old_count;
768 struct ClientQueryRecord *cqr;
769
770 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
771 &seen->key,
772 &find_by_unique_id,
773 &fui_ctx);
774 if (NULL == (cqr = fui_ctx.cqr))
775 {
776 GNUNET_break (0);
777 GNUNET_SERVICE_client_drop (ch->client);
778 return;
779 }
780 /* finally, update 'seen' list */
781 old_count = cqr->seen_replies_count;
782 GNUNET_array_grow (cqr->seen_replies,
783 cqr->seen_replies_count,
784 cqr->seen_replies_count + hash_count);
785 GNUNET_memcpy (&cqr->seen_replies[old_count],
786 hc,
787 sizeof(struct GNUNET_HashCode) * hash_count);
788}
789
790
791/**
792 * Closure for #remove_by_unique_id().
793 */
794struct RemoveByUniqueIdContext
795{
796 /**
797 * Client that issued the removal request.
798 */
799 struct ClientHandle *ch;
800
801 /**
802 * Unique ID of the request.
803 */
804 uint64_t unique_id;
805};
806
807
808/**
809 * Iterator over hash map entries that frees all entries
810 * that match the given client and unique ID.
811 *
812 * @param cls unique ID and client to search for in source routes
813 * @param key current key code
814 * @param value value in the hash map, a ClientQueryRecord
815 * @return #GNUNET_YES (we should continue to iterate)
816 */
817static enum GNUNET_GenericReturnValue
818remove_by_unique_id (void *cls,
819 const struct GNUNET_HashCode *key,
820 void *value)
821{
822 const struct RemoveByUniqueIdContext *ctx = cls;
823 struct ClientQueryRecord *cqr = value;
824
825 if (cqr->unique_id != ctx->unique_id)
826 return GNUNET_YES;
827 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828 "Removing client %p's record for key %s (by unique id)\n",
829 ctx->ch->client,
830 GNUNET_h2s (key));
831 remove_client_query_record (cqr);
832 return GNUNET_YES;
833}
834
835
836/**
837 * Handler for any generic DHT stop messages, calls the appropriate handler
838 * depending on message type (if processed locally)
839 *
840 * @param cls client we received this message from
841 * @param dht_stop_msg the actual message received
842 *
843 */
844static void
845handle_dht_local_get_stop (
846 void *cls,
847 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
848{
849 struct ClientHandle *ch = cls;
850 struct RemoveByUniqueIdContext ctx;
851
852 GNUNET_STATISTICS_update (GDS_stats,
853 "# GET STOP requests received from clients",
854 1,
855 GNUNET_NO);
856 LOG (GNUNET_ERROR_TYPE_DEBUG,
857 "Received GET STOP request for %s from local client %p\n",
858 GNUNET_h2s (&dht_stop_msg->key),
859 ch->client);
860 ctx.ch = ch;
861 ctx.unique_id = dht_stop_msg->unique_id;
862 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
863 &dht_stop_msg->key,
864 &remove_by_unique_id,
865 &ctx);
866 GNUNET_SERVICE_client_continue (ch->client);
867}
868
869
870/**
871 * Closure for #forward_reply()
872 */
873struct ForwardReplyContext
874{
875 /**
876 * Block details.
877 */
878 const struct GNUNET_DATACACHE_Block *bd;
879
880 /**
881 * GET path taken.
882 */
883 const struct GNUNET_DHT_PathElement *get_path;
884
885 /**
886 * Number of entries in @e get_path.
887 */
888 unsigned int get_path_length;
889
890};
891
892
893/**
894 * Iterator over hash map entries that send a given reply to
895 * each of the matching clients. With some tricky recycling
896 * of the buffer.
897 *
898 * @param cls the `struct ForwardReplyContext`
899 * @param query_hash hash of the query for which this may be a reply
900 * @param value value in the hash map, a ClientQueryRecord
901 * @return #GNUNET_YES (we should continue to iterate),
902 * if the result is mal-formed, #GNUNET_NO
903 */
904static enum GNUNET_GenericReturnValue
905forward_reply (void *cls,
906 const struct GNUNET_HashCode *query_hash,
907 void *value)
908{
909 struct ForwardReplyContext *frc = cls;
910 struct ClientQueryRecord *record = value;
911 const struct GNUNET_DATACACHE_Block *bd = frc->bd;
912 struct GNUNET_MQ_Envelope *env;
913 struct GNUNET_DHT_ClientResultMessage *reply;
914 enum GNUNET_BLOCK_ReplyEvaluationResult eval;
915 bool do_free;
916 struct GNUNET_HashCode ch;
917 struct GNUNET_DHT_PathElement *paths;
918 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
919 size_t xsize = bd->data_size;
920
921 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
922 "CLIENT-RESULT %s\n",
923 GNUNET_h2s_full (&bd->key));
924 if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
925 (record->type != bd->type) )
926 {
927 LOG (GNUNET_ERROR_TYPE_DEBUG,
928 "Record type mismatch, not passing request for key %s to local client\n",
929 GNUNET_h2s (&bd->key));
930 GNUNET_STATISTICS_update (GDS_stats,
931 "# Key match, type mismatches in REPLY to CLIENT",
932 1,
933 GNUNET_NO);
934 return GNUNET_YES; /* type mismatch */
935 }
936 if ( (0 == (record->msg_options & GNUNET_DHT_RO_FIND_APPROXIMATE)) &&
937 (0 != GNUNET_memcmp (&bd->key,
938 query_hash)) )
939 {
940 GNUNET_STATISTICS_update (GDS_stats,
941 "# Inexact key match, but exact match required",
942 1,
943 GNUNET_NO);
944 return GNUNET_YES; /* type mismatch */
945 }
946 GNUNET_CRYPTO_hash (bd->data,
947 bd->data_size,
948 &ch);
949 for (unsigned int i = 0; i < record->seen_replies_count; i++)
950 if (0 ==
951 GNUNET_memcmp (&record->seen_replies[i],
952 &ch))
953 {
954 LOG (GNUNET_ERROR_TYPE_DEBUG,
955 "Duplicate reply, not passing request for key %s to local client\n",
956 GNUNET_h2s (&bd->key));
957 GNUNET_STATISTICS_update (GDS_stats,
958 "# Duplicate REPLIES to CLIENT request dropped",
959 1,
960 GNUNET_NO);
961 return GNUNET_YES; /* duplicate */
962 }
963 eval
964 = GNUNET_BLOCK_check_reply (GDS_block_context,
965 record->type,
966 NULL,
967 &bd->key,
968 record->xquery,
969 record->xquery_size,
970 bd->data,
971 bd->data_size);
972 LOG (GNUNET_ERROR_TYPE_DEBUG,
973 "Evaluation result is %d for key %s for local client's query\n",
974 (int) eval,
975 GNUNET_h2s (&bd->key));
976 switch (eval)
977 {
978 case GNUNET_BLOCK_REPLY_OK_LAST:
979 do_free = true;
980 break;
981 case GNUNET_BLOCK_REPLY_TYPE_NOT_SUPPORTED:
982 case GNUNET_BLOCK_REPLY_OK_MORE:
983 GNUNET_array_append (record->seen_replies,
984 record->seen_replies_count,
985 ch);
986 do_free = false;
987 break;
988 case GNUNET_BLOCK_REPLY_OK_DUPLICATE:
989 /* should be impossible to encounter here */
990 GNUNET_break (0);
991 return GNUNET_YES;
992 case GNUNET_BLOCK_REPLY_IRRELEVANT:
993 return GNUNET_YES;
994 default:
995 GNUNET_break (0);
996 return GNUNET_NO;
997 }
998 GNUNET_STATISTICS_update (GDS_stats,
999 "# RESULTS queued for clients",
1000 1,
1001 GNUNET_NO);
1002 xsize += (frc->get_path_length + bd->put_path_length)
1003 * sizeof(struct GNUNET_DHT_PathElement);
1004 if (truncated)
1005 xsize += sizeof (struct GNUNET_PeerIdentity);
1006
1007#if SUPER_REDUNDANT_CHECK
1008 GNUNET_break (0 ==
1009 GNUNET_DHT_verify_path (bd->data,
1010 bd->data_size,
1011 bd->expiration_time,
1012 truncated
1013 ? &bd->trunc_peer
1014 : NULL,
1015 bd->put_path,
1016 bd->put_path_length,
1017 frc->get_path,
1018 frc->get_path_length,
1019 &GDS_my_identity));
1020#endif
1021
1022 env = GNUNET_MQ_msg_extra (reply,
1023 xsize,
1024 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1025 reply->type = htonl (bd->type);
1026 reply->options = htonl (bd->ro);
1027 reply->get_path_length = htonl (frc->get_path_length);
1028 reply->put_path_length = htonl (bd->put_path_length);
1029 reply->unique_id = record->unique_id;
1030 reply->expiration = GNUNET_TIME_absolute_hton (bd->expiration_time);
1031 reply->key = *query_hash;
1032 if (truncated)
1033 {
1034 void *tgt = &reply[1];
1035
1036 GNUNET_memcpy (tgt,
1037 &bd->trunc_peer,
1038 sizeof (struct GNUNET_PeerIdentity));
1039 paths = (struct GNUNET_DHT_PathElement *)
1040 (tgt + sizeof (struct GNUNET_PeerIdentity));
1041 }
1042 else
1043 {
1044 paths = (struct GNUNET_DHT_PathElement *) &reply[1];
1045 }
1046 GNUNET_memcpy (paths,
1047 bd->put_path,
1048 sizeof(struct GNUNET_DHT_PathElement)
1049 * bd->put_path_length);
1050 GNUNET_memcpy (&paths[bd->put_path_length],
1051 frc->get_path,
1052 sizeof(struct GNUNET_DHT_PathElement)
1053 * frc->get_path_length);
1054 GNUNET_memcpy (&paths[frc->get_path_length + bd->put_path_length],
1055 bd->data,
1056 bd->data_size);
1057 LOG (GNUNET_ERROR_TYPE_DEBUG,
1058 "Sending reply to query %s for client %p\n",
1059 GNUNET_h2s (query_hash),
1060 record->ch->client);
1061 GNUNET_MQ_send (record->ch->mq,
1062 env);
1063 if (GNUNET_YES == do_free)
1064 remove_client_query_record (record);
1065 return GNUNET_YES;
1066}
1067
1068
1069bool
1070GDS_CLIENTS_handle_reply (const struct GNUNET_DATACACHE_Block *bd,
1071 const struct GNUNET_HashCode *query_hash,
1072 unsigned int get_path_length,
1073 const struct GNUNET_DHT_PathElement *get_path)
1074{
1075 struct ForwardReplyContext frc;
1076 size_t msize = sizeof (struct GNUNET_DHT_ClientResultMessage)
1077 + bd->data_size
1078 + (get_path_length + bd->put_path_length)
1079 * sizeof(struct GNUNET_DHT_PathElement);
1080#if SANITY_CHECKS > 1
1081 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
1082#endif
1083
1084 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
1085 {
1086 GNUNET_break (0);
1087 return false;
1088 }
1089#if SANITY_CHECKS > 1
1090 if (0 !=
1091 GNUNET_DHT_verify_path (bd->data,
1092 bd->data_size,
1093 bd->expiration_time,
1094 truncated
1095 ? &bd->trunc_peer
1096 : NULL,
1097 bd->put_path,
1098 bd->put_path_length,
1099 get_path,
1100 get_path_length,
1101 &GDS_my_identity))
1102 {
1103 GNUNET_break (0);
1104 return false;
1105 }
1106#endif
1107 frc.bd = bd;
1108 frc.get_path = get_path;
1109 frc.get_path_length = get_path_length;
1110 LOG (GNUNET_ERROR_TYPE_DEBUG,
1111 "Forwarding reply for query hash %s with GPL %u and PPL %u to client\n",
1112 GNUNET_h2s (query_hash),
1113 get_path_length,
1114 bd->put_path_length);
1115 if (0 ==
1116 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1117 query_hash,
1118 &forward_reply,
1119 &frc))
1120 {
1121 LOG (GNUNET_ERROR_TYPE_DEBUG,
1122 "No matching client for reply for query %s\n",
1123 GNUNET_h2s (query_hash));
1124 GNUNET_STATISTICS_update (GDS_stats,
1125 "# REPLIES ignored for CLIENTS (no match)",
1126 1,
1127 GNUNET_NO);
1128 }
1129 return true;
1130}
1131
1132
1133/* **************** HELLO logic ***************** */
1134
1135/**
1136 * Handler for HELLO GET message. Reply to client
1137 * with a URL of our HELLO.
1138 *
1139 * @param cls the client we received this message from
1140 * @param msg the actual message received
1141 *
1142 */
1143static void
1144handle_dht_local_hello_get (void *cls,
1145 const struct GNUNET_MessageHeader *msg)
1146{
1147 struct ClientHandle *ch = cls;
1148 char *url = GNUNET_HELLO_builder_to_url (GDS_my_hello,
1149 &GDS_my_private_key);
1150 size_t slen = strlen (url) + 1;
1151 struct GNUNET_MessageHeader *hdr;
1152 struct GNUNET_MQ_Envelope *env;
1153
1154 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1155 "Handling request from local client for my HELLO\n");
1156 env = GNUNET_MQ_msg_extra (hdr,
1157 slen,
1158 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_URL);
1159 memcpy (&hdr[1],
1160 url,
1161 slen);
1162 GNUNET_free (url);
1163 GNUNET_MQ_send (ch->mq,
1164 env);
1165 GNUNET_SERVICE_client_continue (ch->client);
1166}
1167
1168
1169/**
1170 * Process a client HELLO message received from the service.
1171 *
1172 * @param cls the client we received this message from
1173 * @param hdr HELLO URL message from the service.
1174 * @return #GNUNET_OK if @a hdr is well-formed
1175 */
1176static enum GNUNET_GenericReturnValue
1177check_dht_local_hello_offer (void *cls,
1178 const struct GNUNET_MessageHeader *hdr)
1179{
1180 uint16_t len = ntohs (hdr->size);
1181 const char *buf = (const char *) &hdr[1];
1182
1183 (void) cls;
1184 if ('\0' != buf[len - sizeof (*hdr) - 1])
1185 {
1186 GNUNET_break (0);
1187 return GNUNET_SYSERR;
1188 }
1189 return GNUNET_OK;
1190}
1191
1192
1193/**
1194 * Handler for HELLO OFFER message. Try to use the
1195 * HELLO to connect to another peer.
1196 *
1197 * @param cls the client we received this message from
1198 * @param msg the actual message received
1199 */
1200static void
1201handle_dht_local_hello_offer (void *cls,
1202 const struct GNUNET_MessageHeader *msg)
1203{
1204 struct ClientHandle *ch = cls;
1205 const char *url = (const char *) &msg[1];
1206 struct GNUNET_HELLO_Builder *b;
1207
1208 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1209 "Local client provided HELLO URL %s\n",
1210 url);
1211 b = GNUNET_HELLO_builder_from_url (url);
1212 if (NULL == b)
1213 {
1214 GNUNET_break (0);
1215 GNUNET_SERVICE_client_drop (ch->client);
1216 return;
1217 }
1218 GNUNET_SERVICE_client_continue (ch->client);
1219 GNUNET_HELLO_builder_iterate (b,
1220 &GDS_try_connect,
1221 NULL);
1222 GNUNET_HELLO_builder_free (b);
1223}
1224
1225
1226/* ************* logic for monitors ************** */
1227
1228
1229/**
1230 * Handler for monitor start messages
1231 *
1232 * @param cls the client we received this message from
1233 * @param msg the actual message received
1234 *
1235 */
1236static void
1237handle_dht_local_monitor (void *cls,
1238 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
1239{
1240 struct ClientHandle *ch = cls;
1241 struct ClientMonitorRecord *r;
1242
1243 r = GNUNET_new (struct ClientMonitorRecord);
1244 r->ch = ch;
1245 r->type = ntohl (msg->type);
1246 r->get = ntohs (msg->get);
1247 r->get_resp = ntohs (msg->get_resp);
1248 r->put = ntohs (msg->put);
1249 if (0 != ntohs (msg->filter_key))
1250 r->key = msg->key;
1251 GNUNET_CONTAINER_DLL_insert (monitor_head,
1252 monitor_tail,
1253 r);
1254 GNUNET_SERVICE_client_continue (ch->client);
1255}
1256
1257
1258/**
1259 * Handler for monitor stop messages
1260 *
1261 * @param cls the client we received this message from
1262 * @param msg the actual message received
1263 */
1264static void
1265handle_dht_local_monitor_stop (
1266 void *cls,
1267 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
1268{
1269 struct ClientHandle *ch = cls;
1270
1271 GNUNET_SERVICE_client_continue (ch->client);
1272 for (struct ClientMonitorRecord *r = monitor_head;
1273 NULL != r;
1274 r = r->next)
1275 {
1276 bool keys_match;
1277
1278 keys_match =
1279 (GNUNET_is_zero (&r->key))
1280 ? (0 == ntohs (msg->filter_key))
1281 : ( (0 != ntohs (msg->filter_key)) &&
1282 (! GNUNET_memcmp (&r->key,
1283 &msg->key)) );
1284 if ( (ch == r->ch) &&
1285 (ntohl (msg->type) == r->type) &&
1286 (r->get == msg->get) &&
1287 (r->get_resp == msg->get_resp) &&
1288 (r->put == msg->put) &&
1289 keys_match)
1290 {
1291 GNUNET_CONTAINER_DLL_remove (monitor_head,
1292 monitor_tail,
1293 r);
1294 GNUNET_free (r);
1295 return; /* Delete only ONE entry */
1296 }
1297 }
1298}
1299
1300
1301/**
1302 * Function to call by #for_matching_monitors().
1303 *
1304 * @param cls closure
1305 * @param m a matching monitor
1306 */
1307typedef void
1308(*MonitorAction)(void *cls,
1309 struct ClientMonitorRecord *m);
1310
1311
1312/**
1313 * Call @a cb on all monitors that watch for blocks of @a type
1314 * and key @a key.
1315 *
1316 * @param type the type to match
1317 * @param key the key to match
1318 * @param cb function to call
1319 * @param cb_cls closure for @a cb
1320 */
1321static void
1322for_matching_monitors (enum GNUNET_BLOCK_Type type,
1323 const struct GNUNET_HashCode *key,
1324 MonitorAction cb,
1325 void *cb_cls)
1326{
1327 struct ClientHandle **cl = NULL;
1328 unsigned int cl_size = 0;
1329
1330 for (struct ClientMonitorRecord *m = monitor_head;
1331 NULL != m;
1332 m = m->next)
1333 {
1334 bool found = false;
1335
1336 if ( (GNUNET_BLOCK_TYPE_ANY != m->type) &&
1337 (m->type != type) )
1338 continue;
1339 if ( (! GNUNET_is_zero (&m->key)) &&
1340 (0 ==
1341 GNUNET_memcmp (key,
1342 &m->key)) )
1343 continue;
1344 /* Don't send duplicates */
1345 for (unsigned i = 0; i < cl_size; i++)
1346 if (cl[i] == m->ch)
1347 {
1348 found = true;
1349 break;
1350 }
1351 if (found)
1352 continue;
1353 GNUNET_array_append (cl,
1354 cl_size,
1355 m->ch);
1356 cb (cb_cls,
1357 m);
1358 }
1359 GNUNET_free (cl);
1360}
1361
1362
1363/**
1364 * Closure for #get_action();
1365 */
1366struct GetActionContext
1367{
1368 enum GNUNET_DHT_RouteOption options;
1369 enum GNUNET_BLOCK_Type type;
1370 uint32_t hop_count;
1371 uint32_t desired_replication_level;
1372 struct GNUNET_PeerIdentity trunc_peer;
1373 const struct GNUNET_HashCode *key;
1374};
1375
1376
1377/**
1378 * Function called on monitors that match a GET.
1379 * Sends the GET notification to the monitor.
1380 *
1381 * @param cls a `struct GetActionContext`
1382 * @param m a matching monitor
1383 */
1384static void
1385get_action (void *cls,
1386 struct ClientMonitorRecord *m)
1387{
1388 struct GetActionContext *gac = cls;
1389 struct GNUNET_MQ_Envelope *env;
1390 struct GNUNET_DHT_MonitorGetMessage *mmsg;
1391
1392 env = GNUNET_MQ_msg (mmsg,
1393 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1394 mmsg->options = htonl (gac->options);
1395 mmsg->type = htonl (gac->type);
1396 mmsg->hop_count = htonl (gac->hop_count);
1397 mmsg->desired_replication_level = htonl (gac->desired_replication_level);
1398 mmsg->key = *gac->key;
1399 GNUNET_MQ_send (m->ch->mq,
1400 env);
1401}
1402
1403
1404void
1405GDS_CLIENTS_process_get (enum GNUNET_DHT_RouteOption options,
1406 enum GNUNET_BLOCK_Type type,
1407 uint32_t hop_count,
1408 uint32_t desired_replication_level,
1409 const struct GNUNET_HashCode *key)
1410{
1411 struct GetActionContext gac = {
1412 .options = options,
1413 .type = type,
1414 .hop_count = hop_count,
1415 .desired_replication_level = desired_replication_level,
1416 .key = key
1417 };
1418
1419 for_matching_monitors (type,
1420 key,
1421 &get_action,
1422 &gac);
1423}
1424
1425
1426/**
1427 * Closure for response_action().
1428 */
1429struct ResponseActionContext
1430{
1431 const struct GNUNET_DATACACHE_Block *bd;
1432 const struct GNUNET_DHT_PathElement *get_path;
1433 unsigned int get_path_length;
1434};
1435
1436
1437/**
1438 * Function called on monitors that match a response.
1439 * Sends the response notification to the monitor.
1440 *
1441 * @param cls a `struct ResponseActionContext`
1442 * @param m a matching monitor
1443 */
1444static void
1445response_action (void *cls,
1446 struct ClientMonitorRecord *m)
1447{
1448 const struct ResponseActionContext *resp_ctx = cls;
1449 const struct GNUNET_DATACACHE_Block *bd = resp_ctx->bd;
1450 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
1451 struct GNUNET_MQ_Envelope *env;
1452 struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1453 struct GNUNET_DHT_PathElement *path;
1454 size_t msize;
1455
1456 msize = bd->data_size;
1457 msize += (resp_ctx->get_path_length + bd->put_path_length)
1458 * sizeof(struct GNUNET_DHT_PathElement);
1459 if (truncated)
1460 msize += sizeof (struct GNUNET_PeerIdentity);
1461 env = GNUNET_MQ_msg_extra (mmsg,
1462 msize,
1463 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1464 mmsg->type = htonl (bd->type);
1465 mmsg->put_path_length = htonl (bd->put_path_length);
1466 mmsg->get_path_length = htonl (resp_ctx->get_path_length);
1467 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1468 mmsg->key = bd->key;
1469 if (truncated)
1470 {
1471 void *tgt = &mmsg[1];
1472
1473 GNUNET_memcpy (tgt,
1474 &bd->trunc_peer,
1475 sizeof (struct GNUNET_PeerIdentity));
1476 path = (struct GNUNET_DHT_PathElement *)
1477 (tgt + sizeof (struct GNUNET_PeerIdentity));
1478 }
1479 else
1480 {
1481 path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1482 }
1483 GNUNET_memcpy (path,
1484 bd->put_path,
1485 bd->put_path_length * sizeof(struct GNUNET_DHT_PathElement));
1486 GNUNET_memcpy (path,
1487 resp_ctx->get_path,
1488 resp_ctx->get_path_length
1489 * sizeof(struct GNUNET_DHT_PathElement));
1490 GNUNET_memcpy (&path[resp_ctx->get_path_length],
1491 bd->data,
1492 bd->data_size);
1493 GNUNET_MQ_send (m->ch->mq,
1494 env);
1495}
1496
1497
1498void
1499GDS_CLIENTS_process_get_resp (const struct GNUNET_DATACACHE_Block *bd,
1500 const struct GNUNET_DHT_PathElement *get_path,
1501 unsigned int get_path_length)
1502{
1503 struct ResponseActionContext rac = {
1504 .bd = bd,
1505 .get_path = get_path,
1506 .get_path_length = get_path_length
1507 };
1508
1509 for_matching_monitors (bd->type,
1510 &bd->key,
1511 &response_action,
1512 &rac);
1513}
1514
1515
1516/**
1517 * Closure for put_action().
1518 */
1519struct PutActionContext
1520{
1521 const struct GNUNET_DATACACHE_Block *bd;
1522 uint32_t hop_count;
1523 uint32_t desired_replication_level;
1524};
1525
1526
1527/**
1528 * Function called on monitors that match a PUT.
1529 * Sends the PUT notification to the monitor.
1530 *
1531 * @param cls a `struct PutActionContext`
1532 * @param m a matching monitor
1533 */
1534static void
1535put_action (void *cls,
1536 struct ClientMonitorRecord *m)
1537{
1538 const struct PutActionContext *put_ctx = cls;
1539 const struct GNUNET_DATACACHE_Block *bd = put_ctx->bd;
1540 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
1541 struct GNUNET_MQ_Envelope *env;
1542 struct GNUNET_DHT_MonitorPutMessage *mmsg;
1543 struct GNUNET_DHT_PathElement *msg_path;
1544 size_t msize;
1545
1546 msize = bd->data_size
1547 + bd->put_path_length
1548 * sizeof(struct GNUNET_DHT_PathElement);
1549 if (truncated)
1550 msize += sizeof (struct GNUNET_PeerIdentity);
1551 env = GNUNET_MQ_msg_extra (mmsg,
1552 msize,
1553 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1554 mmsg->options = htonl (bd->ro);
1555 mmsg->type = htonl (bd->type);
1556 mmsg->hop_count = htonl (put_ctx->hop_count);
1557 mmsg->desired_replication_level = htonl (put_ctx->desired_replication_level);
1558 mmsg->put_path_length = htonl (bd->put_path_length);
1559 mmsg->key = bd->key;
1560 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1561 if (truncated)
1562 {
1563 void *tgt = &mmsg[1];
1564
1565 GNUNET_memcpy (tgt,
1566 &bd->trunc_peer,
1567 sizeof (struct GNUNET_PeerIdentity));
1568 msg_path = (struct GNUNET_DHT_PathElement *)
1569 (tgt + sizeof (struct GNUNET_PeerIdentity));
1570 }
1571 else
1572 {
1573 msg_path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1574 }
1575 GNUNET_memcpy (msg_path,
1576 bd->put_path,
1577 bd->put_path_length * sizeof(struct GNUNET_DHT_PathElement));
1578 GNUNET_memcpy (&msg_path[bd->put_path_length],
1579 bd->data,
1580 bd->data_size);
1581 GNUNET_MQ_send (m->ch->mq,
1582 env);
1583}
1584
1585
1586void
1587GDS_CLIENTS_process_put (const struct GNUNET_DATACACHE_Block *bd,
1588 uint32_t hop_count,
1589 uint32_t desired_replication_level)
1590{
1591 struct PutActionContext put_ctx = {
1592 .bd = bd,
1593 .hop_count = hop_count,
1594 .desired_replication_level = desired_replication_level
1595 };
1596
1597 for_matching_monitors (bd->type,
1598 &bd->key,
1599 &put_action,
1600 &put_ctx);
1601}
1602
1603
1604/* ********************** Initialization logic ***************** */
1605
1606
1607/**
1608 * Initialize client subsystem.
1609 */
1610static void
1611GDS_CLIENTS_init (void)
1612{
1613 forward_map
1614 = GNUNET_CONTAINER_multihashmap_create (1024,
1615 GNUNET_YES);
1616 retry_heap
1617 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1618}
1619
1620
1621/**
1622 * Shutdown client subsystem.
1623 */
1624static void
1625GDS_CLIENTS_stop (void)
1626{
1627 if (NULL != retry_task)
1628 {
1629 GNUNET_SCHEDULER_cancel (retry_task);
1630 retry_task = NULL;
1631 }
1632}
1633
1634
1635/**
1636 * Define "main" method using service macro.
1637 *
1638 * @param name name of the service, like "dht" or "xdht"
1639 * @param run name of the initializaton method for the service
1640 */
1641#define GDS_DHT_SERVICE_INIT(name, run) \
1642 GNUNET_SERVICE_MAIN \
1643 (name, \
1644 GNUNET_SERVICE_OPTION_NONE, \
1645 run, \
1646 &client_connect_cb, \
1647 &client_disconnect_cb, \
1648 NULL, \
1649 GNUNET_MQ_hd_var_size (dht_local_put, \
1650 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1651 struct GNUNET_DHT_ClientPutMessage, \
1652 NULL), \
1653 GNUNET_MQ_hd_var_size (dht_local_get, \
1654 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1655 struct GNUNET_DHT_ClientGetMessage, \
1656 NULL), \
1657 GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1658 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1659 struct GNUNET_DHT_ClientGetStopMessage, \
1660 NULL), \
1661 GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1662 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1663 struct GNUNET_DHT_MonitorStartStopMessage, \
1664 NULL), \
1665 GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1666 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1667 struct GNUNET_DHT_MonitorStartStopMessage, \
1668 NULL), \
1669 GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1670 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1671 struct GNUNET_DHT_ClientGetResultSeenMessage, \
1672 NULL), \
1673 GNUNET_MQ_hd_fixed_size (dht_local_hello_get, \
1674 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_GET, \
1675 struct GNUNET_MessageHeader, \
1676 NULL), \
1677 GNUNET_MQ_hd_var_size (dht_local_hello_offer, \
1678 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_URL, \
1679 struct GNUNET_MessageHeader, \
1680 NULL), \
1681 GNUNET_MQ_handler_end ())
1682
1683
1684/**
1685 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1686 */
1687void __attribute__ ((destructor))
1688GDS_CLIENTS_done ()
1689{
1690 if (NULL != retry_heap)
1691 {
1692 GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1693 GNUNET_CONTAINER_heap_destroy (retry_heap);
1694 retry_heap = NULL;
1695 }
1696 if (NULL != forward_map)
1697 {
1698 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1699 GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1700 forward_map = NULL;
1701 }
1702}
1703
1704
1705/* end of gnunet-service-dht_clients.c */