aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_push.c
diff options
context:
space:
mode:
authorMartin Schanzenbach <schanzen@gnunet.org>2023-10-19 11:55:21 +0200
committerMartin Schanzenbach <schanzen@gnunet.org>2023-10-19 11:55:21 +0200
commit579d9473bb75072303789599b23be9b0203336fc (patch)
tree687506d1968bd2a391b71b8832d1e97905db3ca8 /src/fs/gnunet-service-fs_push.c
parentb56e4e05ad919c7191260fcf1d78b1f8d739871a (diff)
downloadgnunet-579d9473bb75072303789599b23be9b0203336fc.tar.gz
gnunet-579d9473bb75072303789599b23be9b0203336fc.zip
BUILD: Move fs to contrib/service
Diffstat (limited to 'src/fs/gnunet-service-fs_push.c')
-rw-r--r--src/fs/gnunet-service-fs_push.c672
1 files changed, 0 insertions, 672 deletions
diff --git a/src/fs/gnunet-service-fs_push.c b/src/fs/gnunet-service-fs_push.c
deleted file mode 100644
index 92dbba8e6..000000000
--- a/src/fs/gnunet-service-fs_push.c
+++ /dev/null
@@ -1,672 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2011, 2016 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file fs/gnunet-service-fs_push.c
23 * @brief API to push content from our datastore to other peers
24 * ('anonymous'-content P2P migration)
25 * @author Christian Grothoff
26 */
27#include "platform.h"
28#include "gnunet-service-fs.h"
29#include "gnunet-service-fs_cp.h"
30#include "gnunet-service-fs_indexing.h"
31#include "gnunet-service-fs_push.h"
32
33
34/**
35 * Maximum number of blocks we keep in memory for migration.
36 */
37#define MAX_MIGRATION_QUEUE 8
38
39/**
40 * Blocks are at most migrated to this number of peers
41 * plus one, each time they are fetched from the database.
42 */
43#define MIGRATION_LIST_SIZE 2
44
45/**
46 * How long must content remain valid for us to consider it for migration?
47 * If content will expire too soon, there is clearly no point in pushing
48 * it to other peers. This value gives the threshold for migration. Note
49 * that if this value is increased, the migration testcase may need to be
50 * adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).
51 */
52#define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply ( \
53 GNUNET_TIME_UNIT_MINUTES, 30)
54
55
56/**
57 * Block that is ready for migration to other peers. Actual data is at the end of the block.
58 */
59struct MigrationReadyBlock
60{
61 /**
62 * This is a doubly-linked list.
63 */
64 struct MigrationReadyBlock *next;
65
66 /**
67 * This is a doubly-linked list.
68 */
69 struct MigrationReadyBlock *prev;
70
71 /**
72 * Query for the block.
73 */
74 struct GNUNET_HashCode query;
75
76 /**
77 * When does this block expire?
78 */
79 struct GNUNET_TIME_Absolute expiration;
80
81 /**
82 * Peers we already forwarded this
83 * block to. Zero for empty entries.
84 */
85 GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
86
87 /**
88 * Size of the block.
89 */
90 size_t size;
91
92 /**
93 * Number of targets already used.
94 */
95 unsigned int used_targets;
96
97 /**
98 * Type of the block.
99 */
100 enum GNUNET_BLOCK_Type type;
101};
102
103
104/**
105 * Information about a peer waiting for migratable data.
106 */
107struct MigrationReadyPeer
108{
109 /**
110 * This is a doubly-linked list.
111 */
112 struct MigrationReadyPeer *next;
113
114 /**
115 * This is a doubly-linked list.
116 */
117 struct MigrationReadyPeer *prev;
118
119 /**
120 * Handle to peer.
121 */
122 struct GSF_ConnectedPeer *peer;
123
124 /**
125 * Envelope of the currently pushed message.
126 */
127 struct GNUNET_MQ_Envelope *env;
128};
129
130
131/**
132 * Head of linked list of blocks that can be migrated.
133 */
134static struct MigrationReadyBlock *mig_head;
135
136/**
137 * Tail of linked list of blocks that can be migrated.
138 */
139static struct MigrationReadyBlock *mig_tail;
140
141/**
142 * Head of linked list of peers.
143 */
144static struct MigrationReadyPeer *peer_head;
145
146/**
147 * Tail of linked list of peers.
148 */
149static struct MigrationReadyPeer *peer_tail;
150
151/**
152 * Request to datastore for migration (or NULL).
153 */
154static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
155
156/**
157 * ID of task that collects blocks for migration.
158 */
159static struct GNUNET_SCHEDULER_Task *mig_task;
160
161/**
162 * What is the maximum frequency at which we are allowed to
163 * poll the datastore for migration content?
164 */
165static struct GNUNET_TIME_Relative min_migration_delay;
166
167/**
168 * Size of the doubly-linked list of migration blocks.
169 */
170static unsigned int mig_size;
171
172/**
173 * Is this module enabled?
174 */
175static int enabled;
176
177/**
178 * Did we find anything in the datastore?
179 */
180static int value_found;
181
182
183/**
184 * Delete the given migration block.
185 *
186 * @param mb block to delete
187 */
188static void
189delete_migration_block (struct MigrationReadyBlock *mb)
190{
191 GNUNET_CONTAINER_DLL_remove (mig_head,
192 mig_tail,
193 mb);
194 GNUNET_PEER_decrement_rcs (mb->target_list,
195 MIGRATION_LIST_SIZE);
196 mig_size--;
197 GNUNET_free (mb);
198}
199
200
201/**
202 * Find content for migration to this peer.
203 *
204 * @param cls A `struct MigrationReadyPeer *` to find content for
205 */
206static void
207find_content (void *cls);
208
209
210/**
211 * Send the given block to the given peer.
212 *
213 * @param mrp target peer
214 * @param block the block
215 * @return #GNUNET_YES if the block was deleted (!)
216 */
217static int
218transmit_content (struct MigrationReadyPeer *mrp,
219 struct MigrationReadyBlock *block)
220{
221 struct PutMessage *msg;
222 unsigned int i;
223 struct GSF_PeerPerformanceData *ppd;
224 int ret;
225
226 ppd = GSF_get_peer_performance_data_ (mrp->peer);
227 GNUNET_assert (NULL == mrp->env);
228 mrp->env = GNUNET_MQ_msg_extra (msg,
229 block->size,
230 GNUNET_MESSAGE_TYPE_FS_PUT);
231 msg->type = htonl (block->type);
232 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
233 GNUNET_memcpy (&msg[1],
234 &block[1],
235 block->size);
236 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
237 {
238 if (block->target_list[i] == 0)
239 {
240 block->target_list[i] = ppd->pid;
241 GNUNET_PEER_change_rc (block->target_list[i],
242 1);
243 break;
244 }
245 }
246 if (MIGRATION_LIST_SIZE == i)
247 {
248 delete_migration_block (block);
249 ret = GNUNET_YES;
250 }
251 else
252 {
253 ret = GNUNET_NO;
254 }
255 GNUNET_MQ_notify_sent (mrp->env,
256 &find_content,
257 mrp);
258 GSF_peer_transmit_ (mrp->peer,
259 GNUNET_NO,
260 0 /* priority */,
261 mrp->env);
262 return ret;
263}
264
265
266/**
267 * Count the number of peers this block has
268 * already been forwarded to.
269 *
270 * @param block the block
271 * @return number of times block was forwarded
272 */
273static unsigned int
274count_targets (struct MigrationReadyBlock *block)
275{
276 unsigned int i;
277
278 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
279 if (block->target_list[i] == 0)
280 return i;
281 return i;
282}
283
284
285/**
286 * Check if sending this block to this peer would
287 * be a good idea.
288 *
289 * @param mrp target peer
290 * @param block the block
291 * @return score (>= 0: feasible, negative: infeasible)
292 */
293static long
294score_content (struct MigrationReadyPeer *mrp,
295 struct MigrationReadyBlock *block)
296{
297 unsigned int i;
298 struct GSF_PeerPerformanceData *ppd;
299 struct GNUNET_PeerIdentity id;
300 struct GNUNET_HashCode hc;
301 uint32_t dist;
302
303 ppd = GSF_get_peer_performance_data_ (mrp->peer);
304 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
305 if (block->target_list[i] == ppd->pid)
306 return -1;
307 GNUNET_assert (0 != ppd->pid);
308 GNUNET_PEER_resolve (ppd->pid,
309 &id);
310 GNUNET_CRYPTO_hash (&id,
311 sizeof(struct GNUNET_PeerIdentity),
312 &hc);
313 dist = GNUNET_CRYPTO_hash_distance_u32 (&block->query,
314 &hc);
315 /* closer distance, higher score: */
316 return UINT32_MAX - dist;
317}
318
319
320/**
321 * If the migration task is not currently running, consider
322 * (re)scheduling it with the appropriate delay.
323 */
324static void
325consider_gathering (void);
326
327
328static void
329find_content (void *cls)
330{
331 struct MigrationReadyPeer *mrp = cls;
332 struct MigrationReadyBlock *pos;
333 long score;
334 long best_score;
335 struct MigrationReadyBlock *best;
336
337 mrp->env = NULL;
338 best = NULL;
339 best_score = -1;
340 pos = mig_head;
341 while (NULL != pos)
342 {
343 score = score_content (mrp, pos);
344 if (score > best_score)
345 {
346 best_score = score;
347 best = pos;
348 }
349 pos = pos->next;
350 }
351 if (NULL == best)
352 {
353 if (mig_size < MAX_MIGRATION_QUEUE)
354 {
355 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
356 "No content found for pushing, waiting for queue to fill\n");
357 return; /* will fill up eventually... */
358 }
359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360 "No suitable content found, purging content from full queue\n");
361 /* failed to find migration target AND
362 * queue is full, purge most-forwarded
363 * block from queue to make room for more */
364 pos = mig_head;
365 while (NULL != pos)
366 {
367 score = count_targets (pos);
368 if (score >= best_score)
369 {
370 best_score = score;
371 best = pos;
372 }
373 pos = pos->next;
374 }
375 GNUNET_assert (NULL != best);
376 delete_migration_block (best);
377 consider_gathering ();
378 return;
379 }
380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381 "Preparing to push best content to peer\n");
382 transmit_content (mrp,
383 best);
384}
385
386
387/**
388 * Task that is run periodically to obtain blocks for content
389 * migration
390 *
391 * @param cls unused
392 */
393static void
394gather_migration_blocks (void *cls);
395
396
397/**
398 * If the migration task is not currently running, consider
399 * (re)scheduling it with the appropriate delay.
400 */
401static void
402consider_gathering ()
403{
404 struct GNUNET_TIME_Relative delay;
405
406 if (NULL == GSF_dsh)
407 return;
408 if (NULL != mig_qe)
409 return;
410 if (NULL != mig_task)
411 return;
412 if (mig_size >= MAX_MIGRATION_QUEUE)
413 return;
414 delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
415 mig_size);
416 delay = GNUNET_TIME_relative_divide (delay,
417 MAX_MIGRATION_QUEUE);
418 delay = GNUNET_TIME_relative_max (delay,
419 min_migration_delay);
420 if (GNUNET_NO == value_found)
421 {
422 /* wait at least 5s if the datastore is empty */
423 delay = GNUNET_TIME_relative_max (delay,
424 GNUNET_TIME_relative_multiply (
425 GNUNET_TIME_UNIT_SECONDS,
426 5));
427 }
428 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
429 "Scheduling gathering task (queue size: %u)\n",
430 mig_size);
431 mig_task = GNUNET_SCHEDULER_add_delayed (delay,
432 &gather_migration_blocks,
433 NULL);
434}
435
436
437/**
438 * Process content offered for migration.
439 *
440 * @param cls closure
441 * @param key key for the content
442 * @param size number of bytes in data
443 * @param data content stored
444 * @param type type of the content
445 * @param priority priority of the content
446 * @param anonymity anonymity-level for the content
447 * @param replication replication-level for the content
448 * @param expiration expiration time for the content
449 * @param uid unique identifier for the datum;
450 * maybe 0 if no unique identifier is available
451 */
452static void
453process_migration_content (void *cls,
454 const struct GNUNET_HashCode *key,
455 size_t size,
456 const void *data,
457 enum GNUNET_BLOCK_Type type,
458 uint32_t priority,
459 uint32_t anonymity,
460 uint32_t replication,
461 struct GNUNET_TIME_Absolute expiration,
462 uint64_t uid)
463{
464 struct MigrationReadyBlock *mb;
465 struct MigrationReadyPeer *pos;
466
467 mig_qe = NULL;
468 if (NULL == key)
469 {
470 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
471 "No content found for migration...\n");
472 consider_gathering ();
473 return;
474 }
475 value_found = GNUNET_YES;
476 if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value_us <
477 MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
478 {
479 /* content will expire soon, don't bother */
480 consider_gathering ();
481 return;
482 }
483 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
484 {
485 if (GNUNET_OK !=
486 GNUNET_FS_handle_on_demand_block (key,
487 size,
488 data,
489 type,
490 priority,
491 anonymity,
492 replication,
493 expiration,
494 uid,
495 &process_migration_content,
496 NULL))
497 consider_gathering ();
498 return;
499 }
500 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
502 GNUNET_h2s (key),
503 type, mig_size + 1,
504 MAX_MIGRATION_QUEUE);
505 mb = GNUNET_malloc (sizeof(struct MigrationReadyBlock) + size);
506 mb->query = *key;
507 mb->expiration = expiration;
508 mb->size = size;
509 mb->type = type;
510 GNUNET_memcpy (&mb[1], data, size);
511 GNUNET_CONTAINER_DLL_insert_after (mig_head,
512 mig_tail,
513 mig_tail,
514 mb);
515 mig_size++;
516 for (pos = peer_head; NULL != pos; pos = pos->next)
517 {
518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519 "Preparing to push best content to peer %s\n",
520 GNUNET_i2s (GSF_connected_peer_get_identity2_ (pos->peer)));
521 if ((NULL == pos->env) &&
522 (GNUNET_YES == transmit_content (pos,
523 mb)))
524 {
525 break; /* 'mb' was freed! */
526 }
527 }
528 consider_gathering ();
529}
530
531
532/**
533 * Task that is run periodically to obtain blocks for content
534 * migration
535 *
536 * @param cls unused
537 */
538static void
539gather_migration_blocks (void *cls)
540{
541 mig_task = NULL;
542 if (mig_size >= MAX_MIGRATION_QUEUE)
543 return;
544 if (NULL == GSF_dsh)
545 return;
546 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
547 "Asking datastore for content for replication (queue size: %u)\n",
548 mig_size);
549 value_found = GNUNET_NO;
550 mig_qe = GNUNET_DATASTORE_get_for_replication (GSF_dsh,
551 0,
552 UINT_MAX,
553 &process_migration_content,
554 NULL);
555 if (NULL == mig_qe)
556 consider_gathering ();
557}
558
559
560/**
561 * A peer connected to us. Start pushing content
562 * to this peer.
563 *
564 * @param peer handle for the peer that connected
565 */
566void
567GSF_push_start_ (struct GSF_ConnectedPeer *peer)
568{
569 struct MigrationReadyPeer *mrp;
570
571 if (GNUNET_YES != enabled)
572 return;
573 for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
574 if (mrp->peer == peer)
575 break;
576 if (NULL != mrp)
577 {
578 /* same peer added twice, must not happen */
579 GNUNET_break (0);
580 return;
581 }
582
583 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
584 "Adding peer %s to list for pushing\n",
585 GNUNET_i2s (GSF_connected_peer_get_identity2_ (peer)));
586
587 mrp = GNUNET_new (struct MigrationReadyPeer);
588 mrp->peer = peer;
589 find_content (mrp);
590 GNUNET_CONTAINER_DLL_insert (peer_head,
591 peer_tail,
592 mrp);
593}
594
595
596/**
597 * A peer disconnected from us. Stop pushing content
598 * to this peer.
599 *
600 * @param peer handle for the peer that disconnected
601 */
602void
603GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
604{
605 struct MigrationReadyPeer *pos;
606
607 for (pos = peer_head; NULL != pos; pos = pos->next)
608 if (pos->peer == peer)
609 break;
610 if (NULL == pos)
611 return;
612 if (NULL != pos->env)
613 GNUNET_MQ_send_cancel (pos->env);
614 GNUNET_CONTAINER_DLL_remove (peer_head,
615 peer_tail,
616 pos);
617 GNUNET_free (pos);
618}
619
620
621/**
622 * Setup the module.
623 */
624void
625GSF_push_init_ ()
626{
627 enabled =
628 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
629 "FS",
630 "CONTENT_PUSHING");
631 if (GNUNET_YES != enabled)
632 return;
633
634 if (GNUNET_OK !=
635 GNUNET_CONFIGURATION_get_value_time (GSF_cfg,
636 "fs",
637 "MIN_MIGRATION_DELAY",
638 &min_migration_delay))
639 {
640 GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
641 "fs",
642 "MIN_MIGRATION_DELAY",
643 _ ("time required, content pushing disabled"));
644 return;
645 }
646 consider_gathering ();
647}
648
649
650/**
651 * Shutdown the module.
652 */
653void
654GSF_push_done_ ()
655{
656 if (NULL != mig_task)
657 {
658 GNUNET_SCHEDULER_cancel (mig_task);
659 mig_task = NULL;
660 }
661 if (NULL != mig_qe)
662 {
663 GNUNET_DATASTORE_cancel (mig_qe);
664 mig_qe = NULL;
665 }
666 while (NULL != mig_head)
667 delete_migration_block (mig_head);
668 GNUNET_assert (0 == mig_size);
669}
670
671
672/* end of gnunet-service-fs_push.c */