aboutsummaryrefslogtreecommitdiff
path: root/src/dht/gnunet-service-dht_clients.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht/gnunet-service-dht_clients.c')
-rw-r--r--src/dht/gnunet-service-dht_clients.c1706
1 files changed, 0 insertions, 1706 deletions
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
deleted file mode 100644
index 90bb4d1f7..000000000
--- a/src/dht/gnunet-service-dht_clients.c
+++ /dev/null
@@ -1,1706 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009, 2010, 2011, 2016, 2017, 2022 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file dht/gnunet-service-dht_clients.c
23 * @brief GNUnet DHT service's client management code
24 * @author Christian Grothoff
25 * @author Nathan Evans
26 */
27#include "platform.h"
28#include "gnunet_constants.h"
29#include "gnunet_protocols.h"
30#include "gnunet_statistics_service.h"
31#include "gnunet-service-dht.h"
32#include "gnunet-service-dht_datacache.h"
33#include "gnunet-service-dht_neighbours.h"
34#include "dht.h"
35
36
37/**
38 * Enable slow sanity checks to debug issues.
39 * 0: do not check
40 * 1: check all external inputs
41 * 2: check internal computations as well
42 */
43#define SANITY_CHECKS 0
44
45/**
46 * Should routing details be logged to stderr (for debugging)?
47 */
48#define LOG_TRAFFIC(kind, ...) GNUNET_log_from (kind, "dht-traffic", \
49 __VA_ARGS__)
50
51#define LOG(kind, ...) GNUNET_log_from (kind, "dht-clients", __VA_ARGS__)
52
53
54/**
55 * Struct containing information about a client,
56 * handle to connect to it, and any pending messages
57 * that need to be sent to it.
58 */
59struct ClientHandle;
60
61
62/**
63 * Entry in the local forwarding map for a client's GET request.
64 */
65struct ClientQueryRecord
66{
67 /**
68 * The key this request was about
69 */
70 struct GNUNET_HashCode key;
71
72 /**
73 * Kept in a DLL with @e client.
74 */
75 struct ClientQueryRecord *next;
76
77 /**
78 * Kept in a DLL with @e client.
79 */
80 struct ClientQueryRecord *prev;
81
82 /**
83 * Client responsible for the request.
84 */
85 struct ClientHandle *ch;
86
87 /**
88 * Extended query (see gnunet_block_lib.h), allocated at the end of this struct.
89 */
90 const void *xquery;
91
92 /**
93 * Array of (hashes of) replies we have already seen for this request.
94 */
95 struct GNUNET_HashCode *seen_replies;
96
97 /**
98 * Pointer to this nodes heap location in the retry-heap (for fast removal)
99 */
100 struct GNUNET_CONTAINER_HeapNode *hnode;
101
102 /**
103 * What's the delay between re-try operations that we currently use for this
104 * request?
105 */
106 struct GNUNET_TIME_Relative retry_frequency;
107
108 /**
109 * What's the next time we should re-try this request?
110 */
111 struct GNUNET_TIME_Absolute retry_time;
112
113 /**
114 * The unique identifier of this request
115 */
116 uint64_t unique_id;
117
118 /**
119 * Number of bytes in xquery.
120 */
121 size_t xquery_size;
122
123 /**
124 * Number of entries in @e seen_replies.
125 */
126 unsigned int seen_replies_count;
127
128 /**
129 * Desired replication level
130 */
131 uint32_t replication;
132
133 /**
134 * Any message options for this request
135 */
136 enum GNUNET_DHT_RouteOption msg_options;
137
138 /**
139 * The type for the data for the GET request.
140 */
141 enum GNUNET_BLOCK_Type type;
142};
143
144
145/**
146 * Struct containing parameters of monitoring requests.
147 */
148struct ClientMonitorRecord
149{
150 /**
151 * Next element in DLL.
152 */
153 struct ClientMonitorRecord *next;
154
155 /**
156 * Previous element in DLL.
157 */
158 struct ClientMonitorRecord *prev;
159
160 /**
161 * Client to notify of these requests.
162 */
163 struct ClientHandle *ch;
164
165 /**
166 * Key of data of interest. All bits zero for 'all'.
167 */
168 struct GNUNET_HashCode key;
169
170 /**
171 * Type of blocks that are of interest
172 */
173 enum GNUNET_BLOCK_Type type;
174
175 /**
176 * Flag whether to notify about GET messages.
177 */
178 int16_t get;
179
180 /**
181 * Flag whether to notify about GET_REPONSE messages.
182 */
183 int16_t get_resp;
184
185 /**
186 * Flag whether to notify about PUT messages.
187 */
188 uint16_t put;
189
190};
191
192
193/**
194 * Struct containing information about a client,
195 * handle to connect to it, and any pending messages
196 * that need to be sent to it.
197 */
198struct ClientHandle
199{
200 /**
201 * Linked list of active queries of this client.
202 */
203 struct ClientQueryRecord *cqr_head;
204
205 /**
206 * Linked list of active queries of this client.
207 */
208 struct ClientQueryRecord *cqr_tail;
209
210 /**
211 * The handle to this client
212 */
213 struct GNUNET_SERVICE_Client *client;
214
215 /**
216 * The message queue to this client
217 */
218 struct GNUNET_MQ_Handle *mq;
219};
220
221
222/**
223 * Our handle to the BLOCK library.
224 */
225struct GNUNET_BLOCK_Context *GDS_block_context;
226
227/**
228 * Handle for the statistics service.
229 */
230struct GNUNET_STATISTICS_Handle *GDS_stats;
231
232/**
233 * Handle for the service.
234 */
235struct GNUNET_SERVICE_Handle *GDS_service;
236
237/**
238 * The configuration the DHT service is running with
239 */
240const struct GNUNET_CONFIGURATION_Handle *GDS_cfg;
241
242/**
243 * List of active monitoring requests.
244 */
245static struct ClientMonitorRecord *monitor_head;
246
247/**
248 * List of active monitoring requests.
249 */
250static struct ClientMonitorRecord *monitor_tail;
251
252/**
253 * Hashmap for fast key based lookup, maps keys to `struct ClientQueryRecord` entries.
254 */
255static struct GNUNET_CONTAINER_MultiHashMap *forward_map;
256
257/**
258 * Heap with all of our client's request, sorted by retry time (earliest on top).
259 */
260static struct GNUNET_CONTAINER_Heap *retry_heap;
261
262/**
263 * Task that re-transmits requests (using retry_heap).
264 */
265static struct GNUNET_SCHEDULER_Task *retry_task;
266
267
268/**
269 * Free data structures associated with the given query.
270 *
271 * @param record record to remove
272 */
273static void
274remove_client_query_record (struct ClientQueryRecord *record)
275{
276 struct ClientHandle *ch = record->ch;
277
278 GNUNET_CONTAINER_DLL_remove (ch->cqr_head,
279 ch->cqr_tail,
280 record);
281 GNUNET_assert (GNUNET_YES ==
282 GNUNET_CONTAINER_multihashmap_remove (forward_map,
283 &record->key,
284 record));
285 if (NULL != record->hnode)
286 GNUNET_CONTAINER_heap_remove_node (record->hnode);
287 GNUNET_array_grow (record->seen_replies,
288 record->seen_replies_count,
289 0);
290 GNUNET_free (record);
291}
292
293
294/**
295 * Functions with this signature are called whenever a local client is
296 * connects to us.
297 *
298 * @param cls closure (NULL for dht)
299 * @param client identification of the client
300 * @param mq message queue for talking to @a client
301 * @return our `struct ClientHandle` for @a client
302 */
303static void *
304client_connect_cb (void *cls,
305 struct GNUNET_SERVICE_Client *client,
306 struct GNUNET_MQ_Handle *mq)
307{
308 struct ClientHandle *ch;
309
310 (void) cls;
311 ch = GNUNET_new (struct ClientHandle);
312 ch->client = client;
313 ch->mq = mq;
314 return ch;
315}
316
317
318/**
319 * Functions with this signature are called whenever a client
320 * is disconnected on the network level.
321 *
322 * @param cls closure (NULL for dht)
323 * @param client identification of the client
324 * @param app_ctx our `struct ClientHandle` for @a client
325 */
326static void
327client_disconnect_cb (void *cls,
328 struct GNUNET_SERVICE_Client *client,
329 void *app_ctx)
330{
331 struct ClientHandle *ch = app_ctx;
332
333 (void) cls;
334 (void) client;
335 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
336 "Local client %p disconnects\n",
337 ch);
338 {
339 struct ClientMonitorRecord *next;
340
341 for (struct ClientMonitorRecord *monitor = monitor_head;
342 NULL != monitor;
343 monitor = next)
344 {
345 next = monitor->next;
346 if (monitor->ch != ch)
347 continue;
348 GNUNET_CONTAINER_DLL_remove (monitor_head,
349 monitor_tail,
350 monitor);
351 GNUNET_free (monitor);
352 }
353 }
354
355 {
356 struct ClientQueryRecord *cqr;
357
358 while (NULL != (cqr = ch->cqr_head))
359 remove_client_query_record (cqr);
360 }
361 GNUNET_free (ch);
362}
363
364
365/**
366 * Route the given request via the DHT. This includes updating
367 * the bloom filter and retransmission times, building the P2P
368 * message and initiating the routing operation.
369 *
370 * @param cqr request to transmit
371 */
372static void
373transmit_request (struct ClientQueryRecord *cqr)
374{
375 struct GNUNET_BLOCK_Group *bg;
376 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
377
378 GNUNET_STATISTICS_update (GDS_stats,
379 "# GET requests from clients injected",
380 1,
381 GNUNET_NO);
382 bg = GNUNET_BLOCK_group_create (GDS_block_context,
383 cqr->type,
384 NULL, /* raw data */
385 0, /* raw data size */
386 "seen-set-size",
387 cqr->seen_replies_count,
388 NULL);
389 GNUNET_BLOCK_group_set_seen (bg,
390 cqr->seen_replies,
391 cqr->seen_replies_count);
392 peer_bf
393 = GNUNET_CONTAINER_bloomfilter_init (NULL,
394 DHT_BLOOM_SIZE,
395 GNUNET_CONSTANTS_BLOOMFILTER_K);
396 LOG (GNUNET_ERROR_TYPE_DEBUG,
397 "Initiating GET for %s, replication %u, already have %u replies\n",
398 GNUNET_h2s (&cqr->key),
399 cqr->replication,
400 cqr->seen_replies_count);
401 GDS_NEIGHBOURS_handle_get (cqr->type,
402 cqr->msg_options,
403 cqr->replication,
404 0 /* hop count */,
405 &cqr->key,
406 cqr->xquery,
407 cqr->xquery_size,
408 bg,
409 peer_bf);
410 GNUNET_BLOCK_group_destroy (bg);
411 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
412
413 /* Exponential back-off for retries.
414 * max. is #GNUNET_TIME_STD_EXPONENTIAL_BACKOFF_THRESHOLD (15 min) */
415 cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency);
416 cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency);
417}
418
419
420/**
421 * Task that looks at the #retry_heap and transmits all of the requests
422 * on the heap that are ready for transmission. Then re-schedules
423 * itself (unless the heap is empty).
424 *
425 * @param cls unused
426 */
427static void
428transmit_next_request_task (void *cls)
429{
430 struct ClientQueryRecord *cqr;
431
432 (void) cls;
433 retry_task = NULL;
434 while (NULL != (cqr = GNUNET_CONTAINER_heap_remove_root (retry_heap)))
435 {
436 cqr->hnode = NULL;
437 if (! GNUNET_TIME_absolute_is_past (cqr->retry_time))
438 {
439 cqr->hnode
440 = GNUNET_CONTAINER_heap_insert (retry_heap,
441 cqr,
442 cqr->retry_time.abs_value_us);
443 retry_task
444 = GNUNET_SCHEDULER_add_at (cqr->retry_time,
445 &transmit_next_request_task,
446 NULL);
447 return;
448 }
449 transmit_request (cqr);
450 cqr->hnode
451 = GNUNET_CONTAINER_heap_insert (retry_heap,
452 cqr,
453 cqr->retry_time.abs_value_us);
454 }
455}
456
457
458/**
459 * Check DHT PUT messages from the client.
460 *
461 * @param cls the client we received this message from
462 * @param dht_msg the actual message received
463 * @return #GNUNET_OK (always)
464 */
465static enum GNUNET_GenericReturnValue
466check_dht_local_put (void *cls,
467 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
468{
469 uint32_t replication_level = ntohl (dht_msg->desired_replication_level);
470
471 (void) cls;
472 if (replication_level > GNUNET_DHT_MAXIMUM_REPLICATION_LEVEL)
473 {
474 GNUNET_break_op (0);
475 return GNUNET_SYSERR;
476 }
477 return GNUNET_OK;
478}
479
480
481/**
482 * Handler for PUT messages.
483 *
484 * @param cls the client we received this message from
485 * @param dht_msg the actual message received
486 */
487static void
488handle_dht_local_put (void *cls,
489 const struct GNUNET_DHT_ClientPutMessage *dht_msg)
490{
491 struct ClientHandle *ch = cls;
492 uint16_t size = ntohs (dht_msg->header.size);
493 uint32_t replication_level
494 = ntohl (dht_msg->desired_replication_level);
495 struct GNUNET_DATACACHE_Block bd = {
496 .key = dht_msg->key,
497 .expiration_time = GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
498 .data = &dht_msg[1],
499 .data_size = size - sizeof (*dht_msg),
500 .type = ntohl (dht_msg->type),
501 .ro = (enum GNUNET_DHT_RouteOption) ntohl (dht_msg->options)
502 };
503
504 LOG (GNUNET_ERROR_TYPE_DEBUG,
505 "Handling local PUT of %lu-bytes for query %s of type %u\n",
506 (unsigned long) (size - sizeof(struct GNUNET_DHT_ClientPutMessage)),
507 GNUNET_h2s (&dht_msg->key),
508 (unsigned int) bd.type);
509#if SANITY_CHECKS > 0
510 if (GNUNET_OK !=
511 GNUNET_BLOCK_check_block (GDS_block_context,
512 bd.type,
513 bd.data,
514 bd.data_size))
515 {
516 GNUNET_break (0);
517 return;
518 }
519#endif
520 GNUNET_STATISTICS_update (GDS_stats,
521 "# PUT requests received from clients",
522 1,
523 GNUNET_NO);
524 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
525 "CLIENT-PUT %s\n",
526 GNUNET_h2s_full (&dht_msg->key));
527 /* give to local clients */
528 GNUNET_break (GDS_CLIENTS_handle_reply (&bd,
529 &bd.key,
530 0, NULL /* get path */));
531
532 {
533 struct GNUNET_CONTAINER_BloomFilter *peer_bf;
534
535 peer_bf
536 = GNUNET_CONTAINER_bloomfilter_init (NULL,
537 DHT_BLOOM_SIZE,
538 GNUNET_CONSTANTS_BLOOMFILTER_K);
539 /* store locally */
540 if ( (0 != (bd.ro & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
541 (GDS_am_closest_peer (&dht_msg->key,
542 peer_bf)))
543 GDS_DATACACHE_handle_put (&bd);
544 /* route to other peers */
545 if (GNUNET_OK !=
546 GDS_NEIGHBOURS_handle_put (&bd,
547 replication_level,
548 0 /* hop count */,
549 peer_bf))
550 {
551 GNUNET_STATISTICS_update (GDS_stats,
552 "# Local PUT requests not routed",
553 1,
554 GNUNET_NO);
555 }
556 GNUNET_CONTAINER_bloomfilter_free (peer_bf);
557 }
558 GDS_CLIENTS_process_put (
559 &bd,
560 0, /* hop count */
561 replication_level);
562 GNUNET_SERVICE_client_continue (ch->client);
563}
564
565
566/**
567 * Handle a result from local datacache for a GET operation.
568 *
569 * @param cls the `struct ClientHandle` of the client doing the query
570 * @param bd details about the block that was found
571 */
572static void
573handle_local_result (void *cls,
574 const struct GNUNET_DATACACHE_Block *bd)
575{
576 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
577 "Datacache provided result for query key %s\n",
578 GNUNET_h2s (&bd->key));
579 GNUNET_break (GDS_CLIENTS_handle_reply (bd,
580 &bd->key,
581 0, NULL /* get_path */));
582}
583
584
585/**
586 * Check DHT GET messages from the client.
587 *
588 * @param cls the client we received this message from
589 * @param get the actual message received
590 * @return #GNUNET_OK (always)
591 */
592static enum GNUNET_GenericReturnValue
593check_dht_local_get (void *cls,
594 const struct GNUNET_DHT_ClientGetMessage *get)
595{
596 (void) cls;
597 (void) get;
598 /* always well-formed */
599 return GNUNET_OK;
600}
601
602
603/**
604 * Handler for DHT GET messages from the client.
605 *
606 * @param cls the client we received this message from
607 * @param get the actual message received
608 */
609static void
610handle_dht_local_get (void *cls,
611 const struct GNUNET_DHT_ClientGetMessage *get)
612{
613 struct ClientHandle *ch = cls;
614 struct ClientQueryRecord *cqr;
615 uint16_t size = ntohs (get->header.size);
616 const char *xquery = (const char *) &get[1];
617 size_t xquery_size = size - sizeof(struct GNUNET_DHT_ClientGetMessage);
618
619 LOG (GNUNET_ERROR_TYPE_DEBUG,
620 "Received GET request for %s from local client %p, xq: %.*s\n",
621 GNUNET_h2s (&get->key),
622 ch->client,
623 (int) xquery_size,
624 xquery);
625 GNUNET_STATISTICS_update (GDS_stats,
626 "# GET requests received from clients",
627 1,
628 GNUNET_NO);
629 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
630 "CLIENT-GET %s\n",
631 GNUNET_h2s_full (&get->key));
632
633 cqr = GNUNET_malloc (sizeof(struct ClientQueryRecord) + xquery_size);
634 cqr->key = get->key;
635 cqr->ch = ch;
636 cqr->xquery = (const void *) &cqr[1];
637 GNUNET_memcpy (&cqr[1],
638 xquery,
639 xquery_size);
640 cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap,
641 cqr,
642 0);
643 cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS;
644 cqr->retry_time = GNUNET_TIME_absolute_get ();
645 cqr->unique_id = get->unique_id;
646 cqr->xquery_size = xquery_size;
647 cqr->replication = ntohl (get->desired_replication_level);
648 cqr->msg_options = (enum GNUNET_DHT_RouteOption) ntohl (get->options);
649 cqr->type = ntohl (get->type);
650 GNUNET_CONTAINER_DLL_insert (ch->cqr_head,
651 ch->cqr_tail,
652 cqr);
653 GNUNET_CONTAINER_multihashmap_put (forward_map,
654 &cqr->key,
655 cqr,
656 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
657 GDS_CLIENTS_process_get (cqr->msg_options,
658 cqr->type,
659 0, /* hop count */
660 cqr->replication,
661 &get->key);
662 /* start remote requests */
663 if (NULL != retry_task)
664 GNUNET_SCHEDULER_cancel (retry_task);
665 retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task,
666 NULL);
667 /* perform local lookup */
668 GDS_DATACACHE_handle_get (&get->key,
669 cqr->type,
670 cqr->xquery,
671 xquery_size,
672 NULL,
673 &handle_local_result,
674 ch);
675 GNUNET_SERVICE_client_continue (ch->client);
676}
677
678
679/**
680 * Closure for #find_by_unique_id().
681 */
682struct FindByUniqueIdContext
683{
684 /**
685 * Where to store the result, if found.
686 */
687 struct ClientQueryRecord *cqr;
688
689 /**
690 * Unique ID to look for.
691 */
692 uint64_t unique_id;
693};
694
695
696/**
697 * Function called for each existing DHT record for the given
698 * query. Checks if it matches the UID given in the closure
699 * and if so returns the entry as a result.
700 *
701 * @param cls the search context
702 * @param key query for the lookup (not used)
703 * @param value the `struct ClientQueryRecord`
704 * @return #GNUNET_YES to continue iteration (result not yet found)
705 */
706static enum GNUNET_GenericReturnValue
707find_by_unique_id (void *cls,
708 const struct GNUNET_HashCode *key,
709 void *value)
710{
711 struct FindByUniqueIdContext *fui_ctx = cls;
712 struct ClientQueryRecord *cqr = value;
713
714 if (cqr->unique_id != fui_ctx->unique_id)
715 return GNUNET_YES;
716 fui_ctx->cqr = cqr;
717 return GNUNET_NO;
718}
719
720
721/**
722 * Check "GET result seen" messages from the client.
723 *
724 * @param cls the client we received this message from
725 * @param seen the actual message received
726 * @return #GNUNET_OK if @a seen is well-formed
727 */
728static enum GNUNET_GenericReturnValue
729check_dht_local_get_result_seen (
730 void *cls,
731 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
732{
733 uint16_t size = ntohs (seen->header.size);
734 unsigned int hash_count =
735 (size - sizeof(*seen))
736 / sizeof(struct GNUNET_HashCode);
737
738 if (size != sizeof(*seen) + hash_count * sizeof(struct GNUNET_HashCode))
739 {
740 GNUNET_break (0);
741 return GNUNET_SYSERR;
742 }
743 return GNUNET_OK;
744}
745
746
747/**
748 * Handler for "GET result seen" messages from the client.
749 *
750 * @param cls the client we received this message from
751 * @param seen the actual message received
752 */
753static void
754handle_dht_local_get_result_seen (
755 void *cls,
756 const struct GNUNET_DHT_ClientGetResultSeenMessage *seen)
757{
758 struct ClientHandle *ch = cls;
759 uint16_t size = ntohs (seen->header.size);
760 unsigned int hash_count = (size - sizeof(*seen))
761 / sizeof(struct GNUNET_HashCode);
762 const struct GNUNET_HashCode *hc = (const struct GNUNET_HashCode*) &seen[1];
763 struct FindByUniqueIdContext fui_ctx = {
764 .unique_id = seen->unique_id
765 };
766 unsigned int old_count;
767 struct ClientQueryRecord *cqr;
768
769 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
770 &seen->key,
771 &find_by_unique_id,
772 &fui_ctx);
773 if (NULL == (cqr = fui_ctx.cqr))
774 {
775 GNUNET_break (0);
776 GNUNET_SERVICE_client_drop (ch->client);
777 return;
778 }
779 /* finally, update 'seen' list */
780 old_count = cqr->seen_replies_count;
781 GNUNET_array_grow (cqr->seen_replies,
782 cqr->seen_replies_count,
783 cqr->seen_replies_count + hash_count);
784 GNUNET_memcpy (&cqr->seen_replies[old_count],
785 hc,
786 sizeof(struct GNUNET_HashCode) * hash_count);
787}
788
789
790/**
791 * Closure for #remove_by_unique_id().
792 */
793struct RemoveByUniqueIdContext
794{
795 /**
796 * Client that issued the removal request.
797 */
798 struct ClientHandle *ch;
799
800 /**
801 * Unique ID of the request.
802 */
803 uint64_t unique_id;
804};
805
806
807/**
808 * Iterator over hash map entries that frees all entries
809 * that match the given client and unique ID.
810 *
811 * @param cls unique ID and client to search for in source routes
812 * @param key current key code
813 * @param value value in the hash map, a ClientQueryRecord
814 * @return #GNUNET_YES (we should continue to iterate)
815 */
816static enum GNUNET_GenericReturnValue
817remove_by_unique_id (void *cls,
818 const struct GNUNET_HashCode *key,
819 void *value)
820{
821 const struct RemoveByUniqueIdContext *ctx = cls;
822 struct ClientQueryRecord *cqr = value;
823
824 if (cqr->unique_id != ctx->unique_id)
825 return GNUNET_YES;
826 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
827 "Removing client %p's record for key %s (by unique id)\n",
828 ctx->ch->client,
829 GNUNET_h2s (key));
830 remove_client_query_record (cqr);
831 return GNUNET_YES;
832}
833
834
835/**
836 * Handler for any generic DHT stop messages, calls the appropriate handler
837 * depending on message type (if processed locally)
838 *
839 * @param cls client we received this message from
840 * @param dht_stop_msg the actual message received
841 *
842 */
843static void
844handle_dht_local_get_stop (
845 void *cls,
846 const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg)
847{
848 struct ClientHandle *ch = cls;
849 struct RemoveByUniqueIdContext ctx;
850
851 GNUNET_STATISTICS_update (GDS_stats,
852 "# GET STOP requests received from clients",
853 1,
854 GNUNET_NO);
855 LOG (GNUNET_ERROR_TYPE_DEBUG,
856 "Received GET STOP request for %s from local client %p\n",
857 GNUNET_h2s (&dht_stop_msg->key),
858 ch->client);
859 ctx.ch = ch;
860 ctx.unique_id = dht_stop_msg->unique_id;
861 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
862 &dht_stop_msg->key,
863 &remove_by_unique_id,
864 &ctx);
865 GNUNET_SERVICE_client_continue (ch->client);
866}
867
868
869/**
870 * Closure for #forward_reply()
871 */
872struct ForwardReplyContext
873{
874 /**
875 * Block details.
876 */
877 const struct GNUNET_DATACACHE_Block *bd;
878
879 /**
880 * GET path taken.
881 */
882 const struct GNUNET_DHT_PathElement *get_path;
883
884 /**
885 * Number of entries in @e get_path.
886 */
887 unsigned int get_path_length;
888
889};
890
891
892/**
893 * Iterator over hash map entries that send a given reply to
894 * each of the matching clients. With some tricky recycling
895 * of the buffer.
896 *
897 * @param cls the `struct ForwardReplyContext`
898 * @param query_hash hash of the query for which this may be a reply
899 * @param value value in the hash map, a ClientQueryRecord
900 * @return #GNUNET_YES (we should continue to iterate),
901 * if the result is mal-formed, #GNUNET_NO
902 */
903static enum GNUNET_GenericReturnValue
904forward_reply (void *cls,
905 const struct GNUNET_HashCode *query_hash,
906 void *value)
907{
908 struct ForwardReplyContext *frc = cls;
909 struct ClientQueryRecord *record = value;
910 const struct GNUNET_DATACACHE_Block *bd = frc->bd;
911 struct GNUNET_MQ_Envelope *env;
912 struct GNUNET_DHT_ClientResultMessage *reply;
913 enum GNUNET_BLOCK_ReplyEvaluationResult eval;
914 bool do_free;
915 struct GNUNET_HashCode ch;
916 struct GNUNET_DHT_PathElement *paths;
917 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
918 size_t xsize = bd->data_size;
919
920 LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG,
921 "CLIENT-RESULT %s\n",
922 GNUNET_h2s_full (&bd->key));
923 if ( (record->type != GNUNET_BLOCK_TYPE_ANY) &&
924 (record->type != bd->type) )
925 {
926 LOG (GNUNET_ERROR_TYPE_DEBUG,
927 "Record type mismatch, not passing request for key %s to local client\n",
928 GNUNET_h2s (&bd->key));
929 GNUNET_STATISTICS_update (GDS_stats,
930 "# Key match, type mismatches in REPLY to CLIENT",
931 1,
932 GNUNET_NO);
933 return GNUNET_YES; /* type mismatch */
934 }
935 if ( (0 == (record->msg_options & GNUNET_DHT_RO_FIND_APPROXIMATE)) &&
936 (0 != GNUNET_memcmp (&bd->key,
937 query_hash)) )
938 {
939 GNUNET_STATISTICS_update (GDS_stats,
940 "# Inexact key match, but exact match required",
941 1,
942 GNUNET_NO);
943 return GNUNET_YES; /* type mismatch */
944 }
945 GNUNET_CRYPTO_hash (bd->data,
946 bd->data_size,
947 &ch);
948 for (unsigned int i = 0; i < record->seen_replies_count; i++)
949 if (0 ==
950 GNUNET_memcmp (&record->seen_replies[i],
951 &ch))
952 {
953 LOG (GNUNET_ERROR_TYPE_DEBUG,
954 "Duplicate reply, not passing request for key %s to local client\n",
955 GNUNET_h2s (&bd->key));
956 GNUNET_STATISTICS_update (GDS_stats,
957 "# Duplicate REPLIES to CLIENT request dropped",
958 1,
959 GNUNET_NO);
960 return GNUNET_YES; /* duplicate */
961 }
962 eval
963 = GNUNET_BLOCK_check_reply (GDS_block_context,
964 record->type,
965 NULL,
966 &bd->key,
967 record->xquery,
968 record->xquery_size,
969 bd->data,
970 bd->data_size);
971 LOG (GNUNET_ERROR_TYPE_DEBUG,
972 "Evaluation result is %d for key %s for local client's query\n",
973 (int) eval,
974 GNUNET_h2s (&bd->key));
975 switch (eval)
976 {
977 case GNUNET_BLOCK_REPLY_OK_LAST:
978 do_free = true;
979 break;
980 case GNUNET_BLOCK_REPLY_TYPE_NOT_SUPPORTED:
981 case GNUNET_BLOCK_REPLY_OK_MORE:
982 GNUNET_array_append (record->seen_replies,
983 record->seen_replies_count,
984 ch);
985 do_free = false;
986 break;
987 case GNUNET_BLOCK_REPLY_OK_DUPLICATE:
988 /* should be impossible to encounter here */
989 GNUNET_break (0);
990 return GNUNET_YES;
991 case GNUNET_BLOCK_REPLY_IRRELEVANT:
992 return GNUNET_YES;
993 default:
994 GNUNET_break (0);
995 return GNUNET_NO;
996 }
997 GNUNET_STATISTICS_update (GDS_stats,
998 "# RESULTS queued for clients",
999 1,
1000 GNUNET_NO);
1001 xsize += (frc->get_path_length + bd->put_path_length)
1002 * sizeof(struct GNUNET_DHT_PathElement);
1003 if (truncated)
1004 xsize += sizeof (struct GNUNET_PeerIdentity);
1005
1006#if SUPER_REDUNDANT_CHECK
1007 GNUNET_break (0 ==
1008 GNUNET_DHT_verify_path (bd->data,
1009 bd->data_size,
1010 bd->expiration_time,
1011 truncated
1012 ? &bd->trunc_peer
1013 : NULL,
1014 bd->put_path,
1015 bd->put_path_length,
1016 frc->get_path,
1017 frc->get_path_length,
1018 &GDS_my_identity));
1019#endif
1020
1021 env = GNUNET_MQ_msg_extra (reply,
1022 xsize,
1023 GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT);
1024 reply->type = htonl (bd->type);
1025 reply->options = htonl (bd->ro);
1026 reply->get_path_length = htonl (frc->get_path_length);
1027 reply->put_path_length = htonl (bd->put_path_length);
1028 reply->unique_id = record->unique_id;
1029 reply->expiration = GNUNET_TIME_absolute_hton (bd->expiration_time);
1030 reply->key = *query_hash;
1031 if (truncated)
1032 {
1033 void *tgt = &reply[1];
1034
1035 GNUNET_memcpy (tgt,
1036 &bd->trunc_peer,
1037 sizeof (struct GNUNET_PeerIdentity));
1038 paths = (struct GNUNET_DHT_PathElement *)
1039 (tgt + sizeof (struct GNUNET_PeerIdentity));
1040 }
1041 else
1042 {
1043 paths = (struct GNUNET_DHT_PathElement *) &reply[1];
1044 }
1045 GNUNET_memcpy (paths,
1046 bd->put_path,
1047 sizeof(struct GNUNET_DHT_PathElement)
1048 * bd->put_path_length);
1049 GNUNET_memcpy (&paths[bd->put_path_length],
1050 frc->get_path,
1051 sizeof(struct GNUNET_DHT_PathElement)
1052 * frc->get_path_length);
1053 GNUNET_memcpy (&paths[frc->get_path_length + bd->put_path_length],
1054 bd->data,
1055 bd->data_size);
1056 LOG (GNUNET_ERROR_TYPE_DEBUG,
1057 "Sending reply to query %s for client %p\n",
1058 GNUNET_h2s (query_hash),
1059 record->ch->client);
1060 GNUNET_MQ_send (record->ch->mq,
1061 env);
1062 if (GNUNET_YES == do_free)
1063 remove_client_query_record (record);
1064 return GNUNET_YES;
1065}
1066
1067
1068bool
1069GDS_CLIENTS_handle_reply (const struct GNUNET_DATACACHE_Block *bd,
1070 const struct GNUNET_HashCode *query_hash,
1071 unsigned int get_path_length,
1072 const struct GNUNET_DHT_PathElement *get_path)
1073{
1074 struct ForwardReplyContext frc;
1075 size_t msize = sizeof (struct GNUNET_DHT_ClientResultMessage)
1076 + bd->data_size
1077 + (get_path_length + bd->put_path_length)
1078 * sizeof(struct GNUNET_DHT_PathElement);
1079#if SANITY_CHECKS > 1
1080 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
1081#endif
1082
1083 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
1084 {
1085 GNUNET_break (0);
1086 return false;
1087 }
1088#if SANITY_CHECKS > 1
1089 if (0 !=
1090 GNUNET_DHT_verify_path (bd->data,
1091 bd->data_size,
1092 bd->expiration_time,
1093 truncated
1094 ? &bd->trunc_peer
1095 : NULL,
1096 bd->put_path,
1097 bd->put_path_length,
1098 get_path,
1099 get_path_length,
1100 &GDS_my_identity))
1101 {
1102 GNUNET_break (0);
1103 return false;
1104 }
1105#endif
1106 frc.bd = bd;
1107 frc.get_path = get_path;
1108 frc.get_path_length = get_path_length;
1109 LOG (GNUNET_ERROR_TYPE_DEBUG,
1110 "Forwarding reply for query hash %s with GPL %u and PPL %u to client\n",
1111 GNUNET_h2s (query_hash),
1112 get_path_length,
1113 bd->put_path_length);
1114 if (0 ==
1115 GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
1116 query_hash,
1117 &forward_reply,
1118 &frc))
1119 {
1120 LOG (GNUNET_ERROR_TYPE_DEBUG,
1121 "No matching client for reply for query %s\n",
1122 GNUNET_h2s (query_hash));
1123 GNUNET_STATISTICS_update (GDS_stats,
1124 "# REPLIES ignored for CLIENTS (no match)",
1125 1,
1126 GNUNET_NO);
1127 }
1128 return true;
1129}
1130
1131
1132/* **************** HELLO logic ***************** */
1133
1134/**
1135 * Handler for HELLO GET message. Reply to client
1136 * with a URL of our HELLO.
1137 *
1138 * @param cls the client we received this message from
1139 * @param msg the actual message received
1140 *
1141 */
1142static void
1143handle_dht_local_hello_get (void *cls,
1144 const struct GNUNET_MessageHeader *msg)
1145{
1146 struct ClientHandle *ch = cls;
1147 char *url = GNUNET_HELLO_builder_to_url (GDS_my_hello,
1148 &GDS_my_private_key);
1149 size_t slen = strlen (url) + 1;
1150 struct GNUNET_MessageHeader *hdr;
1151 struct GNUNET_MQ_Envelope *env;
1152
1153 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154 "Handling request from local client for my HELLO\n");
1155 env = GNUNET_MQ_msg_extra (hdr,
1156 slen,
1157 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_URL);
1158 memcpy (&hdr[1],
1159 url,
1160 slen);
1161 GNUNET_free (url);
1162 GNUNET_MQ_send (ch->mq,
1163 env);
1164 GNUNET_SERVICE_client_continue (ch->client);
1165}
1166
1167
1168/**
1169 * Process a client HELLO message received from the service.
1170 *
1171 * @param cls the client we received this message from
1172 * @param hdr HELLO URL message from the service.
1173 * @return #GNUNET_OK if @a hdr is well-formed
1174 */
1175static enum GNUNET_GenericReturnValue
1176check_dht_local_hello_offer (void *cls,
1177 const struct GNUNET_MessageHeader *hdr)
1178{
1179 uint16_t len = ntohs (hdr->size);
1180 const char *buf = (const char *) &hdr[1];
1181
1182 (void) cls;
1183 if ('\0' != buf[len - sizeof (*hdr) - 1])
1184 {
1185 GNUNET_break (0);
1186 return GNUNET_SYSERR;
1187 }
1188 return GNUNET_OK;
1189}
1190
1191
1192/**
1193 * Handler for HELLO OFFER message. Try to use the
1194 * HELLO to connect to another peer.
1195 *
1196 * @param cls the client we received this message from
1197 * @param msg the actual message received
1198 */
1199static void
1200handle_dht_local_hello_offer (void *cls,
1201 const struct GNUNET_MessageHeader *msg)
1202{
1203 struct ClientHandle *ch = cls;
1204 const char *url = (const char *) &msg[1];
1205 struct GNUNET_HELLO_Builder *b;
1206 struct GNUNET_PeerIdentity pid;
1207
1208 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1209 "Local client provided HELLO URL %s\n",
1210 url);
1211 b = GNUNET_HELLO_builder_from_url (url);
1212 if (NULL == b)
1213 {
1214 GNUNET_break (0);
1215 GNUNET_SERVICE_client_drop (ch->client);
1216 return;
1217 }
1218 GNUNET_SERVICE_client_continue (ch->client);
1219 GNUNET_HELLO_builder_iterate (b,
1220 &pid,
1221 &GDS_try_connect,
1222 &pid);
1223 GNUNET_HELLO_builder_free (b);
1224}
1225
1226
1227/* ************* logic for monitors ************** */
1228
1229
1230/**
1231 * Handler for monitor start messages
1232 *
1233 * @param cls the client we received this message from
1234 * @param msg the actual message received
1235 *
1236 */
1237static void
1238handle_dht_local_monitor (void *cls,
1239 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
1240{
1241 struct ClientHandle *ch = cls;
1242 struct ClientMonitorRecord *r;
1243
1244 r = GNUNET_new (struct ClientMonitorRecord);
1245 r->ch = ch;
1246 r->type = ntohl (msg->type);
1247 r->get = ntohs (msg->get);
1248 r->get_resp = ntohs (msg->get_resp);
1249 r->put = ntohs (msg->put);
1250 if (0 != ntohs (msg->filter_key))
1251 r->key = msg->key;
1252 GNUNET_CONTAINER_DLL_insert (monitor_head,
1253 monitor_tail,
1254 r);
1255 GNUNET_SERVICE_client_continue (ch->client);
1256}
1257
1258
1259/**
1260 * Handler for monitor stop messages
1261 *
1262 * @param cls the client we received this message from
1263 * @param msg the actual message received
1264 */
1265static void
1266handle_dht_local_monitor_stop (
1267 void *cls,
1268 const struct GNUNET_DHT_MonitorStartStopMessage *msg)
1269{
1270 struct ClientHandle *ch = cls;
1271
1272 GNUNET_SERVICE_client_continue (ch->client);
1273 for (struct ClientMonitorRecord *r = monitor_head;
1274 NULL != r;
1275 r = r->next)
1276 {
1277 bool keys_match;
1278
1279 keys_match =
1280 (GNUNET_is_zero (&r->key))
1281 ? (0 == ntohs (msg->filter_key))
1282 : ( (0 != ntohs (msg->filter_key)) &&
1283 (! GNUNET_memcmp (&r->key,
1284 &msg->key)) );
1285 if ( (ch == r->ch) &&
1286 (ntohl (msg->type) == r->type) &&
1287 (r->get == msg->get) &&
1288 (r->get_resp == msg->get_resp) &&
1289 (r->put == msg->put) &&
1290 keys_match)
1291 {
1292 GNUNET_CONTAINER_DLL_remove (monitor_head,
1293 monitor_tail,
1294 r);
1295 GNUNET_free (r);
1296 return; /* Delete only ONE entry */
1297 }
1298 }
1299}
1300
1301
1302/**
1303 * Function to call by #for_matching_monitors().
1304 *
1305 * @param cls closure
1306 * @param m a matching monitor
1307 */
1308typedef void
1309(*MonitorAction)(void *cls,
1310 struct ClientMonitorRecord *m);
1311
1312
1313/**
1314 * Call @a cb on all monitors that watch for blocks of @a type
1315 * and key @a key.
1316 *
1317 * @param type the type to match
1318 * @param key the key to match
1319 * @param cb function to call
1320 * @param cb_cls closure for @a cb
1321 */
1322static void
1323for_matching_monitors (enum GNUNET_BLOCK_Type type,
1324 const struct GNUNET_HashCode *key,
1325 MonitorAction cb,
1326 void *cb_cls)
1327{
1328 struct ClientHandle **cl = NULL;
1329 unsigned int cl_size = 0;
1330
1331 for (struct ClientMonitorRecord *m = monitor_head;
1332 NULL != m;
1333 m = m->next)
1334 {
1335 bool found = false;
1336
1337 if ( (GNUNET_BLOCK_TYPE_ANY != m->type) &&
1338 (m->type != type) )
1339 continue;
1340 if ( (! GNUNET_is_zero (&m->key)) &&
1341 (0 ==
1342 GNUNET_memcmp (key,
1343 &m->key)) )
1344 continue;
1345 /* Don't send duplicates */
1346 for (unsigned i = 0; i < cl_size; i++)
1347 if (cl[i] == m->ch)
1348 {
1349 found = true;
1350 break;
1351 }
1352 if (found)
1353 continue;
1354 GNUNET_array_append (cl,
1355 cl_size,
1356 m->ch);
1357 cb (cb_cls,
1358 m);
1359 }
1360 GNUNET_free (cl);
1361}
1362
1363
1364/**
1365 * Closure for #get_action();
1366 */
1367struct GetActionContext
1368{
1369 enum GNUNET_DHT_RouteOption options;
1370 enum GNUNET_BLOCK_Type type;
1371 uint32_t hop_count;
1372 uint32_t desired_replication_level;
1373 struct GNUNET_PeerIdentity trunc_peer;
1374 const struct GNUNET_HashCode *key;
1375};
1376
1377
1378/**
1379 * Function called on monitors that match a GET.
1380 * Sends the GET notification to the monitor.
1381 *
1382 * @param cls a `struct GetActionContext`
1383 * @param m a matching monitor
1384 */
1385static void
1386get_action (void *cls,
1387 struct ClientMonitorRecord *m)
1388{
1389 struct GetActionContext *gac = cls;
1390 struct GNUNET_MQ_Envelope *env;
1391 struct GNUNET_DHT_MonitorGetMessage *mmsg;
1392
1393 env = GNUNET_MQ_msg (mmsg,
1394 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
1395 mmsg->options = htonl (gac->options);
1396 mmsg->type = htonl (gac->type);
1397 mmsg->hop_count = htonl (gac->hop_count);
1398 mmsg->desired_replication_level = htonl (gac->desired_replication_level);
1399 mmsg->key = *gac->key;
1400 GNUNET_MQ_send (m->ch->mq,
1401 env);
1402}
1403
1404
1405void
1406GDS_CLIENTS_process_get (enum GNUNET_DHT_RouteOption options,
1407 enum GNUNET_BLOCK_Type type,
1408 uint32_t hop_count,
1409 uint32_t desired_replication_level,
1410 const struct GNUNET_HashCode *key)
1411{
1412 struct GetActionContext gac = {
1413 .options = options,
1414 .type = type,
1415 .hop_count = hop_count,
1416 .desired_replication_level = desired_replication_level,
1417 .key = key
1418 };
1419
1420 for_matching_monitors (type,
1421 key,
1422 &get_action,
1423 &gac);
1424}
1425
1426
1427/**
1428 * Closure for response_action().
1429 */
1430struct ResponseActionContext
1431{
1432 const struct GNUNET_DATACACHE_Block *bd;
1433 const struct GNUNET_DHT_PathElement *get_path;
1434 unsigned int get_path_length;
1435};
1436
1437
1438/**
1439 * Function called on monitors that match a response.
1440 * Sends the response notification to the monitor.
1441 *
1442 * @param cls a `struct ResponseActionContext`
1443 * @param m a matching monitor
1444 */
1445static void
1446response_action (void *cls,
1447 struct ClientMonitorRecord *m)
1448{
1449 const struct ResponseActionContext *resp_ctx = cls;
1450 const struct GNUNET_DATACACHE_Block *bd = resp_ctx->bd;
1451 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
1452 struct GNUNET_MQ_Envelope *env;
1453 struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
1454 struct GNUNET_DHT_PathElement *path;
1455 size_t msize;
1456
1457 msize = bd->data_size;
1458 msize += (resp_ctx->get_path_length + bd->put_path_length)
1459 * sizeof(struct GNUNET_DHT_PathElement);
1460 if (truncated)
1461 msize += sizeof (struct GNUNET_PeerIdentity);
1462 env = GNUNET_MQ_msg_extra (mmsg,
1463 msize,
1464 GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
1465 mmsg->type = htonl (bd->type);
1466 mmsg->put_path_length = htonl (bd->put_path_length);
1467 mmsg->get_path_length = htonl (resp_ctx->get_path_length);
1468 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1469 mmsg->key = bd->key;
1470 if (truncated)
1471 {
1472 void *tgt = &mmsg[1];
1473
1474 GNUNET_memcpy (tgt,
1475 &bd->trunc_peer,
1476 sizeof (struct GNUNET_PeerIdentity));
1477 path = (struct GNUNET_DHT_PathElement *)
1478 (tgt + sizeof (struct GNUNET_PeerIdentity));
1479 }
1480 else
1481 {
1482 path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1483 }
1484 GNUNET_memcpy (path,
1485 bd->put_path,
1486 bd->put_path_length * sizeof(struct GNUNET_DHT_PathElement));
1487 GNUNET_memcpy (path,
1488 resp_ctx->get_path,
1489 resp_ctx->get_path_length
1490 * sizeof(struct GNUNET_DHT_PathElement));
1491 GNUNET_memcpy (&path[resp_ctx->get_path_length],
1492 bd->data,
1493 bd->data_size);
1494 GNUNET_MQ_send (m->ch->mq,
1495 env);
1496}
1497
1498
1499void
1500GDS_CLIENTS_process_get_resp (const struct GNUNET_DATACACHE_Block *bd,
1501 const struct GNUNET_DHT_PathElement *get_path,
1502 unsigned int get_path_length)
1503{
1504 struct ResponseActionContext rac = {
1505 .bd = bd,
1506 .get_path = get_path,
1507 .get_path_length = get_path_length
1508 };
1509
1510 for_matching_monitors (bd->type,
1511 &bd->key,
1512 &response_action,
1513 &rac);
1514}
1515
1516
1517/**
1518 * Closure for put_action().
1519 */
1520struct PutActionContext
1521{
1522 const struct GNUNET_DATACACHE_Block *bd;
1523 uint32_t hop_count;
1524 uint32_t desired_replication_level;
1525};
1526
1527
1528/**
1529 * Function called on monitors that match a PUT.
1530 * Sends the PUT notification to the monitor.
1531 *
1532 * @param cls a `struct PutActionContext`
1533 * @param m a matching monitor
1534 */
1535static void
1536put_action (void *cls,
1537 struct ClientMonitorRecord *m)
1538{
1539 const struct PutActionContext *put_ctx = cls;
1540 const struct GNUNET_DATACACHE_Block *bd = put_ctx->bd;
1541 bool truncated = (0 != (bd->ro & GNUNET_DHT_RO_TRUNCATED));
1542 struct GNUNET_MQ_Envelope *env;
1543 struct GNUNET_DHT_MonitorPutMessage *mmsg;
1544 struct GNUNET_DHT_PathElement *msg_path;
1545 size_t msize;
1546
1547 msize = bd->data_size
1548 + bd->put_path_length
1549 * sizeof(struct GNUNET_DHT_PathElement);
1550 if (truncated)
1551 msize += sizeof (struct GNUNET_PeerIdentity);
1552 env = GNUNET_MQ_msg_extra (mmsg,
1553 msize,
1554 GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
1555 mmsg->options = htonl (bd->ro);
1556 mmsg->type = htonl (bd->type);
1557 mmsg->hop_count = htonl (put_ctx->hop_count);
1558 mmsg->desired_replication_level = htonl (put_ctx->desired_replication_level);
1559 mmsg->put_path_length = htonl (bd->put_path_length);
1560 mmsg->key = bd->key;
1561 mmsg->expiration_time = GNUNET_TIME_absolute_hton (bd->expiration_time);
1562 if (truncated)
1563 {
1564 void *tgt = &mmsg[1];
1565
1566 GNUNET_memcpy (tgt,
1567 &bd->trunc_peer,
1568 sizeof (struct GNUNET_PeerIdentity));
1569 msg_path = (struct GNUNET_DHT_PathElement *)
1570 (tgt + sizeof (struct GNUNET_PeerIdentity));
1571 }
1572 else
1573 {
1574 msg_path = (struct GNUNET_DHT_PathElement *) &mmsg[1];
1575 }
1576 GNUNET_memcpy (msg_path,
1577 bd->put_path,
1578 bd->put_path_length * sizeof(struct GNUNET_DHT_PathElement));
1579 GNUNET_memcpy (&msg_path[bd->put_path_length],
1580 bd->data,
1581 bd->data_size);
1582 GNUNET_MQ_send (m->ch->mq,
1583 env);
1584}
1585
1586
1587void
1588GDS_CLIENTS_process_put (const struct GNUNET_DATACACHE_Block *bd,
1589 uint32_t hop_count,
1590 uint32_t desired_replication_level)
1591{
1592 struct PutActionContext put_ctx = {
1593 .bd = bd,
1594 .hop_count = hop_count,
1595 .desired_replication_level = desired_replication_level
1596 };
1597
1598 for_matching_monitors (bd->type,
1599 &bd->key,
1600 &put_action,
1601 &put_ctx);
1602}
1603
1604
1605/* ********************** Initialization logic ***************** */
1606
1607
1608/**
1609 * Initialize client subsystem.
1610 */
1611static void
1612GDS_CLIENTS_init (void)
1613{
1614 forward_map
1615 = GNUNET_CONTAINER_multihashmap_create (1024,
1616 GNUNET_YES);
1617 retry_heap
1618 = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1619}
1620
1621
1622/**
1623 * Shutdown client subsystem.
1624 */
1625static void
1626GDS_CLIENTS_stop (void)
1627{
1628 if (NULL != retry_task)
1629 {
1630 GNUNET_SCHEDULER_cancel (retry_task);
1631 retry_task = NULL;
1632 }
1633}
1634
1635
1636/**
1637 * Define "main" method using service macro.
1638 *
1639 * @param name name of the service, like "dht" or "xdht"
1640 * @param run name of the initializaton method for the service
1641 */
1642#define GDS_DHT_SERVICE_INIT(name, run) \
1643 GNUNET_SERVICE_MAIN \
1644 (name, \
1645 GNUNET_SERVICE_OPTION_NONE, \
1646 run, \
1647 &client_connect_cb, \
1648 &client_disconnect_cb, \
1649 NULL, \
1650 GNUNET_MQ_hd_var_size (dht_local_put, \
1651 GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, \
1652 struct GNUNET_DHT_ClientPutMessage, \
1653 NULL), \
1654 GNUNET_MQ_hd_var_size (dht_local_get, \
1655 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, \
1656 struct GNUNET_DHT_ClientGetMessage, \
1657 NULL), \
1658 GNUNET_MQ_hd_fixed_size (dht_local_get_stop, \
1659 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, \
1660 struct GNUNET_DHT_ClientGetStopMessage, \
1661 NULL), \
1662 GNUNET_MQ_hd_fixed_size (dht_local_monitor, \
1663 GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, \
1664 struct GNUNET_DHT_MonitorStartStopMessage, \
1665 NULL), \
1666 GNUNET_MQ_hd_fixed_size (dht_local_monitor_stop, \
1667 GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, \
1668 struct GNUNET_DHT_MonitorStartStopMessage, \
1669 NULL), \
1670 GNUNET_MQ_hd_var_size (dht_local_get_result_seen, \
1671 GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, \
1672 struct GNUNET_DHT_ClientGetResultSeenMessage, \
1673 NULL), \
1674 GNUNET_MQ_hd_fixed_size (dht_local_hello_get, \
1675 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_GET, \
1676 struct GNUNET_MessageHeader, \
1677 NULL), \
1678 GNUNET_MQ_hd_var_size (dht_local_hello_offer, \
1679 GNUNET_MESSAGE_TYPE_DHT_CLIENT_HELLO_URL, \
1680 struct GNUNET_MessageHeader, \
1681 NULL), \
1682 GNUNET_MQ_handler_end ())
1683
1684
1685/**
1686 * MINIMIZE heap size (way below 128k) since this process doesn't need much.
1687 */
1688void __attribute__ ((destructor))
1689GDS_CLIENTS_done ()
1690{
1691 if (NULL != retry_heap)
1692 {
1693 GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
1694 GNUNET_CONTAINER_heap_destroy (retry_heap);
1695 retry_heap = NULL;
1696 }
1697 if (NULL != forward_map)
1698 {
1699 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
1700 GNUNET_CONTAINER_multihashmap_destroy (forward_map);
1701 forward_map = NULL;
1702 }
1703}
1704
1705
1706/* end of gnunet-service-dht_clients.c */