aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht_clients.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht/gnunet-service-dht_clients.c')
-rw-r--r--src/dht/gnunet-service-dht_clients.c1665
1 files changed, 0 insertions, 1665 deletions
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
deleted file mode 100644
index fd65102b3..000000000
--- a/src/dht/gnunet-service-dht_clients.c
+++ /dev/null
@@ -1,1665 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011, 2016, 2017, 2022 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file dht/gnunet-service-dht_clients.c
23 * @brief GNUnet DHT service's client management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_constants.h"
29#include "gnunet_protocols.h"
30#include "gnunet_statistics_service.h"
31#include "gnunet-service-dht.h"
32#include "gnunet-service-dht_datacache.h"
33#include "gnunet-service-dht_neighbours.h"
34#include "dht.h"
35
36
37/**
38 * Enable slow sanity checks to debug issues.
39 * 0: do not check
40 * 1: check all external inputs
41 * 2: check internal computations as well
42 */
43#define SANITY_CHECKS 2
44
45/**
46 * Should routing details be logged to stderr (for debugging)?
47 */
48#define LOG_TRAFFIC(kind, ...) GNUNET_log_from (kind, "dht-traffic", \
49 __VA_ARGS__)
50
51#define LOG(kind, ...) GNUNET_log_from (kind, "dht-clients", __VA_ARGS__)
52
53
54/**
55 * Struct containing information about a client,
56 * handle to connect to it, and any pending messages
57 * that need to be sent to it.
58 */
59struct ClientHandle;
60
61
62/**
63 * Entry in the local forwarding map for a client's GET request.
64 */
65struct ClientQueryRecord
66{
67 /**
68 * The key this request was about
69 */
70 struct GNUNET_HashCode key;
71
72 /**
73 * Kept in a DLL with @e client.
74 */
75 struct ClientQueryRecord *next;
76
77 /**
78 * Kept in a DLL with @e client.
79 */
80 struct ClientQueryRecord *prev;
81
82 /**
83 * Client responsible for the request.
84 */
85 struct ClientHandle *ch;
86
87 /**
88 * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
89 */
90 const void *xquery;
91
92 /**
93 * Array of (hashes of) replies we have already seen for this request.
94 */
95 struct GNUNET_HashCode *seen_replies;
96
97 /**
98 * Pointer to this nodes heap location in the retry-heap (for fast removal)
99 */
100 struct GNUNET_CONTAINER_HeapNode *hnode;
101
102 /**
103 * What's the delay between re-try operations that we currently use for this
104 * request?
105 */
106 struct GNUNET_TIME_Relative retry_frequency;
107
108 /**
109 * What's the next time we should re-try this request?
110 */
111 struct GNUNET_TIME_Absolute retry_time;
112
113 /**
114 * The unique identifier of this request
115 */
116 uint64_t unique_id;
117
118 /**
119 * Number of bytes in xquery.
120 */
121 size_t xquery_size;
122
123 /**
124 * Number of entries in @e seen_replies.
125 */
126 unsigned int seen_replies_count;
127
128 /**
129 * Desired replication level
130 */
131 uint32_t replication;
132
133 /**
134 * Any message options for this request
135 */
136 enum GNUNET_DHT_RouteOption msg_options;
137
138 /**
139 * The type for the data for the GET request.
140 */
141 enum GNUNET_BLOCK_Type type;
142};
143
144
145/**
146 * Struct containing parameters of monitoring requests.
147 */
148struct ClientMonitorRecord
149{
150 /**
151 * Next element in DLL.
152 */
153 struct ClientMonitorRecord *next;
154
155 /**
156 * Previous element in DLL.
157 */
158 struct ClientMonitorRecord *prev;
159
160 /**
161 * Client to notify of these requests.
162 */
163 struct ClientHandle *ch;
164
165 /**
166 * Key of data of interest. All bits zero for 'all'.
167 */
168 struct GNUNET_HashCode key;
169
170 /**
171 * Type of blocks that are of interest
172 */
173 enum GNUNET_BLOCK_Type type;
174
175 /**
176 * Flag whether to notify about GET messages.
177 */
178 int16_t get;
179
180 /**
181 * Flag whether to notify about GET_REPONSE messages.
182 */
183 int16_t get_resp;
184
185 /**
186 * Flag whether to notify about PUT messages.
187 */
188 uint16_t put;
189
190};
191
192
193/**
194 * Struct containing information about a client,
195 * handle to connect to it, and any pending messages
196 * that need to be sent to it.
197 */
198struct ClientHandle
199{
200 /**
201 * Linked list of active queries of this client.
202 */
203 struct ClientQueryRecord *cqr_head;
204
205 /**
206 * Linked list of active queries of this client.
207 */
208 struct ClientQueryRecord *cqr_tail;
209
210 /**
211 * The handle to this client
212 */
213 struct GNUNET_SERVICE_Client *client;
214
215 /**
216 * The message queue to this client
217 */
218 struct GNUNET_MQ_Handle *mq;
219};
220
221
222/**
223 * Our handle to the BLOCK library.
224 */
225struct GNUNET_BLOCK_Context *GDS_block_context;
226
227/**
228 * Handle for the statistics service.
229 */
230struct GNUNET_STATISTICS_Handle *GDS_stats;
231
232/**
233 * Handle for the service.
234 */
235struct GNUNET_SERVICE_Handle *GDS_service;
236
237/**
238 * The configuration the DHT service is running with
239 */
240const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
241
242/**
243 * List of active monitoring requests.
244 */
245static struct ClientMonitorRecord *monitor_head;
246
247/**
248 * List of active monitoring requests.
249 */
250static struct ClientMonitorRecord *monitor_tail;
251
252/**
253 * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
254 */
255static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
256
257/**
258 * Heap with all of our client's request, sorted by retry time (earliest on top).
259 */
260static struct GNUNET_CONTAINER_Heap *retry_heap;
261
262/**
263 * Task that re-transmits requests (using retry_heap).
264 */
265static struct GNUNET_SCHEDULER_Task *retry_task;
266
267
268/**
269 * Free data structures associated with the given query.
270 *
271 * @param record record to remove
272 */
273static void
274remove_client_query_record (struct ClientQueryRecord *record)
275{
276 struct ClientHandle *ch = record->ch;
277
278 GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
279 ch->cqr_tail,
280 record);
281 GNUNET_assert (GNUNET_YES ==
282 GNUNET_CONTAINER_multihashmap_remove (forward_map,
283 &record->key,
284 record));
285 if (NULL != record->hnode)
286 GNUNET_CONTAINER_heap_remove_node (record->hnode);
287 GNUNET_array_grow (record->seen_replies,
288 record->seen_replies_count,
289 0);
290 GNUNET_free (record);
291}
292
293
294/**
295 * Functions with this signature are called whenever a local client is
296 * connects to us.
297 *
298 * @param cls closure (NULL for dht)
299 * @param client identification of the client
300 * @param mq message queue for talking to @a client
301 * @return our `struct ClientHandle` for @a client
302 */
303static void *
304client_connect_cb (void *cls,
305 struct GNUNET_SERVICE_Client *client,
306 struct GNUNET_MQ_Handle *mq)
307{
308 struct ClientHandle *ch;
309
310 (void) cls;
311 ch = GNUNET_new (struct ClientHandle);
312 ch->client = client;
313 ch->mq = mq;
314 return ch;
315}
316
317
318/**
319 * Functions with this signature are called whenever a client
320 * is disconnected on the network level.
321 *
322 * @param cls closure (NULL for dht)
323 * @param client identification of the client
324 * @param app_ctx our `struct ClientHandle` for @a client
325 */
326static void
327client_disconnect_cb (void *cls,
328 struct GNUNET_SERVICE_Client *client,
329 void *app_ctx)
330{
331 struct ClientHandle *ch = app_ctx;
332
333 (void) cls;
334 (void) client;
335 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
336 "Local client %p disconnects\n",
337 ch);
338 {
339 struct ClientMonitorRecord *next;
340
341 for (struct ClientMonitorRecord *monitor = monitor_head;
342 NULL != monitor;
343 monitor = next)
344 {
345 next = monitor->next;
346 if (monitor->ch != ch)
347 continue;
348 GNUNET_CONTAINER_DLL_remove (monitor_head,
349 monitor_tail,
350 monitor);
351 GNUNET_free (monitor);
352 }
353 }
354
355 {
356 struct ClientQueryRecord *cqr;
357
358 while (NULL != (cqr = ch->cqr_head))
359 remove_client_query_record (cqr);
360 }
361 GNUNET_free (ch);
362}
363
364
365/**
366 * Route the given request via the DHT. This includes updating
367 * the bloom filter and retransmission times, building the P2P
368 * message and initiating the routing operation.
369 *
370 * @param cqr request to transmit
371 */
372static void
373transmit_request (struct ClientQueryRecord *cqr)
374{
375 struct GNUNET_BLOCK_Group *bg;
376 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
377
378 GNUNET_STATISTICS_update (GDS_stats,
379 "# GET requests from clients injected",
380 1,
381 GNUNET_NO);
382 bg = GNUNET_BLOCK_group_create (GDS_block_context,
383 cqr->type,
384 NULL, /* raw data */
385 0, /* raw data size */
386 "seen-set-size",
387 cqr->seen_replies_count,
388 NULL);
389 GNUNET_BLOCK_group_set_seen (bg,
390 cqr->seen_replies,
391 cqr->seen_replies_count);
392 peer_bf
393 = GNUNET_CONTAINER_bloomfilter_init (NULL,
394 DHT_BLOOM_SIZE,
395 GNUNET_CONSTANTS_BLOOMFILTER_K);
396 LOG (GNUNET_ERROR_TYPE_DEBUG,
397 "Initiating GET for %s, replication %u, already have %u replies\n",
398 GNUNET_h2s (&cqr->key),
399 cqr->replication,
400 cqr->seen_replies_count);
401 GDS_NEIGHBOURS_handle_get (cqr->type,
402 cqr->msg_options,
403 cqr->replication,
404 0 /* hop count */,
405 &cqr->key,
406 cqr->xquery,
407 cqr->xquery_size,
408 bg,
409 peer_bf);
410 GNUNET_BLOCK_group_destroy (bg);
411 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
412
413 /* Exponential back-off for retries.
414 * max. is #GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
415 cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
416 cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
417}
418
419
420/**
421 * Task that looks at the #retry_heap and transmits all of the requests
422 * on the heap that are ready for transmission. Then re-schedules
423 * itself (unless the heap is empty).
424 *
425 * @param cls unused
426 */
427static void
428transmit_next_request_task (void *cls)
429{
430 struct ClientQueryRecord *cqr;
431
432 (void) cls;
433 retry_task = NULL;
434 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
435 {
436 cqr->hnode = NULL;
437 if (! GNUNET_TIME_absolute_is_past (cqr->retry_time))
438 {
439 cqr->hnode
440 = GNUNET_CONTAINER_heap_insert (retry_heap,
441 cqr,
442 cqr->retry_time.abs_value_us);
443 retry_task
444 = GNUNET_SCHEDULER_add_at (cqr->retry_time,
445 &transmit_next_request_task,
446 NULL);
447 return;
448 }
449 transmit_request (cqr);
450 cqr->hnode
451 = GNUNET_CONTAINER_heap_insert (retry_heap,
452 cqr,
453 cqr->retry_time.abs_value_us);
454 }
455}
456
457
458/**
459 * Check DHT PUT messages from the client.
460 *
461 * @param cls the client we received this message from
462 * @param dht_msg the actual message received
463 * @return #GNUNET_OK (always)
464 */
465static enum GNUNET_GenericReturnValue
466check_dht_local_put (void *cls,
467 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
468{
469 uint32_t replication_level = ntohl (dht_msg->desired_replication_level);
470
471 (void) cls;
472 if (replication_level > GNUNET_DHT_MAXIMUM_REPLICATION_LEVEL)
473 {
474 GNUNET_break_op (0);
475 return GNUNET_SYSERR;
476 }
477 return GNUNET_OK;
478}
479
480
481/**
482 * Handler for PUT messages.
483 *
484 * @param cls the client we received this message from
485 * @param dht_msg the actual message received
486 */
487static void
488handle_dht_local_put (void *cls,
489 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
490{
491 struct ClientHandle *ch = cls;
492 uint16_t size = ntohs (dht_msg->header.size);
493 enum GNUNET_DHT_RouteOption options
494 = (enum GNUNET_DHT_RouteOption) ntohl (dht_msg->options);
495 uint32_t replication_level
496 = ntohl (dht_msg->desired_replication_level);
497 struct GDS_DATACACHE_BlockData bd = {
498 .key = dht_msg->key,
499 .expiration_time = GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
500 .data = &dht_msg[1],
501 .data_size = size - sizeof (*dht_msg),
502 .type = ntohl (dht_msg->type)
503 };
504
505 LOG (GNUNET_ERROR_TYPE_DEBUG,
506 "Handling local PUT of %lu-bytes for query %s of type %u\n",
507 (unsigned long) (size - sizeof(struct GNUNET_DHT_ClientPutMessage)),
508 GNUNET_h2s (&dht_msg->key),
509 (unsigned int) bd.type);
510 if (GNUNET_OK !=
511 GNUNET_BLOCK_check_block (GDS_block_context,
512 bd.type,
513 bd.data,
514 bd.data_size))
515 {
516 GNUNET_break (0);
517 return;
518 }
519 GNUNET_STATISTICS_update (GDS_stats,
520 "# PUT requests received from clients",
521 1,
522 GNUNET_NO);
523 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
524 "CLIENT-PUT %s\n",
525 GNUNET_h2s_full (&dht_msg->key));
526 /* give to local clients */
527 GNUNET_break (GDS_CLIENTS_handle_reply (&bd,
528 &bd.key,
529 0, NULL /* get path */));
530
531 {
532 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
533
534 peer_bf
535 = GNUNET_CONTAINER_bloomfilter_init (NULL,
536 DHT_BLOOM_SIZE,
537 GNUNET_CONSTANTS_BLOOMFILTER_K);
538 /* store locally */
539 if ( (0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
540 (GDS_am_closest_peer (&dht_msg->key,
541 peer_bf)))
542 GDS_DATACACHE_handle_put (&bd);
543 /* route to other peers */
544 if (GNUNET_OK !=
545 GDS_NEIGHBOURS_handle_put (&bd,
546 options,
547 replication_level,
548 0 /* hop count */,
549 peer_bf))
550 {
551 GNUNET_STATISTICS_update (GDS_stats,
552 "# Local PUT requests not routed",
553 1,
554 GNUNET_NO);
555 }
556 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
557 }
558 GDS_CLIENTS_process_put (
559 options,
560 &bd,
561 0, /* hop count */
562 replication_level);
563 GNUNET_SERVICE_client_continue (ch->client);
564}
565
566
567/**
568 * Handle a result from local datacache for a GET operation.
569 *
570 * @param cls the `struct ClientHandle` of the client doing the query
571 * @param bd details about the block that was found
572 */
573static void
574handle_local_result (void *cls,
575 const struct GDS_DATACACHE_BlockData *bd)
576{
577 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
578 "Datacache provided result for query key %s\n",
579 GNUNET_h2s (&bd->key));
580 GNUNET_break (GDS_CLIENTS_handle_reply (bd,
581 &bd->key,
582 0, NULL /* get_path */));
583}
584
585
586/**
587 * Check DHT GET messages from the client.
588 *
589 * @param cls the client we received this message from
590 * @param message the actual message received
591 * @return #GNUNET_OK (always)
592 */
593static enum GNUNET_GenericReturnValue
594check_dht_local_get (void *cls,
595 const struct GNUNET_DHT_ClientGetMessage *get)
596{
597 (void) cls;
598 (void) get;
599 /* always well-formed */
600 return GNUNET_OK;
601}
602
603
604/**
605 * Handler for DHT GET messages from the client.
606 *
607 * @param cls the client we received this message from
608 * @param message the actual message received
609 */
610static void
611handle_dht_local_get (void *cls,
612 const struct GNUNET_DHT_ClientGetMessage *get)
613{
614 struct ClientHandle *ch = cls;
615 struct ClientQueryRecord *cqr;
616 uint16_t size = ntohs (get->header.size);
617 const char *xquery = (const char *) &get[1];
618 size_t xquery_size = size - sizeof(struct GNUNET_DHT_ClientGetMessage);
619
620 LOG (GNUNET_ERROR_TYPE_DEBUG,
621 "Received GET request for %s from local client %p, xq: %.*s\n",
622 GNUNET_h2s (&get->key),
623 ch->client,
624 (int) xquery_size,
625 xquery);
626 GNUNET_STATISTICS_update (GDS_stats,
627 "# GET requests received from clients",
628 1,
629 GNUNET_NO);
630 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
631 "CLIENT-GET %s\n",
632 GNUNET_h2s_full (&get->key));
633
634 cqr = GNUNET_malloc (sizeof(struct ClientQueryRecord) + xquery_size);
635 cqr->key = get->key;
636 cqr->ch = ch;
637 cqr->xquery = (const void *) &cqr[1];
638 GNUNET_memcpy (&cqr[1],
639 xquery,
640 xquery_size);
641 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap,
642 cqr,
643 0);
644 cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
645 cqr->retry_time = GNUNET_TIME_absolute_get ();
646 cqr->unique_id = get->unique_id;
647 cqr->xquery_size = xquery_size;
648 cqr->replication = ntohl (get->desired_replication_level);
649 cqr->msg_options = (enum GNUNET_DHT_RouteOption) ntohl (get->options);
650 cqr->type = ntohl (get->type);
651 GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
652 ch->cqr_tail,
653 cqr);
654 GNUNET_CONTAINER_multihashmap_put (forward_map,
655 &cqr->key,
656 cqr,
657 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
658 GDS_CLIENTS_process_get (cqr->msg_options,
659 cqr->type,
660 0, /* hop count */
661 cqr->replication,
662 0, /* path length */
663 NULL,
664 &get->key);
665 /* start remote requests */
666 if (NULL != retry_task)
667 GNUNET_SCHEDULER_cancel (retry_task);
668 retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task,
669 NULL);
670 /* perform local lookup */
671 GDS_DATACACHE_handle_get (&get->key,
672 cqr->type,
673 cqr->xquery,
674 xquery_size,
675 NULL,
676 &handle_local_result,
677 ch);
678 GNUNET_SERVICE_client_continue (ch->client);
679}
680
681
682/**
683 * Closure for #find_by_unique_id().
684 */
685struct FindByUniqueIdContext
686{
687 /**
688 * Where to store the result, if found.
689 */
690 struct ClientQueryRecord *cqr;
691
692 /**
693 * Unique ID to look for.
694 */
695 uint64_t unique_id;
696};
697
698
699/**
700 * Function called for each existing DHT record for the given
701 * query. Checks if it matches the UID given in the closure
702 * and if so returns the entry as a result.
703 *
704 * @param cls the search context
705 * @param key query for the lookup (not used)
706 * @param value the `struct ClientQueryRecord`
707 * @return #GNUNET_YES to continue iteration (result not yet found)
708 */
709static enum GNUNET_GenericReturnValue
710find_by_unique_id (void *cls,
711 const struct GNUNET_HashCode *key,
712 void *value)
713{
714 struct FindByUniqueIdContext *fui_ctx = cls;
715 struct ClientQueryRecord *cqr = value;
716
717 if (cqr->unique_id != fui_ctx->unique_id)
718 return GNUNET_YES;
719 fui_ctx->cqr = cqr;
720 return GNUNET_NO;
721}
722
723
724/**
725 * Check "GET result seen" messages from the client.
726 *
727 * @param cls the client we received this message from
728 * @param message the actual message received
729 * @return #GNUNET_OK if @a seen is well-formed
730 */
731static enum GNUNET_GenericReturnValue
732check_dht_local_get_result_seen (
733 void *cls,
734 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
735{
736 uint16_t size = ntohs (seen->header.size);
737 unsigned int hash_count =
738 (size - sizeof(*seen))
739 / sizeof(struct GNUNET_HashCode);
740
741 if (size != sizeof(*seen) + hash_count * sizeof(struct GNUNET_HashCode))
742 {
743 GNUNET_break (0);
744 return GNUNET_SYSERR;
745 }
746 return GNUNET_OK;
747}
748
749
750/**
751 * Handler for "GET result seen" messages from the client.
752 *
753 * @param cls the client we received this message from
754 * @param message the actual message received
755 */
756static void
757handle_dht_local_get_result_seen (
758 void *cls,
759 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
760{
761 struct ClientHandle *ch = cls;
762 uint16_t size = ntohs (seen->header.size);
763 unsigned int hash_count = (size - sizeof(*seen))
764 / sizeof(struct GNUNET_HashCode);
765 const struct GNUNET_HashCode *hc = (const struct GNUNET_HashCode*) &seen[1];
766 struct FindByUniqueIdContext fui_ctx = {
767 .unique_id = seen->unique_id
768 };
769 unsigned int old_count;
770 struct ClientQueryRecord *cqr;
771
772 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
773 &seen->key,
774 &find_by_unique_id,
775 &fui_ctx);
776 if (NULL == (cqr = fui_ctx.cqr))
777 {
778 GNUNET_break (0);
779 GNUNET_SERVICE_client_drop (ch->client);
780 return;
781 }
782 /* finally, update 'seen' list */
783 old_count = cqr->seen_replies_count;
784 GNUNET_array_grow (cqr->seen_replies,
785 cqr->seen_replies_count,
786 cqr->seen_replies_count + hash_count);
787 GNUNET_memcpy (&cqr->seen_replies[old_count],
788 hc,
789 sizeof(struct GNUNET_HashCode) * hash_count);
790}
791
792
793/**
794 * Closure for #remove_by_unique_id().
795 */
796struct RemoveByUniqueIdContext
797{
798 /**
799 * Client that issued the removal request.
800 */
801 struct ClientHandle *ch;
802
803 /**
804 * Unique ID of the request.
805 */
806 uint64_t unique_id;
807};
808
809
810/**
811 * Iterator over hash map entries that frees all entries
812 * that match the given client and unique ID.
813 *
814 * @param cls unique ID and client to search for in source routes
815 * @param key current key code
816 * @param value value in the hash map, a ClientQueryRecord
817 * @return #GNUNET_YES (we should continue to iterate)
818 */
819static enum GNUNET_GenericReturnValue
820remove_by_unique_id (void *cls,
821 const struct GNUNET_HashCode *key,
822 void *value)
823{
824 const struct RemoveByUniqueIdContext *ctx = cls;
825 struct ClientQueryRecord *cqr = value;
826
827 if (cqr->unique_id != ctx->unique_id)
828 return GNUNET_YES;
829 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
830 "Removing client %p's record for key %s (by unique id)\n",
831 ctx->ch->client,
832 GNUNET_h2s (key));
833 remove_client_query_record (cqr);
834 return GNUNET_YES;
835}
836
837
838/**
839 * Handler for any generic DHT stop messages, calls the appropriate handler
840 * depending on message type (if processed locally)
841 *
842 * @param cls client we received this message from
843 * @param message the actual message received
844 *
845 */
846static void
847handle_dht_local_get_stop (
848 void *cls,
849 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
850{
851 struct ClientHandle *ch = cls;
852 struct RemoveByUniqueIdContext ctx;
853
854 GNUNET_STATISTICS_update (GDS_stats,
855 "# GET STOP requests received from clients",
856 1,
857 GNUNET_NO);
858 LOG (GNUNET_ERROR_TYPE_DEBUG,
859 "Received GET STOP request for %s from local client %p\n",
860 GNUNET_h2s (&dht_stop_msg->key),
861 ch->client);
862 ctx.ch = ch;
863 ctx.unique_id = dht_stop_msg->unique_id;
864 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
865 &dht_stop_msg->key,
866 &remove_by_unique_id,
867 &ctx);
868 GNUNET_SERVICE_client_continue (ch->client);
869}
870
871
872/**
873 * Closure for #forward_reply()
874 */
875struct ForwardReplyContext
876{
877 /**
878 * Block details.
879 */
880 const struct GDS_DATACACHE_BlockData *bd;
881
882 /**
883 * GET path taken.
884 */
885 const struct GNUNET_DHT_PathElement *get_path;
886
887 /**
888 * Number of entries in @e get_path.
889 */
890 unsigned int get_path_length;
891
892};
893
894
895/**
896 * Iterator over hash map entries that send a given reply to
897 * each of the matching clients. With some tricky recycling
898 * of the buffer.
899 *
900 * @param cls the `struct ForwardReplyContext`
901 * @param query_hash hash of the query for which this may be a reply
902 * @param value value in the hash map, a ClientQueryRecord
903 * @return #GNUNET_YES (we should continue to iterate),
904 * if the result is mal-formed, #GNUNET_NO
905 */
906static enum GNUNET_GenericReturnValue
907forward_reply (void *cls,
908 const struct GNUNET_HashCode *query_hash,
909 void *value)
910{
911 struct ForwardReplyContext *frc = cls;
912 struct ClientQueryRecord *record = value;
913 struct GNUNET_MQ_Envelope *env;
914 struct GNUNET_DHT_ClientResultMessage *reply;
915 enum GNUNET_BLOCK_ReplyEvaluationResult eval;
916 bool do_free;
917 struct GNUNET_HashCode ch;
918 struct GNUNET_DHT_PathElement *paths;
919
920 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
921 "CLIENT-RESULT %s\n",
922 GNUNET_h2s_full (&frc->bd->key));
923 if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
924 (record->type != frc->bd->type) )
925 {
926 LOG (GNUNET_ERROR_TYPE_DEBUG,
927 "Record type mismatch, not passing request for key %s to local client\n",
928 GNUNET_h2s (&frc->bd->key));
929 GNUNET_STATISTICS_update (GDS_stats,
930 "# Key match, type mismatches in REPLY to CLIENT",
931 1,
932 GNUNET_NO);
933 return GNUNET_YES; /* type mismatch */
934 }
935 if ( (0 == (record->msg_options & GNUNET_DHT_RO_FIND_APPROXIMATE)) &&
936 (0 != GNUNET_memcmp (&frc->bd->key,
937 query_hash)) )
938 {
939 GNUNET_STATISTICS_update (GDS_stats,
940 "# Inexact key match, but exact match required",
941 1,
942 GNUNET_NO);
943 return GNUNET_YES; /* type mismatch */
944 }
945 GNUNET_CRYPTO_hash (frc->bd->data,
946 frc->bd->data_size,
947 &ch);
948 for (unsigned int i = 0; i < record->seen_replies_count; i++)
949 if (0 ==
950 GNUNET_memcmp (&record->seen_replies[i],
951 &ch))
952 {
953 LOG (GNUNET_ERROR_TYPE_DEBUG,
954 "Duplicate reply, not passing request for key %s to local client\n",
955 GNUNET_h2s (&frc->bd->key));
956 GNUNET_STATISTICS_update (GDS_stats,
957 "# Duplicate REPLIES to CLIENT request dropped",
958 1,
959 GNUNET_NO);
960 return GNUNET_YES; /* duplicate */
961 }
962 eval
963 = GNUNET_BLOCK_check_reply (GDS_block_context,
964 record->type,
965 NULL,
966 &frc->bd->key,
967 record->xquery,
968 record->xquery_size,
969 frc->bd->data,
970 frc->bd->data_size);
971 LOG (GNUNET_ERROR_TYPE_DEBUG,
972 "Evaluation result is %d for key %s for local client's query\n",
973 (int) eval,
974 GNUNET_h2s (&frc->bd->key));
975 switch (eval)
976 {
977 case GNUNET_BLOCK_REPLY_OK_LAST:
978 do_free = true;
979 break;
980 case GNUNET_BLOCK_REPLY_TYPE_NOT_SUPPORTED:
981 case GNUNET_BLOCK_REPLY_OK_MORE:
982 GNUNET_array_append (record->seen_replies,
983 record->seen_replies_count,
984 ch);
985 do_free = false;
986 break;
987 case GNUNET_BLOCK_REPLY_OK_DUPLICATE:
988 /* should be impossible to encounter here */
989 GNUNET_break (0);
990 return GNUNET_YES;
991 case GNUNET_BLOCK_REPLY_IRRELEVANT:
992 return GNUNET_YES;
993 default:
994 GNUNET_break (0);
995 return GNUNET_NO;
996 }
997 GNUNET_STATISTICS_update (GDS_stats,
998 "# RESULTS queued for clients",
999 1,
1000 GNUNET_NO);
1001 env = GNUNET_MQ_msg_extra (reply,
1002 frc->bd->data_size
1003 + (frc->get_path_length + frc->bd->put_path_length)
1004 * sizeof(struct GNUNET_DHT_PathElement),
1005 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1006 reply->type = htonl (frc->bd->type);
1007 reply->get_path_length = htonl (frc->get_path_length);
1008 reply->put_path_length = htonl (frc->bd->put_path_length);
1009 reply->unique_id = record->unique_id;
1010 reply->expiration = GNUNET_TIME_absolute_hton (frc->bd->expiration_time);
1011 reply->key = *query_hash;
1012 paths = (struct GNUNET_DHT_PathElement *) &reply[1];
1013 GNUNET_memcpy (paths,
1014 frc->bd->put_path,
1015 sizeof(struct GNUNET_DHT_PathElement)
1016 * frc->bd->put_path_length);
1017 GNUNET_memcpy (&paths[frc->bd->put_path_length],
1018 frc->get_path,
1019 sizeof(struct GNUNET_DHT_PathElement)
1020 * frc->get_path_length);
1021 GNUNET_memcpy (&paths[frc->get_path_length + frc->bd->put_path_length],
1022 frc->bd->data,
1023 frc->bd->data_size);
1024 LOG (GNUNET_ERROR_TYPE_DEBUG,
1025 "Sending reply to query %s for client %p\n",
1026 GNUNET_h2s (query_hash),
1027 record->ch->client);
1028 GNUNET_MQ_send (record->ch->mq,
1029 env);
1030 if (GNUNET_YES == do_free)
1031 remove_client_query_record (record);
1032 return GNUNET_YES;
1033}
1034
1035
1036bool
1037GDS_CLIENTS_handle_reply (const struct GDS_DATACACHE_BlockData *bd,
1038 const struct GNUNET_HashCode *query_hash,
1039 unsigned int get_path_length,
1040 const struct GNUNET_DHT_PathElement *get_path)
1041{
1042 struct ForwardReplyContext frc;
1043 size_t msize = sizeof (struct GNUNET_DHT_ClientResultMessage)
1044 + bd->data_size
1045 + (get_path_length + bd->put_path_length)
1046 * sizeof(struct GNUNET_DHT_PathElement);
1047
1048 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
1049 {
1050 GNUNET_break (0);
1051 return false;
1052 }
1053#if SANITY_CHECKS > 1
1054 if (0 !=
1055 GNUNET_DHT_verify_path (bd->data,
1056 bd->data_size,
1057 bd->expiration_time,
1058 bd->put_path,
1059 bd->put_path_length,
1060 get_path,
1061 get_path_length,
1062 &GDS_my_identity))
1063 {
1064 GNUNET_break (0);
1065 return false;
1066 }
1067#endif
1068 frc.bd = bd;
1069 frc.get_path = get_path;
1070 frc.get_path_length = get_path_length;
1071 LOG (GNUNET_ERROR_TYPE_DEBUG,
1072 "Forwarding reply for query hash %s with GPL %u and PPL %u to client\n",
1073 GNUNET_h2s (query_hash),
1074 get_path_length,
1075 bd->put_path_length);
1076 if (0 ==
1077 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1078 query_hash,
1079 &forward_reply,
1080 &frc))
1081 {
1082 LOG (GNUNET_ERROR_TYPE_DEBUG,
1083 "No matching client for reply for query %s\n",
1084 GNUNET_h2s (query_hash));
1085 GNUNET_STATISTICS_update (GDS_stats,
1086 "# REPLIES ignored for CLIENTS (no match)",
1087 1,
1088 GNUNET_NO);
1089 }
1090 return true;
1091}
1092
1093
1094/* **************** HELLO logic ***************** */
1095
1096/**
1097 * Handler for HELLO GET message. Reply to client
1098 * with a URL of our HELLO.
1099 *
1100 * @param cls the client we received this message from
1101 * @param msg the actual message received
1102 *
1103 */
1104static void
1105handle_dht_local_hello_get (void *cls,
1106 const struct GNUNET_MessageHeader *msg)
1107{
1108 struct ClientHandle *ch = cls;
1109 char *url = GNUNET_HELLO_builder_to_url (GDS_my_hello,
1110 &GDS_my_private_key);
1111 size_t slen = strlen (url) + 1;
1112 struct GNUNET_MessageHeader *hdr;
1113 struct GNUNET_MQ_Envelope *env;
1114
1115 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116 "Handling request from local client for my HELLO\n");
1117 env = GNUNET_MQ_msg_extra (hdr,
1118 slen,
1119 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_URL);
1120 memcpy (&hdr[1],
1121 url,
1122 slen);
1123 GNUNET_free (url);
1124 GNUNET_MQ_send (ch->mq,
1125 env);
1126 GNUNET_SERVICE_client_continue (ch->client);
1127}
1128
1129
1130/**
1131 * Process a client HELLO message received from the service.
1132 *
1133 * @param cls the client we received this message from
1134 * @param hdr HELLO URL message from the service.
1135 * @return #GNUNET_OK if @a hdr is well-formed
1136 */
1137static enum GNUNET_GenericReturnValue
1138check_dht_local_hello_offer (void *cls,
1139 const struct GNUNET_MessageHeader *hdr)
1140{
1141 uint16_t len = ntohs (hdr->size);
1142 const char *buf = (const char *) &hdr[1];
1143
1144 (void) cls;
1145 if ('\0' != buf[len - sizeof (*hdr) - 1])
1146 {
1147 GNUNET_break (0);
1148 return GNUNET_SYSERR;
1149 }
1150 return GNUNET_OK;
1151}
1152
1153
1154/**
1155 * Handler for HELLO OFFER message. Try to use the
1156 * HELLO to connect to another peer.
1157 *
1158 * @param cls the client we received this message from
1159 * @param msg the actual message received
1160 */
1161static void
1162handle_dht_local_hello_offer (void *cls,
1163 const struct GNUNET_MessageHeader *msg)
1164{
1165 struct ClientHandle *ch = cls;
1166 const char *url = (const char *) &msg[1];
1167 struct GNUNET_HELLO_Builder *b;
1168 struct GNUNET_PeerIdentity pid;
1169
1170 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1171 "Local client provided HELLO URL %s\n",
1172 url);
1173 b = GNUNET_HELLO_builder_from_url (url);
1174 if (NULL == b)
1175 {
1176 GNUNET_break (0);
1177 GNUNET_SERVICE_client_drop (ch->client);
1178 return;
1179 }
1180 GNUNET_SERVICE_client_continue (ch->client);
1181 GNUNET_HELLO_builder_iterate (b,
1182 &pid,
1183 &GDS_try_connect,
1184 &pid);
1185 GNUNET_HELLO_builder_free (b);
1186}
1187
1188
1189/* ************* logic for monitors ************** */
1190
1191
1192/**
1193 * Handler for monitor start messages
1194 *
1195 * @param cls the client we received this message from
1196 * @param msg the actual message received
1197 *
1198 */
1199static void
1200handle_dht_local_monitor (void *cls,
1201 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
1202{
1203 struct ClientHandle *ch = cls;
1204 struct ClientMonitorRecord *r;
1205
1206 r = GNUNET_new (struct ClientMonitorRecord);
1207 r->ch = ch;
1208 r->type = ntohl (msg->type);
1209 r->get = ntohs (msg->get);
1210 r->get_resp = ntohs (msg->get_resp);
1211 r->put = ntohs (msg->put);
1212 if (0 != ntohs (msg->filter_key))
1213 r->key = msg->key;
1214 GNUNET_CONTAINER_DLL_insert (monitor_head,
1215 monitor_tail,
1216 r);
1217 GNUNET_SERVICE_client_continue (ch->client);
1218}
1219
1220
1221/**
1222 * Handler for monitor stop messages
1223 *
1224 * @param cls the client we received this message from
1225 * @param msg the actual message received
1226 */
1227static void
1228handle_dht_local_monitor_stop (
1229 void *cls,
1230 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
1231{
1232 struct ClientHandle *ch = cls;
1233
1234 GNUNET_SERVICE_client_continue (ch->client);
1235 for (struct ClientMonitorRecord *r = monitor_head;
1236 NULL != r;
1237 r = r->next)
1238 {
1239 bool keys_match;
1240
1241 keys_match =
1242 (GNUNET_is_zero (&r->key))
1243 ? (0 == ntohs (msg->filter_key))
1244 : ( (0 != ntohs (msg->filter_key)) &&
1245 (! GNUNET_memcmp (&r->key,
1246 &msg->key)) );
1247 if ( (ch == r->ch) &&
1248 (ntohl (msg->type) == r->type) &&
1249 (r->get == msg->get) &&
1250 (r->get_resp == msg->get_resp) &&
1251 (r->put == msg->put) &&
1252 keys_match)
1253 {
1254 GNUNET_CONTAINER_DLL_remove (monitor_head,
1255 monitor_tail,
1256 r);
1257 GNUNET_free (r);
1258 return; /* Delete only ONE entry */
1259 }
1260 }
1261}
1262
1263
1264/**
1265 * Function to call by #for_matching_monitors().
1266 *
1267 * @param cls closure
1268 * @param m a matching monitor
1269 */
1270typedef void
1271(*MonitorAction)(void *cls,
1272 struct ClientMonitorRecord *m);
1273
1274
1275/**
1276 * Call @a cb on all monitors that watch for blocks of @a type
1277 * and key @a key.
1278 *
1279 * @param type the type to match
1280 * @param key the key to match
1281 * @param cb function to call
1282 * @param cb_cls closure for @a cb
1283 */
1284static void
1285for_matching_monitors (enum GNUNET_BLOCK_Type type,
1286 const struct GNUNET_HashCode *key,
1287 MonitorAction cb,
1288 void *cb_cls)
1289{
1290 struct ClientHandle **cl = NULL;
1291 unsigned int cl_size = 0;
1292
1293 for (struct ClientMonitorRecord *m = monitor_head;
1294 NULL != m;
1295 m = m->next)
1296 {
1297 if ( ( (GNUNET_BLOCK_TYPE_ANY == m->type) ||
1298 (m->type == type) ) &&
1299 ( (GNUNET_is_zero (&m->key)) ||
1300 (0 ==
1301 GNUNET_memcmp (key,
1302 &m->key)) ) )
1303 {
1304 unsigned int i;
1305
1306 /* Don't send duplicates */
1307 for (i = 0; i < cl_size; i++)
1308 if (cl[i] == m->ch)
1309 break;
1310 if (i < cl_size)
1311 continue;
1312 GNUNET_array_append (cl,
1313 cl_size,
1314 m->ch);
1315 cb (cb_cls,
1316 m);
1317 }
1318 }
1319 GNUNET_free (cl);
1320}
1321
1322
1323/**
1324 * Closure for #get_action();
1325 */
1326struct GetActionContext
1327{
1328 enum GNUNET_DHT_RouteOption options;
1329 enum GNUNET_BLOCK_Type type;
1330 uint32_t hop_count;
1331 uint32_t desired_replication_level;
1332 unsigned int get_path_length;
1333 const struct GNUNET_DHT_PathElement *get_path;
1334 const struct GNUNET_HashCode *key;
1335};
1336
1337
1338/**
1339 * Function called on monitors that match a GET.
1340 * Sends the GET notification to the monitor.
1341 *
1342 * @param cls a `struct GetActionContext`
1343 * @param m a matching monitor
1344 */
1345static void
1346get_action (void *cls,
1347 struct ClientMonitorRecord *m)
1348{
1349 struct GetActionContext *gac = cls;
1350 struct GNUNET_MQ_Envelope *env;
1351 struct GNUNET_DHT_MonitorGetMessage *mmsg;
1352 struct GNUNET_DHT_PathElement *msg_path;
1353 size_t msize;
1354
1355 msize = gac->get_path_length * sizeof(struct GNUNET_DHT_PathElement);
1356 env = GNUNET_MQ_msg_extra (mmsg,
1357 msize,
1358 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1359 mmsg->options = htonl (gac->options);
1360 mmsg->type = htonl (gac->type);
1361 mmsg->hop_count = htonl (gac->hop_count);
1362 mmsg->desired_replication_level = htonl (gac->desired_replication_level);
1363 mmsg->get_path_length = htonl (gac->get_path_length);
1364 mmsg->key = *gac->key;
1365 msg_path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1366 GNUNET_memcpy (msg_path,
1367 gac->get_path,
1368 gac->get_path_length * sizeof(struct GNUNET_DHT_PathElement));
1369 GNUNET_MQ_send (m->ch->mq,
1370 env);
1371}
1372
1373
1374/**
1375 * Check if some client is monitoring GET messages and notify
1376 * them in that case. If tracked, @a path should include the local peer.
1377 *
1378 * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
1379 * @param type The type of data in the request.
1380 * @param hop_count Hop count so far.
1381 * @param path_length number of entries in path (or 0 if not recorded).
1382 * @param path peers on the GET path (or NULL if not recorded).
1383 * @param desired_replication_level Desired replication level.
1384 * @param key Key of the requested data.
1385 */
1386void
1387GDS_CLIENTS_process_get (enum GNUNET_DHT_RouteOption options,
1388 enum GNUNET_BLOCK_Type type,
1389 uint32_t hop_count,
1390 uint32_t desired_replication_level,
1391 unsigned int path_length,
1392 const struct GNUNET_DHT_PathElement *path,
1393 const struct GNUNET_HashCode *key)
1394{
1395 struct GetActionContext gac = {
1396 .options = options,
1397 .type = type,
1398 .hop_count = hop_count,
1399 .desired_replication_level = desired_replication_level,
1400 .get_path_length = path_length,
1401 .get_path = path,
1402 .key = key
1403 };
1404
1405 for_matching_monitors (type,
1406 key,
1407 &get_action,
1408 &gac);
1409}
1410
1411
1412/**
1413 * Closure for response_action().
1414 */
1415struct ResponseActionContext
1416{
1417 const struct GDS_DATACACHE_BlockData *bd;
1418 const struct GNUNET_DHT_PathElement *get_path;
1419 unsigned int get_path_length;
1420};
1421
1422
1423/**
1424 * Function called on monitors that match a response.
1425 * Sends the response notification to the monitor.
1426 *
1427 * @param cls a `struct ResponseActionContext`
1428 * @param m a matching monitor
1429 */
1430static void
1431response_action (void *cls,
1432 struct ClientMonitorRecord *m)
1433{
1434 const struct ResponseActionContext *resp_ctx = cls;
1435 const struct GDS_DATACACHE_BlockData *bd = resp_ctx->bd;
1436
1437 struct GNUNET_MQ_Envelope *env;
1438 struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1439 struct GNUNET_DHT_PathElement *path;
1440 size_t msize;
1441
1442 msize = bd->data_size;
1443 msize += (resp_ctx->get_path_length + bd->put_path_length)
1444 * sizeof(struct GNUNET_DHT_PathElement);
1445 env = GNUNET_MQ_msg_extra (mmsg,
1446 msize,
1447 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1448 mmsg->type = htonl (bd->type);
1449 mmsg->put_path_length = htonl (bd->put_path_length);
1450 mmsg->get_path_length = htonl (resp_ctx->get_path_length);
1451 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1452 mmsg->key = bd->key;
1453 path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1454 GNUNET_memcpy (path,
1455 bd->put_path,
1456 bd->put_path_length * sizeof(struct GNUNET_DHT_PathElement));
1457 GNUNET_memcpy (path,
1458 resp_ctx->get_path,
1459 resp_ctx->get_path_length
1460 * sizeof(struct GNUNET_DHT_PathElement));
1461 GNUNET_memcpy (&path[resp_ctx->get_path_length],
1462 bd->data,
1463 bd->data_size);
1464 GNUNET_MQ_send (m->ch->mq,
1465 env);
1466}
1467
1468
1469void
1470GDS_CLIENTS_process_get_resp (const struct GDS_DATACACHE_BlockData *bd,
1471 const struct GNUNET_DHT_PathElement *get_path,
1472 unsigned int get_path_length)
1473{
1474 struct ResponseActionContext rac = {
1475 .bd = bd,
1476 .get_path = get_path,
1477 .get_path_length = get_path_length
1478 };
1479
1480 for_matching_monitors (bd->type,
1481 &bd->key,
1482 &response_action,
1483 &rac);
1484}
1485
1486
1487/**
1488 * Closure for put_action().
1489 */
1490struct PutActionContext
1491{
1492 const struct GDS_DATACACHE_BlockData *bd;
1493 enum GNUNET_DHT_RouteOption options;
1494 uint32_t hop_count;
1495 uint32_t desired_replication_level;
1496};
1497
1498
1499/**
1500 * Function called on monitors that match a PUT.
1501 * Sends the PUT notification to the monitor.
1502 *
1503 * @param cls a `struct PutActionContext`
1504 * @param m a matching monitor
1505 */
1506static void
1507put_action (void *cls,
1508 struct ClientMonitorRecord *m)
1509{
1510 const struct PutActionContext *put_ctx = cls;
1511 const struct GDS_DATACACHE_BlockData *bd = put_ctx->bd;
1512 struct GNUNET_MQ_Envelope *env;
1513 struct GNUNET_DHT_MonitorPutMessage *mmsg;
1514 struct GNUNET_DHT_PathElement *msg_path;
1515 size_t msize;
1516
1517 msize = bd->data_size
1518 + bd->put_path_length
1519 * sizeof(struct GNUNET_DHT_PathElement);
1520 env = GNUNET_MQ_msg_extra (mmsg,
1521 msize,
1522 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1523 mmsg->options = htonl (put_ctx->options);
1524 mmsg->type = htonl (bd->type);
1525 mmsg->hop_count = htonl (put_ctx->hop_count);
1526 mmsg->desired_replication_level = htonl (put_ctx->desired_replication_level);
1527 mmsg->put_path_length = htonl (bd->put_path_length);
1528 mmsg->key = bd->key;
1529 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1530 msg_path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1531 GNUNET_memcpy (msg_path,
1532 bd->put_path,
1533 bd->put_path_length * sizeof(struct GNUNET_DHT_PathElement));
1534 GNUNET_memcpy (&msg_path[bd->put_path_length],
1535 bd->data,
1536 bd->data_size);
1537 GNUNET_MQ_send (m->ch->mq,
1538 env);
1539}
1540
1541
1542void
1543GDS_CLIENTS_process_put (enum GNUNET_DHT_RouteOption options,
1544 const struct GDS_DATACACHE_BlockData *bd,
1545 uint32_t hop_count,
1546 uint32_t desired_replication_level)
1547{
1548 struct PutActionContext put_ctx = {
1549 .bd = bd,
1550 .hop_count = hop_count,
1551 .desired_replication_level = desired_replication_level,
1552 .options = options
1553 };
1554
1555 for_matching_monitors (bd->type,
1556 &bd->key,
1557 &put_action,
1558 &put_ctx);
1559}
1560
1561
1562/* ********************** Initialization logic ***************** */
1563
1564
1565/**
1566 * Initialize client subsystem.
1567 *
1568 * @param server the initialized server
1569 */
1570static void
1571GDS_CLIENTS_init (void)
1572{
1573 forward_map
1574 = GNUNET_CONTAINER_multihashmap_create (1024,
1575 GNUNET_YES);
1576 retry_heap
1577 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1578}
1579
1580
1581/**
1582 * Shutdown client subsystem.
1583 */
1584static void
1585GDS_CLIENTS_stop (void)
1586{
1587 if (NULL != retry_task)
1588 {
1589 GNUNET_SCHEDULER_cancel (retry_task);
1590 retry_task = NULL;
1591 }
1592}
1593
1594
1595/**
1596 * Define "main" method using service macro.
1597 *
1598 * @param name name of the service, like "dht" or "xdht"
1599 * @param run name of the initializaton method for the service
1600 */
1601#define GDS_DHT_SERVICE_INIT(name, run) \
1602 GNUNET_SERVICE_MAIN \
1603 (name, \
1604 GNUNET_SERVICE_OPTION_NONE, \
1605 run, \
1606 &client_connect_cb, \
1607 &client_disconnect_cb, \
1608 NULL, \
1609 GNUNET_MQ_hd_var_size (dht_local_put, \
1610 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1611 struct GNUNET_DHT_ClientPutMessage, \
1612 NULL), \
1613 GNUNET_MQ_hd_var_size (dht_local_get, \
1614 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1615 struct GNUNET_DHT_ClientGetMessage, \
1616 NULL), \
1617 GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1618 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1619 struct GNUNET_DHT_ClientGetStopMessage, \
1620 NULL), \
1621 GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1622 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1623 struct GNUNET_DHT_MonitorStartStopMessage, \
1624 NULL), \
1625 GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1626 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1627 struct GNUNET_DHT_MonitorStartStopMessage, \
1628 NULL), \
1629 GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1630 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1631 struct GNUNET_DHT_ClientGetResultSeenMessage, \
1632 NULL), \
1633 GNUNET_MQ_hd_fixed_size (dht_local_hello_get, \
1634 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_GET, \
1635 struct GNUNET_MessageHeader, \
1636 NULL), \
1637 GNUNET_MQ_hd_var_size (dht_local_hello_offer, \
1638 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_URL, \
1639 struct GNUNET_MessageHeader, \
1640 NULL), \
1641 GNUNET_MQ_handler_end ())
1642
1643
1644/**
1645 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1646 */
1647void __attribute__ ((destructor))
1648GDS_CLIENTS_done ()
1649{
1650 if (NULL != retry_heap)
1651 {
1652 GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1653 GNUNET_CONTAINER_heap_destroy (retry_heap);
1654 retry_heap = NULL;
1655 }
1656 if (NULL != forward_map)
1657 {
1658 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1659 GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1660 forward_map = NULL;
1661 }
1662}
1663
1664
1665/* end of gnunet-service-dht_clients.c */