aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c1659
1 files changed, 0 insertions, 1659 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
deleted file mode 100644
index 74dd42daf..000000000
--- a/src/fs/gnunet-service-fs_cp.c
+++ /dev/null
@@ -1,1659 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2011, 2016 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file fs/gnunet-service-fs_cp.c
22 * @brief API to handle 'connected peers'
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "gnunet_util_lib.h"
27#include "gnunet_load_lib.h"
28#include "gnunet-service-fs.h"
29#include "gnunet-service-fs_cp.h"
30#include "gnunet-service-fs_pe.h"
31#include "gnunet-service-fs_pr.h"
32#include "gnunet-service-fs_push.h"
33#include "gnunet_peerstore_service.h"
34
35
36/**
37 * Ratio for moving average delay calculation. The previous
38 * average goes in with a factor of (n-1) into the calculation.
39 * Must be > 0.
40 */
41#define RUNAVG_DELAY_N 16
42
43/**
44 * How often do we flush respect values to disk?
45 */
46#define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply ( \
47 GNUNET_TIME_UNIT_MINUTES, 5)
48
49/**
50 * After how long do we discard a reply?
51 */
52#define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \
53 2)
54
55/**
56 * Collect an instance number of statistics? May cause excessive IPC.
57 */
58#define INSANE_STATISTICS GNUNET_NO
59
60
61/**
62 * Handle to cancel a transmission request.
63 */
64struct GSF_PeerTransmitHandle
65{
66 /**
67 * Kept in a doubly-linked list.
68 */
69 struct GSF_PeerTransmitHandle *next;
70
71 /**
72 * Kept in a doubly-linked list.
73 */
74 struct GSF_PeerTransmitHandle *prev;
75
76 /**
77 * Time when this transmission request was issued.
78 */
79 struct GNUNET_TIME_Absolute transmission_request_start_time;
80
81 /**
82 * Envelope with the actual message.
83 */
84 struct GNUNET_MQ_Envelope *env;
85
86 /**
87 * Peer this request targets.
88 */
89 struct GSF_ConnectedPeer *cp;
90
91 /**
92 * #GNUNET_YES if this is a query, #GNUNET_NO for content.
93 */
94 int is_query;
95
96 /**
97 * Priority of this request.
98 */
99 uint32_t priority;
100};
101
102
103/**
104 * Handle for an entry in our delay list.
105 */
106struct GSF_DelayedHandle
107{
108 /**
109 * Kept in a doubly-linked list.
110 */
111 struct GSF_DelayedHandle *next;
112
113 /**
114 * Kept in a doubly-linked list.
115 */
116 struct GSF_DelayedHandle *prev;
117
118 /**
119 * Peer this transmission belongs to.
120 */
121 struct GSF_ConnectedPeer *cp;
122
123 /**
124 * Envelope of the message that was delayed.
125 */
126 struct GNUNET_MQ_Envelope *env;
127
128 /**
129 * Task for the delay.
130 */
131 struct GNUNET_SCHEDULER_Task *delay_task;
132
133 /**
134 * Size of the message.
135 */
136 size_t msize;
137};
138
139
140/**
141 * Information per peer and request.
142 */
143struct PeerRequest
144{
145 /**
146 * Handle to generic request (generic: from peer or local client).
147 */
148 struct GSF_PendingRequest *pr;
149
150 /**
151 * Which specific peer issued this request?
152 */
153 struct GSF_ConnectedPeer *cp;
154
155 /**
156 * Task for asynchronous stopping of this request.
157 */
158 struct GNUNET_SCHEDULER_Task *kill_task;
159};
160
161
162/**
163 * A connected peer.
164 */
165struct GSF_ConnectedPeer
166{
167 /**
168 * Performance data for this peer.
169 */
170 struct GSF_PeerPerformanceData ppd;
171
172 /**
173 * Time until when we blocked this peer from migrating
174 * data to us.
175 */
176 struct GNUNET_TIME_Absolute last_migration_block;
177
178 /**
179 * Task scheduled to revive migration to this peer.
180 */
181 struct GNUNET_SCHEDULER_Task *mig_revive_task;
182
183 /**
184 * Messages (replies, queries, content migration) we would like to
185 * send to this peer in the near future. Sorted by priority, head.
186 */
187 struct GSF_PeerTransmitHandle *pth_head;
188
189 /**
190 * Messages (replies, queries, content migration) we would like to
191 * send to this peer in the near future. Sorted by priority, tail.
192 */
193 struct GSF_PeerTransmitHandle *pth_tail;
194
195 /**
196 * Messages (replies, queries, content migration) we would like to
197 * send to this peer in the near future. Sorted by priority, head.
198 */
199 struct GSF_DelayedHandle *delayed_head;
200
201 /**
202 * Messages (replies, queries, content migration) we would like to
203 * send to this peer in the near future. Sorted by priority, tail.
204 */
205 struct GSF_DelayedHandle *delayed_tail;
206
207 /**
208 * Task scheduled if we need to retry bandwidth reservation later.
209 */
210 struct GNUNET_SCHEDULER_Task *rc_delay_task;
211
212 /**
213 * Active requests from this neighbour, map of query to `struct PeerRequest`.
214 */
215 struct GNUNET_CONTAINER_MultiHashMap *request_map;
216
217 /**
218 * Handle for an active request for transmission to this
219 * peer.
220 */
221 struct GNUNET_MQ_Handle *mq;
222
223 /**
224 * Increase in traffic preference still to be submitted
225 * to the core service for this peer.
226 */
227 uint64_t inc_preference;
228
229 /**
230 * Number of entries in @e delayed_head DLL.
231 */
232 unsigned int delay_queue_size;
233
234 /**
235 * Respect rating for this peer on disk.
236 */
237 uint32_t disk_respect;
238
239 /**
240 * Which offset in @e last_p2p_replies will be updated next?
241 * (we go round-robin).
242 */
243 unsigned int last_p2p_replies_woff;
244
245 /**
246 * Which offset in @e last_client_replies will be updated next?
247 * (we go round-robin).
248 */
249 unsigned int last_client_replies_woff;
250
251 /**
252 * Current offset into @e last_request_times ring buffer.
253 */
254 unsigned int last_request_times_off;
255
256 /**
257 * Handle to the PEERSTORE iterate request for peer respect value
258 */
259 struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
260};
261
262
263/**
264 * Map from peer identities to `struct GSF_ConnectPeer` entries.
265 */
266static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
267
268/**
269 * Handle to peerstore service.
270 */
271static struct GNUNET_PEERSTORE_Handle *peerstore;
272
273/**
274 * Task used to flush respect values to disk.
275 */
276static struct GNUNET_SCHEDULER_Task *fr_task;
277
278
279/**
280 * Update the latency information kept for the given peer.
281 *
282 * @param id peer record to update
283 * @param latency current latency value
284 */
285void
286GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
287 struct GNUNET_TIME_Relative latency)
288{
289 struct GSF_ConnectedPeer *cp;
290
291 cp = GSF_peer_get_ (id);
292 if (NULL == cp)
293 return; /* we're not yet connected at the core level, ignore */
294 GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
295 latency);
296}
297
298
299/**
300 * Return the performance data record for the given peer
301 *
302 * @param cp peer to query
303 * @return performance data record for the peer
304 */
305struct GSF_PeerPerformanceData *
306GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
307{
308 return &cp->ppd;
309}
310
311
312/**
313 * Core is ready to transmit to a peer, get the message.
314 *
315 * @param cp which peer to send a message to
316 */
317static void
318peer_transmit (struct GSF_ConnectedPeer *cp);
319
320
321/**
322 * If ready (bandwidth reserved), try to schedule transmission via
323 * core for the given handle.
324 *
325 * @param pth transmission handle to schedule
326 */
327static void
328schedule_transmission (struct GSF_PeerTransmitHandle *pth)
329{
330 struct GSF_ConnectedPeer *cp;
331 struct GNUNET_PeerIdentity target;
332
333 cp = pth->cp;
334 GNUNET_assert (0 != cp->ppd.pid);
335 GNUNET_PEER_resolve (cp->ppd.pid, &target);
336
337 peer_transmit (cp);
338}
339
340
341/**
342 * Core is ready to transmit to a peer, get the message.
343 *
344 * @param cp which peer to send a message to
345 */
346static void
347peer_transmit (struct GSF_ConnectedPeer *cp)
348{
349 struct GSF_PeerTransmitHandle *pth = cp->pth_head;
350 struct GSF_PeerTransmitHandle *pos;
351
352 if (NULL == pth)
353 return;
354 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
355 cp->pth_tail,
356 pth);
357 if (GNUNET_YES == pth->is_query)
358 {
359 cp->ppd.last_request_times[(cp->last_request_times_off++)
360 % MAX_QUEUE_PER_PEER] =
361 GNUNET_TIME_absolute_get ();
362 GNUNET_assert (0 < cp->ppd.pending_queries--);
363 }
364 else if (GNUNET_NO == pth->is_query)
365 {
366 GNUNET_assert (0 < cp->ppd.pending_replies--);
367 }
368 GNUNET_LOAD_update (cp->ppd.transmission_delay,
369 GNUNET_TIME_absolute_get_duration
370 (pth->transmission_request_start_time).rel_value_us);
371 GNUNET_MQ_send (cp->mq,
372 pth->env);
373 GNUNET_free (pth);
374 if (NULL != (pos = cp->pth_head))
375 {
376 GNUNET_assert (pos != pth);
377 schedule_transmission (pos);
378 }
379}
380
381
382/**
383 * Function called by PEERSTORE with peer respect record
384 *
385 * @param cls handle to connected peer entry
386 * @param record peerstore record information
387 * @param emsg error message, or NULL if no errors
388 */
389static void
390peer_respect_cb (void *cls,
391 const struct GNUNET_PEERSTORE_Record *record,
392 const char *emsg)
393{
394 struct GSF_ConnectedPeer *cp = cls;
395
396 GNUNET_assert (NULL != cp->respect_iterate_req);
397 if ((NULL != record) &&
398 (sizeof(cp->disk_respect) == record->value_size))
399 {
400 cp->disk_respect = *((uint32_t *) record->value);
401 cp->ppd.respect += *((uint32_t *) record->value);
402 }
403 GSF_push_start_ (cp);
404 if (NULL != record)
405 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
406 cp->respect_iterate_req = NULL;
407}
408
409
410/**
411 * Function called for each pending request whenever a new
412 * peer connects, giving us a chance to decide about submitting
413 * the existing request to the new peer.
414 *
415 * @param cls the `struct GSF_ConnectedPeer` of the new peer
416 * @param key query for the request
417 * @param pr handle to the pending request
418 * @return #GNUNET_YES to continue to iterate
419 */
420static int
421consider_peer_for_forwarding (void *cls,
422 const struct GNUNET_HashCode *key,
423 struct GSF_PendingRequest *pr)
424{
425 struct GSF_ConnectedPeer *cp = cls;
426 struct GNUNET_PeerIdentity pid;
427
428 if (GNUNET_YES !=
429 GSF_pending_request_test_active_ (pr))
430 return GNUNET_YES; /* request is not actually active, skip! */
431 GSF_connected_peer_get_identity_ (cp, &pid);
432 if (GNUNET_YES !=
433 GSF_pending_request_test_target_ (pr, &pid))
434 {
435 GNUNET_STATISTICS_update (GSF_stats,
436 gettext_noop ("# Loopback routes suppressed"),
437 1,
438 GNUNET_NO);
439 return GNUNET_YES;
440 }
441 GSF_plan_add_ (cp, pr);
442 return GNUNET_YES;
443}
444
445
446void *
447GSF_peer_connect_handler (void *cls,
448 const struct GNUNET_PeerIdentity *peer,
449 struct GNUNET_MQ_Handle *mq)
450{
451 struct GSF_ConnectedPeer *cp;
452
453 if (0 ==
454 GNUNET_memcmp (&GSF_my_id,
455 peer))
456 return NULL;
457 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
458 "Connected to peer %s\n",
459 GNUNET_i2s (peer));
460 cp = GNUNET_new (struct GSF_ConnectedPeer);
461 cp->ppd.pid = GNUNET_PEER_intern (peer);
462 cp->ppd.peer = peer;
463 cp->mq = mq;
464 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
465
466 cp->request_map = GNUNET_CONTAINER_multihashmap_create (128,
467 GNUNET_YES);
468 GNUNET_break (GNUNET_OK ==
469 GNUNET_CONTAINER_multipeermap_put (cp_map,
470 GSF_connected_peer_get_identity2_ (
471 cp),
472 cp,
473 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
474 GNUNET_STATISTICS_set (GSF_stats,
475 gettext_noop ("# peers connected"),
476 GNUNET_CONTAINER_multipeermap_size (cp_map),
477 GNUNET_NO);
478 cp->respect_iterate_req
479 = GNUNET_PEERSTORE_iterate (peerstore,
480 "fs",
481 peer,
482 "respect",
483 &peer_respect_cb,
484 cp);
485 GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
486 cp);
487 return cp;
488}
489
490
491/**
492 * It may be time to re-start migrating content to this
493 * peer. Check, and if so, restart migration.
494 *
495 * @param cls the `struct GSF_ConnectedPeer`
496 */
497static void
498revive_migration (void *cls)
499{
500 struct GSF_ConnectedPeer *cp = cls;
501 struct GNUNET_TIME_Relative bt;
502
503 cp->mig_revive_task = NULL;
504 bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
505 if (0 != bt.rel_value_us)
506 {
507 /* still time left... */
508 cp->mig_revive_task =
509 GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
510 return;
511 }
512 GSF_push_start_ (cp);
513}
514
515
516struct GSF_ConnectedPeer *
517GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
518{
519 if (NULL == cp_map)
520 return NULL;
521 return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
522}
523
524
525/**
526 * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
527 *
528 * @param cls closure, the `struct GSF_ConnectedPeer`
529 * @param msm the actual message
530 */
531void
532handle_p2p_migration_stop (void *cls,
533 const struct MigrationStopMessage *msm)
534{
535 struct GSF_ConnectedPeer *cp = cls;
536 struct GNUNET_TIME_Relative bt;
537
538 GNUNET_STATISTICS_update (GSF_stats,
539 gettext_noop ("# migration stop messages received"),
540 1, GNUNET_NO);
541 bt = GNUNET_TIME_relative_ntoh (msm->duration);
542 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
543 _ ("Migration of content to peer `%s' blocked for %s\n"),
544 GNUNET_i2s (cp->ppd.peer),
545 GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
546 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
547 if ((NULL == cp->mig_revive_task) &&
548 (NULL == cp->respect_iterate_req))
549 {
550 GSF_push_stop_ (cp);
551 cp->mig_revive_task =
552 GNUNET_SCHEDULER_add_delayed (bt,
553 &revive_migration, cp);
554 }
555}
556
557
558/**
559 * Free resources associated with the given peer request.
560 *
561 * @param peerreq request to free
562 */
563static void
564free_pending_request (struct PeerRequest *peerreq)
565{
566 struct GSF_ConnectedPeer *cp = peerreq->cp;
567 struct GSF_PendingRequestData *prd;
568
569 prd = GSF_pending_request_get_data_ (peerreq->pr);
570 if (NULL != peerreq->kill_task)
571 {
572 GNUNET_SCHEDULER_cancel (peerreq->kill_task);
573 peerreq->kill_task = NULL;
574 }
575 GNUNET_STATISTICS_update (GSF_stats,
576 gettext_noop ("# P2P searches active"),
577 -1,
578 GNUNET_NO);
579 GNUNET_break (GNUNET_YES ==
580 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
581 &prd->query,
582 peerreq));
583 GNUNET_free (peerreq);
584}
585
586
587/**
588 * Cancel all requests associated with the peer.
589 *
590 * @param cls unused
591 * @param query hash code of the request
592 * @param value the `struct GSF_PendingRequest`
593 * @return #GNUNET_YES (continue to iterate)
594 */
595static int
596cancel_pending_request (void *cls,
597 const struct GNUNET_HashCode *query,
598 void *value)
599{
600 struct PeerRequest *peerreq = value;
601 struct GSF_PendingRequest *pr = peerreq->pr;
602
603 free_pending_request (peerreq);
604 GSF_pending_request_cancel_ (pr,
605 GNUNET_NO);
606 return GNUNET_OK;
607}
608
609
610/**
611 * Free the given request.
612 *
613 * @param cls the request to free
614 */
615static void
616peer_request_destroy (void *cls)
617{
618 struct PeerRequest *peerreq = cls;
619 struct GSF_PendingRequest *pr = peerreq->pr;
620 struct GSF_PendingRequestData *prd;
621
622 peerreq->kill_task = NULL;
623 prd = GSF_pending_request_get_data_ (pr);
624 cancel_pending_request (NULL,
625 &prd->query,
626 peerreq);
627}
628
629
630/**
631 * The artificial delay is over, transmit the message now.
632 *
633 * @param cls the `struct GSF_DelayedHandle` with the message
634 */
635static void
636transmit_delayed_now (void *cls)
637{
638 struct GSF_DelayedHandle *dh = cls;
639 struct GSF_ConnectedPeer *cp = dh->cp;
640
641 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
642 cp->delayed_tail,
643 dh);
644 cp->delay_queue_size--;
645 GSF_peer_transmit_ (cp,
646 GNUNET_NO,
647 UINT32_MAX,
648 dh->env);
649 GNUNET_free (dh);
650}
651
652
653/**
654 * Get the randomized delay a response should be subjected to.
655 *
656 * @return desired delay
657 */
658static struct GNUNET_TIME_Relative
659get_randomized_delay ()
660{
661 struct GNUNET_TIME_Relative ret;
662
663 ret =
664 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
665 GNUNET_CRYPTO_random_u32
666 (GNUNET_CRYPTO_QUALITY_WEAK,
667 2 * GSF_avg_latency.rel_value_us + 1));
668#if INSANE_STATISTICS
669 GNUNET_STATISTICS_update (GSF_stats,
670 gettext_noop
671 ("# artificial delays introduced (ms)"),
672 ret.rel_value_us / 1000LL, GNUNET_NO);
673#endif
674 return ret;
675}
676
677
678/**
679 * Handle a reply to a pending request. Also called if a request
680 * expires (then with data == NULL). The handler may be called
681 * many times (depending on the request type), but will not be
682 * called during or after a call to GSF_pending_request_cancel
683 * and will also not be called anymore after a call signalling
684 * expiration.
685 *
686 * @param cls `struct PeerRequest` this is an answer for
687 * @param eval evaluation of the result
688 * @param pr handle to the original pending request
689 * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
690 * @param expiration when does @a data expire?
691 * @param last_transmission when did we last transmit a request for this block
692 * @param type type of the block
693 * @param data response data, NULL on request expiration
694 * @param data_len number of bytes in @a data
695 */
696static void
697handle_p2p_reply (void *cls,
698 enum GNUNET_BLOCK_ReplyEvaluationResult eval,
699 struct GSF_PendingRequest *pr,
700 uint32_t reply_anonymity_level,
701 struct GNUNET_TIME_Absolute expiration,
702 struct GNUNET_TIME_Absolute last_transmission,
703 enum GNUNET_BLOCK_Type type,
704 const void *data,
705 size_t data_len)
706{
707 struct PeerRequest *peerreq = cls;
708 struct GSF_ConnectedPeer *cp = peerreq->cp;
709 struct GSF_PendingRequestData *prd;
710 struct GNUNET_MQ_Envelope *env;
711 struct PutMessage *pm;
712 size_t msize;
713
714 GNUNET_assert (data_len + sizeof(struct PutMessage) <
715 GNUNET_MAX_MESSAGE_SIZE);
716 GNUNET_assert (peerreq->pr == pr);
717 prd = GSF_pending_request_get_data_ (pr);
718 if (NULL == data)
719 {
720 free_pending_request (peerreq);
721 return;
722 }
723 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
724 if ( (prd->type != type) &&
725 (GNUNET_BLOCK_TYPE_ANY != prd->type) )
726 {
727 GNUNET_STATISTICS_update (GSF_stats,
728 "# replies dropped due to type mismatch",
729 1, GNUNET_NO);
730 return;
731 }
732 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
733 "Transmitting result for query `%s' to peer\n",
734 GNUNET_h2s (&prd->query));
735 GNUNET_STATISTICS_update (GSF_stats,
736 "# replies received for other peers",
737 1,
738 GNUNET_NO);
739 msize = sizeof(struct PutMessage) + data_len;
740 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
741 {
742 GNUNET_break (0);
743 return;
744 }
745 if ( (UINT32_MAX != reply_anonymity_level) &&
746 (reply_anonymity_level > 1) )
747 {
748 if (reply_anonymity_level - 1 > GSF_cover_content_count)
749 {
750 GNUNET_STATISTICS_update (GSF_stats,
751 "# replies dropped due to insufficient cover traffic",
752 1, GNUNET_NO);
753 return;
754 }
755 GSF_cover_content_count -= (reply_anonymity_level - 1);
756 }
757
758 env = GNUNET_MQ_msg_extra (pm,
759 data_len,
760 GNUNET_MESSAGE_TYPE_FS_PUT);
761 pm->type = htonl (type);
762 pm->expiration = GNUNET_TIME_absolute_hton (expiration);
763 GNUNET_memcpy (&pm[1],
764 data,
765 data_len);
766 if ((UINT32_MAX != reply_anonymity_level) &&
767 (0 != reply_anonymity_level) &&
768 (GNUNET_YES == GSF_enable_randomized_delays))
769 {
770 struct GSF_DelayedHandle *dh;
771
772 dh = GNUNET_new (struct GSF_DelayedHandle);
773 dh->cp = cp;
774 dh->env = env;
775 dh->msize = msize;
776 GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
777 cp->delayed_tail,
778 dh);
779 cp->delay_queue_size++;
780 dh->delay_task =
781 GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
782 &transmit_delayed_now,
783 dh);
784 }
785 else
786 {
787 GSF_peer_transmit_ (cp,
788 GNUNET_NO,
789 UINT32_MAX,
790 env);
791 }
792 if (GNUNET_BLOCK_REPLY_OK_LAST != eval)
793 return;
794 if (NULL == peerreq->kill_task)
795 {
796 GNUNET_STATISTICS_update (GSF_stats,
797 "# P2P searches destroyed due to ultimate reply",
798 1,
799 GNUNET_NO);
800 peerreq->kill_task =
801 GNUNET_SCHEDULER_add_now (&peer_request_destroy,
802 peerreq);
803 }
804}
805
806
807/**
808 * Increase the peer's respect by a value.
809 *
810 * @param cp which peer to change the respect value on
811 * @param value is the int value by which the
812 * peer's credit is to be increased or decreased
813 * @returns the actual change in respect (positive or negative)
814 */
815static int
816change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
817{
818 if (0 == value)
819 return 0;
820 GNUNET_assert (NULL != cp);
821 if (value > 0)
822 {
823 if (cp->ppd.respect + value < cp->ppd.respect)
824 {
825 value = UINT32_MAX - cp->ppd.respect;
826 cp->ppd.respect = UINT32_MAX;
827 }
828 else
829 cp->ppd.respect += value;
830 }
831 else
832 {
833 if (cp->ppd.respect < -value)
834 {
835 value = -cp->ppd.respect;
836 cp->ppd.respect = 0;
837 }
838 else
839 cp->ppd.respect += value;
840 }
841 return value;
842}
843
844
845/**
846 * We've received a request with the specified priority. Bound it
847 * according to how much we respect the given peer.
848 *
849 * @param prio_in requested priority
850 * @param cp the peer making the request
851 * @return effective priority
852 */
853static int32_t
854bound_priority (uint32_t prio_in,
855 struct GSF_ConnectedPeer *cp)
856{
857#define N ((double) 128.0)
858 uint32_t ret;
859 double rret;
860 int ld;
861
862 ld = GSF_test_get_load_too_high_ (0);
863 if (GNUNET_SYSERR == ld)
864 {
865#if INSANE_STATISTICS
866 GNUNET_STATISTICS_update (GSF_stats,
867 gettext_noop
868 ("# requests done for free (low load)"), 1,
869 GNUNET_NO);
870#endif
871 return 0; /* excess resources */
872 }
873 if (prio_in > INT32_MAX)
874 prio_in = INT32_MAX;
875 ret = -change_peer_respect (cp, -(int) prio_in);
876 if (ret > 0)
877 {
878 if (ret > GSF_current_priorities + N)
879 rret = GSF_current_priorities + N;
880 else
881 rret = ret;
882 GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
883 }
884 if ((GNUNET_YES == ld) && (ret > 0))
885 {
886 /* try with charging */
887 ld = GSF_test_get_load_too_high_ (ret);
888 }
889 if (GNUNET_YES == ld)
890 {
891 GNUNET_STATISTICS_update (GSF_stats,
892 gettext_noop
893 ("# request dropped, priority insufficient"), 1,
894 GNUNET_NO);
895 /* undo charge */
896 change_peer_respect (cp, (int) ret);
897 return -1; /* not enough resources */
898 }
899 else
900 {
901 GNUNET_STATISTICS_update (GSF_stats,
902 gettext_noop
903 ("# requests done for a price (normal load)"),
904 1,
905 GNUNET_NO);
906 }
907#undef N
908 return ret;
909}
910
911
912/**
913 * The priority level imposes a bound on the maximum
914 * value for the ttl that can be requested.
915 *
916 * @param ttl_in requested ttl
917 * @param prio given priority
918 * @return @a ttl_in if @a ttl_in is below the limit,
919 * otherwise the ttl-limit for the given @a prio
920 */
921static int32_t
922bound_ttl (int32_t ttl_in,
923 uint32_t prio)
924{
925 unsigned long long allowed;
926
927 if (ttl_in <= 0)
928 return ttl_in;
929 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
930 if (ttl_in > allowed)
931 {
932 if (allowed >= (1 << 30))
933 return 1 << 30;
934 return allowed;
935 }
936 return ttl_in;
937}
938
939
940/**
941 * Closure for #test_exist_cb().
942 */
943struct TestExistClosure
944{
945 /**
946 * Priority of the incoming request.
947 */
948 int32_t priority;
949
950 /**
951 * Relative TTL of the incoming request.
952 */
953 int32_t ttl;
954
955 /**
956 * Type of the incoming request.
957 */
958 enum GNUNET_BLOCK_Type type;
959
960 /**
961 * Set to #GNUNET_YES if we are done handling the query.
962 */
963 int finished;
964};
965
966
967/**
968 * Test if the query already exists. If so, merge it, otherwise
969 * keep `finished` at #GNUNET_NO.
970 *
971 * @param cls our `struct TestExistClosure`
972 * @param hc the key of the query
973 * @param value the existing `struct PeerRequest`.
974 * @return #GNUNET_YES to continue to iterate,
975 * #GNUNET_NO if we successfully merged
976 */
977static int
978test_exist_cb (void *cls,
979 const struct GNUNET_HashCode *hc,
980 void *value)
981{
982 struct TestExistClosure *tec = cls;
983 struct PeerRequest *peerreq = value;
984 struct GSF_PendingRequest *pr;
985 struct GSF_PendingRequestData *prd;
986
987 pr = peerreq->pr;
988 prd = GSF_pending_request_get_data_ (pr);
989 if (prd->type != tec->type)
990 return GNUNET_YES;
991 if (prd->ttl.abs_value_us >=
992 GNUNET_TIME_absolute_get ().abs_value_us + tec->ttl * 1000LL)
993 {
994 /* existing request has higher TTL, drop new one! */
995 prd->priority += tec->priority;
996 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997 "Have existing request with higher TTL, dropping new request.\n");
998 GNUNET_STATISTICS_update (GSF_stats,
999 gettext_noop
1000 ("# requests dropped due to higher-TTL request"),
1001 1, GNUNET_NO);
1002 tec->finished = GNUNET_YES;
1003 return GNUNET_NO;
1004 }
1005 /* existing request has lower TTL, drop old one! */
1006 tec->priority += prd->priority;
1007 free_pending_request (peerreq);
1008 GSF_pending_request_cancel_ (pr,
1009 GNUNET_YES);
1010 return GNUNET_NO;
1011}
1012
1013
1014/**
1015 * Handle P2P "QUERY" message. Creates the pending request entry
1016 * and sets up all of the data structures to that we will
1017 * process replies properly. Does not initiate forwarding or
1018 * local database lookups.
1019 *
1020 * @param cls the other peer involved (sender of the message)
1021 * @param gm the GET message
1022 */
1023void
1024handle_p2p_get (void *cls,
1025 const struct GetMessage *gm)
1026{
1027 struct GSF_ConnectedPeer *cps = cls;
1028 struct PeerRequest *peerreq;
1029 struct GSF_PendingRequest *pr;
1030 struct GSF_ConnectedPeer *cp;
1031 const struct GNUNET_PeerIdentity *target;
1032 enum GSF_PendingRequestOptions options;
1033 uint16_t msize;
1034 unsigned int bits;
1035 const struct GNUNET_PeerIdentity *opt;
1036 uint32_t bm;
1037 size_t bfsize;
1038 uint32_t ttl_decrement;
1039 struct TestExistClosure tec;
1040 GNUNET_PEER_Id spid;
1041 const struct GSF_PendingRequestData *prd;
1042
1043 msize = ntohs (gm->header.size);
1044 tec.type = ntohl (gm->type);
1045 bm = ntohl (gm->hash_bitmap);
1046 bits = 0;
1047 while (bm > 0)
1048 {
1049 if (1 == (bm & 1))
1050 bits++;
1051 bm >>= 1;
1052 }
1053 opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1054 bfsize = msize - sizeof(struct GetMessage) - bits * sizeof(struct
1055 GNUNET_PeerIdentity);
1056 GNUNET_STATISTICS_update (GSF_stats,
1057 gettext_noop
1058 ("# GET requests received (from other peers)"),
1059 1,
1060 GNUNET_NO);
1061 GSF_cover_query_count++;
1062 bm = ntohl (gm->hash_bitmap);
1063 bits = 0;
1064 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1065 cp = GSF_peer_get_ (&opt[bits++]);
1066 else
1067 cp = cps;
1068 if (NULL == cp)
1069 {
1070 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1071 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072 "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1073 GNUNET_i2s (&opt[bits - 1]));
1074
1075 else
1076 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077 "Failed to find peer `%s' in connection set. Dropping query.\n",
1078 GNUNET_i2s (cps->ppd.peer));
1079 GNUNET_STATISTICS_update (GSF_stats,
1080 gettext_noop
1081 (
1082 "# requests dropped due to missing reverse route"),
1083 1,
1084 GNUNET_NO);
1085 return;
1086 }
1087 unsigned int queue_size = GNUNET_MQ_get_length (cp->mq);
1088 queue_size += cp->ppd.pending_replies + cp->delay_queue_size;
1089 if (queue_size > MAX_QUEUE_PER_PEER)
1090 {
1091 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1092 "Peer `%s' has too many replies queued already. Dropping query.\n",
1093 GNUNET_i2s (cps->ppd.peer));
1094 GNUNET_STATISTICS_update (GSF_stats,
1095 gettext_noop (
1096 "# requests dropped due to full reply queue"),
1097 1,
1098 GNUNET_NO);
1099 return;
1100 }
1101 /* note that we can really only check load here since otherwise
1102 * peers could find out that we are overloaded by not being
1103 * disconnected after sending us a malformed query... */
1104 tec.priority = bound_priority (ntohl (gm->priority),
1105 cps);
1106 if (tec.priority < 0)
1107 {
1108 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1109 "Dropping query from `%s', this peer is too busy.\n",
1110 GNUNET_i2s (cps->ppd.peer));
1111 return;
1112 }
1113 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1114 "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1115 GNUNET_h2s (&gm->query),
1116 (unsigned int) tec.type,
1117 GNUNET_i2s (cps->ppd.peer),
1118 (unsigned int) bm);
1119 target =
1120 (0 !=
1121 (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1122 options = GSF_PRO_DEFAULTS;
1123 spid = 0;
1124 if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1
1125 + tec.priority))
1126 || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1127 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2
1128 + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1129 {
1130 /* don't have BW to send to peer, or would likely take longer than we have for it,
1131 * so at best indirect the query */
1132 tec.priority = 0;
1133 options |= GSF_PRO_FORWARD_ONLY;
1134 spid = GNUNET_PEER_intern (cps->ppd.peer);
1135 GNUNET_assert (0 != spid);
1136 }
1137 tec.ttl = bound_ttl (ntohl (gm->ttl),
1138 tec.priority);
1139 /* decrement ttl (always) */
1140 ttl_decrement =
1141 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1142 TTL_DECREMENT);
1143 if ((tec.ttl < 0) &&
1144 (((int32_t) (tec.ttl - ttl_decrement)) > 0))
1145 {
1146 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1147 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1148 GNUNET_i2s (cps->ppd.peer),
1149 tec.ttl,
1150 ttl_decrement);
1151 GNUNET_STATISTICS_update (GSF_stats,
1152 gettext_noop
1153 ("# requests dropped due TTL underflow"), 1,
1154 GNUNET_NO);
1155 /* integer underflow => drop (should be very rare)! */
1156 return;
1157 }
1158 tec.ttl -= ttl_decrement;
1159
1160 /* test if the request already exists */
1161 tec.finished = GNUNET_NO;
1162 GNUNET_CONTAINER_multihashmap_get_multiple (cp->request_map,
1163 &gm->query,
1164 &test_exist_cb,
1165 &tec);
1166 if (GNUNET_YES == tec.finished)
1167 return; /* merged into existing request, we're done */
1168
1169 peerreq = GNUNET_new (struct PeerRequest);
1170 peerreq->cp = cp;
1171 pr = GSF_pending_request_create_ (options,
1172 tec.type,
1173 &gm->query,
1174 target,
1175 (bfsize > 0)
1176 ? (const char *) &opt[bits]
1177 : NULL,
1178 bfsize,
1179 1 /* anonymity */,
1180 (uint32_t) tec.priority,
1181 tec.ttl,
1182 spid,
1183 GNUNET_PEER_intern (cps->ppd.peer),
1184 NULL, 0, /* replies_seen */
1185 &handle_p2p_reply,
1186 peerreq);
1187 GNUNET_assert (NULL != pr);
1188 prd = GSF_pending_request_get_data_ (pr);
1189 peerreq->pr = pr;
1190 GNUNET_break (GNUNET_OK ==
1191 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1192 &prd->query,
1193 peerreq,
1194 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1195 GNUNET_STATISTICS_update (GSF_stats,
1196 gettext_noop (
1197 "# P2P query messages received and processed"),
1198 1,
1199 GNUNET_NO);
1200 GNUNET_STATISTICS_update (GSF_stats,
1201 gettext_noop ("# P2P searches active"),
1202 1,
1203 GNUNET_NO);
1204 GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
1205 GSF_local_lookup_ (pr,
1206 &GSF_consider_forwarding,
1207 NULL);
1208}
1209
1210
1211/**
1212 * Transmit a message to the given peer as soon as possible.
1213 * If the peer disconnects before the transmission can happen,
1214 * the callback is invoked with a `NULL` @a buffer.
1215 *
1216 * @param cp target peer
1217 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1218 * @param priority how important is this request?
1219 * @param env message to send
1220 */
1221void
1222GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1223 int is_query,
1224 uint32_t priority,
1225 struct GNUNET_MQ_Envelope *env)
1226{
1227 struct GSF_PeerTransmitHandle *pth;
1228 struct GSF_PeerTransmitHandle *pos;
1229 struct GSF_PeerTransmitHandle *prev;
1230
1231 pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1232 pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1233 pth->env = env;
1234 pth->is_query = is_query;
1235 pth->priority = priority;
1236 pth->cp = cp;
1237 /* insertion sort (by priority, descending) */
1238 prev = NULL;
1239 pos = cp->pth_head;
1240 while ((NULL != pos) && (pos->priority > priority))
1241 {
1242 prev = pos;
1243 pos = pos->next;
1244 }
1245 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1246 cp->pth_tail,
1247 prev,
1248 pth);
1249 if (GNUNET_YES == is_query)
1250 cp->ppd.pending_queries++;
1251 else if (GNUNET_NO == is_query)
1252 cp->ppd.pending_replies++;
1253 schedule_transmission (pth);
1254}
1255
1256
1257/**
1258 * Report on receiving a reply; update the performance record of the given peer.
1259 *
1260 * @param cp responding peer (will be updated)
1261 * @param request_time time at which the original query was transmitted
1262 * @param request_priority priority of the original request
1263 */
1264void
1265GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1266 struct GNUNET_TIME_Absolute request_time,
1267 uint32_t request_priority)
1268{
1269 struct GNUNET_TIME_Relative delay;
1270
1271 delay = GNUNET_TIME_absolute_get_duration (request_time);
1272 cp->ppd.avg_reply_delay.rel_value_us =
1273 (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1)
1274 + delay.rel_value_us) / RUNAVG_DELAY_N;
1275 cp->ppd.avg_priority =
1276 (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1)
1277 + request_priority) / RUNAVG_DELAY_N;
1278}
1279
1280
1281/**
1282 * Report on receiving a reply in response to an initiating client.
1283 * Remember that this peer is good for this client.
1284 *
1285 * @param cp responding peer (will be updated)
1286 * @param initiator_client local client on responsible for query
1287 */
1288void
1289GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1290 struct GSF_LocalClient *initiator_client)
1291{
1292 cp->ppd.last_client_replies[cp->last_client_replies_woff++
1293 % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1294}
1295
1296
1297/**
1298 * Report on receiving a reply in response to an initiating peer.
1299 * Remember that this peer is good for this initiating peer.
1300 *
1301 * @param cp responding peer (will be updated)
1302 * @param initiator_peer other peer responsible for query
1303 */
1304void
1305GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1306 const struct GSF_ConnectedPeer *initiator_peer)
1307{
1308 unsigned int woff;
1309
1310 woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1311 GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1312 cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1313 GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1314 cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1315}
1316
1317
1318/**
1319 * Write peer-respect information to a file - flush the buffer entry!
1320 *
1321 * @param cls unused
1322 * @param key peer identity
1323 * @param value the `struct GSF_ConnectedPeer` to flush
1324 * @return #GNUNET_OK to continue iteration
1325 */
1326static int
1327flush_respect (void *cls,
1328 const struct GNUNET_PeerIdentity *key,
1329 void *value)
1330{
1331 struct GSF_ConnectedPeer *cp = value;
1332 struct GNUNET_PeerIdentity pid;
1333
1334 if (cp->ppd.respect == cp->disk_respect)
1335 return GNUNET_OK; /* unchanged */
1336 GNUNET_assert (0 != cp->ppd.pid);
1337 GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1338 GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1339 sizeof(cp->ppd.respect),
1340 GNUNET_TIME_UNIT_FOREVER_ABS,
1341 GNUNET_PEERSTORE_STOREOPTION_REPLACE,
1342 NULL,
1343 NULL);
1344 return GNUNET_OK;
1345}
1346
1347
1348void
1349GSF_peer_disconnect_handler (void *cls,
1350 const struct GNUNET_PeerIdentity *peer,
1351 void *internal_cls)
1352{
1353 struct GSF_ConnectedPeer *cp = internal_cls;
1354 struct GSF_PeerTransmitHandle *pth;
1355 struct GSF_DelayedHandle *dh;
1356
1357 if (NULL == cp)
1358 return; /* must have been disconnect from core with
1359 * 'peer' == my_id, ignore */
1360 flush_respect (NULL,
1361 peer,
1362 cp);
1363 GNUNET_assert (GNUNET_YES ==
1364 GNUNET_CONTAINER_multipeermap_remove (cp_map,
1365 peer,
1366 cp));
1367 GNUNET_STATISTICS_set (GSF_stats,
1368 gettext_noop ("# peers connected"),
1369 GNUNET_CONTAINER_multipeermap_size (cp_map),
1370 GNUNET_NO);
1371 if (NULL != cp->respect_iterate_req)
1372 {
1373 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1374 cp->respect_iterate_req = NULL;
1375 }
1376 GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1377 &cancel_pending_request,
1378 cp);
1379 GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1380 cp->request_map = NULL;
1381 GSF_plan_notify_peer_disconnect_ (cp);
1382 GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1383 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies,
1384 P2P_SUCCESS_LIST_SIZE);
1385 memset (cp->ppd.last_p2p_replies,
1386 0,
1387 sizeof(cp->ppd.last_p2p_replies));
1388 GSF_push_stop_ (cp);
1389 while (NULL != (pth = cp->pth_head))
1390 {
1391 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1392 cp->pth_tail,
1393 pth);
1394 if (GNUNET_YES == pth->is_query)
1395 GNUNET_assert (0 < cp->ppd.pending_queries--);
1396 else if (GNUNET_NO == pth->is_query)
1397 GNUNET_assert (0 < cp->ppd.pending_replies--);
1398 GNUNET_free (pth);
1399 }
1400 while (NULL != (dh = cp->delayed_head))
1401 {
1402 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1403 cp->delayed_tail,
1404 dh);
1405 GNUNET_MQ_discard (dh->env);
1406 cp->delay_queue_size--;
1407 GNUNET_SCHEDULER_cancel (dh->delay_task);
1408 GNUNET_free (dh);
1409 }
1410 GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1411 if (NULL != cp->mig_revive_task)
1412 {
1413 GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1414 cp->mig_revive_task = NULL;
1415 }
1416 GNUNET_break (0 == cp->ppd.pending_queries);
1417 GNUNET_break (0 == cp->ppd.pending_replies);
1418 GNUNET_free (cp);
1419}
1420
1421
1422/**
1423 * Closure for #call_iterator().
1424 */
1425struct IterationContext
1426{
1427 /**
1428 * Function to call on each entry.
1429 */
1430 GSF_ConnectedPeerIterator it;
1431
1432 /**
1433 * Closure for @e it.
1434 */
1435 void *it_cls;
1436};
1437
1438
1439/**
1440 * Function that calls the callback for each peer.
1441 *
1442 * @param cls the `struct IterationContext *`
1443 * @param key identity of the peer
1444 * @param value the `struct GSF_ConnectedPeer *`
1445 * @return #GNUNET_YES to continue iteration
1446 */
1447static int
1448call_iterator (void *cls,
1449 const struct GNUNET_PeerIdentity *key,
1450 void *value)
1451{
1452 struct IterationContext *ic = cls;
1453 struct GSF_ConnectedPeer *cp = value;
1454
1455 ic->it (ic->it_cls,
1456 key, cp,
1457 &cp->ppd);
1458 return GNUNET_YES;
1459}
1460
1461
1462void
1463GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1464 void *it_cls)
1465{
1466 struct IterationContext ic;
1467
1468 ic.it = it;
1469 ic.it_cls = it_cls;
1470 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1471 &call_iterator,
1472 &ic);
1473}
1474
1475
1476/**
1477 * Obtain the identity of a connected peer.
1478 *
1479 * @param cp peer to get identity of
1480 * @param id identity to set (written to)
1481 */
1482void
1483GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1484 struct GNUNET_PeerIdentity *id)
1485{
1486 GNUNET_assert (0 != cp->ppd.pid);
1487 GNUNET_PEER_resolve (cp->ppd.pid, id);
1488}
1489
1490
1491/**
1492 * Obtain the identity of a connected peer.
1493 *
1494 * @param cp peer to get identity of
1495 * @return reference to peer identity, valid until peer disconnects (!)
1496 */
1497const struct GNUNET_PeerIdentity *
1498GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1499{
1500 GNUNET_assert (0 != cp->ppd.pid);
1501 return GNUNET_PEER_resolve2 (cp->ppd.pid);
1502}
1503
1504
1505/**
1506 * Ask a peer to stop migrating data to us until the given point
1507 * in time.
1508 *
1509 * @param cp peer to ask
1510 * @param block_time until when to block
1511 */
1512void
1513GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1514 struct GNUNET_TIME_Absolute block_time)
1515{
1516 struct GNUNET_MQ_Envelope *env;
1517 struct MigrationStopMessage *msm;
1518
1519 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1520 {
1521 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1522 "Migration already blocked for another %s\n",
1523 GNUNET_STRINGS_relative_time_to_string (
1524 GNUNET_TIME_absolute_get_remaining
1525 (cp->
1526 last_migration_block), GNUNET_YES));
1527 return; /* already blocked */
1528 }
1529 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1530 GNUNET_STRINGS_relative_time_to_string (
1531 GNUNET_TIME_absolute_get_remaining (block_time),
1532 GNUNET_YES));
1533 cp->last_migration_block = block_time;
1534 env = GNUNET_MQ_msg (msm,
1535 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1536 msm->reserved = htonl (0);
1537 msm->duration
1538 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1539 (cp->last_migration_block));
1540 GNUNET_STATISTICS_update (GSF_stats,
1541 gettext_noop ("# migration stop messages sent"),
1542 1,
1543 GNUNET_NO);
1544 GSF_peer_transmit_ (cp,
1545 GNUNET_SYSERR,
1546 UINT32_MAX,
1547 env);
1548}
1549
1550
1551/**
1552 * Notify core about a preference we have for the given peer
1553 * (to allocate more resources towards it). The change will
1554 * be communicated the next time we reserve bandwidth with
1555 * core (not instantly).
1556 *
1557 * @param cp peer to reserve bandwidth from
1558 * @param pref preference change
1559 */
1560void
1561GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1562 uint64_t pref)
1563{
1564 cp->inc_preference += pref;
1565}
1566
1567
1568/**
1569 * Call this method periodically to flush respect information to disk.
1570 *
1571 * @param cls closure, not used
1572 */
1573static void
1574cron_flush_respect (void *cls)
1575{
1576 fr_task = NULL;
1577 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1578 &flush_respect,
1579 NULL);
1580 fr_task = GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1581 GNUNET_SCHEDULER_PRIORITY_HIGH,
1582 &cron_flush_respect,
1583 NULL);
1584}
1585
1586
1587/**
1588 * Initialize peer management subsystem.
1589 */
1590void
1591GSF_connected_peer_init_ ()
1592{
1593 cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1594 peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1595 fr_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1596 &cron_flush_respect, NULL);
1597}
1598
1599
1600/**
1601 * Shutdown peer management subsystem.
1602 */
1603void
1604GSF_connected_peer_done_ ()
1605{
1606 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1607 &flush_respect,
1608 NULL);
1609 GNUNET_SCHEDULER_cancel (fr_task);
1610 fr_task = NULL;
1611 GNUNET_CONTAINER_multipeermap_destroy (cp_map);
1612 cp_map = NULL;
1613 GNUNET_PEERSTORE_disconnect (peerstore,
1614 GNUNET_YES);
1615}
1616
1617
1618/**
1619 * Iterator to remove references to LC entry.
1620 *
1621 * @param cls the `struct GSF_LocalClient *` to look for
1622 * @param key current key code
1623 * @param value value in the hash map (peer entry)
1624 * @return #GNUNET_YES (we should continue to iterate)
1625 */
1626static int
1627clean_local_client (void *cls,
1628 const struct GNUNET_PeerIdentity *key,
1629 void *value)
1630{
1631 const struct GSF_LocalClient *lc = cls;
1632 struct GSF_ConnectedPeer *cp = value;
1633 unsigned int i;
1634
1635 for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1636 if (cp->ppd.last_client_replies[i] == lc)
1637 cp->ppd.last_client_replies[i] = NULL;
1638 return GNUNET_YES;
1639}
1640
1641
1642/**
1643 * Notification that a local client disconnected. Clean up all of our
1644 * references to the given handle.
1645 *
1646 * @param lc handle to the local client (henceforth invalid)
1647 */
1648void
1649GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1650{
1651 if (NULL == cp_map)
1652 return; /* already cleaned up */
1653 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1654 &clean_local_client,
1655 (void *) lc);
1656}
1657
1658
1659/* end of gnunet-service-fs_cp.c */