aboutsummaryrefslogtreecommitdiff
path: root/src/service/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/service/fs/gnunet-service-fs_pr.c1887
1 files changed, 1887 insertions, 0 deletions
diff --git a/src/service/fs/gnunet-service-fs_pr.c b/src/service/fs/gnunet-service-fs_pr.c
new file mode 100644
index 000000000..f192c017d
--- /dev/null
+++ b/src/service/fs/gnunet-service-fs_pr.c
@@ -0,0 +1,1887 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2009-2013 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file fs/gnunet-service-fs_pr.c
23 * @brief API to handle pending requests
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
28#include "gnunet_load_lib.h"
29#include "gnunet-service-fs.h"
30#include "gnunet-service-fs_cp.h"
31#include "gnunet-service-fs_indexing.h"
32#include "gnunet-service-fs_pe.h"
33#include "gnunet-service-fs_pr.h"
34#include "gnunet-service-fs_cadet.h"
35
36
37/**
38 * Desired replication level for GETs.
39 */
40#define DHT_GET_REPLICATION 5
41
42/**
43 * Maximum size of the datastore queue for P2P operations. Needs to
44 * be large enough to queue #MAX_QUEUE_PER_PEER operations for roughly
45 * the number of active (connected) peers.
46 */
47#define MAX_DATASTORE_QUEUE (16 * MAX_QUEUE_PER_PEER)
48
49/**
50 * Bandwidth value of a 0-priority content (must be fairly high
51 * compared to query since content is typically significantly larger
52 * -- and more valuable since it can take many queries to get one
53 * piece of content).
54 */
55#define CONTENT_BANDWIDTH_VALUE 800
56
57/**
58 * Hard limit on the number of results we may get from the datastore per query.
59 */
60#define MAX_RESULTS (100 * 1024)
61
62/**
63 * Collect an instance number of statistics? May cause excessive IPC.
64 */
65#define INSANE_STATISTICS GNUNET_NO
66
67/**
68 * If obtaining a block via cadet fails, how often do we retry it before
69 * giving up for good (and sticking to non-anonymous transfer)?
70 */
71#define CADET_RETRY_MAX 3
72
73
74/**
75 * An active request.
76 */
77struct GSF_PendingRequest
78{
79 /**
80 * Public data for the request.
81 */
82 struct GSF_PendingRequestData public_data;
83
84 /**
85 * Function to call if we encounter a reply.
86 */
87 GSF_PendingRequestReplyHandler rh;
88
89 /**
90 * Closure for @e rh
91 */
92 void *rh_cls;
93
94 /**
95 * Array of hash codes of replies we've already seen.
96 */
97 struct GNUNET_HashCode *replies_seen;
98
99 /**
100 * Block group for filtering replies we've already seen.
101 */
102 struct GNUNET_BLOCK_Group *bg;
103
104 /**
105 * Entry for this pending request in the expiration heap, or NULL.
106 */
107 struct GNUNET_CONTAINER_HeapNode *hnode;
108
109 /**
110 * Datastore queue entry for this request (or NULL for none).
111 */
112 struct GNUNET_DATASTORE_QueueEntry *qe;
113
114 /**
115 * DHT request handle for this request (or NULL for none).
116 */
117 struct GNUNET_DHT_GetHandle *gh;
118
119 /**
120 * Cadet request handle for this request (or NULL for none).
121 */
122 struct GSF_CadetRequest *cadet_request;
123
124 /**
125 * Function to call upon completion of the local get
126 * request, or NULL for none.
127 */
128 GSF_LocalLookupContinuation llc_cont;
129
130 /**
131 * Closure for @e llc_cont.
132 */
133 void *llc_cont_cls;
134
135 /**
136 * Last result from the local datastore lookup evaluation.
137 */
138 enum GNUNET_BLOCK_ReplyEvaluationResult local_result;
139
140 /**
141 * Identity of the peer that we should use for the 'sender'
142 * (recipient of the response) when forwarding (0 for none).
143 */
144 GNUNET_PEER_Id sender_pid;
145
146 /**
147 * Identity of the peer that we should never forward this query
148 * to since it originated this query (0 for none).
149 */
150 GNUNET_PEER_Id origin_pid;
151
152 /**
153 * Time we started the last datastore lookup.
154 */
155 struct GNUNET_TIME_Absolute qe_start;
156
157 /**
158 * Task that warns us if the local datastore lookup takes too long.
159 */
160 struct GNUNET_SCHEDULER_Task *warn_task;
161
162 /**
163 * Do we have a first UID yet?
164 */
165 bool have_first_uid;
166
167 /**
168 * Have we seen a NULL result yet?
169 */
170 bool seen_null;
171
172 /**
173 * Unique ID of the first result from the local datastore;
174 * used to terminate the loop.
175 */
176 uint64_t first_uid;
177
178 /**
179 * Result count.
180 */
181 size_t result_count;
182
183 /**
184 * How often have we retried this request via 'cadet'?
185 * (used to bound overall retries).
186 */
187 unsigned int cadet_retry_count;
188
189 /**
190 * Number of valid entries in the 'replies_seen' array.
191 */
192 unsigned int replies_seen_count;
193
194 /**
195 * Length of the 'replies_seen' array.
196 */
197 unsigned int replies_seen_size;
198};
199
200
201/**
202 * All pending requests, ordered by the query. Entries
203 * are of type 'struct GSF_PendingRequest*'.
204 */
205static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
206
207
208/**
209 * Datastore 'PUT' load tracking.
210 */
211static struct GNUNET_LOAD_Value *datastore_put_load;
212
213
214/**
215 * Are we allowed to migrate content to this peer.
216 */
217static int active_to_migration;
218
219
220/**
221 * Heap with the request that will expire next at the top. Contains
222 * pointers of type "struct PendingRequest*"; these will *also* be
223 * aliased from the "requests_by_peer" data structures and the
224 * "requests_by_query" table. Note that requests from our clients
225 * don't expire and are thus NOT in the "requests_by_expiration"
226 * (or the "requests_by_peer" tables).
227 */
228static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
229
230
231/**
232 * Maximum number of requests (from other peers, overall) that we're
233 * willing to have pending at any given point in time. Can be changed
234 * via the configuration file (32k is just the default).
235 */
236static unsigned long long max_pending_requests = (32 * 1024);
237
238
239/**
240 * Recalculate our bloom filter for filtering replies. This function
241 * will create a new bloom filter from scratch, so it should only be
242 * called if we have no bloomfilter at all (and hence can create a
243 * fresh one of minimal size without problems) OR if our peer is the
244 * initiator (in which case we may resize to larger than minimum size).
245 *
246 * @param type type of the request
247 * @param pr request for which the BF is to be recomputed
248 */
249static void
250refresh_bloomfilter (enum GNUNET_BLOCK_Type type,
251 struct GSF_PendingRequest *pr)
252{
253 if (NULL != pr->bg)
254 {
255 GNUNET_BLOCK_group_destroy (pr->bg);
256 pr->bg = NULL;
257 }
258 if (GNUNET_BLOCK_TYPE_FS_UBLOCK != type)
259 return; /* no need */
260 pr->bg =
261 GNUNET_BLOCK_group_create (GSF_block_ctx,
262 type,
263 NULL,
264 0,
265 "seen-set-size",
266 pr->replies_seen_count,
267 NULL);
268 if (NULL == pr->bg)
269 return;
270 GNUNET_break (GNUNET_OK ==
271 GNUNET_BLOCK_group_set_seen (pr->bg,
272 pr->replies_seen,
273 pr->replies_seen_count));
274}
275
276
277struct GSF_PendingRequest *
278GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
279 enum GNUNET_BLOCK_Type type,
280 const struct GNUNET_HashCode *query,
281 const struct GNUNET_PeerIdentity *target,
282 const char *bf_data,
283 size_t bf_size,
284 uint32_t anonymity_level,
285 uint32_t priority,
286 int32_t ttl,
287 GNUNET_PEER_Id sender_pid,
288 GNUNET_PEER_Id origin_pid,
289 const struct GNUNET_HashCode *replies_seen,
290 unsigned int replies_seen_count,
291 GSF_PendingRequestReplyHandler rh,
292 void *rh_cls)
293{
294 struct GSF_PendingRequest *pr;
295 struct GSF_PendingRequest *dpr;
296 size_t extra;
297 struct GNUNET_HashCode *eptr;
298
299 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
300 "Creating request handle for `%s' of type %d\n",
301 GNUNET_h2s (query),
302 type);
303#if INSANE_STATISTICS
304 GNUNET_STATISTICS_update (GSF_stats,
305 gettext_noop ("# Pending requests created"),
306 1,
307 GNUNET_NO);
308#endif
309 extra = 0;
310 if (NULL != target)
311 extra += sizeof(struct GNUNET_PeerIdentity);
312 pr = GNUNET_malloc (sizeof(struct GSF_PendingRequest) + extra);
313 pr->public_data.query = *query;
314 eptr = (struct GNUNET_HashCode *) &pr[1];
315 if (NULL != target)
316 {
317 pr->public_data.target = (struct GNUNET_PeerIdentity *) eptr;
318 GNUNET_memcpy (eptr, target, sizeof(struct GNUNET_PeerIdentity));
319 }
320 pr->public_data.anonymity_level = anonymity_level;
321 pr->public_data.priority = priority;
322 pr->public_data.original_priority = priority;
323 pr->public_data.options = options;
324 pr->public_data.type = type;
325 pr->public_data.start_time = GNUNET_TIME_absolute_get ();
326 pr->sender_pid = sender_pid;
327 pr->origin_pid = origin_pid;
328 pr->rh = rh;
329 pr->rh_cls = rh_cls;
330 GNUNET_assert ((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY)));
331 if (ttl >= 0)
332 pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (
333 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, (uint32_t) ttl));
334 else
335 pr->public_data.ttl = GNUNET_TIME_absolute_subtract (
336 pr->public_data.start_time,
337 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
338 (uint32_t) (-ttl)));
339 if (replies_seen_count > 0)
340 {
341 pr->replies_seen_size = replies_seen_count;
342 pr->replies_seen =
343 GNUNET_new_array (pr->replies_seen_size, struct GNUNET_HashCode);
344 GNUNET_memcpy (pr->replies_seen,
345 replies_seen,
346 replies_seen_count * sizeof(struct GNUNET_HashCode));
347 pr->replies_seen_count = replies_seen_count;
348 }
349 if ((NULL != bf_data) &&
350 (GNUNET_BLOCK_TYPE_FS_UBLOCK == pr->public_data.type))
351 {
352 pr->bg = GNUNET_BLOCK_group_create (GSF_block_ctx,
353 pr->public_data.type,
354 bf_data,
355 bf_size,
356 "seen-set-size",
357 0,
358 NULL);
359 }
360 else if ((replies_seen_count > 0) &&
361 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)))
362 {
363 refresh_bloomfilter (pr->public_data.type, pr);
364 }
365 GNUNET_CONTAINER_multihashmap_put (pr_map,
366 &pr->public_data.query,
367 pr,
368 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
369 if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES))
370 {
371 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
372 pr,
373 pr->public_data.ttl.abs_value_us);
374 /* make sure we don't track too many requests */
375 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
376 max_pending_requests)
377 {
378 dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
379 GNUNET_assert (NULL != dpr);
380 if (pr == dpr)
381 break; /* let the request live briefly... */
382 if (NULL != dpr->rh)
383 dpr->rh (dpr->rh_cls,
384 GNUNET_BLOCK_REPLY_TYPE_NOT_SUPPORTED,
385 dpr,
386 UINT32_MAX,
387 GNUNET_TIME_UNIT_FOREVER_ABS,
388 GNUNET_TIME_UNIT_FOREVER_ABS,
389 GNUNET_BLOCK_TYPE_ANY,
390 NULL,
391 0);
392 GSF_pending_request_cancel_ (dpr, GNUNET_YES);
393 }
394 }
395 GNUNET_STATISTICS_update (GSF_stats,
396 gettext_noop ("# Pending requests active"),
397 1,
398 GNUNET_NO);
399 return pr;
400}
401
402
403/**
404 * Obtain the public data associated with a pending request
405 *
406 * @param pr pending request
407 * @return associated public data
408 */
409struct GSF_PendingRequestData *
410GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
411{
412 return &pr->public_data;
413}
414
415
416/**
417 * Test if two pending requests are compatible (would generate
418 * the same query modulo filters and should thus be processed
419 * jointly).
420 *
421 * @param pra a pending request
422 * @param prb another pending request
423 * @return #GNUNET_OK if the requests are compatible
424 */
425int
426GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
427 struct GSF_PendingRequest *prb)
428{
429 if ((pra->public_data.type != prb->public_data.type) ||
430 (0 != memcmp (&pra->public_data.query,
431 &prb->public_data.query,
432 sizeof(struct GNUNET_HashCode))))
433 return GNUNET_NO;
434 return GNUNET_OK;
435}
436
437
438void
439GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
440 const struct GNUNET_HashCode *replies_seen,
441 unsigned int replies_seen_count)
442{
443 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
444 return; /* integer overflow */
445 if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
446 {
447 /* we're responsible for the BF, full refresh */
448 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
449 GNUNET_array_grow (pr->replies_seen,
450 pr->replies_seen_size,
451 replies_seen_count + pr->replies_seen_count);
452 GNUNET_memcpy (&pr->replies_seen[pr->replies_seen_count],
453 replies_seen,
454 sizeof(struct GNUNET_HashCode) * replies_seen_count);
455 pr->replies_seen_count += replies_seen_count;
456 refresh_bloomfilter (pr->public_data.type, pr);
457 }
458 else
459 {
460 if (NULL == pr->bg)
461 {
462 /* we're not the initiator, but the initiator did not give us
463 * any bloom-filter, so we need to create one on-the-fly */
464 refresh_bloomfilter (pr->public_data.type, pr);
465 }
466 else
467 {
468 GNUNET_break (GNUNET_OK ==
469 GNUNET_BLOCK_group_set_seen (pr->bg,
470 replies_seen,
471 pr->replies_seen_count));
472 }
473 }
474 if (NULL != pr->gh)
475 GNUNET_DHT_get_filter_known_results (pr->gh,
476 replies_seen_count,
477 replies_seen);
478}
479
480
481/**
482 * Generate the message corresponding to the given pending request for
483 * transmission to other peers.
484 *
485 * @param pr request to generate the message for
486 * @return envelope with the request message
487 */
488struct GNUNET_MQ_Envelope *
489GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
490{
491 struct GNUNET_MQ_Envelope *env;
492 struct GetMessage *gm;
493 struct GNUNET_PeerIdentity *ext;
494 unsigned int k;
495 uint32_t bm;
496 uint32_t prio;
497 size_t bf_size;
498 struct GNUNET_TIME_Absolute now;
499 int64_t ttl;
500 int do_route;
501 void *bf_data;
502
503 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
504 "Building request message for `%s' of type %d\n",
505 GNUNET_h2s (&pr->public_data.query),
506 pr->public_data.type);
507 k = 0;
508 bm = 0;
509 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
510 if ((! do_route) && (pr->sender_pid == 0))
511 {
512 GNUNET_break (0);
513 do_route = GNUNET_YES;
514 }
515 if (! do_route)
516 {
517 bm |= GET_MESSAGE_BIT_RETURN_TO;
518 k++;
519 }
520 if (NULL != pr->public_data.target)
521 {
522 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
523 k++;
524 }
525 if (GNUNET_OK !=
526 GNUNET_BLOCK_group_serialize (pr->bg,
527 &bf_data,
528 &bf_size))
529 {
530 bf_size = 0;
531 bf_data = NULL;
532 }
533 env = GNUNET_MQ_msg_extra (gm,
534 bf_size + k * sizeof(struct GNUNET_PeerIdentity),
535 GNUNET_MESSAGE_TYPE_FS_GET);
536 gm->type = htonl (pr->public_data.type);
537 if (do_route)
538 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
539 pr->public_data.priority + 1);
540 else
541 prio = 0;
542 pr->public_data.priority -= prio;
543 pr->public_data.num_transmissions++;
544 pr->public_data.respect_offered += prio;
545 gm->priority = htonl (prio);
546 now = GNUNET_TIME_absolute_get ();
547 ttl = (int64_t) (pr->public_data.ttl.abs_value_us - now.abs_value_us);
548 gm->ttl = htonl (ttl / 1000LL / 1000LL);
549 gm->reserved = htonl (0);
550 gm->hash_bitmap = htonl (bm);
551 gm->query = pr->public_data.query;
552 ext = (struct GNUNET_PeerIdentity *) &gm[1];
553 k = 0;
554 if (! do_route)
555 GNUNET_PEER_resolve (pr->sender_pid, &ext[k++]);
556 if (NULL != pr->public_data.target)
557 ext[k++] = *pr->public_data.target;
558 GNUNET_memcpy (&ext[k], bf_data, bf_size);
559 GNUNET_free (bf_data);
560 return env;
561}
562
563
564/**
565 * Iterator to free pending requests.
566 *
567 * @param cls closure, unused
568 * @param key current key code
569 * @param value value in the hash map (pending request)
570 * @return #GNUNET_YES (we should continue to iterate)
571 */
572static int
573clean_request (void *cls, const struct GNUNET_HashCode *key, void *value)
574{
575 struct GSF_PendingRequest *pr = value;
576 GSF_LocalLookupContinuation cont;
577
578 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
579 "Cleaning up pending request for `%s'.\n",
580 GNUNET_h2s (key));
581 if (NULL != pr->cadet_request)
582 {
583 pr->cadet_retry_count = CADET_RETRY_MAX;
584 GSF_cadet_query_cancel (pr->cadet_request);
585 pr->cadet_request = NULL;
586 }
587 if (NULL != (cont = pr->llc_cont))
588 {
589 pr->llc_cont = NULL;
590 cont (pr->llc_cont_cls,
591 pr,
592 pr->local_result);
593 }
594 GSF_plan_notify_request_done_ (pr);
595 GNUNET_free (pr->replies_seen);
596 GNUNET_BLOCK_group_destroy (pr->bg);
597 pr->bg = NULL;
598 GNUNET_PEER_change_rc (pr->sender_pid, -1);
599 pr->sender_pid = 0;
600 GNUNET_PEER_change_rc (pr->origin_pid, -1);
601 pr->origin_pid = 0;
602 if (NULL != pr->hnode)
603 {
604 GNUNET_CONTAINER_heap_remove_node (pr->hnode);
605 pr->hnode = NULL;
606 }
607 if (NULL != pr->qe)
608 {
609 GNUNET_DATASTORE_cancel (pr->qe);
610 pr->qe = NULL;
611 }
612 if (NULL != pr->gh)
613 {
614 GNUNET_DHT_get_stop (pr->gh);
615 pr->gh = NULL;
616 }
617 if (NULL != pr->warn_task)
618 {
619 GNUNET_SCHEDULER_cancel (pr->warn_task);
620 pr->warn_task = NULL;
621 }
622 GNUNET_assert (
623 GNUNET_OK ==
624 GNUNET_CONTAINER_multihashmap_remove (pr_map, &pr->public_data.query, pr));
625 GNUNET_STATISTICS_update (GSF_stats,
626 gettext_noop ("# Pending requests active"),
627 -1,
628 GNUNET_NO);
629 GNUNET_free (pr);
630 return GNUNET_YES;
631}
632
633
634/**
635 * Explicitly cancel a pending request.
636 *
637 * @param pr request to cancel
638 * @param full_cleanup fully purge the request
639 */
640void
641GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
642{
643 GSF_LocalLookupContinuation cont;
644
645 if (NULL == pr_map)
646 return; /* already cleaned up! */
647 if (GNUNET_NO == full_cleanup)
648 {
649 /* make request inactive (we're no longer interested in more results),
650 * but do NOT remove from our data-structures, we still need it there
651 * to prevent the request from looping */
652 pr->rh = NULL;
653 if (NULL != pr->cadet_request)
654 {
655 pr->cadet_retry_count = CADET_RETRY_MAX;
656 GSF_cadet_query_cancel (pr->cadet_request);
657 pr->cadet_request = NULL;
658 }
659 if (NULL != (cont = pr->llc_cont))
660 {
661 pr->llc_cont = NULL;
662 cont (pr->llc_cont_cls,
663 pr,
664 pr->local_result);
665 }
666 GSF_plan_notify_request_done_ (pr);
667 if (NULL != pr->qe)
668 {
669 GNUNET_DATASTORE_cancel (pr->qe);
670 pr->qe = NULL;
671 }
672 if (NULL != pr->gh)
673 {
674 GNUNET_DHT_get_stop (pr->gh);
675 pr->gh = NULL;
676 }
677 if (NULL != pr->warn_task)
678 {
679 GNUNET_SCHEDULER_cancel (pr->warn_task);
680 pr->warn_task = NULL;
681 }
682 return;
683 }
684 GNUNET_assert (GNUNET_YES ==
685 clean_request (NULL, &pr->public_data.query, pr));
686}
687
688
689void
690GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, void *cls)
691{
692 GNUNET_CONTAINER_multihashmap_iterate (
693 pr_map,
694 (GNUNET_CONTAINER_MultiHashMapIteratorCallback) it,
695 cls);
696}
697
698
699/**
700 * Closure for process_reply() function.
701 */
702struct ProcessReplyClosure
703{
704 /**
705 * The data for the reply.
706 */
707 const void *data;
708
709 /**
710 * Who gave us this reply? NULL for local host (or DHT)
711 */
712 struct GSF_ConnectedPeer *sender;
713
714 /**
715 * When the reply expires.
716 */
717 struct GNUNET_TIME_Absolute expiration;
718
719 /**
720 * Size of data.
721 */
722 size_t size;
723
724 /**
725 * Type of the block.
726 */
727 enum GNUNET_BLOCK_Type type;
728
729 /**
730 * How much was this reply worth to us?
731 */
732 uint32_t priority;
733
734 /**
735 * Anonymity requirements for this reply.
736 */
737 uint32_t anonymity_level;
738
739 /**
740 * Evaluation result (returned).
741 */
742 enum GNUNET_BLOCK_ReplyEvaluationResult eval;
743
744 /**
745 * Did we find a matching request?
746 */
747 int request_found;
748};
749
750
751/**
752 * Update the performance data for the sender (if any) since
753 * the sender successfully answered one of our queries.
754 *
755 * @param prq information about the sender
756 * @param pr request that was satisfied
757 */
758static void
759update_request_performance_data (struct ProcessReplyClosure *prq,
760 struct GSF_PendingRequest *pr)
761{
762 if (prq->sender == NULL)
763 return;
764 GSF_peer_update_performance_ (prq->sender,
765 pr->public_data.start_time,
766 prq->priority);
767}
768
769
770/**
771 * We have received a reply; handle it!
772 *
773 * @param cls response (a `struct ProcessReplyClosure`)
774 * @param key our query
775 * @param value value in the hash map (info about the query)
776 * @return #GNUNET_YES (we should continue to iterate)
777 */
778static enum GNUNET_GenericReturnValue
779process_reply (void *cls,
780 const struct GNUNET_HashCode *key,
781 void *value)
782{
783 struct ProcessReplyClosure *prq = cls;
784 struct GSF_PendingRequest *pr = value;
785 struct GNUNET_HashCode chash;
786 struct GNUNET_TIME_Absolute last_transmission;
787
788 if (NULL == pr->rh)
789 return GNUNET_YES;
790 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
791 "Matched result (type %u) for query `%s' with pending request\n",
792 (unsigned int) prq->type,
793 GNUNET_h2s (key));
794 GNUNET_STATISTICS_update (GSF_stats,
795 gettext_noop ("# replies received and matched"),
796 1,
797 GNUNET_NO);
798 prq->eval = GNUNET_BLOCK_check_reply (GSF_block_ctx,
799 prq->type,
800 pr->bg,
801 key,
802 NULL, 0,
803 prq->data,
804 prq->size);
805 switch (prq->eval)
806 {
807 case GNUNET_BLOCK_REPLY_OK_MORE:
808 update_request_performance_data (prq, pr);
809 break;
810 case GNUNET_BLOCK_REPLY_OK_LAST:
811 /* short cut: stop processing early, no BF-update, etc. */
812 update_request_performance_data (prq, pr);
813 GNUNET_LOAD_update (GSF_rt_entry_lifetime,
814 GNUNET_TIME_absolute_get_duration (
815 pr->public_data.start_time)
816 .rel_value_us);
817 if (GNUNET_YES !=
818 GSF_request_plan_reference_get_last_transmission_ (pr->public_data
819 .pr_head,
820 prq->sender,
821 &last_transmission))
822 last_transmission = GNUNET_TIME_UNIT_FOREVER_ABS;
823 /* pass on to other peers / local clients */
824 pr->rh (pr->rh_cls,
825 prq->eval,
826 pr,
827 prq->anonymity_level,
828 prq->expiration,
829 last_transmission,
830 prq->type,
831 prq->data,
832 prq->size);
833 return GNUNET_YES;
834 case GNUNET_BLOCK_REPLY_OK_DUPLICATE:
835#if INSANE_STATISTICS
836 GNUNET_STATISTICS_update (GSF_stats,
837 "# duplicate replies discarded (bloomfilter)",
838 1,
839 GNUNET_NO);
840#endif
841 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
842 "Duplicate response, discarding.\n");
843 return GNUNET_YES; /* duplicate */
844 case GNUNET_BLOCK_REPLY_IRRELEVANT:
845 GNUNET_STATISTICS_update (GSF_stats,
846 "# irrelevant replies discarded",
847 1,
848 GNUNET_NO);
849 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850 "Irrelevant response, ignoring.\n");
851 return GNUNET_YES;
852 case GNUNET_BLOCK_REPLY_TYPE_NOT_SUPPORTED:
853 GNUNET_break (0); /* bad installation? */
854 return GNUNET_NO;
855 }
856 /* update bloomfilter */
857 GNUNET_CRYPTO_hash (prq->data,
858 prq->size,
859 &chash);
860 GSF_pending_request_update_ (pr,
861 &chash,
862 1);
863 if (NULL == prq->sender)
864 {
865 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
866 "Found result for query `%s' in local datastore\n",
867 GNUNET_h2s (key));
868 GNUNET_STATISTICS_update (GSF_stats,
869 gettext_noop ("# results found locally"),
870 1,
871 GNUNET_NO);
872 }
873 else
874 {
875 GSF_dht_lookup_ (pr);
876 }
877 prq->priority += pr->public_data.original_priority;
878 pr->public_data.priority = 0;
879 pr->public_data.original_priority = 0;
880 pr->public_data.results_found++;
881 prq->request_found = GNUNET_YES;
882 /* finally, pass on to other peer / local client */
883 if (! GSF_request_plan_reference_get_last_transmission_ (pr->public_data
884 .pr_head,
885 prq->sender,
886 &last_transmission))
887 last_transmission = GNUNET_TIME_UNIT_FOREVER_ABS;
888 pr->rh (pr->rh_cls,
889 prq->eval,
890 pr,
891 prq->anonymity_level,
892 prq->expiration,
893 last_transmission,
894 prq->type,
895 prq->data,
896 prq->size);
897 return GNUNET_YES;
898}
899
900
901/**
902 * Context for put_migration_continuation().
903 */
904struct PutMigrationContext
905{
906 /**
907 * Start time for the operation.
908 */
909 struct GNUNET_TIME_Absolute start;
910
911 /**
912 * Request origin.
913 */
914 struct GNUNET_PeerIdentity origin;
915
916 /**
917 * #GNUNET_YES if we had a matching request for this block,
918 * #GNUNET_NO if not.
919 */
920 int requested;
921};
922
923
924/**
925 * Continuation called to notify client about result of the
926 * operation.
927 *
928 * @param cls closure
929 * @param success #GNUNET_SYSERR on failure
930 * @param min_expiration minimum expiration time required for content to be stored
931 * @param msg NULL on success, otherwise an error message
932 */
933static void
934put_migration_continuation (void *cls,
935 int success,
936 struct GNUNET_TIME_Absolute min_expiration,
937 const char *msg)
938{
939 struct PutMigrationContext *pmc = cls;
940 struct GSF_ConnectedPeer *cp;
941 struct GNUNET_TIME_Relative mig_pause;
942 struct GSF_PeerPerformanceData *ppd;
943
944 if (NULL != datastore_put_load)
945 {
946 if (GNUNET_SYSERR != success)
947 {
948 GNUNET_LOAD_update (datastore_put_load,
949 GNUNET_TIME_absolute_get_duration (pmc->start)
950 .rel_value_us);
951 }
952 else
953 {
954 /* on queue failure / timeout, increase the put load dramatically */
955 GNUNET_LOAD_update (datastore_put_load,
956 GNUNET_TIME_UNIT_MINUTES.rel_value_us);
957 }
958 }
959 cp = GSF_peer_get_ (&pmc->origin);
960 if (GNUNET_OK == success)
961 {
962 if (NULL != cp)
963 {
964 ppd = GSF_get_peer_performance_data_ (cp);
965 ppd->migration_delay.rel_value_us /= 2;
966 }
967 GNUNET_free (pmc);
968 return;
969 }
970 if ((GNUNET_NO == success) && (GNUNET_NO == pmc->requested) && (NULL != cp))
971 {
972 ppd = GSF_get_peer_performance_data_ (cp);
973 if (min_expiration.abs_value_us > 0)
974 {
975 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
976 "Asking to stop migration for %s because datastore is full\n",
977 GNUNET_STRINGS_relative_time_to_string (
978 GNUNET_TIME_absolute_get_remaining (min_expiration),
979 GNUNET_YES));
980 GSF_block_peer_migration_ (cp, min_expiration);
981 }
982 else
983 {
984 ppd->migration_delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_SECONDS,
985 ppd->migration_delay);
986 ppd->migration_delay =
987 GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS, ppd->migration_delay);
988 mig_pause.rel_value_us =
989 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
990 ppd->migration_delay.rel_value_us);
991 ppd->migration_delay =
992 GNUNET_TIME_relative_saturating_multiply (ppd->migration_delay, 2);
993 GNUNET_log (
994 GNUNET_ERROR_TYPE_DEBUG,
995 "Replicated content already exists locally, asking to stop migration for %s\n",
996 GNUNET_STRINGS_relative_time_to_string (mig_pause, GNUNET_YES));
997 GSF_block_peer_migration_ (cp,
998 GNUNET_TIME_relative_to_absolute (mig_pause));
999 }
1000 }
1001 GNUNET_free (pmc);
1002 GNUNET_STATISTICS_update (GSF_stats,
1003 gettext_noop ("# Datastore `PUT' failures"),
1004 1,
1005 GNUNET_NO);
1006}
1007
1008
1009/**
1010 * Test if the DATABASE (PUT) load on this peer is too high
1011 * to even consider processing the query at
1012 * all.
1013 *
1014 * @param priority the priority of the item
1015 * @return #GNUNET_YES if the load is too high to do anything (load high)
1016 * #GNUNET_NO to process normally (load normal or low)
1017 */
1018static int
1019test_put_load_too_high (uint32_t priority)
1020{
1021 double ld;
1022
1023 if (NULL == datastore_put_load)
1024 return GNUNET_NO;
1025 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
1026 return GNUNET_NO; /* very fast */
1027 ld = GNUNET_LOAD_get_load (datastore_put_load);
1028 if (ld < 2.0 * (1 + priority))
1029 return GNUNET_NO;
1030 GNUNET_STATISTICS_update (GSF_stats,
1031 gettext_noop (
1032 "# storage requests dropped due to high load"),
1033 1,
1034 GNUNET_NO);
1035 return GNUNET_YES;
1036}
1037
1038
1039/**
1040 * Iterator called on each result obtained for a DHT
1041 * operation that expects a reply
1042 *
1043 * @param cls closure
1044 * @param exp when will this value expire
1045 * @param key key of the result
1046 * @param trunc_peer truncated peer, NULL for none
1047 * @param get_path peers on reply path (or NULL if not recorded)
1048 * @param get_path_length number of entries in @a get_path
1049 * @param put_path peers on the PUT path (or NULL if not recorded)
1050 * @param put_path_length number of entries in @a get_path
1051 * @param type type of the result
1052 * @param size number of bytes in @a data
1053 * @param data pointer to the result data
1054 */
1055static void
1056handle_dht_reply (void *cls,
1057 struct GNUNET_TIME_Absolute exp,
1058 const struct GNUNET_HashCode *key,
1059 const struct GNUNET_PeerIdentity *trunc_peer,
1060 const struct GNUNET_DHT_PathElement *get_path,
1061 unsigned int get_path_length,
1062 const struct GNUNET_DHT_PathElement *put_path,
1063 unsigned int put_path_length,
1064 enum GNUNET_BLOCK_Type type,
1065 size_t size,
1066 const void *data)
1067{
1068 struct GSF_PendingRequest *pr = cls;
1069 struct ProcessReplyClosure prq;
1070 struct PutMigrationContext *pmc;
1071
1072 GNUNET_STATISTICS_update (GSF_stats,
1073 gettext_noop ("# Replies received from DHT"),
1074 1,
1075 GNUNET_NO);
1076 memset (&prq, 0, sizeof(prq));
1077 prq.data = data;
1078 prq.expiration = exp;
1079 /* do not allow migrated content to live longer than 1 year */
1080 prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (
1081 GNUNET_TIME_UNIT_YEARS),
1082 prq.expiration);
1083 prq.size = size;
1084 prq.type = type;
1085 process_reply (&prq,
1086 key,
1087 pr);
1088 if ((GNUNET_YES == active_to_migration) &&
1089 (GNUNET_NO == test_put_load_too_high (prq.priority)))
1090 {
1091 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1092 "Replicating result for query `%s' with priority %u\n",
1093 GNUNET_h2s (key),
1094 prq.priority);
1095 pmc = GNUNET_new (struct PutMigrationContext);
1096 pmc->start = GNUNET_TIME_absolute_get ();
1097 pmc->requested = GNUNET_YES;
1098 if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1099 0,
1100 key,
1101 size,
1102 data,
1103 type,
1104 prq.priority,
1105 1 /* anonymity */,
1106 0 /* replication */,
1107 exp,
1108 1 + prq.priority,
1109 MAX_DATASTORE_QUEUE,
1110 &put_migration_continuation,
1111 pmc))
1112 {
1113 put_migration_continuation (pmc,
1114 GNUNET_SYSERR,
1115 GNUNET_TIME_UNIT_ZERO_ABS,
1116 NULL);
1117 }
1118 }
1119}
1120
1121
1122/**
1123 * Consider looking up the data in the DHT (anonymity-level permitting).
1124 *
1125 * @param pr the pending request to process
1126 */
1127void
1128GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1129{
1130 const void *xquery;
1131 size_t xquery_size;
1132 struct GNUNET_PeerIdentity pi;
1133 char buf[sizeof(struct GNUNET_HashCode) * 2] GNUNET_ALIGN;
1134
1135 if (0 != pr->public_data.anonymity_level)
1136 return;
1137 if (NULL != pr->gh)
1138 {
1139 GNUNET_DHT_get_stop (pr->gh);
1140 pr->gh = NULL;
1141 }
1142 xquery = NULL;
1143 xquery_size = 0;
1144 if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
1145 {
1146 GNUNET_assert (0 != pr->sender_pid);
1147 GNUNET_PEER_resolve (pr->sender_pid, &pi);
1148 GNUNET_memcpy (&buf[xquery_size], &pi, sizeof(struct GNUNET_PeerIdentity));
1149 xquery_size += sizeof(struct GNUNET_PeerIdentity);
1150 }
1151 pr->gh = GNUNET_DHT_get_start (GSF_dht,
1152 pr->public_data.type,
1153 &pr->public_data.query,
1154 DHT_GET_REPLICATION,
1155 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1156 xquery,
1157 xquery_size,
1158 &handle_dht_reply,
1159 pr);
1160 if ((NULL != pr->gh) && (0 != pr->replies_seen_count))
1161 GNUNET_DHT_get_filter_known_results (pr->gh,
1162 pr->replies_seen_count,
1163 pr->replies_seen);
1164}
1165
1166
1167/**
1168 * Function called with a reply from the cadet.
1169 *
1170 * @param cls the pending request struct
1171 * @param type type of the block, ANY on error
1172 * @param expiration expiration time for the block
1173 * @param data_size number of bytes in @a data, 0 on error
1174 * @param data reply block data, NULL on error
1175 */
1176static void
1177cadet_reply_proc (void *cls,
1178 enum GNUNET_BLOCK_Type type,
1179 struct GNUNET_TIME_Absolute expiration,
1180 size_t data_size,
1181 const void *data)
1182{
1183 struct GSF_PendingRequest *pr = cls;
1184 struct ProcessReplyClosure prq;
1185 struct GNUNET_HashCode query;
1186
1187 pr->cadet_request = NULL;
1188 if (GNUNET_OK !=
1189 GNUNET_BLOCK_check_block (GSF_block_ctx,
1190 type,
1191 data,
1192 data_size))
1193 {
1194 GNUNET_break_op (0);
1195 return;
1196 }
1197 if (GNUNET_BLOCK_TYPE_ANY == type)
1198 {
1199 GNUNET_break (NULL == data);
1200 GNUNET_break (0 == data_size);
1201 pr->cadet_retry_count++;
1202 if (pr->cadet_retry_count >= CADET_RETRY_MAX)
1203 return; /* give up on cadet */
1204 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Error retrieiving block via cadet\n");
1205 /* retry -- without delay, as this is non-anonymous
1206 and cadet/cadet connect will take some time anyway */
1207 pr->cadet_request = GSF_cadet_query (pr->public_data.target,
1208 &pr->public_data.query,
1209 pr->public_data.type,
1210 &cadet_reply_proc,
1211 pr);
1212 return;
1213 }
1214 if (GNUNET_YES !=
1215 GNUNET_BLOCK_get_key (GSF_block_ctx,
1216 type,
1217 data,
1218 data_size,
1219 &query))
1220 {
1221 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222 "Failed to derive key for block of type %d\n",
1223 (int) type);
1224 GNUNET_break_op (0);
1225 return;
1226 }
1227 GNUNET_STATISTICS_update (GSF_stats,
1228 gettext_noop ("# Replies received from CADET"),
1229 1,
1230 GNUNET_NO);
1231 memset (&prq, 0, sizeof(prq));
1232 prq.data = data;
1233 prq.expiration = expiration;
1234 /* do not allow migrated content to live longer than 1 year */
1235 prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (
1236 GNUNET_TIME_UNIT_YEARS),
1237 prq.expiration);
1238 prq.size = data_size;
1239 prq.type = type;
1240 process_reply (&prq,
1241 &query,
1242 pr);
1243}
1244
1245
1246/**
1247 * Consider downloading via cadet (if possible)
1248 *
1249 * @param pr the pending request to process
1250 */
1251void
1252GSF_cadet_lookup_ (struct GSF_PendingRequest *pr)
1253{
1254 if (0 != pr->public_data.anonymity_level)
1255 return;
1256 if (0 == pr->public_data.target)
1257 {
1258 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259 "Cannot do cadet-based download, target peer not known\n");
1260 return;
1261 }
1262 if (NULL != pr->cadet_request)
1263 return;
1264 pr->cadet_request = GSF_cadet_query (pr->public_data.target,
1265 &pr->public_data.query,
1266 pr->public_data.type,
1267 &cadet_reply_proc,
1268 pr);
1269}
1270
1271
1272/**
1273 * Task that issues a warning if the datastore lookup takes too long.
1274 *
1275 * @param cls the `struct GSF_PendingRequest`
1276 */
1277static void
1278warn_delay_task (void *cls)
1279{
1280 struct GSF_PendingRequest *pr = cls;
1281
1282 GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1283 _ ("Datastore lookup already took %s!\n"),
1284 GNUNET_STRINGS_relative_time_to_string (
1285 GNUNET_TIME_absolute_get_duration (pr->qe_start),
1286 GNUNET_YES));
1287 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1288 &warn_delay_task,
1289 pr);
1290}
1291
1292
1293/**
1294 * Task that issues a warning if the datastore lookup takes too long.
1295 *
1296 * @param cls the `struct GSF_PendingRequest`
1297 */
1298static void
1299odc_warn_delay_task (void *cls)
1300{
1301 struct GSF_PendingRequest *pr = cls;
1302
1303 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1304 _ ("On-demand lookup already took %s!\n"),
1305 GNUNET_STRINGS_relative_time_to_string (
1306 GNUNET_TIME_absolute_get_duration (pr->qe_start),
1307 GNUNET_YES));
1308 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1309 &odc_warn_delay_task,
1310 pr);
1311}
1312
1313
1314/* Call our continuation (if we have any) */
1315static void
1316call_continuation (struct GSF_PendingRequest *pr)
1317{
1318 GSF_LocalLookupContinuation cont = pr->llc_cont;
1319
1320 GNUNET_assert (NULL == pr->qe);
1321 if (NULL != pr->warn_task)
1322 {
1323 GNUNET_SCHEDULER_cancel (pr->warn_task);
1324 pr->warn_task = NULL;
1325 }
1326 if (NULL == cont)
1327 return; /* no continuation */
1328 pr->llc_cont = NULL;
1329 if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1330 {
1331 if (GNUNET_BLOCK_REPLY_OK_LAST != pr->local_result)
1332 {
1333 /* Signal that we are done and that there won't be any
1334 additional results to allow client to clean up state. */
1335 pr->rh (pr->rh_cls,
1336 GNUNET_BLOCK_REPLY_OK_LAST,
1337 pr,
1338 UINT32_MAX,
1339 GNUNET_TIME_UNIT_ZERO_ABS,
1340 GNUNET_TIME_UNIT_FOREVER_ABS,
1341 GNUNET_BLOCK_TYPE_ANY,
1342 NULL,
1343 0);
1344 }
1345 /* Finally, call our continuation to signal that we are
1346 done with local processing of this request; i.e. to
1347 start reading again from the client. */
1348 cont (pr->llc_cont_cls,
1349 NULL,
1350 GNUNET_BLOCK_REPLY_OK_LAST);
1351 return;
1352 }
1353
1354 cont (pr->llc_cont_cls,
1355 pr,
1356 pr->local_result);
1357}
1358
1359
1360/* Update stats and call continuation */
1361static void
1362no_more_local_results (struct GSF_PendingRequest *pr)
1363{
1364 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1365 "No further local responses available.\n");
1366#if INSANE_STATISTICS
1367 if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == pr->public_data.type) ||
1368 (GNUNET_BLOCK_TYPE_FS_IBLOCK == pr->public_data.type))
1369 GNUNET_STATISTICS_update (GSF_stats,
1370 gettext_noop (
1371 "# requested DBLOCK or IBLOCK not found"),
1372 1,
1373 GNUNET_NO);
1374#endif
1375 call_continuation (pr);
1376}
1377
1378
1379/* forward declaration */
1380static void
1381process_local_reply (void *cls,
1382 const struct GNUNET_HashCode *key,
1383 size_t size,
1384 const void *data,
1385 enum GNUNET_BLOCK_Type type,
1386 uint32_t priority,
1387 uint32_t anonymity,
1388 uint32_t replication,
1389 struct GNUNET_TIME_Absolute expiration,
1390 uint64_t uid);
1391
1392
1393/* Start a local query */
1394static void
1395start_local_query (struct GSF_PendingRequest *pr,
1396 uint64_t next_uid,
1397 bool random)
1398{
1399 pr->qe_start = GNUNET_TIME_absolute_get ();
1400 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1401 &warn_delay_task,
1402 pr);
1403 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1404 next_uid,
1405 random,
1406 &pr->public_data.query,
1407 pr->public_data.type ==
1408 GNUNET_BLOCK_TYPE_FS_DBLOCK
1409 ? GNUNET_BLOCK_TYPE_ANY
1410 : pr->public_data.type,
1411 (0 != (GSF_PRO_PRIORITY_UNLIMITED
1412 & pr->public_data.options))
1413 ? UINT_MAX
1414 : 1
1415 /* queue priority */,
1416 (0 != (GSF_PRO_PRIORITY_UNLIMITED
1417 & pr->public_data.options))
1418 ? UINT_MAX
1419 : GSF_datastore_queue_size
1420 /* max queue size */,
1421 &process_local_reply,
1422 pr);
1423 if (NULL != pr->qe)
1424 return;
1425 GNUNET_log (
1426 GNUNET_ERROR_TYPE_DEBUG,
1427 "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n",
1428 GNUNET_h2s (&pr->public_data.query),
1429 pr->public_data.type,
1430 (unsigned long long) next_uid);
1431 GNUNET_STATISTICS_update (GSF_stats,
1432 gettext_noop (
1433 "# Datastore lookups concluded (error queueing)"),
1434 1,
1435 GNUNET_NO);
1436 call_continuation (pr);
1437}
1438
1439
1440/**
1441 * We're processing (local) results for a search request
1442 * from another peer. Pass applicable results to the
1443 * peer and if we are done either clean up (operation
1444 * complete) or forward to other peers (more results possible).
1445 *
1446 * @param cls our closure (`struct GSF_PendingRequest *`)
1447 * @param key key for the content
1448 * @param size number of bytes in @a data
1449 * @param data content stored
1450 * @param type type of the content
1451 * @param priority priority of the content
1452 * @param anonymity anonymity-level for the content
1453 * @param replication replication-level for the content
1454 * @param expiration expiration time for the content
1455 * @param uid unique identifier for the datum;
1456 * maybe 0 if no unique identifier is available
1457 */
1458static void
1459process_local_reply (void *cls,
1460 const struct GNUNET_HashCode *key,
1461 size_t size,
1462 const void *data,
1463 enum GNUNET_BLOCK_Type type,
1464 uint32_t priority,
1465 uint32_t anonymity,
1466 uint32_t replication,
1467 struct GNUNET_TIME_Absolute expiration,
1468 uint64_t uid)
1469{
1470 struct GSF_PendingRequest *pr = cls;
1471 struct ProcessReplyClosure prq;
1472 struct GNUNET_HashCode query;
1473 unsigned int old_rf;
1474
1475 GNUNET_SCHEDULER_cancel (pr->warn_task);
1476 pr->warn_task = NULL;
1477 if (NULL == pr->qe)
1478 goto called_from_on_demand;
1479 pr->qe = NULL;
1480 if (
1481 (NULL == key) && pr->seen_null &&
1482 ! pr->have_first_uid) /* We have hit the end for the 2nd time with no results */
1483 {
1484 /* No results */
1485#if INSANE_STATISTICS
1486 GNUNET_STATISTICS_update (GSF_stats,
1487 gettext_noop (
1488 "# Datastore lookups concluded (no results)"),
1489 1,
1490 GNUNET_NO);
1491#endif
1492 no_more_local_results (pr);
1493 return;
1494 }
1495 if (((NULL == key) &&
1496 pr->seen_null) || /* We have hit the end for the 2nd time OR */
1497 (pr->seen_null && pr->have_first_uid &&
1498 (uid >= pr->first_uid))) /* We have hit the end and past first UID */
1499 {
1500 /* Seen all results */
1501 GNUNET_STATISTICS_update (GSF_stats,
1502 gettext_noop (
1503 "# Datastore lookups concluded (seen all)"),
1504 1,
1505 GNUNET_NO);
1506 no_more_local_results (pr);
1507 return;
1508 }
1509 if (NULL == key)
1510 {
1511 GNUNET_assert (! pr->seen_null);
1512 pr->seen_null = true;
1513 start_local_query (pr, 0 /* next_uid */, false /* random */);
1514 return;
1515 }
1516 if (! pr->have_first_uid)
1517 {
1518 pr->first_uid = uid;
1519 pr->have_first_uid = true;
1520 }
1521 pr->result_count++;
1522 if (pr->result_count > MAX_RESULTS)
1523 {
1524 GNUNET_STATISTICS_update (
1525 GSF_stats,
1526 gettext_noop ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1527 1,
1528 GNUNET_NO);
1529 no_more_local_results (pr);
1530 return;
1531 }
1532 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1533 "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1534 GNUNET_h2s (key),
1535 type,
1536 (unsigned long long) uid);
1537 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1538 {
1539 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1540 "Found ONDEMAND block, performing on-demand encoding\n");
1541 GNUNET_STATISTICS_update (GSF_stats,
1542 gettext_noop (
1543 "# on-demand blocks matched requests"),
1544 1,
1545 GNUNET_NO);
1546 pr->qe_start = GNUNET_TIME_absolute_get ();
1547 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1548 &odc_warn_delay_task,
1549 pr);
1550 if (GNUNET_OK == GNUNET_FS_handle_on_demand_block (key,
1551 size,
1552 data,
1553 type,
1554 priority,
1555 anonymity,
1556 replication,
1557 expiration,
1558 uid,
1559 &process_local_reply,
1560 pr))
1561 {
1562 GNUNET_STATISTICS_update (GSF_stats,
1563 gettext_noop (
1564 "# on-demand lookups performed successfully"),
1565 1,
1566 GNUNET_NO);
1567 return; /* we're done */
1568 }
1569 GNUNET_STATISTICS_update (GSF_stats,
1570 gettext_noop ("# on-demand lookups failed"),
1571 1,
1572 GNUNET_NO);
1573 GNUNET_SCHEDULER_cancel (pr->warn_task);
1574 start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1575 return;
1576 }
1577called_from_on_demand:
1578 old_rf = pr->public_data.results_found;
1579 memset (&prq, 0, sizeof(prq));
1580 prq.data = data;
1581 prq.expiration = expiration;
1582 prq.size = size;
1583 if (GNUNET_OK !=
1584 GNUNET_BLOCK_get_key (GSF_block_ctx,
1585 type,
1586 data,
1587 size,
1588 &query))
1589 {
1590 GNUNET_break (0);
1591 GNUNET_DATASTORE_remove (GSF_dsh,
1592 key,
1593 size,
1594 data,
1595 UINT_MAX,
1596 UINT_MAX,
1597 NULL,
1598 NULL);
1599 start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1600 return;
1601 }
1602 prq.type = type;
1603 prq.priority = priority;
1604 prq.request_found = GNUNET_NO;
1605 prq.anonymity_level = anonymity;
1606 if ((0 == old_rf) && (0 == pr->public_data.results_found))
1607 GSF_update_datastore_delay_ (pr->public_data.start_time);
1608 process_reply (&prq,
1609 key,
1610 pr);
1611 pr->local_result = prq.eval;
1612 if (GNUNET_BLOCK_REPLY_OK_LAST == prq.eval)
1613 {
1614 GNUNET_STATISTICS_update (
1615 GSF_stats,
1616 gettext_noop ("# Datastore lookups concluded (found last result)"),
1617 1,
1618 GNUNET_NO);
1619 call_continuation (pr);
1620 return;
1621 }
1622 if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1623 ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
1624 (pr->public_data.results_found > 5 + 2 * pr->public_data.priority)))
1625 {
1626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n");
1627 GNUNET_STATISTICS_update (GSF_stats,
1628 gettext_noop (
1629 "# Datastore lookups concluded (load too high)"),
1630 1,
1631 GNUNET_NO);
1632 call_continuation (pr);
1633 return;
1634 }
1635 start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1636}
1637
1638
1639/**
1640 * Is the given target a legitimate peer for forwarding the given request?
1641 *
1642 * @param pr request
1643 * @param target
1644 * @return #GNUNET_YES if this request could be forwarded to the given peer
1645 */
1646int
1647GSF_pending_request_test_target_ (struct GSF_PendingRequest *pr,
1648 const struct GNUNET_PeerIdentity *target)
1649{
1650 struct GNUNET_PeerIdentity pi;
1651
1652 if (0 == pr->origin_pid)
1653 return GNUNET_YES;
1654 GNUNET_PEER_resolve (pr->origin_pid, &pi);
1655 return (0 == memcmp (&pi, target, sizeof(struct GNUNET_PeerIdentity)))
1656 ? GNUNET_NO
1657 : GNUNET_YES;
1658}
1659
1660
1661/**
1662 * Look up the request in the local datastore.
1663 *
1664 * @param pr the pending request to process
1665 * @param cont function to call at the end
1666 * @param cont_cls closure for @a cont
1667 */
1668void
1669GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1670 GSF_LocalLookupContinuation cont,
1671 void *cont_cls)
1672{
1673 GNUNET_assert (NULL == pr->gh);
1674 GNUNET_assert (NULL == pr->cadet_request);
1675 GNUNET_assert (NULL == pr->llc_cont);
1676 pr->llc_cont = cont;
1677 pr->llc_cont_cls = cont_cls;
1678#if INSANE_STATISTICS
1679 GNUNET_STATISTICS_update (GSF_stats,
1680 gettext_noop ("# Datastore lookups initiated"),
1681 1,
1682 GNUNET_NO);
1683#endif
1684 start_local_query (pr, 0 /* next_uid */, true /* random */);
1685}
1686
1687
1688/**
1689 * Handle P2P "CONTENT" message. Checks that the message is
1690 * well-formed and then checks if there are any pending requests for
1691 * this content and possibly passes it on (to local clients or other
1692 * peers). Does NOT perform migration (content caching at this peer).
1693 *
1694 * @param cls the other peer involved
1695 * @param put the actual message
1696 */
1697void
1698handle_p2p_put (void *cls,
1699 const struct PutMessage *put)
1700{
1701 struct GSF_ConnectedPeer *cp = cls;
1702 uint16_t msize;
1703 size_t dsize;
1704 enum GNUNET_BLOCK_Type type;
1705 struct GNUNET_TIME_Absolute expiration;
1706 struct GNUNET_HashCode query;
1707 struct ProcessReplyClosure prq;
1708 struct GNUNET_TIME_Relative block_time;
1709 double putl;
1710 struct PutMigrationContext *pmc;
1711
1712 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1713 "Received P2P PUT from %s\n",
1714 GNUNET_i2s (GSF_get_peer_performance_data_ (cp)->peer));
1715 GSF_cover_content_count++;
1716 msize = ntohs (put->header.size);
1717 dsize = msize - sizeof(struct PutMessage);
1718 type = ntohl (put->type);
1719 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1720 /* do not allow migrated content to live longer than 1 year */
1721 expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (
1722 GNUNET_TIME_UNIT_YEARS),
1723 expiration);
1724 if (GNUNET_OK !=
1725 GNUNET_BLOCK_check_block (GSF_block_ctx,
1726 type,
1727 &put[1],
1728 dsize))
1729 {
1730 GNUNET_break_op (0);
1731 return;
1732 }
1733 if (GNUNET_OK !=
1734 GNUNET_BLOCK_get_key (GSF_block_ctx,
1735 type,
1736 &put[1],
1737 dsize,
1738 &query))
1739 {
1740 GNUNET_break_op (0);
1741 return;
1742 }
1743 GNUNET_STATISTICS_update (GSF_stats,
1744 gettext_noop ("# GAP PUT messages received"),
1745 1,
1746 GNUNET_NO);
1747 /* now, lookup 'query' */
1748 prq.data = (const void *) &put[1];
1749 prq.sender = cp;
1750 prq.size = dsize;
1751 prq.type = type;
1752 prq.expiration = expiration;
1753 prq.priority = 0;
1754 prq.anonymity_level = UINT32_MAX;
1755 prq.request_found = GNUNET_NO;
1756 GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
1757 &query,
1758 &process_reply,
1759 &prq);
1760 if (NULL != cp)
1761 {
1762 GSF_connected_peer_change_preference_ (cp,
1763 CONTENT_BANDWIDTH_VALUE
1764 + 1000 * prq.priority);
1765 GSF_get_peer_performance_data_ (cp)->respect += prq.priority;
1766 }
1767 if ((GNUNET_YES == active_to_migration) && (NULL != cp) &&
1768 (GNUNET_NO == test_put_load_too_high (prq.priority)))
1769 {
1770 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1771 "Replicating result for query `%s' with priority %u\n",
1772 GNUNET_h2s (&query),
1773 prq.priority);
1774 pmc = GNUNET_new (struct PutMigrationContext);
1775 pmc->start = GNUNET_TIME_absolute_get ();
1776 pmc->requested = prq.request_found;
1777 GNUNET_assert (0 != GSF_get_peer_performance_data_ (cp)->pid);
1778 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
1779 &pmc->origin);
1780 if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1781 0,
1782 &query,
1783 dsize,
1784 &put[1],
1785 type,
1786 prq.priority,
1787 1 /* anonymity */,
1788 0 /* replication */,
1789 expiration,
1790 1 + prq.priority,
1791 MAX_DATASTORE_QUEUE,
1792 &put_migration_continuation,
1793 pmc))
1794 {
1795 put_migration_continuation (pmc,
1796 GNUNET_SYSERR,
1797 GNUNET_TIME_UNIT_ZERO_ABS,
1798 NULL);
1799 }
1800 }
1801 else if (NULL != cp)
1802 {
1803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1804 "Choosing not to keep content `%s' (%d/%d)\n",
1805 GNUNET_h2s (&query),
1806 active_to_migration,
1807 test_put_load_too_high (prq.priority));
1808 }
1809 putl = GNUNET_LOAD_get_load (datastore_put_load);
1810 if ((NULL != cp) && (GNUNET_NO == prq.request_found) &&
1811 ((GNUNET_YES != active_to_migration) ||
1812 (putl > 2.5 * (1 + prq.priority))))
1813 {
1814 if (GNUNET_YES != active_to_migration)
1815 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
1816 block_time = GNUNET_TIME_relative_multiply (
1817 GNUNET_TIME_UNIT_MILLISECONDS,
1818 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1819 (unsigned int) (60000 * putl * putl)));
1820 GNUNET_log (
1821 GNUNET_ERROR_TYPE_DEBUG,
1822 "Asking to stop migration for %s because of load %f and events %d/%d\n",
1823 GNUNET_STRINGS_relative_time_to_string (block_time, GNUNET_YES),
1824 putl,
1825 active_to_migration,
1826 (GNUNET_NO == prq.request_found));
1827 GSF_block_peer_migration_ (cp,
1828 GNUNET_TIME_relative_to_absolute (block_time));
1829 }
1830}
1831
1832
1833/**
1834 * Check if the given request is still active.
1835 *
1836 * @param pr pending request
1837 * @return #GNUNET_YES if the request is still active
1838 */
1839int
1840GSF_pending_request_test_active_ (struct GSF_PendingRequest *pr)
1841{
1842 return (NULL != pr->rh) ? GNUNET_YES : GNUNET_NO;
1843}
1844
1845
1846/**
1847 * Setup the subsystem.
1848 */
1849void
1850GSF_pending_request_init_ ()
1851{
1852 if (GNUNET_OK !=
1853 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1854 "fs",
1855 "MAX_PENDING_REQUESTS",
1856 &max_pending_requests))
1857 {
1858 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
1859 "fs",
1860 "MAX_PENDING_REQUESTS");
1861 }
1862 active_to_migration =
1863 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
1864 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
1865 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024, GNUNET_YES);
1866 requests_by_expiration_heap =
1867 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1868}
1869
1870
1871/**
1872 * Shutdown the subsystem.
1873 */
1874void
1875GSF_pending_request_done_ ()
1876{
1877 GNUNET_CONTAINER_multihashmap_iterate (pr_map, &clean_request, NULL);
1878 GNUNET_CONTAINER_multihashmap_destroy (pr_map);
1879 pr_map = NULL;
1880 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1881 requests_by_expiration_heap = NULL;
1882 GNUNET_LOAD_value_free (datastore_put_load);
1883 datastore_put_load = NULL;
1884}
1885
1886
1887/* end of gnunet-service-fs_pr.c */