aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht_clients.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht/gnunet-service-dht_clients.c')
-rw-r--r--src/dht/gnunet-service-dht_clients.c1698
1 files changed, 0 insertions, 1698 deletions
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
deleted file mode 100644
index fdcc31f13..000000000
--- a/src/dht/gnunet-service-dht_clients.c
+++ /dev/null
@@ -1,1698 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011, 2016, 2017, 2022 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file dht/gnunet-service-dht_clients.c
23 * @brief GNUnet DHT service's client management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_constants.h"
29#include "gnunet_protocols.h"
30#include "gnunet_statistics_service.h"
31#include "gnunet-service-dht.h"
32#include "gnunet-service-dht_datacache.h"
33#include "gnunet-service-dht_neighbours.h"
34#include "dht.h"
35
36
37/**
38 * Enable slow sanity checks to debug issues.
39 * 0: do not check
40 * 1: check all external inputs
41 * 2: check internal computations as well
42 */
43#define SANITY_CHECKS 2
44
45/**
46 * Should routing details be logged to stderr (for debugging)?
47 */
48#define LOG_TRAFFIC(kind, ...) GNUNET_log_from (kind, "dht-traffic", \
49 __VA_ARGS__)
50
51#define LOG(kind, ...) GNUNET_log_from (kind, "dht-clients", __VA_ARGS__)
52
53
54/**
55 * Struct containing information about a client,
56 * handle to connect to it, and any pending messages
57 * that need to be sent to it.
58 */
59struct ClientHandle;
60
61
62/**
63 * Entry in the local forwarding map for a client's GET request.
64 */
65struct ClientQueryRecord
66{
67 /**
68 * The key this request was about
69 */
70 struct GNUNET_HashCode key;
71
72 /**
73 * Kept in a DLL with @e client.
74 */
75 struct ClientQueryRecord *next;
76
77 /**
78 * Kept in a DLL with @e client.
79 */
80 struct ClientQueryRecord *prev;
81
82 /**
83 * Client responsible for the request.
84 */
85 struct ClientHandle *ch;
86
87 /**
88 * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
89 */
90 const void *xquery;
91
92 /**
93 * Array of (hashes of) replies we have already seen for this request.
94 */
95 struct GNUNET_HashCode *seen_replies;
96
97 /**
98 * Pointer to this nodes heap location in the retry-heap (for fast removal)
99 */
100 struct GNUNET_CONTAINER_HeapNode *hnode;
101
102 /**
103 * What's the delay between re-try operations that we currently use for this
104 * request?
105 */
106 struct GNUNET_TIME_Relative retry_frequency;
107
108 /**
109 * What's the next time we should re-try this request?
110 */
111 struct GNUNET_TIME_Absolute retry_time;
112
113 /**
114 * The unique identifier of this request
115 */
116 uint64_t unique_id;
117
118 /**
119 * Number of bytes in xquery.
120 */
121 size_t xquery_size;
122
123 /**
124 * Number of entries in @e seen_replies.
125 */
126 unsigned int seen_replies_count;
127
128 /**
129 * Desired replication level
130 */
131 uint32_t replication;
132
133 /**
134 * Any message options for this request
135 */
136 enum GNUNET_DHT_RouteOption msg_options;
137
138 /**
139 * The type for the data for the GET request.
140 */
141 enum GNUNET_BLOCK_Type type;
142};
143
144
145/**
146 * Struct containing parameters of monitoring requests.
147 */
148struct ClientMonitorRecord
149{
150 /**
151 * Next element in DLL.
152 */
153 struct ClientMonitorRecord *next;
154
155 /**
156 * Previous element in DLL.
157 */
158 struct ClientMonitorRecord *prev;
159
160 /**
161 * Client to notify of these requests.
162 */
163 struct ClientHandle *ch;
164
165 /**
166 * Key of data of interest. All bits zero for 'all'.
167 */
168 struct GNUNET_HashCode key;
169
170 /**
171 * Type of blocks that are of interest
172 */
173 enum GNUNET_BLOCK_Type type;
174
175 /**
176 * Flag whether to notify about GET messages.
177 */
178 int16_t get;
179
180 /**
181 * Flag whether to notify about GET_REPONSE messages.
182 */
183 int16_t get_resp;
184
185 /**
186 * Flag whether to notify about PUT messages.
187 */
188 uint16_t put;
189
190};
191
192
193/**
194 * Struct containing information about a client,
195 * handle to connect to it, and any pending messages
196 * that need to be sent to it.
197 */
198struct ClientHandle
199{
200 /**
201 * Linked list of active queries of this client.
202 */
203 struct ClientQueryRecord *cqr_head;
204
205 /**
206 * Linked list of active queries of this client.
207 */
208 struct ClientQueryRecord *cqr_tail;
209
210 /**
211 * The handle to this client
212 */
213 struct GNUNET_SERVICE_Client *client;
214
215 /**
216 * The message queue to this client
217 */
218 struct GNUNET_MQ_Handle *mq;
219};
220
221
222/**
223 * Our handle to the BLOCK library.
224 */
225struct GNUNET_BLOCK_Context *GDS_block_context;
226
227/**
228 * Handle for the statistics service.
229 */
230struct GNUNET_STATISTICS_Handle *GDS_stats;
231
232/**
233 * Handle for the service.
234 */
235struct GNUNET_SERVICE_Handle *GDS_service;
236
237/**
238 * The configuration the DHT service is running with
239 */
240const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
241
242/**
243 * List of active monitoring requests.
244 */
245static struct ClientMonitorRecord *monitor_head;
246
247/**
248 * List of active monitoring requests.
249 */
250static struct ClientMonitorRecord *monitor_tail;
251
252/**
253 * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
254 */
255static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
256
257/**
258 * Heap with all of our client's request, sorted by retry time (earliest on top).
259 */
260static struct GNUNET_CONTAINER_Heap *retry_heap;
261
262/**
263 * Task that re-transmits requests (using retry_heap).
264 */
265static struct GNUNET_SCHEDULER_Task *retry_task;
266
267
268/**
269 * Free data structures associated with the given query.
270 *
271 * @param record record to remove
272 */
273static void
274remove_client_query_record (struct ClientQueryRecord *record)
275{
276 struct ClientHandle *ch = record->ch;
277
278 GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
279 ch->cqr_tail,
280 record);
281 GNUNET_assert (GNUNET_YES ==
282 GNUNET_CONTAINER_multihashmap_remove (forward_map,
283 &record->key,
284 record));
285 if (NULL != record->hnode)
286 GNUNET_CONTAINER_heap_remove_node (record->hnode);
287 GNUNET_array_grow (record->seen_replies,
288 record->seen_replies_count,
289 0);
290 GNUNET_free (record);
291}
292
293
294/**
295 * Functions with this signature are called whenever a local client is
296 * connects to us.
297 *
298 * @param cls closure (NULL for dht)
299 * @param client identification of the client
300 * @param mq message queue for talking to @a client
301 * @return our `struct ClientHandle` for @a client
302 */
303static void *
304client_connect_cb (void *cls,
305 struct GNUNET_SERVICE_Client *client,
306 struct GNUNET_MQ_Handle *mq)
307{
308 struct ClientHandle *ch;
309
310 (void) cls;
311 ch = GNUNET_new (struct ClientHandle);
312 ch->client = client;
313 ch->mq = mq;
314 return ch;
315}
316
317
318/**
319 * Functions with this signature are called whenever a client
320 * is disconnected on the network level.
321 *
322 * @param cls closure (NULL for dht)
323 * @param client identification of the client
324 * @param app_ctx our `struct ClientHandle` for @a client
325 */
326static void
327client_disconnect_cb (void *cls,
328 struct GNUNET_SERVICE_Client *client,
329 void *app_ctx)
330{
331 struct ClientHandle *ch = app_ctx;
332
333 (void) cls;
334 (void) client;
335 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
336 "Local client %p disconnects\n",
337 ch);
338 {
339 struct ClientMonitorRecord *next;
340
341 for (struct ClientMonitorRecord *monitor = monitor_head;
342 NULL != monitor;
343 monitor = next)
344 {
345 next = monitor->next;
346 if (monitor->ch != ch)
347 continue;
348 GNUNET_CONTAINER_DLL_remove (monitor_head,
349 monitor_tail,
350 monitor);
351 GNUNET_free (monitor);
352 }
353 }
354
355 {
356 struct ClientQueryRecord *cqr;
357
358 while (NULL != (cqr = ch->cqr_head))
359 remove_client_query_record (cqr);
360 }
361 GNUNET_free (ch);
362}
363
364
365/**
366 * Route the given request via the DHT. This includes updating
367 * the bloom filter and retransmission times, building the P2P
368 * message and initiating the routing operation.
369 *
370 * @param cqr request to transmit
371 */
372static void
373transmit_request (struct ClientQueryRecord *cqr)
374{
375 struct GNUNET_BLOCK_Group *bg;
376 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
377
378 GNUNET_STATISTICS_update (GDS_stats,
379 "# GET requests from clients injected",
380 1,
381 GNUNET_NO);
382 bg = GNUNET_BLOCK_group_create (GDS_block_context,
383 cqr->type,
384 NULL, /* raw data */
385 0, /* raw data size */
386 "seen-set-size",
387 cqr->seen_replies_count,
388 NULL);
389 GNUNET_BLOCK_group_set_seen (bg,
390 cqr->seen_replies,
391 cqr->seen_replies_count);
392 peer_bf
393 = GNUNET_CONTAINER_bloomfilter_init (NULL,
394 DHT_BLOOM_SIZE,
395 GNUNET_CONSTANTS_BLOOMFILTER_K);
396 LOG (GNUNET_ERROR_TYPE_DEBUG,
397 "Initiating GET for %s, replication %u, already have %u replies\n",
398 GNUNET_h2s (&cqr->key),
399 cqr->replication,
400 cqr->seen_replies_count);
401 GDS_NEIGHBOURS_handle_get (cqr->type,
402 cqr->msg_options,
403 cqr->replication,
404 0 /* hop count */,
405 &cqr->key,
406 cqr->xquery,
407 cqr->xquery_size,
408 bg,
409 peer_bf);
410 GNUNET_BLOCK_group_destroy (bg);
411 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
412
413 /* Exponential back-off for retries.
414 * max. is #GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
415 cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
416 cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
417}
418
419
420/**
421 * Task that looks at the #retry_heap and transmits all of the requests
422 * on the heap that are ready for transmission. Then re-schedules
423 * itself (unless the heap is empty).
424 *
425 * @param cls unused
426 */
427static void
428transmit_next_request_task (void *cls)
429{
430 struct ClientQueryRecord *cqr;
431
432 (void) cls;
433 retry_task = NULL;
434 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
435 {
436 cqr->hnode = NULL;
437 if (! GNUNET_TIME_absolute_is_past (cqr->retry_time))
438 {
439 cqr->hnode
440 = GNUNET_CONTAINER_heap_insert (retry_heap,
441 cqr,
442 cqr->retry_time.abs_value_us);
443 retry_task
444 = GNUNET_SCHEDULER_add_at (cqr->retry_time,
445 &transmit_next_request_task,
446 NULL);
447 return;
448 }
449 transmit_request (cqr);
450 cqr->hnode
451 = GNUNET_CONTAINER_heap_insert (retry_heap,
452 cqr,
453 cqr->retry_time.abs_value_us);
454 }
455}
456
457
458/**
459 * Check DHT PUT messages from the client.
460 *
461 * @param cls the client we received this message from
462 * @param dht_msg the actual message received
463 * @return #GNUNET_OK (always)
464 */
465static enum GNUNET_GenericReturnValue
466check_dht_local_put (void *cls,
467 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
468{
469 uint32_t replication_level = ntohl (dht_msg->desired_replication_level);
470
471 (void) cls;
472 if (replication_level > GNUNET_DHT_MAXIMUM_REPLICATION_LEVEL)
473 {
474 GNUNET_break_op (0);
475 return GNUNET_SYSERR;
476 }
477 return GNUNET_OK;
478}
479
480
481/**
482 * Handler for PUT messages.
483 *
484 * @param cls the client we received this message from
485 * @param dht_msg the actual message received
486 */
487static void
488handle_dht_local_put (void *cls,
489 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
490{
491 struct ClientHandle *ch = cls;
492 uint16_t size = ntohs (dht_msg->header.size);
493 uint32_t replication_level
494 = ntohl (dht_msg->desired_replication_level);
495 struct GNUNET_DATACACHE_Block bd = {
496 .key = dht_msg->key,
497 .expiration_time = GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
498 .data = &dht_msg[1],
499 .data_size = size - sizeof (*dht_msg),
500 .type = ntohl (dht_msg->type),
501 .ro = (enum GNUNET_DHT_RouteOption) ntohl (dht_msg->options)
502 };
503
504 LOG (GNUNET_ERROR_TYPE_DEBUG,
505 "Handling local PUT of %lu-bytes for query %s of type %u\n",
506 (unsigned long) (size - sizeof(struct GNUNET_DHT_ClientPutMessage)),
507 GNUNET_h2s (&dht_msg->key),
508 (unsigned int) bd.type);
509 if (GNUNET_OK !=
510 GNUNET_BLOCK_check_block (GDS_block_context,
511 bd.type,
512 bd.data,
513 bd.data_size))
514 {
515 GNUNET_break (0);
516 return;
517 }
518 GNUNET_STATISTICS_update (GDS_stats,
519 "# PUT requests received from clients",
520 1,
521 GNUNET_NO);
522 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
523 "CLIENT-PUT %s\n",
524 GNUNET_h2s_full (&dht_msg->key));
525 /* give to local clients */
526 GNUNET_break (GDS_CLIENTS_handle_reply (&bd,
527 &bd.key,
528 0, NULL /* get path */));
529
530 {
531 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
532
533 peer_bf
534 = GNUNET_CONTAINER_bloomfilter_init (NULL,
535 DHT_BLOOM_SIZE,
536 GNUNET_CONSTANTS_BLOOMFILTER_K);
537 /* store locally */
538 if ( (0 != (bd.ro & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
539 (GDS_am_closest_peer (&dht_msg->key,
540 peer_bf)))
541 GDS_DATACACHE_handle_put (&bd);
542 /* route to other peers */
543 if (GNUNET_OK !=
544 GDS_NEIGHBOURS_handle_put (&bd,
545 replication_level,
546 0 /* hop count */,
547 peer_bf))
548 {
549 GNUNET_STATISTICS_update (GDS_stats,
550 "# Local PUT requests not routed",
551 1,
552 GNUNET_NO);
553 }
554 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
555 }
556 GDS_CLIENTS_process_put (
557 &bd,
558 0, /* hop count */
559 replication_level);
560 GNUNET_SERVICE_client_continue (ch->client);
561}
562
563
564/**
565 * Handle a result from local datacache for a GET operation.
566 *
567 * @param cls the `struct ClientHandle` of the client doing the query
568 * @param bd details about the block that was found
569 */
570static void
571handle_local_result (void *cls,
572 const struct GNUNET_DATACACHE_Block *bd)
573{
574 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
575 "Datacache provided result for query key %s\n",
576 GNUNET_h2s (&bd->key));
577 GNUNET_break (GDS_CLIENTS_handle_reply (bd,
578 &bd->key,
579 0, NULL /* get_path */));
580}
581
582
583/**
584 * Check DHT GET messages from the client.
585 *
586 * @param cls the client we received this message from
587 * @param message the actual message received
588 * @return #GNUNET_OK (always)
589 */
590static enum GNUNET_GenericReturnValue
591check_dht_local_get (void *cls,
592 const struct GNUNET_DHT_ClientGetMessage *get)
593{
594 (void) cls;
595 (void) get;
596 /* always well-formed */
597 return GNUNET_OK;
598}
599
600
601/**
602 * Handler for DHT GET messages from the client.
603 *
604 * @param cls the client we received this message from
605 * @param message the actual message received
606 */
607static void
608handle_dht_local_get (void *cls,
609 const struct GNUNET_DHT_ClientGetMessage *get)
610{
611 struct ClientHandle *ch = cls;
612 struct ClientQueryRecord *cqr;
613 uint16_t size = ntohs (get->header.size);
614 const char *xquery = (const char *) &get[1];
615 size_t xquery_size = size - sizeof(struct GNUNET_DHT_ClientGetMessage);
616
617 LOG (GNUNET_ERROR_TYPE_DEBUG,
618 "Received GET request for %s from local client %p, xq: %.*s\n",
619 GNUNET_h2s (&get->key),
620 ch->client,
621 (int) xquery_size,
622 xquery);
623 GNUNET_STATISTICS_update (GDS_stats,
624 "# GET requests received from clients",
625 1,
626 GNUNET_NO);
627 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
628 "CLIENT-GET %s\n",
629 GNUNET_h2s_full (&get->key));
630
631 cqr = GNUNET_malloc (sizeof(struct ClientQueryRecord) + xquery_size);
632 cqr->key = get->key;
633 cqr->ch = ch;
634 cqr->xquery = (const void *) &cqr[1];
635 GNUNET_memcpy (&cqr[1],
636 xquery,
637 xquery_size);
638 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap,
639 cqr,
640 0);
641 cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
642 cqr->retry_time = GNUNET_TIME_absolute_get ();
643 cqr->unique_id = get->unique_id;
644 cqr->xquery_size = xquery_size;
645 cqr->replication = ntohl (get->desired_replication_level);
646 cqr->msg_options = (enum GNUNET_DHT_RouteOption) ntohl (get->options);
647 cqr->type = ntohl (get->type);
648 GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
649 ch->cqr_tail,
650 cqr);
651 GNUNET_CONTAINER_multihashmap_put (forward_map,
652 &cqr->key,
653 cqr,
654 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
655 GDS_CLIENTS_process_get (cqr->msg_options,
656 cqr->type,
657 0, /* hop count */
658 cqr->replication,
659 &get->key);
660 /* start remote requests */
661 if (NULL != retry_task)
662 GNUNET_SCHEDULER_cancel (retry_task);
663 retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task,
664 NULL);
665 /* perform local lookup */
666 GDS_DATACACHE_handle_get (&get->key,
667 cqr->type,
668 cqr->xquery,
669 xquery_size,
670 NULL,
671 &handle_local_result,
672 ch);
673 GNUNET_SERVICE_client_continue (ch->client);
674}
675
676
677/**
678 * Closure for #find_by_unique_id().
679 */
680struct FindByUniqueIdContext
681{
682 /**
683 * Where to store the result, if found.
684 */
685 struct ClientQueryRecord *cqr;
686
687 /**
688 * Unique ID to look for.
689 */
690 uint64_t unique_id;
691};
692
693
694/**
695 * Function called for each existing DHT record for the given
696 * query. Checks if it matches the UID given in the closure
697 * and if so returns the entry as a result.
698 *
699 * @param cls the search context
700 * @param key query for the lookup (not used)
701 * @param value the `struct ClientQueryRecord`
702 * @return #GNUNET_YES to continue iteration (result not yet found)
703 */
704static enum GNUNET_GenericReturnValue
705find_by_unique_id (void *cls,
706 const struct GNUNET_HashCode *key,
707 void *value)
708{
709 struct FindByUniqueIdContext *fui_ctx = cls;
710 struct ClientQueryRecord *cqr = value;
711
712 if (cqr->unique_id != fui_ctx->unique_id)
713 return GNUNET_YES;
714 fui_ctx->cqr = cqr;
715 return GNUNET_NO;
716}
717
718
719/**
720 * Check "GET result seen" messages from the client.
721 *
722 * @param cls the client we received this message from
723 * @param message the actual message received
724 * @return #GNUNET_OK if @a seen is well-formed
725 */
726static enum GNUNET_GenericReturnValue
727check_dht_local_get_result_seen (
728 void *cls,
729 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
730{
731 uint16_t size = ntohs (seen->header.size);
732 unsigned int hash_count =
733 (size - sizeof(*seen))
734 / sizeof(struct GNUNET_HashCode);
735
736 if (size != sizeof(*seen) + hash_count * sizeof(struct GNUNET_HashCode))
737 {
738 GNUNET_break (0);
739 return GNUNET_SYSERR;
740 }
741 return GNUNET_OK;
742}
743
744
745/**
746 * Handler for "GET result seen" messages from the client.
747 *
748 * @param cls the client we received this message from
749 * @param message the actual message received
750 */
751static void
752handle_dht_local_get_result_seen (
753 void *cls,
754 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
755{
756 struct ClientHandle *ch = cls;
757 uint16_t size = ntohs (seen->header.size);
758 unsigned int hash_count = (size - sizeof(*seen))
759 / sizeof(struct GNUNET_HashCode);
760 const struct GNUNET_HashCode *hc = (const struct GNUNET_HashCode*) &seen[1];
761 struct FindByUniqueIdContext fui_ctx = {
762 .unique_id = seen->unique_id
763 };
764 unsigned int old_count;
765 struct ClientQueryRecord *cqr;
766
767 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
768 &seen->key,
769 &find_by_unique_id,
770 &fui_ctx);
771 if (NULL == (cqr = fui_ctx.cqr))
772 {
773 GNUNET_break (0);
774 GNUNET_SERVICE_client_drop (ch->client);
775 return;
776 }
777 /* finally, update 'seen' list */
778 old_count = cqr->seen_replies_count;
779 GNUNET_array_grow (cqr->seen_replies,
780 cqr->seen_replies_count,
781 cqr->seen_replies_count + hash_count);
782 GNUNET_memcpy (&cqr->seen_replies[old_count],
783 hc,
784 sizeof(struct GNUNET_HashCode) * hash_count);
785}
786
787
788/**
789 * Closure for #remove_by_unique_id().
790 */
791struct RemoveByUniqueIdContext
792{
793 /**
794 * Client that issued the removal request.
795 */
796 struct ClientHandle *ch;
797
798 /**
799 * Unique ID of the request.
800 */
801 uint64_t unique_id;
802};
803
804
805/**
806 * Iterator over hash map entries that frees all entries
807 * that match the given client and unique ID.
808 *
809 * @param cls unique ID and client to search for in source routes
810 * @param key current key code
811 * @param value value in the hash map, a ClientQueryRecord
812 * @return #GNUNET_YES (we should continue to iterate)
813 */
814static enum GNUNET_GenericReturnValue
815remove_by_unique_id (void *cls,
816 const struct GNUNET_HashCode *key,
817 void *value)
818{
819 const struct RemoveByUniqueIdContext *ctx = cls;
820 struct ClientQueryRecord *cqr = value;
821
822 if (cqr->unique_id != ctx->unique_id)
823 return GNUNET_YES;
824 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
825 "Removing client %p's record for key %s (by unique id)\n",
826 ctx->ch->client,
827 GNUNET_h2s (key));
828 remove_client_query_record (cqr);
829 return GNUNET_YES;
830}
831
832
833/**
834 * Handler for any generic DHT stop messages, calls the appropriate handler
835 * depending on message type (if processed locally)
836 *
837 * @param cls client we received this message from
838 * @param message the actual message received
839 *
840 */
841static void
842handle_dht_local_get_stop (
843 void *cls,
844 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
845{
846 struct ClientHandle *ch = cls;
847 struct RemoveByUniqueIdContext ctx;
848
849 GNUNET_STATISTICS_update (GDS_stats,
850 "# GET STOP requests received from clients",
851 1,
852 GNUNET_NO);
853 LOG (GNUNET_ERROR_TYPE_DEBUG,
854 "Received GET STOP request for %s from local client %p\n",
855 GNUNET_h2s (&dht_stop_msg->key),
856 ch->client);
857 ctx.ch = ch;
858 ctx.unique_id = dht_stop_msg->unique_id;
859 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
860 &dht_stop_msg->key,
861 &remove_by_unique_id,
862 &ctx);
863 GNUNET_SERVICE_client_continue (ch->client);
864}
865
866
867/**
868 * Closure for #forward_reply()
869 */
870struct ForwardReplyContext
871{
872 /**
873 * Block details.
874 */
875 const struct GNUNET_DATACACHE_Block *bd;
876
877 /**
878 * GET path taken.
879 */
880 const struct GNUNET_DHT_PathElement *get_path;
881
882 /**
883 * Number of entries in @e get_path.
884 */
885 unsigned int get_path_length;
886
887};
888
889
890/**
891 * Iterator over hash map entries that send a given reply to
892 * each of the matching clients. With some tricky recycling
893 * of the buffer.
894 *
895 * @param cls the `struct ForwardReplyContext`
896 * @param query_hash hash of the query for which this may be a reply
897 * @param value value in the hash map, a ClientQueryRecord
898 * @return #GNUNET_YES (we should continue to iterate),
899 * if the result is mal-formed, #GNUNET_NO
900 */
901static enum GNUNET_GenericReturnValue
902forward_reply (void *cls,
903 const struct GNUNET_HashCode *query_hash,
904 void *value)
905{
906 struct ForwardReplyContext *frc = cls;
907 struct ClientQueryRecord *record = value;
908 const struct GNUNET_DATACACHE_Block *bd = frc->bd;
909 struct GNUNET_MQ_Envelope *env;
910 struct GNUNET_DHT_ClientResultMessage *reply;
911 enum GNUNET_BLOCK_ReplyEvaluationResult eval;
912 bool do_free;
913 struct GNUNET_HashCode ch;
914 struct GNUNET_DHT_PathElement *paths;
915 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
916 size_t xsize = bd->data_size;
917
918 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
919 "CLIENT-RESULT %s\n",
920 GNUNET_h2s_full (&frc->bd->key));
921 if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
922 (record->type != frc->bd->type) )
923 {
924 LOG (GNUNET_ERROR_TYPE_DEBUG,
925 "Record type mismatch, not passing request for key %s to local client\n",
926 GNUNET_h2s (&frc->bd->key));
927 GNUNET_STATISTICS_update (GDS_stats,
928 "# Key match, type mismatches in REPLY to CLIENT",
929 1,
930 GNUNET_NO);
931 return GNUNET_YES; /* type mismatch */
932 }
933 if ( (0 == (record->msg_options & GNUNET_DHT_RO_FIND_APPROXIMATE)) &&
934 (0 != GNUNET_memcmp (&frc->bd->key,
935 query_hash)) )
936 {
937 GNUNET_STATISTICS_update (GDS_stats,
938 "# Inexact key match, but exact match required",
939 1,
940 GNUNET_NO);
941 return GNUNET_YES; /* type mismatch */
942 }
943 GNUNET_CRYPTO_hash (frc->bd->data,
944 frc->bd->data_size,
945 &ch);
946 for (unsigned int i = 0; i < record->seen_replies_count; i++)
947 if (0 ==
948 GNUNET_memcmp (&record->seen_replies[i],
949 &ch))
950 {
951 LOG (GNUNET_ERROR_TYPE_DEBUG,
952 "Duplicate reply, not passing request for key %s to local client\n",
953 GNUNET_h2s (&frc->bd->key));
954 GNUNET_STATISTICS_update (GDS_stats,
955 "# Duplicate REPLIES to CLIENT request dropped",
956 1,
957 GNUNET_NO);
958 return GNUNET_YES; /* duplicate */
959 }
960 eval
961 = GNUNET_BLOCK_check_reply (GDS_block_context,
962 record->type,
963 NULL,
964 &frc->bd->key,
965 record->xquery,
966 record->xquery_size,
967 frc->bd->data,
968 frc->bd->data_size);
969 LOG (GNUNET_ERROR_TYPE_DEBUG,
970 "Evaluation result is %d for key %s for local client's query\n",
971 (int) eval,
972 GNUNET_h2s (&frc->bd->key));
973 switch (eval)
974 {
975 case GNUNET_BLOCK_REPLY_OK_LAST:
976 do_free = true;
977 break;
978 case GNUNET_BLOCK_REPLY_TYPE_NOT_SUPPORTED:
979 case GNUNET_BLOCK_REPLY_OK_MORE:
980 GNUNET_array_append (record->seen_replies,
981 record->seen_replies_count,
982 ch);
983 do_free = false;
984 break;
985 case GNUNET_BLOCK_REPLY_OK_DUPLICATE:
986 /* should be impossible to encounter here */
987 GNUNET_break (0);
988 return GNUNET_YES;
989 case GNUNET_BLOCK_REPLY_IRRELEVANT:
990 return GNUNET_YES;
991 default:
992 GNUNET_break (0);
993 return GNUNET_NO;
994 }
995 GNUNET_STATISTICS_update (GDS_stats,
996 "# RESULTS queued for clients",
997 1,
998 GNUNET_NO);
999 xsize += (frc->get_path_length + frc->bd->put_path_length)
1000 * sizeof(struct GNUNET_DHT_PathElement);
1001 if (truncated)
1002 xsize += sizeof (struct GNUNET_PeerIdentity);
1003 env = GNUNET_MQ_msg_extra (reply,
1004 xsize,
1005 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1006 reply->type = htonl (frc->bd->type);
1007 reply->options = htonl (bd->ro);
1008 reply->get_path_length = htonl (frc->get_path_length);
1009 reply->put_path_length = htonl (frc->bd->put_path_length);
1010 reply->unique_id = record->unique_id;
1011 reply->expiration = GNUNET_TIME_absolute_hton (frc->bd->expiration_time);
1012 reply->key = *query_hash;
1013 if (truncated)
1014 {
1015 void *tgt = &reply[1];
1016
1017 GNUNET_memcpy (tgt,
1018 &bd->trunc_peer,
1019 sizeof (struct GNUNET_PeerIdentity));
1020 paths = (struct GNUNET_DHT_PathElement *)
1021 (tgt + sizeof (struct GNUNET_PeerIdentity));
1022 }
1023 else
1024 {
1025 paths = (struct GNUNET_DHT_PathElement *) &reply[1];
1026 }
1027 GNUNET_memcpy (paths,
1028 frc->bd->put_path,
1029 sizeof(struct GNUNET_DHT_PathElement)
1030 * frc->bd->put_path_length);
1031 GNUNET_memcpy (&paths[frc->bd->put_path_length],
1032 frc->get_path,
1033 sizeof(struct GNUNET_DHT_PathElement)
1034 * frc->get_path_length);
1035 GNUNET_memcpy (&paths[frc->get_path_length + frc->bd->put_path_length],
1036 frc->bd->data,
1037 frc->bd->data_size);
1038 LOG (GNUNET_ERROR_TYPE_DEBUG,
1039 "Sending reply to query %s for client %p\n",
1040 GNUNET_h2s (query_hash),
1041 record->ch->client);
1042 GNUNET_MQ_send (record->ch->mq,
1043 env);
1044 if (GNUNET_YES == do_free)
1045 remove_client_query_record (record);
1046 return GNUNET_YES;
1047}
1048
1049
1050bool
1051GDS_CLIENTS_handle_reply (const struct GNUNET_DATACACHE_Block *bd,
1052 const struct GNUNET_HashCode *query_hash,
1053 unsigned int get_path_length,
1054 const struct GNUNET_DHT_PathElement *get_path)
1055{
1056 struct ForwardReplyContext frc;
1057 size_t msize = sizeof (struct GNUNET_DHT_ClientResultMessage)
1058 + bd->data_size
1059 + (get_path_length + bd->put_path_length)
1060 * sizeof(struct GNUNET_DHT_PathElement);
1061 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
1062
1063 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
1064 {
1065 GNUNET_break (0);
1066 return false;
1067 }
1068#if SANITY_CHECKS > 1
1069 if (0 !=
1070 GNUNET_DHT_verify_path (bd->data,
1071 bd->data_size,
1072 bd->expiration_time,
1073 truncated
1074 ? &bd->trunc_peer
1075 : NULL,
1076 bd->put_path,
1077 bd->put_path_length,
1078 get_path,
1079 get_path_length,
1080 &GDS_my_identity))
1081 {
1082 GNUNET_break (0);
1083 return false;
1084 }
1085#endif
1086 frc.bd = bd;
1087 frc.get_path = get_path;
1088 frc.get_path_length = get_path_length;
1089 LOG (GNUNET_ERROR_TYPE_DEBUG,
1090 "Forwarding reply for query hash %s with GPL %u and PPL %u to client\n",
1091 GNUNET_h2s (query_hash),
1092 get_path_length,
1093 bd->put_path_length);
1094 if (0 ==
1095 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1096 query_hash,
1097 &forward_reply,
1098 &frc))
1099 {
1100 LOG (GNUNET_ERROR_TYPE_DEBUG,
1101 "No matching client for reply for query %s\n",
1102 GNUNET_h2s (query_hash));
1103 GNUNET_STATISTICS_update (GDS_stats,
1104 "# REPLIES ignored for CLIENTS (no match)",
1105 1,
1106 GNUNET_NO);
1107 }
1108 return true;
1109}
1110
1111
1112/* **************** HELLO logic ***************** */
1113
1114/**
1115 * Handler for HELLO GET message. Reply to client
1116 * with a URL of our HELLO.
1117 *
1118 * @param cls the client we received this message from
1119 * @param msg the actual message received
1120 *
1121 */
1122static void
1123handle_dht_local_hello_get (void *cls,
1124 const struct GNUNET_MessageHeader *msg)
1125{
1126 struct ClientHandle *ch = cls;
1127 char *url = GNUNET_HELLO_builder_to_url (GDS_my_hello,
1128 &GDS_my_private_key);
1129 size_t slen = strlen (url) + 1;
1130 struct GNUNET_MessageHeader *hdr;
1131 struct GNUNET_MQ_Envelope *env;
1132
1133 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1134 "Handling request from local client for my HELLO\n");
1135 env = GNUNET_MQ_msg_extra (hdr,
1136 slen,
1137 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_URL);
1138 memcpy (&hdr[1],
1139 url,
1140 slen);
1141 GNUNET_free (url);
1142 GNUNET_MQ_send (ch->mq,
1143 env);
1144 GNUNET_SERVICE_client_continue (ch->client);
1145}
1146
1147
1148/**
1149 * Process a client HELLO message received from the service.
1150 *
1151 * @param cls the client we received this message from
1152 * @param hdr HELLO URL message from the service.
1153 * @return #GNUNET_OK if @a hdr is well-formed
1154 */
1155static enum GNUNET_GenericReturnValue
1156check_dht_local_hello_offer (void *cls,
1157 const struct GNUNET_MessageHeader *hdr)
1158{
1159 uint16_t len = ntohs (hdr->size);
1160 const char *buf = (const char *) &hdr[1];
1161
1162 (void) cls;
1163 if ('\0' != buf[len - sizeof (*hdr) - 1])
1164 {
1165 GNUNET_break (0);
1166 return GNUNET_SYSERR;
1167 }
1168 return GNUNET_OK;
1169}
1170
1171
1172/**
1173 * Handler for HELLO OFFER message. Try to use the
1174 * HELLO to connect to another peer.
1175 *
1176 * @param cls the client we received this message from
1177 * @param msg the actual message received
1178 */
1179static void
1180handle_dht_local_hello_offer (void *cls,
1181 const struct GNUNET_MessageHeader *msg)
1182{
1183 struct ClientHandle *ch = cls;
1184 const char *url = (const char *) &msg[1];
1185 struct GNUNET_HELLO_Builder *b;
1186 struct GNUNET_PeerIdentity pid;
1187
1188 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189 "Local client provided HELLO URL %s\n",
1190 url);
1191 b = GNUNET_HELLO_builder_from_url (url);
1192 if (NULL == b)
1193 {
1194 GNUNET_break (0);
1195 GNUNET_SERVICE_client_drop (ch->client);
1196 return;
1197 }
1198 GNUNET_SERVICE_client_continue (ch->client);
1199 GNUNET_HELLO_builder_iterate (b,
1200 &pid,
1201 &GDS_try_connect,
1202 &pid);
1203 GNUNET_HELLO_builder_free (b);
1204}
1205
1206
1207/* ************* logic for monitors ************** */
1208
1209
1210/**
1211 * Handler for monitor start messages
1212 *
1213 * @param cls the client we received this message from
1214 * @param msg the actual message received
1215 *
1216 */
1217static void
1218handle_dht_local_monitor (void *cls,
1219 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
1220{
1221 struct ClientHandle *ch = cls;
1222 struct ClientMonitorRecord *r;
1223
1224 r = GNUNET_new (struct ClientMonitorRecord);
1225 r->ch = ch;
1226 r->type = ntohl (msg->type);
1227 r->get = ntohs (msg->get);
1228 r->get_resp = ntohs (msg->get_resp);
1229 r->put = ntohs (msg->put);
1230 if (0 != ntohs (msg->filter_key))
1231 r->key = msg->key;
1232 GNUNET_CONTAINER_DLL_insert (monitor_head,
1233 monitor_tail,
1234 r);
1235 GNUNET_SERVICE_client_continue (ch->client);
1236}
1237
1238
1239/**
1240 * Handler for monitor stop messages
1241 *
1242 * @param cls the client we received this message from
1243 * @param msg the actual message received
1244 */
1245static void
1246handle_dht_local_monitor_stop (
1247 void *cls,
1248 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
1249{
1250 struct ClientHandle *ch = cls;
1251
1252 GNUNET_SERVICE_client_continue (ch->client);
1253 for (struct ClientMonitorRecord *r = monitor_head;
1254 NULL != r;
1255 r = r->next)
1256 {
1257 bool keys_match;
1258
1259 keys_match =
1260 (GNUNET_is_zero (&r->key))
1261 ? (0 == ntohs (msg->filter_key))
1262 : ( (0 != ntohs (msg->filter_key)) &&
1263 (! GNUNET_memcmp (&r->key,
1264 &msg->key)) );
1265 if ( (ch == r->ch) &&
1266 (ntohl (msg->type) == r->type) &&
1267 (r->get == msg->get) &&
1268 (r->get_resp == msg->get_resp) &&
1269 (r->put == msg->put) &&
1270 keys_match)
1271 {
1272 GNUNET_CONTAINER_DLL_remove (monitor_head,
1273 monitor_tail,
1274 r);
1275 GNUNET_free (r);
1276 return; /* Delete only ONE entry */
1277 }
1278 }
1279}
1280
1281
1282/**
1283 * Function to call by #for_matching_monitors().
1284 *
1285 * @param cls closure
1286 * @param m a matching monitor
1287 */
1288typedef void
1289(*MonitorAction)(void *cls,
1290 struct ClientMonitorRecord *m);
1291
1292
1293/**
1294 * Call @a cb on all monitors that watch for blocks of @a type
1295 * and key @a key.
1296 *
1297 * @param type the type to match
1298 * @param key the key to match
1299 * @param cb function to call
1300 * @param cb_cls closure for @a cb
1301 */
1302static void
1303for_matching_monitors (enum GNUNET_BLOCK_Type type,
1304 const struct GNUNET_HashCode *key,
1305 MonitorAction cb,
1306 void *cb_cls)
1307{
1308 struct ClientHandle **cl = NULL;
1309 unsigned int cl_size = 0;
1310
1311 for (struct ClientMonitorRecord *m = monitor_head;
1312 NULL != m;
1313 m = m->next)
1314 {
1315 bool found = false;
1316
1317 if ( (GNUNET_BLOCK_TYPE_ANY != m->type) &&
1318 (m->type != type) )
1319 continue;
1320 if ( (! GNUNET_is_zero (&m->key)) &&
1321 (0 ==
1322 GNUNET_memcmp (key,
1323 &m->key)) )
1324 continue;
1325 /* Don't send duplicates */
1326 for (unsigned i = 0; i < cl_size; i++)
1327 if (cl[i] == m->ch)
1328 {
1329 found = true;
1330 break;
1331 }
1332 if (found)
1333 continue;
1334 GNUNET_array_append (cl,
1335 cl_size,
1336 m->ch);
1337 cb (cb_cls,
1338 m);
1339 }
1340 GNUNET_free (cl);
1341}
1342
1343
1344/**
1345 * Closure for #get_action();
1346 */
1347struct GetActionContext
1348{
1349 enum GNUNET_DHT_RouteOption options;
1350 enum GNUNET_BLOCK_Type type;
1351 uint32_t hop_count;
1352 uint32_t desired_replication_level;
1353 struct GNUNET_PeerIdentity trunc_peer;
1354 const struct GNUNET_HashCode *key;
1355};
1356
1357
1358/**
1359 * Function called on monitors that match a GET.
1360 * Sends the GET notification to the monitor.
1361 *
1362 * @param cls a `struct GetActionContext`
1363 * @param m a matching monitor
1364 */
1365static void
1366get_action (void *cls,
1367 struct ClientMonitorRecord *m)
1368{
1369 struct GetActionContext *gac = cls;
1370 struct GNUNET_MQ_Envelope *env;
1371 struct GNUNET_DHT_MonitorGetMessage *mmsg;
1372
1373 env = GNUNET_MQ_msg (mmsg,
1374 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1375 mmsg->options = htonl (gac->options);
1376 mmsg->type = htonl (gac->type);
1377 mmsg->hop_count = htonl (gac->hop_count);
1378 mmsg->desired_replication_level = htonl (gac->desired_replication_level);
1379 mmsg->key = *gac->key;
1380 GNUNET_MQ_send (m->ch->mq,
1381 env);
1382}
1383
1384
1385/**
1386 * Check if some client is monitoring GET messages and notify
1387 * them in that case. If tracked, @a path should include the local peer.
1388 *
1389 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1390 * @param type The type of data in the request.
1391 * @param hop_count Hop count so far.
1392 * @param desired_replication_level Desired replication level.
1393 * @param key Key of the requested data.
1394 */
1395void
1396GDS_CLIENTS_process_get (enum GNUNET_DHT_RouteOption options,
1397 enum GNUNET_BLOCK_Type type,
1398 uint32_t hop_count,
1399 uint32_t desired_replication_level,
1400 const struct GNUNET_HashCode *key)
1401{
1402 struct GetActionContext gac = {
1403 .options = options,
1404 .type = type,
1405 .hop_count = hop_count,
1406 .desired_replication_level = desired_replication_level,
1407 .key = key
1408 };
1409
1410 for_matching_monitors (type,
1411 key,
1412 &get_action,
1413 &gac);
1414}
1415
1416
1417/**
1418 * Closure for response_action().
1419 */
1420struct ResponseActionContext
1421{
1422 const struct GNUNET_DATACACHE_Block *bd;
1423 const struct GNUNET_DHT_PathElement *get_path;
1424 unsigned int get_path_length;
1425};
1426
1427
1428/**
1429 * Function called on monitors that match a response.
1430 * Sends the response notification to the monitor.
1431 *
1432 * @param cls a `struct ResponseActionContext`
1433 * @param m a matching monitor
1434 */
1435static void
1436response_action (void *cls,
1437 struct ClientMonitorRecord *m)
1438{
1439 const struct ResponseActionContext *resp_ctx = cls;
1440 const struct GNUNET_DATACACHE_Block *bd = resp_ctx->bd;
1441 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
1442 struct GNUNET_MQ_Envelope *env;
1443 struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1444 struct GNUNET_DHT_PathElement *path;
1445 size_t msize;
1446
1447 msize = bd->data_size;
1448 msize += (resp_ctx->get_path_length + bd->put_path_length)
1449 * sizeof(struct GNUNET_DHT_PathElement);
1450 if (truncated)
1451 msize += sizeof (struct GNUNET_PeerIdentity);
1452 env = GNUNET_MQ_msg_extra (mmsg,
1453 msize,
1454 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1455 mmsg->type = htonl (bd->type);
1456 mmsg->put_path_length = htonl (bd->put_path_length);
1457 mmsg->get_path_length = htonl (resp_ctx->get_path_length);
1458 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1459 mmsg->key = bd->key;
1460 if (truncated)
1461 {
1462 void *tgt = &mmsg[1];
1463
1464 GNUNET_memcpy (tgt,
1465 &bd->trunc_peer,
1466 sizeof (struct GNUNET_PeerIdentity));
1467 path = (struct GNUNET_DHT_PathElement *)
1468 (tgt + sizeof (struct GNUNET_PeerIdentity));
1469 }
1470 else
1471 {
1472 path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1473 }
1474 GNUNET_memcpy (path,
1475 bd->put_path,
1476 bd->put_path_length * sizeof(struct GNUNET_DHT_PathElement));
1477 GNUNET_memcpy (path,
1478 resp_ctx->get_path,
1479 resp_ctx->get_path_length
1480 * sizeof(struct GNUNET_DHT_PathElement));
1481 GNUNET_memcpy (&path[resp_ctx->get_path_length],
1482 bd->data,
1483 bd->data_size);
1484 GNUNET_MQ_send (m->ch->mq,
1485 env);
1486}
1487
1488
1489void
1490GDS_CLIENTS_process_get_resp (const struct GNUNET_DATACACHE_Block *bd,
1491 const struct GNUNET_DHT_PathElement *get_path,
1492 unsigned int get_path_length)
1493{
1494 struct ResponseActionContext rac = {
1495 .bd = bd,
1496 .get_path = get_path,
1497 .get_path_length = get_path_length
1498 };
1499
1500 for_matching_monitors (bd->type,
1501 &bd->key,
1502 &response_action,
1503 &rac);
1504}
1505
1506
1507/**
1508 * Closure for put_action().
1509 */
1510struct PutActionContext
1511{
1512 const struct GNUNET_DATACACHE_Block *bd;
1513 uint32_t hop_count;
1514 uint32_t desired_replication_level;
1515};
1516
1517
1518/**
1519 * Function called on monitors that match a PUT.
1520 * Sends the PUT notification to the monitor.
1521 *
1522 * @param cls a `struct PutActionContext`
1523 * @param m a matching monitor
1524 */
1525static void
1526put_action (void *cls,
1527 struct ClientMonitorRecord *m)
1528{
1529 const struct PutActionContext *put_ctx = cls;
1530 const struct GNUNET_DATACACHE_Block *bd = put_ctx->bd;
1531 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
1532 struct GNUNET_MQ_Envelope *env;
1533 struct GNUNET_DHT_MonitorPutMessage *mmsg;
1534 struct GNUNET_DHT_PathElement *msg_path;
1535 size_t msize;
1536
1537 msize = bd->data_size
1538 + bd->put_path_length
1539 * sizeof(struct GNUNET_DHT_PathElement);
1540 if (truncated)
1541 msize += sizeof (struct GNUNET_PeerIdentity);
1542 env = GNUNET_MQ_msg_extra (mmsg,
1543 msize,
1544 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1545 mmsg->options = htonl (bd->ro);
1546 mmsg->type = htonl (bd->type);
1547 mmsg->hop_count = htonl (put_ctx->hop_count);
1548 mmsg->desired_replication_level = htonl (put_ctx->desired_replication_level);
1549 mmsg->put_path_length = htonl (bd->put_path_length);
1550 mmsg->key = bd->key;
1551 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1552 if (truncated)
1553 {
1554 void *tgt = &mmsg[1];
1555
1556 GNUNET_memcpy (tgt,
1557 &bd->trunc_peer,
1558 sizeof (struct GNUNET_PeerIdentity));
1559 msg_path = (struct GNUNET_DHT_PathElement *)
1560 (tgt + sizeof (struct GNUNET_PeerIdentity));
1561 }
1562 else
1563 {
1564 msg_path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1565 }
1566 GNUNET_memcpy (msg_path,
1567 bd->put_path,
1568 bd->put_path_length * sizeof(struct GNUNET_DHT_PathElement));
1569 GNUNET_memcpy (&msg_path[bd->put_path_length],
1570 bd->data,
1571 bd->data_size);
1572 GNUNET_MQ_send (m->ch->mq,
1573 env);
1574}
1575
1576
1577void
1578GDS_CLIENTS_process_put (const struct GNUNET_DATACACHE_Block *bd,
1579 uint32_t hop_count,
1580 uint32_t desired_replication_level)
1581{
1582 struct PutActionContext put_ctx = {
1583 .bd = bd,
1584 .hop_count = hop_count,
1585 .desired_replication_level = desired_replication_level
1586 };
1587
1588 for_matching_monitors (bd->type,
1589 &bd->key,
1590 &put_action,
1591 &put_ctx);
1592}
1593
1594
1595/* ********************** Initialization logic ***************** */
1596
1597
1598/**
1599 * Initialize client subsystem.
1600 *
1601 * @param server the initialized server
1602 */
1603static void
1604GDS_CLIENTS_init (void)
1605{
1606 forward_map
1607 = GNUNET_CONTAINER_multihashmap_create (1024,
1608 GNUNET_YES);
1609 retry_heap
1610 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1611}
1612
1613
1614/**
1615 * Shutdown client subsystem.
1616 */
1617static void
1618GDS_CLIENTS_stop (void)
1619{
1620 if (NULL != retry_task)
1621 {
1622 GNUNET_SCHEDULER_cancel (retry_task);
1623 retry_task = NULL;
1624 }
1625}
1626
1627
1628/**
1629 * Define "main" method using service macro.
1630 *
1631 * @param name name of the service, like "dht" or "xdht"
1632 * @param run name of the initializaton method for the service
1633 */
1634#define GDS_DHT_SERVICE_INIT(name, run) \
1635 GNUNET_SERVICE_MAIN \
1636 (name, \
1637 GNUNET_SERVICE_OPTION_NONE, \
1638 run, \
1639 &client_connect_cb, \
1640 &client_disconnect_cb, \
1641 NULL, \
1642 GNUNET_MQ_hd_var_size (dht_local_put, \
1643 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1644 struct GNUNET_DHT_ClientPutMessage, \
1645 NULL), \
1646 GNUNET_MQ_hd_var_size (dht_local_get, \
1647 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1648 struct GNUNET_DHT_ClientGetMessage, \
1649 NULL), \
1650 GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1651 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1652 struct GNUNET_DHT_ClientGetStopMessage, \
1653 NULL), \
1654 GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1655 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1656 struct GNUNET_DHT_MonitorStartStopMessage, \
1657 NULL), \
1658 GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1659 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1660 struct GNUNET_DHT_MonitorStartStopMessage, \
1661 NULL), \
1662 GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1663 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1664 struct GNUNET_DHT_ClientGetResultSeenMessage, \
1665 NULL), \
1666 GNUNET_MQ_hd_fixed_size (dht_local_hello_get, \
1667 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_GET, \
1668 struct GNUNET_MessageHeader, \
1669 NULL), \
1670 GNUNET_MQ_hd_var_size (dht_local_hello_offer, \
1671 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_URL, \
1672 struct GNUNET_MessageHeader, \
1673 NULL), \
1674 GNUNET_MQ_handler_end ())
1675
1676
1677/**
1678 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1679 */
1680void __attribute__ ((destructor))
1681GDS_CLIENTS_done ()
1682{
1683 if (NULL != retry_heap)
1684 {
1685 GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1686 GNUNET_CONTAINER_heap_destroy (retry_heap);
1687 retry_heap = NULL;
1688 }
1689 if (NULL != forward_map)
1690 {
1691 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1692 GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1693 forward_map = NULL;
1694 }
1695}
1696
1697
1698/* end of gnunet-service-dht_clients.c */