aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht_clients.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht/gnunet-service-dht_clients.c')
-rw-r--r--src/dht/gnunet-service-dht_clients.c1668
1 files changed, 0 insertions, 1668 deletions
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
deleted file mode 100644
index 6a4f58d1f..000000000
--- a/src/dht/gnunet-service-dht_clients.c
+++ /dev/null
@@ -1,1668 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011, 2016, 2017, 2022 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file dht/gnunet-service-dht_clients.c
23 * @brief GNUnet DHT service's client management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_constants.h"
29#include "gnunet_protocols.h"
30#include "gnunet_statistics_service.h"
31#include "gnunet-service-dht.h"
32#include "gnunet-service-dht_datacache.h"
33#include "gnunet-service-dht_neighbours.h"
34#include "dht.h"
35
36
37/**
38 * Enable slow sanity checks to debug issues.
39 * 0: do not check
40 * 1: check all external inputs
41 * 2: check internal computations as well
42 */
43#define SANITY_CHECKS 2
44
45/**
46 * Should routing details be logged to stderr (for debugging)?
47 */
48#define LOG_TRAFFIC(kind, ...) GNUNET_log_from (kind, "dht-traffic", \
49 __VA_ARGS__)
50
51#define LOG(kind, ...) GNUNET_log_from (kind, "dht-clients", __VA_ARGS__)
52
53
54/**
55 * Struct containing information about a client,
56 * handle to connect to it, and any pending messages
57 * that need to be sent to it.
58 */
59struct ClientHandle;
60
61
62/**
63 * Entry in the local forwarding map for a client's GET request.
64 */
65struct ClientQueryRecord
66{
67 /**
68 * The key this request was about
69 */
70 struct GNUNET_HashCode key;
71
72 /**
73 * Kept in a DLL with @e client.
74 */
75 struct ClientQueryRecord *next;
76
77 /**
78 * Kept in a DLL with @e client.
79 */
80 struct ClientQueryRecord *prev;
81
82 /**
83 * Client responsible for the request.
84 */
85 struct ClientHandle *ch;
86
87 /**
88 * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
89 */
90 const void *xquery;
91
92 /**
93 * Array of (hashes of) replies we have already seen for this request.
94 */
95 struct GNUNET_HashCode *seen_replies;
96
97 /**
98 * Pointer to this nodes heap location in the retry-heap (for fast removal)
99 */
100 struct GNUNET_CONTAINER_HeapNode *hnode;
101
102 /**
103 * What's the delay between re-try operations that we currently use for this
104 * request?
105 */
106 struct GNUNET_TIME_Relative retry_frequency;
107
108 /**
109 * What's the next time we should re-try this request?
110 */
111 struct GNUNET_TIME_Absolute retry_time;
112
113 /**
114 * The unique identifier of this request
115 */
116 uint64_t unique_id;
117
118 /**
119 * Number of bytes in xquery.
120 */
121 size_t xquery_size;
122
123 /**
124 * Number of entries in @e seen_replies.
125 */
126 unsigned int seen_replies_count;
127
128 /**
129 * Desired replication level
130 */
131 uint32_t replication;
132
133 /**
134 * Any message options for this request
135 */
136 enum GNUNET_DHT_RouteOption msg_options;
137
138 /**
139 * The type for the data for the GET request.
140 */
141 enum GNUNET_BLOCK_Type type;
142};
143
144
145/**
146 * Struct containing parameters of monitoring requests.
147 */
148struct ClientMonitorRecord
149{
150 /**
151 * Next element in DLL.
152 */
153 struct ClientMonitorRecord *next;
154
155 /**
156 * Previous element in DLL.
157 */
158 struct ClientMonitorRecord *prev;
159
160 /**
161 * Client to notify of these requests.
162 */
163 struct ClientHandle *ch;
164
165 /**
166 * Key of data of interest. All bits zero for 'all'.
167 */
168 struct GNUNET_HashCode key;
169
170 /**
171 * Type of blocks that are of interest
172 */
173 enum GNUNET_BLOCK_Type type;
174
175 /**
176 * Flag whether to notify about GET messages.
177 */
178 int16_t get;
179
180 /**
181 * Flag whether to notify about GET_REPONSE messages.
182 */
183 int16_t get_resp;
184
185 /**
186 * Flag whether to notify about PUT messages.
187 */
188 uint16_t put;
189
190};
191
192
193/**
194 * Struct containing information about a client,
195 * handle to connect to it, and any pending messages
196 * that need to be sent to it.
197 */
198struct ClientHandle
199{
200 /**
201 * Linked list of active queries of this client.
202 */
203 struct ClientQueryRecord *cqr_head;
204
205 /**
206 * Linked list of active queries of this client.
207 */
208 struct ClientQueryRecord *cqr_tail;
209
210 /**
211 * The handle to this client
212 */
213 struct GNUNET_SERVICE_Client *client;
214
215 /**
216 * The message queue to this client
217 */
218 struct GNUNET_MQ_Handle *mq;
219};
220
221
222/**
223 * Our handle to the BLOCK library.
224 */
225struct GNUNET_BLOCK_Context *GDS_block_context;
226
227/**
228 * Handle for the statistics service.
229 */
230struct GNUNET_STATISTICS_Handle *GDS_stats;
231
232/**
233 * Handle for the service.
234 */
235struct GNUNET_SERVICE_Handle *GDS_service;
236
237/**
238 * The configuration the DHT service is running with
239 */
240const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
241
242/**
243 * List of active monitoring requests.
244 */
245static struct ClientMonitorRecord *monitor_head;
246
247/**
248 * List of active monitoring requests.
249 */
250static struct ClientMonitorRecord *monitor_tail;
251
252/**
253 * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
254 */
255static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
256
257/**
258 * Heap with all of our client's request, sorted by retry time (earliest on top).
259 */
260static struct GNUNET_CONTAINER_Heap *retry_heap;
261
262/**
263 * Task that re-transmits requests (using retry_heap).
264 */
265static struct GNUNET_SCHEDULER_Task *retry_task;
266
267
268/**
269 * Free data structures associated with the given query.
270 *
271 * @param record record to remove
272 */
273static void
274remove_client_query_record (struct ClientQueryRecord *record)
275{
276 struct ClientHandle *ch = record->ch;
277
278 GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
279 ch->cqr_tail,
280 record);
281 GNUNET_assert (GNUNET_YES ==
282 GNUNET_CONTAINER_multihashmap_remove (forward_map,
283 &record->key,
284 record));
285 if (NULL != record->hnode)
286 GNUNET_CONTAINER_heap_remove_node (record->hnode);
287 GNUNET_array_grow (record->seen_replies,
288 record->seen_replies_count,
289 0);
290 GNUNET_free (record);
291}
292
293
294/**
295 * Functions with this signature are called whenever a local client is
296 * connects to us.
297 *
298 * @param cls closure (NULL for dht)
299 * @param client identification of the client
300 * @param mq message queue for talking to @a client
301 * @return our `struct ClientHandle` for @a client
302 */
303static void *
304client_connect_cb (void *cls,
305 struct GNUNET_SERVICE_Client *client,
306 struct GNUNET_MQ_Handle *mq)
307{
308 struct ClientHandle *ch;
309
310 (void) cls;
311 ch = GNUNET_new (struct ClientHandle);
312 ch->client = client;
313 ch->mq = mq;
314 return ch;
315}
316
317
318/**
319 * Functions with this signature are called whenever a client
320 * is disconnected on the network level.
321 *
322 * @param cls closure (NULL for dht)
323 * @param client identification of the client
324 * @param app_ctx our `struct ClientHandle` for @a client
325 */
326static void
327client_disconnect_cb (void *cls,
328 struct GNUNET_SERVICE_Client *client,
329 void *app_ctx)
330{
331 struct ClientHandle *ch = app_ctx;
332
333 (void) cls;
334 (void) client;
335 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
336 "Local client %p disconnects\n",
337 ch);
338 {
339 struct ClientMonitorRecord *next;
340
341 for (struct ClientMonitorRecord *monitor = monitor_head;
342 NULL != monitor;
343 monitor = next)
344 {
345 next = monitor->next;
346 if (monitor->ch != ch)
347 continue;
348 GNUNET_CONTAINER_DLL_remove (monitor_head,
349 monitor_tail,
350 monitor);
351 GNUNET_free (monitor);
352 }
353 }
354
355 {
356 struct ClientQueryRecord *cqr;
357
358 while (NULL != (cqr = ch->cqr_head))
359 remove_client_query_record (cqr);
360 }
361 GNUNET_free (ch);
362}
363
364
365/**
366 * Route the given request via the DHT. This includes updating
367 * the bloom filter and retransmission times, building the P2P
368 * message and initiating the routing operation.
369 *
370 * @param cqr request to transmit
371 */
372static void
373transmit_request (struct ClientQueryRecord *cqr)
374{
375 struct GNUNET_BLOCK_Group *bg;
376 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
377
378 GNUNET_STATISTICS_update (GDS_stats,
379 "# GET requests from clients injected",
380 1,
381 GNUNET_NO);
382 bg = GNUNET_BLOCK_group_create (GDS_block_context,
383 cqr->type,
384 GNUNET_CRYPTO_random_u32 (
385 GNUNET_CRYPTO_QUALITY_WEAK,
386 UINT32_MAX),
387 NULL, /* raw data */
388 0, /* raw data size */
389 "seen-set-size",
390 cqr->seen_replies_count,
391 NULL);
392 GNUNET_BLOCK_group_set_seen (bg,
393 cqr->seen_replies,
394 cqr->seen_replies_count);
395 peer_bf
396 = GNUNET_CONTAINER_bloomfilter_init (NULL,
397 DHT_BLOOM_SIZE,
398 GNUNET_CONSTANTS_BLOOMFILTER_K);
399 LOG (GNUNET_ERROR_TYPE_DEBUG,
400 "Initiating GET for %s, replication %u, already have %u replies\n",
401 GNUNET_h2s (&cqr->key),
402 cqr->replication,
403 cqr->seen_replies_count);
404 GDS_NEIGHBOURS_handle_get (cqr->type,
405 cqr->msg_options,
406 cqr->replication,
407 0 /* hop count */,
408 &cqr->key,
409 cqr->xquery,
410 cqr->xquery_size,
411 bg,
412 peer_bf);
413 GNUNET_BLOCK_group_destroy (bg);
414 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
415
416 /* Exponential back-off for retries.
417 * max. is #GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
418 cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
419 cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
420}
421
422
423/**
424 * Task that looks at the #retry_heap and transmits all of the requests
425 * on the heap that are ready for transmission. Then re-schedules
426 * itself (unless the heap is empty).
427 *
428 * @param cls unused
429 */
430static void
431transmit_next_request_task (void *cls)
432{
433 struct ClientQueryRecord *cqr;
434
435 (void) cls;
436 retry_task = NULL;
437 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
438 {
439 cqr->hnode = NULL;
440 if (! GNUNET_TIME_absolute_is_past (cqr->retry_time))
441 {
442 cqr->hnode
443 = GNUNET_CONTAINER_heap_insert (retry_heap,
444 cqr,
445 cqr->retry_time.abs_value_us);
446 retry_task
447 = GNUNET_SCHEDULER_add_at (cqr->retry_time,
448 &transmit_next_request_task,
449 NULL);
450 return;
451 }
452 transmit_request (cqr);
453 cqr->hnode
454 = GNUNET_CONTAINER_heap_insert (retry_heap,
455 cqr,
456 cqr->retry_time.abs_value_us);
457 }
458}
459
460
461/**
462 * Check DHT PUT messages from the client.
463 *
464 * @param cls the client we received this message from
465 * @param dht_msg the actual message received
466 * @return #GNUNET_OK (always)
467 */
468static enum GNUNET_GenericReturnValue
469check_dht_local_put (void *cls,
470 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
471{
472 uint32_t replication_level = ntohl (dht_msg->desired_replication_level);
473
474 (void) cls;
475 if (replication_level > GNUNET_DHT_MAXIMUM_REPLICATION_LEVEL)
476 {
477 GNUNET_break_op (0);
478 return GNUNET_SYSERR;
479 }
480 return GNUNET_OK;
481}
482
483
484/**
485 * Handler for PUT messages.
486 *
487 * @param cls the client we received this message from
488 * @param dht_msg the actual message received
489 */
490static void
491handle_dht_local_put (void *cls,
492 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
493{
494 struct ClientHandle *ch = cls;
495 uint16_t size = ntohs (dht_msg->header.size);
496 enum GNUNET_DHT_RouteOption options
497 = (enum GNUNET_DHT_RouteOption) ntohl (dht_msg->options);
498 uint32_t replication_level
499 = ntohl (dht_msg->desired_replication_level);
500 struct GDS_DATACACHE_BlockData bd = {
501 .key = dht_msg->key,
502 .expiration_time = GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
503 .data = &dht_msg[1],
504 .data_size = size - sizeof (*dht_msg),
505 .type = ntohl (dht_msg->type)
506 };
507
508 LOG (GNUNET_ERROR_TYPE_DEBUG,
509 "Handling local PUT of %lu-bytes for query %s of type %u\n",
510 (unsigned long) (size - sizeof(struct GNUNET_DHT_ClientPutMessage)),
511 GNUNET_h2s (&dht_msg->key),
512 (unsigned int) bd.type);
513 if (GNUNET_OK !=
514 GNUNET_BLOCK_check_block (GDS_block_context,
515 bd.type,
516 bd.data,
517 bd.data_size))
518 {
519 GNUNET_break (0);
520 return;
521 }
522 GNUNET_STATISTICS_update (GDS_stats,
523 "# PUT requests received from clients",
524 1,
525 GNUNET_NO);
526 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
527 "CLIENT-PUT %s\n",
528 GNUNET_h2s_full (&dht_msg->key));
529 /* give to local clients */
530 GNUNET_break (GDS_CLIENTS_handle_reply (&bd,
531 &bd.key,
532 0, NULL /* get path */));
533
534 {
535 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
536
537 peer_bf
538 = GNUNET_CONTAINER_bloomfilter_init (NULL,
539 DHT_BLOOM_SIZE,
540 GNUNET_CONSTANTS_BLOOMFILTER_K);
541 /* store locally */
542 if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
543 (GDS_am_closest_peer (&dht_msg->key,
544 peer_bf)))
545 GDS_DATACACHE_handle_put (&bd);
546 /* route to other peers */
547 if (GNUNET_OK !=
548 GDS_NEIGHBOURS_handle_put (&bd,
549 options,
550 replication_level,
551 0 /* hop count */,
552 peer_bf))
553 {
554 GNUNET_STATISTICS_update (GDS_stats,
555 "# Local PUT requests not routed",
556 1,
557 GNUNET_NO);
558 }
559 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
560 }
561 GDS_CLIENTS_process_put (
562 options,
563 &bd,
564 0, /* hop count */
565 replication_level);
566 GNUNET_SERVICE_client_continue (ch->client);
567}
568
569
570/**
571 * Handle a result from local datacache for a GET operation.
572 *
573 * @param cls the `struct ClientHandle` of the client doing the query
574 * @param bd details about the block that was found
575 */
576static void
577handle_local_result (void *cls,
578 const struct GDS_DATACACHE_BlockData *bd)
579{
580 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
581 "Datacache provided result for query key %s\n",
582 GNUNET_h2s (&bd->key));
583 GNUNET_break (GDS_CLIENTS_handle_reply (bd,
584 &bd->key,
585 0, NULL /* get_path */));
586}
587
588
589/**
590 * Check DHT GET messages from the client.
591 *
592 * @param cls the client we received this message from
593 * @param message the actual message received
594 * @return #GNUNET_OK (always)
595 */
596static enum GNUNET_GenericReturnValue
597check_dht_local_get (void *cls,
598 const struct GNUNET_DHT_ClientGetMessage *get)
599{
600 (void) cls;
601 (void) get;
602 /* always well-formed */
603 return GNUNET_OK;
604}
605
606
607/**
608 * Handler for DHT GET messages from the client.
609 *
610 * @param cls the client we received this message from
611 * @param message the actual message received
612 */
613static void
614handle_dht_local_get (void *cls,
615 const struct GNUNET_DHT_ClientGetMessage *get)
616{
617 struct ClientHandle *ch = cls;
618 struct ClientQueryRecord *cqr;
619 uint16_t size = ntohs (get->header.size);
620 const char *xquery = (const char *) &get[1];
621 size_t xquery_size = size - sizeof(struct GNUNET_DHT_ClientGetMessage);
622
623 LOG (GNUNET_ERROR_TYPE_DEBUG,
624 "Received GET request for %s from local client %p, xq: %.*s\n",
625 GNUNET_h2s (&get->key),
626 ch->client,
627 (int) xquery_size,
628 xquery);
629 GNUNET_STATISTICS_update (GDS_stats,
630 "# GET requests received from clients",
631 1,
632 GNUNET_NO);
633 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
634 "CLIENT-GET %s\n",
635 GNUNET_h2s_full (&get->key));
636
637 cqr = GNUNET_malloc (sizeof(struct ClientQueryRecord) + xquery_size);
638 cqr->key = get->key;
639 cqr->ch = ch;
640 cqr->xquery = (const void *) &cqr[1];
641 GNUNET_memcpy (&cqr[1],
642 xquery,
643 xquery_size);
644 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap,
645 cqr,
646 0);
647 cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
648 cqr->retry_time = GNUNET_TIME_absolute_get ();
649 cqr->unique_id = get->unique_id;
650 cqr->xquery_size = xquery_size;
651 cqr->replication = ntohl (get->desired_replication_level);
652 cqr->msg_options = (enum GNUNET_DHT_RouteOption) ntohl (get->options);
653 cqr->type = ntohl (get->type);
654 GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
655 ch->cqr_tail,
656 cqr);
657 GNUNET_CONTAINER_multihashmap_put (forward_map,
658 &cqr->key,
659 cqr,
660 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
661 GDS_CLIENTS_process_get (cqr->msg_options,
662 cqr->type,
663 0, /* hop count */
664 cqr->replication,
665 0, /* path length */
666 NULL,
667 &get->key);
668 /* start remote requests */
669 if (NULL != retry_task)
670 GNUNET_SCHEDULER_cancel (retry_task);
671 retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task,
672 NULL);
673 /* perform local lookup */
674 GDS_DATACACHE_handle_get (&get->key,
675 cqr->type,
676 cqr->xquery,
677 xquery_size,
678 NULL,
679 &handle_local_result,
680 ch);
681 GNUNET_SERVICE_client_continue (ch->client);
682}
683
684
685/**
686 * Closure for #find_by_unique_id().
687 */
688struct FindByUniqueIdContext
689{
690 /**
691 * Where to store the result, if found.
692 */
693 struct ClientQueryRecord *cqr;
694
695 /**
696 * Unique ID to look for.
697 */
698 uint64_t unique_id;
699};
700
701
702/**
703 * Function called for each existing DHT record for the given
704 * query. Checks if it matches the UID given in the closure
705 * and if so returns the entry as a result.
706 *
707 * @param cls the search context
708 * @param key query for the lookup (not used)
709 * @param value the `struct ClientQueryRecord`
710 * @return #GNUNET_YES to continue iteration (result not yet found)
711 */
712static enum GNUNET_GenericReturnValue
713find_by_unique_id (void *cls,
714 const struct GNUNET_HashCode *key,
715 void *value)
716{
717 struct FindByUniqueIdContext *fui_ctx = cls;
718 struct ClientQueryRecord *cqr = value;
719
720 if (cqr->unique_id != fui_ctx->unique_id)
721 return GNUNET_YES;
722 fui_ctx->cqr = cqr;
723 return GNUNET_NO;
724}
725
726
727/**
728 * Check "GET result seen" messages from the client.
729 *
730 * @param cls the client we received this message from
731 * @param message the actual message received
732 * @return #GNUNET_OK if @a seen is well-formed
733 */
734static enum GNUNET_GenericReturnValue
735check_dht_local_get_result_seen (
736 void *cls,
737 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
738{
739 uint16_t size = ntohs (seen->header.size);
740 unsigned int hash_count =
741 (size - sizeof(*seen))
742 / sizeof(struct GNUNET_HashCode);
743
744 if (size != sizeof(*seen) + hash_count * sizeof(struct GNUNET_HashCode))
745 {
746 GNUNET_break (0);
747 return GNUNET_SYSERR;
748 }
749 return GNUNET_OK;
750}
751
752
753/**
754 * Handler for "GET result seen" messages from the client.
755 *
756 * @param cls the client we received this message from
757 * @param message the actual message received
758 */
759static void
760handle_dht_local_get_result_seen (
761 void *cls,
762 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
763{
764 struct ClientHandle *ch = cls;
765 uint16_t size = ntohs (seen->header.size);
766 unsigned int hash_count = (size - sizeof(*seen))
767 / sizeof(struct GNUNET_HashCode);
768 const struct GNUNET_HashCode *hc = (const struct GNUNET_HashCode*) &seen[1];
769 struct FindByUniqueIdContext fui_ctx = {
770 .unique_id = seen->unique_id
771 };
772 unsigned int old_count;
773 struct ClientQueryRecord *cqr;
774
775 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
776 &seen->key,
777 &find_by_unique_id,
778 &fui_ctx);
779 if (NULL == (cqr = fui_ctx.cqr))
780 {
781 GNUNET_break (0);
782 GNUNET_SERVICE_client_drop (ch->client);
783 return;
784 }
785 /* finally, update 'seen' list */
786 old_count = cqr->seen_replies_count;
787 GNUNET_array_grow (cqr->seen_replies,
788 cqr->seen_replies_count,
789 cqr->seen_replies_count + hash_count);
790 GNUNET_memcpy (&cqr->seen_replies[old_count],
791 hc,
792 sizeof(struct GNUNET_HashCode) * hash_count);
793}
794
795
796/**
797 * Closure for #remove_by_unique_id().
798 */
799struct RemoveByUniqueIdContext
800{
801 /**
802 * Client that issued the removal request.
803 */
804 struct ClientHandle *ch;
805
806 /**
807 * Unique ID of the request.
808 */
809 uint64_t unique_id;
810};
811
812
813/**
814 * Iterator over hash map entries that frees all entries
815 * that match the given client and unique ID.
816 *
817 * @param cls unique ID and client to search for in source routes
818 * @param key current key code
819 * @param value value in the hash map, a ClientQueryRecord
820 * @return #GNUNET_YES (we should continue to iterate)
821 */
822static enum GNUNET_GenericReturnValue
823remove_by_unique_id (void *cls,
824 const struct GNUNET_HashCode *key,
825 void *value)
826{
827 const struct RemoveByUniqueIdContext *ctx = cls;
828 struct ClientQueryRecord *cqr = value;
829
830 if (cqr->unique_id != ctx->unique_id)
831 return GNUNET_YES;
832 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
833 "Removing client %p's record for key %s (by unique id)\n",
834 ctx->ch->client,
835 GNUNET_h2s (key));
836 remove_client_query_record (cqr);
837 return GNUNET_YES;
838}
839
840
841/**
842 * Handler for any generic DHT stop messages, calls the appropriate handler
843 * depending on message type (if processed locally)
844 *
845 * @param cls client we received this message from
846 * @param message the actual message received
847 *
848 */
849static void
850handle_dht_local_get_stop (
851 void *cls,
852 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
853{
854 struct ClientHandle *ch = cls;
855 struct RemoveByUniqueIdContext ctx;
856
857 GNUNET_STATISTICS_update (GDS_stats,
858 "# GET STOP requests received from clients",
859 1,
860 GNUNET_NO);
861 LOG (GNUNET_ERROR_TYPE_DEBUG,
862 "Received GET STOP request for %s from local client %p\n",
863 GNUNET_h2s (&dht_stop_msg->key),
864 ch->client);
865 ctx.ch = ch;
866 ctx.unique_id = dht_stop_msg->unique_id;
867 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
868 &dht_stop_msg->key,
869 &remove_by_unique_id,
870 &ctx);
871 GNUNET_SERVICE_client_continue (ch->client);
872}
873
874
875/**
876 * Closure for #forward_reply()
877 */
878struct ForwardReplyContext
879{
880 /**
881 * Block details.
882 */
883 const struct GDS_DATACACHE_BlockData *bd;
884
885 /**
886 * GET path taken.
887 */
888 const struct GNUNET_DHT_PathElement *get_path;
889
890 /**
891 * Number of entries in @e get_path.
892 */
893 unsigned int get_path_length;
894
895};
896
897
898/**
899 * Iterator over hash map entries that send a given reply to
900 * each of the matching clients. With some tricky recycling
901 * of the buffer.
902 *
903 * @param cls the `struct ForwardReplyContext`
904 * @param query_hash hash of the query for which this may be a reply
905 * @param value value in the hash map, a ClientQueryRecord
906 * @return #GNUNET_YES (we should continue to iterate),
907 * if the result is mal-formed, #GNUNET_NO
908 */
909static enum GNUNET_GenericReturnValue
910forward_reply (void *cls,
911 const struct GNUNET_HashCode *query_hash,
912 void *value)
913{
914 struct ForwardReplyContext *frc = cls;
915 struct ClientQueryRecord *record = value;
916 struct GNUNET_MQ_Envelope *env;
917 struct GNUNET_DHT_ClientResultMessage *reply;
918 enum GNUNET_BLOCK_ReplyEvaluationResult eval;
919 bool do_free;
920 struct GNUNET_HashCode ch;
921 struct GNUNET_DHT_PathElement *paths;
922
923 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
924 "CLIENT-RESULT %s\n",
925 GNUNET_h2s_full (&frc->bd->key));
926 if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
927 (record->type != frc->bd->type) )
928 {
929 LOG (GNUNET_ERROR_TYPE_DEBUG,
930 "Record type mismatch, not passing request for key %s to local client\n",
931 GNUNET_h2s (&frc->bd->key));
932 GNUNET_STATISTICS_update (GDS_stats,
933 "# Key match, type mismatches in REPLY to CLIENT",
934 1,
935 GNUNET_NO);
936 return GNUNET_YES; /* type mismatch */
937 }
938 if ( (0 == (record->msg_options & GNUNET_DHT_RO_FIND_APPROXIMATE)) &&
939 (0 != GNUNET_memcmp (&frc->bd->key,
940 query_hash)) )
941 {
942 GNUNET_STATISTICS_update (GDS_stats,
943 "# Inexact key match, but exact match required",
944 1,
945 GNUNET_NO);
946 return GNUNET_YES; /* type mismatch */
947 }
948 GNUNET_CRYPTO_hash (frc->bd->data,
949 frc->bd->data_size,
950 &ch);
951 for (unsigned int i = 0; i < record->seen_replies_count; i++)
952 if (0 ==
953 GNUNET_memcmp (&record->seen_replies[i],
954 &ch))
955 {
956 LOG (GNUNET_ERROR_TYPE_DEBUG,
957 "Duplicate reply, not passing request for key %s to local client\n",
958 GNUNET_h2s (&frc->bd->key));
959 GNUNET_STATISTICS_update (GDS_stats,
960 "# Duplicate REPLIES to CLIENT request dropped",
961 1,
962 GNUNET_NO);
963 return GNUNET_YES; /* duplicate */
964 }
965 eval
966 = GNUNET_BLOCK_check_reply (GDS_block_context,
967 record->type,
968 NULL,
969 &frc->bd->key,
970 record->xquery,
971 record->xquery_size,
972 frc->bd->data,
973 frc->bd->data_size);
974 LOG (GNUNET_ERROR_TYPE_DEBUG,
975 "Evaluation result is %d for key %s for local client's query\n",
976 (int) eval,
977 GNUNET_h2s (&frc->bd->key));
978 switch (eval)
979 {
980 case GNUNET_BLOCK_REPLY_OK_LAST:
981 do_free = true;
982 break;
983 case GNUNET_BLOCK_REPLY_TYPE_NOT_SUPPORTED:
984 case GNUNET_BLOCK_REPLY_OK_MORE:
985 GNUNET_array_append (record->seen_replies,
986 record->seen_replies_count,
987 ch);
988 do_free = false;
989 break;
990 case GNUNET_BLOCK_REPLY_OK_DUPLICATE:
991 /* should be impossible to encounter here */
992 GNUNET_break (0);
993 return GNUNET_YES;
994 case GNUNET_BLOCK_REPLY_IRRELEVANT:
995 return GNUNET_YES;
996 default:
997 GNUNET_break (0);
998 return GNUNET_NO;
999 }
1000 GNUNET_STATISTICS_update (GDS_stats,
1001 "# RESULTS queued for clients",
1002 1,
1003 GNUNET_NO);
1004 env = GNUNET_MQ_msg_extra (reply,
1005 frc->bd->data_size
1006 + (frc->get_path_length + frc->bd->put_path_length)
1007 * sizeof(struct GNUNET_DHT_PathElement),
1008 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1009 reply->type = htonl (frc->bd->type);
1010 reply->get_path_length = htonl (frc->get_path_length);
1011 reply->put_path_length = htonl (frc->bd->put_path_length);
1012 reply->unique_id = record->unique_id;
1013 reply->expiration = GNUNET_TIME_absolute_hton (frc->bd->expiration_time);
1014 reply->key = *query_hash;
1015 paths = (struct GNUNET_DHT_PathElement *) &reply[1];
1016 GNUNET_memcpy (paths,
1017 frc->bd->put_path,
1018 sizeof(struct GNUNET_DHT_PathElement)
1019 * frc->bd->put_path_length);
1020 GNUNET_memcpy (&paths[frc->bd->put_path_length],
1021 frc->get_path,
1022 sizeof(struct GNUNET_DHT_PathElement)
1023 * frc->get_path_length);
1024 GNUNET_memcpy (&paths[frc->get_path_length + frc->bd->put_path_length],
1025 frc->bd->data,
1026 frc->bd->data_size);
1027 LOG (GNUNET_ERROR_TYPE_DEBUG,
1028 "Sending reply to query %s for client %p\n",
1029 GNUNET_h2s (query_hash),
1030 record->ch->client);
1031 GNUNET_MQ_send (record->ch->mq,
1032 env);
1033 if (GNUNET_YES == do_free)
1034 remove_client_query_record (record);
1035 return GNUNET_YES;
1036}
1037
1038
1039bool
1040GDS_CLIENTS_handle_reply (const struct GDS_DATACACHE_BlockData *bd,
1041 const struct GNUNET_HashCode *query_hash,
1042 unsigned int get_path_length,
1043 const struct GNUNET_DHT_PathElement *get_path)
1044{
1045 struct ForwardReplyContext frc;
1046 size_t msize = sizeof (struct GNUNET_DHT_ClientResultMessage)
1047 + bd->data_size
1048 + (get_path_length + bd->put_path_length)
1049 * sizeof(struct GNUNET_DHT_PathElement);
1050
1051 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
1052 {
1053 GNUNET_break (0);
1054 return false;
1055 }
1056#if SANITY_CHECKS > 1
1057 if (0 !=
1058 GNUNET_DHT_verify_path (bd->data,
1059 bd->data_size,
1060 bd->expiration_time,
1061 bd->put_path,
1062 bd->put_path_length,
1063 get_path,
1064 get_path_length,
1065 &GDS_my_identity))
1066 {
1067 GNUNET_break (0);
1068 return false;
1069 }
1070#endif
1071 frc.bd = bd;
1072 frc.get_path = get_path;
1073 frc.get_path_length = get_path_length;
1074 LOG (GNUNET_ERROR_TYPE_DEBUG,
1075 "Forwarding reply for query hash %s with GPL %u and PPL %u to client\n",
1076 GNUNET_h2s (query_hash),
1077 get_path_length,
1078 bd->put_path_length);
1079 if (0 ==
1080 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1081 query_hash,
1082 &forward_reply,
1083 &frc))
1084 {
1085 LOG (GNUNET_ERROR_TYPE_DEBUG,
1086 "No matching client for reply for query %s\n",
1087 GNUNET_h2s (query_hash));
1088 GNUNET_STATISTICS_update (GDS_stats,
1089 "# REPLIES ignored for CLIENTS (no match)",
1090 1,
1091 GNUNET_NO);
1092 }
1093 return true;
1094}
1095
1096
1097/* **************** HELLO logic ***************** */
1098
1099/**
1100 * Handler for HELLO GET message. Reply to client
1101 * with a URL of our HELLO.
1102 *
1103 * @param cls the client we received this message from
1104 * @param msg the actual message received
1105 *
1106 */
1107static void
1108handle_dht_local_hello_get (void *cls,
1109 const struct GNUNET_MessageHeader *msg)
1110{
1111 struct ClientHandle *ch = cls;
1112 char *url = GNUNET_HELLO_builder_to_url (GDS_my_hello,
1113 &GDS_my_private_key);
1114 size_t slen = strlen (url) + 1;
1115 struct GNUNET_MessageHeader *hdr;
1116 struct GNUNET_MQ_Envelope *env;
1117
1118 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1119 "Handling request from local client for my HELLO\n");
1120 env = GNUNET_MQ_msg_extra (hdr,
1121 slen,
1122 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_URL);
1123 memcpy (&hdr[1],
1124 url,
1125 slen);
1126 GNUNET_free (url);
1127 GNUNET_MQ_send (ch->mq,
1128 env);
1129 GNUNET_SERVICE_client_continue (ch->client);
1130}
1131
1132
1133/**
1134 * Process a client HELLO message received from the service.
1135 *
1136 * @param cls the client we received this message from
1137 * @param hdr HELLO URL message from the service.
1138 * @return #GNUNET_OK if @a hdr is well-formed
1139 */
1140static enum GNUNET_GenericReturnValue
1141check_dht_local_hello_offer (void *cls,
1142 const struct GNUNET_MessageHeader *hdr)
1143{
1144 uint16_t len = ntohs (hdr->size);
1145 const char *buf = (const char *) &hdr[1];
1146
1147 (void) cls;
1148 if ('\0' != buf[len - sizeof (*hdr) - 1])
1149 {
1150 GNUNET_break (0);
1151 return GNUNET_SYSERR;
1152 }
1153 return GNUNET_OK;
1154}
1155
1156
1157/**
1158 * Handler for HELLO OFFER message. Try to use the
1159 * HELLO to connect to another peer.
1160 *
1161 * @param cls the client we received this message from
1162 * @param msg the actual message received
1163 */
1164static void
1165handle_dht_local_hello_offer (void *cls,
1166 const struct GNUNET_MessageHeader *msg)
1167{
1168 struct ClientHandle *ch = cls;
1169 const char *url = (const char *) &msg[1];
1170 struct GNUNET_HELLO_Builder *b;
1171 struct GNUNET_PeerIdentity pid;
1172
1173 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1174 "Local client provided HELLO URL %s\n",
1175 url);
1176 b = GNUNET_HELLO_builder_from_url (url);
1177 if (NULL == b)
1178 {
1179 GNUNET_break (0);
1180 GNUNET_SERVICE_client_drop (ch->client);
1181 return;
1182 }
1183 GNUNET_SERVICE_client_continue (ch->client);
1184 GNUNET_HELLO_builder_iterate (b,
1185 &pid,
1186 &GDS_try_connect,
1187 &pid);
1188 GNUNET_HELLO_builder_free (b);
1189}
1190
1191
1192/* ************* logic for monitors ************** */
1193
1194
1195/**
1196 * Handler for monitor start messages
1197 *
1198 * @param cls the client we received this message from
1199 * @param msg the actual message received
1200 *
1201 */
1202static void
1203handle_dht_local_monitor (void *cls,
1204 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
1205{
1206 struct ClientHandle *ch = cls;
1207 struct ClientMonitorRecord *r;
1208
1209 r = GNUNET_new (struct ClientMonitorRecord);
1210 r->ch = ch;
1211 r->type = ntohl (msg->type);
1212 r->get = ntohs (msg->get);
1213 r->get_resp = ntohs (msg->get_resp);
1214 r->put = ntohs (msg->put);
1215 if (0 != ntohs (msg->filter_key))
1216 r->key = msg->key;
1217 GNUNET_CONTAINER_DLL_insert (monitor_head,
1218 monitor_tail,
1219 r);
1220 GNUNET_SERVICE_client_continue (ch->client);
1221}
1222
1223
1224/**
1225 * Handler for monitor stop messages
1226 *
1227 * @param cls the client we received this message from
1228 * @param msg the actual message received
1229 */
1230static void
1231handle_dht_local_monitor_stop (
1232 void *cls,
1233 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
1234{
1235 struct ClientHandle *ch = cls;
1236
1237 GNUNET_SERVICE_client_continue (ch->client);
1238 for (struct ClientMonitorRecord *r = monitor_head;
1239 NULL != r;
1240 r = r->next)
1241 {
1242 bool keys_match;
1243
1244 keys_match =
1245 (GNUNET_is_zero (&r->key))
1246 ? (0 == ntohs (msg->filter_key))
1247 : ( (0 != ntohs (msg->filter_key)) &&
1248 (! GNUNET_memcmp (&r->key,
1249 &msg->key)) );
1250 if ( (ch == r->ch) &&
1251 (ntohl (msg->type) == r->type) &&
1252 (r->get == msg->get) &&
1253 (r->get_resp == msg->get_resp) &&
1254 (r->put == msg->put) &&
1255 keys_match)
1256 {
1257 GNUNET_CONTAINER_DLL_remove (monitor_head,
1258 monitor_tail,
1259 r);
1260 GNUNET_free (r);
1261 return; /* Delete only ONE entry */
1262 }
1263 }
1264}
1265
1266
1267/**
1268 * Function to call by #for_matching_monitors().
1269 *
1270 * @param cls closure
1271 * @param m a matching monitor
1272 */
1273typedef void
1274(*MonitorAction)(void *cls,
1275 struct ClientMonitorRecord *m);
1276
1277
1278/**
1279 * Call @a cb on all monitors that watch for blocks of @a type
1280 * and key @a key.
1281 *
1282 * @param type the type to match
1283 * @param key the key to match
1284 * @param cb function to call
1285 * @param cb_cls closure for @a cb
1286 */
1287static void
1288for_matching_monitors (enum GNUNET_BLOCK_Type type,
1289 const struct GNUNET_HashCode *key,
1290 MonitorAction cb,
1291 void *cb_cls)
1292{
1293 struct ClientHandle **cl = NULL;
1294 unsigned int cl_size = 0;
1295
1296 for (struct ClientMonitorRecord *m = monitor_head;
1297 NULL != m;
1298 m = m->next)
1299 {
1300 if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1301 (m->type == type) ) &&
1302 ( (GNUNET_is_zero (&m->key)) ||
1303 (0 ==
1304 GNUNET_memcmp (key,
1305 &m->key)) ) )
1306 {
1307 unsigned int i;
1308
1309 /* Don't send duplicates */
1310 for (i = 0; i < cl_size; i++)
1311 if (cl[i] == m->ch)
1312 break;
1313 if (i < cl_size)
1314 continue;
1315 GNUNET_array_append (cl,
1316 cl_size,
1317 m->ch);
1318 cb (cb_cls,
1319 m);
1320 }
1321 }
1322 GNUNET_free (cl);
1323}
1324
1325
1326/**
1327 * Closure for #get_action();
1328 */
1329struct GetActionContext
1330{
1331 enum GNUNET_DHT_RouteOption options;
1332 enum GNUNET_BLOCK_Type type;
1333 uint32_t hop_count;
1334 uint32_t desired_replication_level;
1335 unsigned int get_path_length;
1336 const struct GNUNET_DHT_PathElement *get_path;
1337 const struct GNUNET_HashCode *key;
1338};
1339
1340
1341/**
1342 * Function called on monitors that match a GET.
1343 * Sends the GET notification to the monitor.
1344 *
1345 * @param cls a `struct GetActionContext`
1346 * @param m a matching monitor
1347 */
1348static void
1349get_action (void *cls,
1350 struct ClientMonitorRecord *m)
1351{
1352 struct GetActionContext *gac = cls;
1353 struct GNUNET_MQ_Envelope *env;
1354 struct GNUNET_DHT_MonitorGetMessage *mmsg;
1355 struct GNUNET_DHT_PathElement *msg_path;
1356 size_t msize;
1357
1358 msize = gac->get_path_length * sizeof(struct GNUNET_DHT_PathElement);
1359 env = GNUNET_MQ_msg_extra (mmsg,
1360 msize,
1361 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1362 mmsg->options = htonl (gac->options);
1363 mmsg->type = htonl (gac->type);
1364 mmsg->hop_count = htonl (gac->hop_count);
1365 mmsg->desired_replication_level = htonl (gac->desired_replication_level);
1366 mmsg->get_path_length = htonl (gac->get_path_length);
1367 mmsg->key = *gac->key;
1368 msg_path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1369 GNUNET_memcpy (msg_path,
1370 gac->get_path,
1371 gac->get_path_length * sizeof(struct GNUNET_DHT_PathElement));
1372 GNUNET_MQ_send (m->ch->mq,
1373 env);
1374}
1375
1376
1377/**
1378 * Check if some client is monitoring GET messages and notify
1379 * them in that case. If tracked, @a path should include the local peer.
1380 *
1381 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1382 * @param type The type of data in the request.
1383 * @param hop_count Hop count so far.
1384 * @param path_length number of entries in path (or 0 if not recorded).
1385 * @param path peers on the GET path (or NULL if not recorded).
1386 * @param desired_replication_level Desired replication level.
1387 * @param key Key of the requested data.
1388 */
1389void
1390GDS_CLIENTS_process_get (enum GNUNET_DHT_RouteOption options,
1391 enum GNUNET_BLOCK_Type type,
1392 uint32_t hop_count,
1393 uint32_t desired_replication_level,
1394 unsigned int path_length,
1395 const struct GNUNET_DHT_PathElement *path,
1396 const struct GNUNET_HashCode *key)
1397{
1398 struct GetActionContext gac = {
1399 .options = options,
1400 .type = type,
1401 .hop_count = hop_count,
1402 .desired_replication_level = desired_replication_level,
1403 .get_path_length = path_length,
1404 .get_path = path,
1405 .key = key
1406 };
1407
1408 for_matching_monitors (type,
1409 key,
1410 &get_action,
1411 &gac);
1412}
1413
1414
1415/**
1416 * Closure for response_action().
1417 */
1418struct ResponseActionContext
1419{
1420 const struct GDS_DATACACHE_BlockData *bd;
1421 const struct GNUNET_DHT_PathElement *get_path;
1422 unsigned int get_path_length;
1423};
1424
1425
1426/**
1427 * Function called on monitors that match a response.
1428 * Sends the response notification to the monitor.
1429 *
1430 * @param cls a `struct ResponseActionContext`
1431 * @param m a matching monitor
1432 */
1433static void
1434response_action (void *cls,
1435 struct ClientMonitorRecord *m)
1436{
1437 const struct ResponseActionContext *resp_ctx = cls;
1438 const struct GDS_DATACACHE_BlockData *bd = resp_ctx->bd;
1439
1440 struct GNUNET_MQ_Envelope *env;
1441 struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1442 struct GNUNET_DHT_PathElement *path;
1443 size_t msize;
1444
1445 msize = bd->data_size;
1446 msize += (resp_ctx->get_path_length + bd->put_path_length)
1447 * sizeof(struct GNUNET_DHT_PathElement);
1448 env = GNUNET_MQ_msg_extra (mmsg,
1449 msize,
1450 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1451 mmsg->type = htonl (bd->type);
1452 mmsg->put_path_length = htonl (bd->put_path_length);
1453 mmsg->get_path_length = htonl (resp_ctx->get_path_length);
1454 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1455 mmsg->key = bd->key;
1456 path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1457 GNUNET_memcpy (path,
1458 bd->put_path,
1459 bd->put_path_length * sizeof(struct GNUNET_DHT_PathElement));
1460 GNUNET_memcpy (path,
1461 resp_ctx->get_path,
1462 resp_ctx->get_path_length
1463 * sizeof(struct GNUNET_DHT_PathElement));
1464 GNUNET_memcpy (&path[resp_ctx->get_path_length],
1465 bd->data,
1466 bd->data_size);
1467 GNUNET_MQ_send (m->ch->mq,
1468 env);
1469}
1470
1471
1472void
1473GDS_CLIENTS_process_get_resp (const struct GDS_DATACACHE_BlockData *bd,
1474 const struct GNUNET_DHT_PathElement *get_path,
1475 unsigned int get_path_length)
1476{
1477 struct ResponseActionContext rac = {
1478 .bd = bd,
1479 .get_path = get_path,
1480 .get_path_length = get_path_length
1481 };
1482
1483 for_matching_monitors (bd->type,
1484 &bd->key,
1485 &response_action,
1486 &rac);
1487}
1488
1489
1490/**
1491 * Closure for put_action().
1492 */
1493struct PutActionContext
1494{
1495 const struct GDS_DATACACHE_BlockData *bd;
1496 enum GNUNET_DHT_RouteOption options;
1497 uint32_t hop_count;
1498 uint32_t desired_replication_level;
1499};
1500
1501
1502/**
1503 * Function called on monitors that match a PUT.
1504 * Sends the PUT notification to the monitor.
1505 *
1506 * @param cls a `struct PutActionContext`
1507 * @param m a matching monitor
1508 */
1509static void
1510put_action (void *cls,
1511 struct ClientMonitorRecord *m)
1512{
1513 const struct PutActionContext *put_ctx = cls;
1514 const struct GDS_DATACACHE_BlockData *bd = put_ctx->bd;
1515 struct GNUNET_MQ_Envelope *env;
1516 struct GNUNET_DHT_MonitorPutMessage *mmsg;
1517 struct GNUNET_DHT_PathElement *msg_path;
1518 size_t msize;
1519
1520 msize = bd->data_size
1521 + bd->put_path_length
1522 * sizeof(struct GNUNET_DHT_PathElement);
1523 env = GNUNET_MQ_msg_extra (mmsg,
1524 msize,
1525 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1526 mmsg->options = htonl (put_ctx->options);
1527 mmsg->type = htonl (bd->type);
1528 mmsg->hop_count = htonl (put_ctx->hop_count);
1529 mmsg->desired_replication_level = htonl (put_ctx->desired_replication_level);
1530 mmsg->put_path_length = htonl (bd->put_path_length);
1531 mmsg->key = bd->key;
1532 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1533 msg_path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1534 GNUNET_memcpy (msg_path,
1535 bd->put_path,
1536 bd->put_path_length * sizeof(struct GNUNET_DHT_PathElement));
1537 GNUNET_memcpy (&msg_path[bd->put_path_length],
1538 bd->data,
1539 bd->data_size);
1540 GNUNET_MQ_send (m->ch->mq,
1541 env);
1542}
1543
1544
1545void
1546GDS_CLIENTS_process_put (enum GNUNET_DHT_RouteOption options,
1547 const struct GDS_DATACACHE_BlockData *bd,
1548 uint32_t hop_count,
1549 uint32_t desired_replication_level)
1550{
1551 struct PutActionContext put_ctx = {
1552 .bd = bd,
1553 .hop_count = hop_count,
1554 .desired_replication_level = desired_replication_level,
1555 .options = options
1556 };
1557
1558 for_matching_monitors (bd->type,
1559 &bd->key,
1560 &put_action,
1561 &put_ctx);
1562}
1563
1564
1565/* ********************** Initialization logic ***************** */
1566
1567
1568/**
1569 * Initialize client subsystem.
1570 *
1571 * @param server the initialized server
1572 */
1573static void
1574GDS_CLIENTS_init (void)
1575{
1576 forward_map
1577 = GNUNET_CONTAINER_multihashmap_create (1024,
1578 GNUNET_YES);
1579 retry_heap
1580 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1581}
1582
1583
1584/**
1585 * Shutdown client subsystem.
1586 */
1587static void
1588GDS_CLIENTS_stop (void)
1589{
1590 if (NULL != retry_task)
1591 {
1592 GNUNET_SCHEDULER_cancel (retry_task);
1593 retry_task = NULL;
1594 }
1595}
1596
1597
1598/**
1599 * Define "main" method using service macro.
1600 *
1601 * @param name name of the service, like "dht" or "xdht"
1602 * @param run name of the initializaton method for the service
1603 */
1604#define GDS_DHT_SERVICE_INIT(name, run) \
1605 GNUNET_SERVICE_MAIN \
1606 (name, \
1607 GNUNET_SERVICE_OPTION_NONE, \
1608 run, \
1609 &client_connect_cb, \
1610 &client_disconnect_cb, \
1611 NULL, \
1612 GNUNET_MQ_hd_var_size (dht_local_put, \
1613 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1614 struct GNUNET_DHT_ClientPutMessage, \
1615 NULL), \
1616 GNUNET_MQ_hd_var_size (dht_local_get, \
1617 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1618 struct GNUNET_DHT_ClientGetMessage, \
1619 NULL), \
1620 GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1621 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1622 struct GNUNET_DHT_ClientGetStopMessage, \
1623 NULL), \
1624 GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1625 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1626 struct GNUNET_DHT_MonitorStartStopMessage, \
1627 NULL), \
1628 GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1629 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1630 struct GNUNET_DHT_MonitorStartStopMessage, \
1631 NULL), \
1632 GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1633 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1634 struct GNUNET_DHT_ClientGetResultSeenMessage, \
1635 NULL), \
1636 GNUNET_MQ_hd_fixed_size (dht_local_hello_get, \
1637 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_GET, \
1638 struct GNUNET_MessageHeader, \
1639 NULL), \
1640 GNUNET_MQ_hd_var_size (dht_local_hello_offer, \
1641 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_URL, \
1642 struct GNUNET_MessageHeader, \
1643 NULL), \
1644 GNUNET_MQ_handler_end ())
1645
1646
1647/**
1648 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1649 */
1650void __attribute__ ((destructor))
1651GDS_CLIENTS_done ()
1652{
1653 if (NULL != retry_heap)
1654 {
1655 GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1656 GNUNET_CONTAINER_heap_destroy (retry_heap);
1657 retry_heap = NULL;
1658 }
1659 if (NULL != forward_map)
1660 {
1661 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1662 GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1663 forward_map = NULL;
1664 }
1665}
1666
1667
1668/* end of gnunet-service-dht_clients.c */