aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c1826
1 files changed, 0 insertions, 1826 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
deleted file mode 100644
index 30b895752..000000000
--- a/src/fs/gnunet-service-fs_cp.c
+++ /dev/null
@@ -1,1826 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2011, 2016 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file fs/gnunet-service-fs_cp.c
22 * @brief API to handle 'connected peers'
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "gnunet_util_lib.h"
27#include "gnunet_load_lib.h"
28#include "gnunet-service-fs.h"
29#include "gnunet-service-fs_cp.h"
30#include "gnunet-service-fs_pe.h"
31#include "gnunet-service-fs_pr.h"
32#include "gnunet-service-fs_push.h"
33#include "gnunet_peerstore_service.h"
34
35
36/**
37 * Ratio for moving average delay calculation. The previous
38 * average goes in with a factor of (n-1) into the calculation.
39 * Must be > 0.
40 */
41#define RUNAVG_DELAY_N 16
42
43/**
44 * How often do we flush respect values to disk?
45 */
46#define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply ( \
47 GNUNET_TIME_UNIT_MINUTES, 5)
48
49/**
50 * After how long do we discard a reply?
51 */
52#define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \
53 2)
54
55/**
56 * Collect an instance number of statistics? May cause excessive IPC.
57 */
58#define INSANE_STATISTICS GNUNET_NO
59
60
61/**
62 * Handle to cancel a transmission request.
63 */
64struct GSF_PeerTransmitHandle
65{
66 /**
67 * Kept in a doubly-linked list.
68 */
69 struct GSF_PeerTransmitHandle *next;
70
71 /**
72 * Kept in a doubly-linked list.
73 */
74 struct GSF_PeerTransmitHandle *prev;
75
76 /**
77 * Time when this transmission request was issued.
78 */
79 struct GNUNET_TIME_Absolute transmission_request_start_time;
80
81 /**
82 * Envelope with the actual message.
83 */
84 struct GNUNET_MQ_Envelope *env;
85
86 /**
87 * Peer this request targets.
88 */
89 struct GSF_ConnectedPeer *cp;
90
91 /**
92 * #GNUNET_YES if this is a query, #GNUNET_NO for content.
93 */
94 int is_query;
95
96 /**
97 * Did we get a reservation already?
98 */
99 int was_reserved;
100
101 /**
102 * Priority of this request.
103 */
104 uint32_t priority;
105};
106
107
108/**
109 * Handle for an entry in our delay list.
110 */
111struct GSF_DelayedHandle
112{
113 /**
114 * Kept in a doubly-linked list.
115 */
116 struct GSF_DelayedHandle *next;
117
118 /**
119 * Kept in a doubly-linked list.
120 */
121 struct GSF_DelayedHandle *prev;
122
123 /**
124 * Peer this transmission belongs to.
125 */
126 struct GSF_ConnectedPeer *cp;
127
128 /**
129 * Envelope of the message that was delayed.
130 */
131 struct GNUNET_MQ_Envelope *env;
132
133 /**
134 * Task for the delay.
135 */
136 struct GNUNET_SCHEDULER_Task *delay_task;
137
138 /**
139 * Size of the message.
140 */
141 size_t msize;
142};
143
144
145/**
146 * Information per peer and request.
147 */
148struct PeerRequest
149{
150 /**
151 * Handle to generic request (generic: from peer or local client).
152 */
153 struct GSF_PendingRequest *pr;
154
155 /**
156 * Which specific peer issued this request?
157 */
158 struct GSF_ConnectedPeer *cp;
159
160 /**
161 * Task for asynchronous stopping of this request.
162 */
163 struct GNUNET_SCHEDULER_Task *kill_task;
164};
165
166
167/**
168 * A connected peer.
169 */
170struct GSF_ConnectedPeer
171{
172 /**
173 * Performance data for this peer.
174 */
175 struct GSF_PeerPerformanceData ppd;
176
177 /**
178 * Time until when we blocked this peer from migrating
179 * data to us.
180 */
181 struct GNUNET_TIME_Absolute last_migration_block;
182
183 /**
184 * Task scheduled to revive migration to this peer.
185 */
186 struct GNUNET_SCHEDULER_Task *mig_revive_task;
187
188 /**
189 * Messages (replies, queries, content migration) we would like to
190 * send to this peer in the near future. Sorted by priority, head.
191 */
192 struct GSF_PeerTransmitHandle *pth_head;
193
194 /**
195 * Messages (replies, queries, content migration) we would like to
196 * send to this peer in the near future. Sorted by priority, tail.
197 */
198 struct GSF_PeerTransmitHandle *pth_tail;
199
200 /**
201 * Messages (replies, queries, content migration) we would like to
202 * send to this peer in the near future. Sorted by priority, head.
203 */
204 struct GSF_DelayedHandle *delayed_head;
205
206 /**
207 * Messages (replies, queries, content migration) we would like to
208 * send to this peer in the near future. Sorted by priority, tail.
209 */
210 struct GSF_DelayedHandle *delayed_tail;
211
212 /**
213 * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
214 */
215 struct GNUNET_ATS_ReservationContext *rc;
216
217 /**
218 * Task scheduled if we need to retry bandwidth reservation later.
219 */
220 struct GNUNET_SCHEDULER_Task *rc_delay_task;
221
222 /**
223 * Active requests from this neighbour, map of query to `struct PeerRequest`.
224 */
225 struct GNUNET_CONTAINER_MultiHashMap *request_map;
226
227 /**
228 * Handle for an active request for transmission to this
229 * peer.
230 */
231 struct GNUNET_MQ_Handle *mq;
232
233 /**
234 * Increase in traffic preference still to be submitted
235 * to the core service for this peer.
236 */
237 uint64_t inc_preference;
238
239 /**
240 * Number of entries in @e delayed_head DLL.
241 */
242 unsigned int delay_queue_size;
243
244 /**
245 * Respect rating for this peer on disk.
246 */
247 uint32_t disk_respect;
248
249 /**
250 * Which offset in @e last_p2p_replies will be updated next?
251 * (we go round-robin).
252 */
253 unsigned int last_p2p_replies_woff;
254
255 /**
256 * Which offset in @e last_client_replies will be updated next?
257 * (we go round-robin).
258 */
259 unsigned int last_client_replies_woff;
260
261 /**
262 * Current offset into @e last_request_times ring buffer.
263 */
264 unsigned int last_request_times_off;
265
266 /**
267 * #GNUNET_YES if we did successfully reserve 32k bandwidth,
268 * #GNUNET_NO if not.
269 */
270 int did_reserve;
271
272 /**
273 * Handle to the PEERSTORE iterate request for peer respect value
274 */
275 struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
276};
277
278
279/**
280 * Map from peer identities to `struct GSF_ConnectPeer` entries.
281 */
282static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
283
284/**
285 * Handle to peerstore service.
286 */
287static struct GNUNET_PEERSTORE_Handle *peerstore;
288
289/**
290 * Task used to flush respect values to disk.
291 */
292static struct GNUNET_SCHEDULER_Task *fr_task;
293
294
295/**
296 * Update the latency information kept for the given peer.
297 *
298 * @param id peer record to update
299 * @param latency current latency value
300 */
301void
302GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
303 struct GNUNET_TIME_Relative latency)
304{
305 struct GSF_ConnectedPeer *cp;
306
307 cp = GSF_peer_get_ (id);
308 if (NULL == cp)
309 return; /* we're not yet connected at the core level, ignore */
310 GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
311 latency);
312}
313
314
315/**
316 * Return the performance data record for the given peer
317 *
318 * @param cp peer to query
319 * @return performance data record for the peer
320 */
321struct GSF_PeerPerformanceData *
322GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
323{
324 return &cp->ppd;
325}
326
327
328/**
329 * Core is ready to transmit to a peer, get the message.
330 *
331 * @param cp which peer to send a message to
332 */
333static void
334peer_transmit (struct GSF_ConnectedPeer *cp);
335
336
337/**
338 * Function called by core upon success or failure of our bandwidth reservation request.
339 *
340 * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
341 * @param peer identifies the peer
342 * @param amount set to the amount that was actually reserved or unreserved;
343 * either the full requested amount or zero (no partial reservations)
344 * @param res_delay if the reservation could not be satisfied (amount was 0), how
345 * long should the client wait until re-trying?
346 */
347static void
348ats_reserve_callback (void *cls,
349 const struct GNUNET_PeerIdentity *peer,
350 int32_t amount,
351 struct GNUNET_TIME_Relative res_delay);
352
353
354/**
355 * If ready (bandwidth reserved), try to schedule transmission via
356 * core for the given handle.
357 *
358 * @param pth transmission handle to schedule
359 */
360static void
361schedule_transmission (struct GSF_PeerTransmitHandle *pth)
362{
363 struct GSF_ConnectedPeer *cp;
364 struct GNUNET_PeerIdentity target;
365
366 cp = pth->cp;
367 GNUNET_assert (0 != cp->ppd.pid);
368 GNUNET_PEER_resolve (cp->ppd.pid, &target);
369
370 if (0 != cp->inc_preference)
371 {
372 GNUNET_ATS_performance_change_preference (GSF_ats,
373 &target,
374 GNUNET_ATS_PREFERENCE_BANDWIDTH,
375 (double) cp->inc_preference,
376 GNUNET_ATS_PREFERENCE_END);
377 cp->inc_preference = 0;
378 }
379
380 if ((GNUNET_YES == pth->is_query) &&
381 (GNUNET_YES != pth->was_reserved))
382 {
383 /* query, need reservation */
384 if (GNUNET_YES != cp->did_reserve)
385 return; /* not ready */
386 cp->did_reserve = GNUNET_NO;
387 /* reservation already done! */
388 pth->was_reserved = GNUNET_YES;
389 cp->rc = GNUNET_ATS_reserve_bandwidth (GSF_ats,
390 &target,
391 DBLOCK_SIZE,
392 &ats_reserve_callback,
393 cp);
394 return;
395 }
396 peer_transmit (cp);
397}
398
399
400/**
401 * Core is ready to transmit to a peer, get the message.
402 *
403 * @param cp which peer to send a message to
404 */
405static void
406peer_transmit (struct GSF_ConnectedPeer *cp)
407{
408 struct GSF_PeerTransmitHandle *pth = cp->pth_head;
409 struct GSF_PeerTransmitHandle *pos;
410
411 if (NULL == pth)
412 return;
413 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
414 cp->pth_tail,
415 pth);
416 if (GNUNET_YES == pth->is_query)
417 {
418 cp->ppd.last_request_times[(cp->last_request_times_off++)
419 % MAX_QUEUE_PER_PEER] =
420 GNUNET_TIME_absolute_get ();
421 GNUNET_assert (0 < cp->ppd.pending_queries--);
422 }
423 else if (GNUNET_NO == pth->is_query)
424 {
425 GNUNET_assert (0 < cp->ppd.pending_replies--);
426 }
427 GNUNET_LOAD_update (cp->ppd.transmission_delay,
428 GNUNET_TIME_absolute_get_duration
429 (pth->transmission_request_start_time).rel_value_us);
430 GNUNET_MQ_send (cp->mq,
431 pth->env);
432 GNUNET_free (pth);
433 if (NULL != (pos = cp->pth_head))
434 {
435 GNUNET_assert (pos != pth);
436 schedule_transmission (pos);
437 }
438}
439
440
441/**
442 * (re)try to reserve bandwidth from the given peer.
443 *
444 * @param cls the `struct GSF_ConnectedPeer` to reserve from
445 */
446static void
447retry_reservation (void *cls)
448{
449 struct GSF_ConnectedPeer *cp = cls;
450 struct GNUNET_PeerIdentity target;
451
452 GNUNET_PEER_resolve (cp->ppd.pid, &target);
453 cp->rc_delay_task = NULL;
454 cp->rc =
455 GNUNET_ATS_reserve_bandwidth (GSF_ats,
456 &target,
457 DBLOCK_SIZE,
458 &ats_reserve_callback, cp);
459}
460
461
462/**
463 * Function called by core upon success or failure of our bandwidth reservation request.
464 *
465 * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
466 * @param peer identifies the peer
467 * @param amount set to the amount that was actually reserved or unreserved;
468 * either the full requested amount or zero (no partial reservations)
469 * @param res_delay if the reservation could not be satisfied (amount was 0), how
470 * long should the client wait until re-trying?
471 */
472static void
473ats_reserve_callback (void *cls,
474 const struct GNUNET_PeerIdentity *peer,
475 int32_t amount,
476 struct GNUNET_TIME_Relative res_delay)
477{
478 struct GSF_ConnectedPeer *cp = cls;
479 struct GSF_PeerTransmitHandle *pth;
480
481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482 "Reserved %d bytes / need to wait %s for reservation\n",
483 (int) amount,
484 GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
485 cp->rc = NULL;
486 if (0 == amount)
487 {
488 cp->rc_delay_task =
489 GNUNET_SCHEDULER_add_delayed (res_delay,
490 &retry_reservation,
491 cp);
492 return;
493 }
494 cp->did_reserve = GNUNET_YES;
495 pth = cp->pth_head;
496 if (NULL != pth)
497 {
498 /* reservation success, try transmission now! */
499 peer_transmit (cp);
500 }
501}
502
503
504/**
505 * Function called by PEERSTORE with peer respect record
506 *
507 * @param cls handle to connected peer entry
508 * @param record peerstore record information
509 * @param emsg error message, or NULL if no errors
510 */
511static void
512peer_respect_cb (void *cls,
513 const struct GNUNET_PEERSTORE_Record *record,
514 const char *emsg)
515{
516 struct GSF_ConnectedPeer *cp = cls;
517
518 GNUNET_assert (NULL != cp->respect_iterate_req);
519 if ((NULL != record) &&
520 (sizeof(cp->disk_respect) == record->value_size))
521 {
522 cp->disk_respect = *((uint32_t *) record->value);
523 cp->ppd.respect += *((uint32_t *) record->value);
524 }
525 GSF_push_start_ (cp);
526 if (NULL != record)
527 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
528 cp->respect_iterate_req = NULL;
529}
530
531
532/**
533 * Function called for each pending request whenever a new
534 * peer connects, giving us a chance to decide about submitting
535 * the existing request to the new peer.
536 *
537 * @param cls the `struct GSF_ConnectedPeer` of the new peer
538 * @param key query for the request
539 * @param pr handle to the pending request
540 * @return #GNUNET_YES to continue to iterate
541 */
542static int
543consider_peer_for_forwarding (void *cls,
544 const struct GNUNET_HashCode *key,
545 struct GSF_PendingRequest *pr)
546{
547 struct GSF_ConnectedPeer *cp = cls;
548 struct GNUNET_PeerIdentity pid;
549
550 if (GNUNET_YES !=
551 GSF_pending_request_test_active_ (pr))
552 return GNUNET_YES; /* request is not actually active, skip! */
553 GSF_connected_peer_get_identity_ (cp, &pid);
554 if (GNUNET_YES !=
555 GSF_pending_request_test_target_ (pr, &pid))
556 {
557 GNUNET_STATISTICS_update (GSF_stats,
558 gettext_noop ("# Loopback routes suppressed"),
559 1,
560 GNUNET_NO);
561 return GNUNET_YES;
562 }
563 GSF_plan_add_ (cp, pr);
564 return GNUNET_YES;
565}
566
567
568/**
569 * A peer connected to us. Setup the connected peer
570 * records.
571 *
572 * @param cls NULL
573 * @param peer identity of peer that connected
574 * @param mq message queue for talking to @a peer
575 * @return our internal handle for the peer
576 */
577void *
578GSF_peer_connect_handler (void *cls,
579 const struct GNUNET_PeerIdentity *peer,
580 struct GNUNET_MQ_Handle *mq)
581{
582 struct GSF_ConnectedPeer *cp;
583
584 if (0 ==
585 GNUNET_memcmp (&GSF_my_id,
586 peer))
587 return NULL;
588 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
589 "Connected to peer %s\n",
590 GNUNET_i2s (peer));
591 cp = GNUNET_new (struct GSF_ConnectedPeer);
592 cp->ppd.pid = GNUNET_PEER_intern (peer);
593 cp->ppd.peer = peer;
594 cp->mq = mq;
595 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
596 cp->rc =
597 GNUNET_ATS_reserve_bandwidth (GSF_ats,
598 peer,
599 DBLOCK_SIZE,
600 &ats_reserve_callback, cp);
601 cp->request_map = GNUNET_CONTAINER_multihashmap_create (128,
602 GNUNET_YES);
603 GNUNET_break (GNUNET_OK ==
604 GNUNET_CONTAINER_multipeermap_put (cp_map,
605 GSF_connected_peer_get_identity2_ (
606 cp),
607 cp,
608 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
609 GNUNET_STATISTICS_set (GSF_stats,
610 gettext_noop ("# peers connected"),
611 GNUNET_CONTAINER_multipeermap_size (cp_map),
612 GNUNET_NO);
613 cp->respect_iterate_req
614 = GNUNET_PEERSTORE_iterate (peerstore,
615 "fs",
616 peer,
617 "respect",
618 &peer_respect_cb,
619 cp);
620 GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
621 cp);
622 return cp;
623}
624
625
626/**
627 * It may be time to re-start migrating content to this
628 * peer. Check, and if so, restart migration.
629 *
630 * @param cls the `struct GSF_ConnectedPeer`
631 */
632static void
633revive_migration (void *cls)
634{
635 struct GSF_ConnectedPeer *cp = cls;
636 struct GNUNET_TIME_Relative bt;
637
638 cp->mig_revive_task = NULL;
639 bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
640 if (0 != bt.rel_value_us)
641 {
642 /* still time left... */
643 cp->mig_revive_task =
644 GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
645 return;
646 }
647 GSF_push_start_ (cp);
648}
649
650
651/**
652 * Get a handle for a connected peer.
653 *
654 * @param peer peer's identity
655 * @return NULL if the peer is not currently connected
656 */
657struct GSF_ConnectedPeer *
658GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
659{
660 if (NULL == cp_map)
661 return NULL;
662 return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
663}
664
665
666/**
667 * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
668 *
669 * @param cls closure, the `struct GSF_ConnectedPeer`
670 * @param msm the actual message
671 */
672void
673handle_p2p_migration_stop (void *cls,
674 const struct MigrationStopMessage *msm)
675{
676 struct GSF_ConnectedPeer *cp = cls;
677 struct GNUNET_TIME_Relative bt;
678
679 GNUNET_STATISTICS_update (GSF_stats,
680 gettext_noop ("# migration stop messages received"),
681 1, GNUNET_NO);
682 bt = GNUNET_TIME_relative_ntoh (msm->duration);
683 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
684 _ ("Migration of content to peer `%s' blocked for %s\n"),
685 GNUNET_i2s (cp->ppd.peer),
686 GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
687 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
688 if ((NULL == cp->mig_revive_task) &&
689 (NULL == cp->respect_iterate_req))
690 {
691 GSF_push_stop_ (cp);
692 cp->mig_revive_task =
693 GNUNET_SCHEDULER_add_delayed (bt,
694 &revive_migration, cp);
695 }
696}
697
698
699/**
700 * Free resources associated with the given peer request.
701 *
702 * @param peerreq request to free
703 */
704static void
705free_pending_request (struct PeerRequest *peerreq)
706{
707 struct GSF_ConnectedPeer *cp = peerreq->cp;
708 struct GSF_PendingRequestData *prd;
709
710 prd = GSF_pending_request_get_data_ (peerreq->pr);
711 if (NULL != peerreq->kill_task)
712 {
713 GNUNET_SCHEDULER_cancel (peerreq->kill_task);
714 peerreq->kill_task = NULL;
715 }
716 GNUNET_STATISTICS_update (GSF_stats,
717 gettext_noop ("# P2P searches active"),
718 -1,
719 GNUNET_NO);
720 GNUNET_break (GNUNET_YES ==
721 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
722 &prd->query,
723 peerreq));
724 GNUNET_free (peerreq);
725}
726
727
728/**
729 * Cancel all requests associated with the peer.
730 *
731 * @param cls unused
732 * @param query hash code of the request
733 * @param value the `struct GSF_PendingRequest`
734 * @return #GNUNET_YES (continue to iterate)
735 */
736static int
737cancel_pending_request (void *cls,
738 const struct GNUNET_HashCode *query,
739 void *value)
740{
741 struct PeerRequest *peerreq = value;
742 struct GSF_PendingRequest *pr = peerreq->pr;
743
744 free_pending_request (peerreq);
745 GSF_pending_request_cancel_ (pr,
746 GNUNET_NO);
747 return GNUNET_OK;
748}
749
750
751/**
752 * Free the given request.
753 *
754 * @param cls the request to free
755 */
756static void
757peer_request_destroy (void *cls)
758{
759 struct PeerRequest *peerreq = cls;
760 struct GSF_PendingRequest *pr = peerreq->pr;
761 struct GSF_PendingRequestData *prd;
762
763 peerreq->kill_task = NULL;
764 prd = GSF_pending_request_get_data_ (pr);
765 cancel_pending_request (NULL,
766 &prd->query,
767 peerreq);
768}
769
770
771/**
772 * The artificial delay is over, transmit the message now.
773 *
774 * @param cls the `struct GSF_DelayedHandle` with the message
775 */
776static void
777transmit_delayed_now (void *cls)
778{
779 struct GSF_DelayedHandle *dh = cls;
780 struct GSF_ConnectedPeer *cp = dh->cp;
781
782 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
783 cp->delayed_tail,
784 dh);
785 cp->delay_queue_size--;
786 GSF_peer_transmit_ (cp,
787 GNUNET_NO,
788 UINT32_MAX,
789 dh->env);
790 GNUNET_free (dh);
791}
792
793
794/**
795 * Get the randomized delay a response should be subjected to.
796 *
797 * @return desired delay
798 */
799static struct GNUNET_TIME_Relative
800get_randomized_delay ()
801{
802 struct GNUNET_TIME_Relative ret;
803
804 ret =
805 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
806 GNUNET_CRYPTO_random_u32
807 (GNUNET_CRYPTO_QUALITY_WEAK,
808 2 * GSF_avg_latency.rel_value_us + 1));
809#if INSANE_STATISTICS
810 GNUNET_STATISTICS_update (GSF_stats,
811 gettext_noop
812 ("# artificial delays introduced (ms)"),
813 ret.rel_value_us / 1000LL, GNUNET_NO);
814#endif
815 return ret;
816}
817
818
819/**
820 * Handle a reply to a pending request. Also called if a request
821 * expires (then with data == NULL). The handler may be called
822 * many times (depending on the request type), but will not be
823 * called during or after a call to GSF_pending_request_cancel
824 * and will also not be called anymore after a call signalling
825 * expiration.
826 *
827 * @param cls `struct PeerRequest` this is an answer for
828 * @param eval evaluation of the result
829 * @param pr handle to the original pending request
830 * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
831 * @param expiration when does @a data expire?
832 * @param last_transmission when did we last transmit a request for this block
833 * @param type type of the block
834 * @param data response data, NULL on request expiration
835 * @param data_len number of bytes in @a data
836 */
837static void
838handle_p2p_reply (void *cls,
839 enum GNUNET_BLOCK_ReplyEvaluationResult eval,
840 struct GSF_PendingRequest *pr,
841 uint32_t reply_anonymity_level,
842 struct GNUNET_TIME_Absolute expiration,
843 struct GNUNET_TIME_Absolute last_transmission,
844 enum GNUNET_BLOCK_Type type,
845 const void *data,
846 size_t data_len)
847{
848 struct PeerRequest *peerreq = cls;
849 struct GSF_ConnectedPeer *cp = peerreq->cp;
850 struct GSF_PendingRequestData *prd;
851 struct GNUNET_MQ_Envelope *env;
852 struct PutMessage *pm;
853 size_t msize;
854
855 GNUNET_assert (data_len + sizeof(struct PutMessage) <
856 GNUNET_MAX_MESSAGE_SIZE);
857 GNUNET_assert (peerreq->pr == pr);
858 prd = GSF_pending_request_get_data_ (pr);
859 if (NULL == data)
860 {
861 free_pending_request (peerreq);
862 return;
863 }
864 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
865 if ( (prd->type != type) &&
866 (GNUNET_BLOCK_TYPE_ANY != prd->type) )
867 {
868 GNUNET_STATISTICS_update (GSF_stats,
869 "# replies dropped due to type mismatch",
870 1, GNUNET_NO);
871 return;
872 }
873 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874 "Transmitting result for query `%s' to peer\n",
875 GNUNET_h2s (&prd->query));
876 GNUNET_STATISTICS_update (GSF_stats,
877 "# replies received for other peers",
878 1,
879 GNUNET_NO);
880 msize = sizeof(struct PutMessage) + data_len;
881 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
882 {
883 GNUNET_break (0);
884 return;
885 }
886 if ( (UINT32_MAX != reply_anonymity_level) &&
887 (reply_anonymity_level > 1) )
888 {
889 if (reply_anonymity_level - 1 > GSF_cover_content_count)
890 {
891 GNUNET_STATISTICS_update (GSF_stats,
892 "# replies dropped due to insufficient cover traffic",
893 1, GNUNET_NO);
894 return;
895 }
896 GSF_cover_content_count -= (reply_anonymity_level - 1);
897 }
898
899 env = GNUNET_MQ_msg_extra (pm,
900 data_len,
901 GNUNET_MESSAGE_TYPE_FS_PUT);
902 pm->type = htonl (type);
903 pm->expiration = GNUNET_TIME_absolute_hton (expiration);
904 GNUNET_memcpy (&pm[1],
905 data,
906 data_len);
907 if ((UINT32_MAX != reply_anonymity_level) &&
908 (0 != reply_anonymity_level) &&
909 (GNUNET_YES == GSF_enable_randomized_delays))
910 {
911 struct GSF_DelayedHandle *dh;
912
913 dh = GNUNET_new (struct GSF_DelayedHandle);
914 dh->cp = cp;
915 dh->env = env;
916 dh->msize = msize;
917 GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
918 cp->delayed_tail,
919 dh);
920 cp->delay_queue_size++;
921 dh->delay_task =
922 GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
923 &transmit_delayed_now,
924 dh);
925 }
926 else
927 {
928 GSF_peer_transmit_ (cp,
929 GNUNET_NO,
930 UINT32_MAX,
931 env);
932 }
933 if (GNUNET_BLOCK_REPLY_OK_LAST != eval)
934 return;
935 if (NULL == peerreq->kill_task)
936 {
937 GNUNET_STATISTICS_update (GSF_stats,
938 "# P2P searches destroyed due to ultimate reply",
939 1,
940 GNUNET_NO);
941 peerreq->kill_task =
942 GNUNET_SCHEDULER_add_now (&peer_request_destroy,
943 peerreq);
944 }
945}
946
947
948/**
949 * Increase the peer's respect by a value.
950 *
951 * @param cp which peer to change the respect value on
952 * @param value is the int value by which the
953 * peer's credit is to be increased or decreased
954 * @returns the actual change in respect (positive or negative)
955 */
956static int
957change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
958{
959 if (0 == value)
960 return 0;
961 GNUNET_assert (NULL != cp);
962 if (value > 0)
963 {
964 if (cp->ppd.respect + value < cp->ppd.respect)
965 {
966 value = UINT32_MAX - cp->ppd.respect;
967 cp->ppd.respect = UINT32_MAX;
968 }
969 else
970 cp->ppd.respect += value;
971 }
972 else
973 {
974 if (cp->ppd.respect < -value)
975 {
976 value = -cp->ppd.respect;
977 cp->ppd.respect = 0;
978 }
979 else
980 cp->ppd.respect += value;
981 }
982 return value;
983}
984
985
986/**
987 * We've received a request with the specified priority. Bound it
988 * according to how much we respect the given peer.
989 *
990 * @param prio_in requested priority
991 * @param cp the peer making the request
992 * @return effective priority
993 */
994static int32_t
995bound_priority (uint32_t prio_in,
996 struct GSF_ConnectedPeer *cp)
997{
998#define N ((double) 128.0)
999 uint32_t ret;
1000 double rret;
1001 int ld;
1002
1003 ld = GSF_test_get_load_too_high_ (0);
1004 if (GNUNET_SYSERR == ld)
1005 {
1006#if INSANE_STATISTICS
1007 GNUNET_STATISTICS_update (GSF_stats,
1008 gettext_noop
1009 ("# requests done for free (low load)"), 1,
1010 GNUNET_NO);
1011#endif
1012 return 0; /* excess resources */
1013 }
1014 if (prio_in > INT32_MAX)
1015 prio_in = INT32_MAX;
1016 ret = -change_peer_respect (cp, -(int) prio_in);
1017 if (ret > 0)
1018 {
1019 if (ret > GSF_current_priorities + N)
1020 rret = GSF_current_priorities + N;
1021 else
1022 rret = ret;
1023 GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1024 }
1025 if ((GNUNET_YES == ld) && (ret > 0))
1026 {
1027 /* try with charging */
1028 ld = GSF_test_get_load_too_high_ (ret);
1029 }
1030 if (GNUNET_YES == ld)
1031 {
1032 GNUNET_STATISTICS_update (GSF_stats,
1033 gettext_noop
1034 ("# request dropped, priority insufficient"), 1,
1035 GNUNET_NO);
1036 /* undo charge */
1037 change_peer_respect (cp, (int) ret);
1038 return -1; /* not enough resources */
1039 }
1040 else
1041 {
1042 GNUNET_STATISTICS_update (GSF_stats,
1043 gettext_noop
1044 ("# requests done for a price (normal load)"),
1045 1,
1046 GNUNET_NO);
1047 }
1048#undef N
1049 return ret;
1050}
1051
1052
1053/**
1054 * The priority level imposes a bound on the maximum
1055 * value for the ttl that can be requested.
1056 *
1057 * @param ttl_in requested ttl
1058 * @param prio given priority
1059 * @return @a ttl_in if @a ttl_in is below the limit,
1060 * otherwise the ttl-limit for the given @a prio
1061 */
1062static int32_t
1063bound_ttl (int32_t ttl_in,
1064 uint32_t prio)
1065{
1066 unsigned long long allowed;
1067
1068 if (ttl_in <= 0)
1069 return ttl_in;
1070 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1071 if (ttl_in > allowed)
1072 {
1073 if (allowed >= (1 << 30))
1074 return 1 << 30;
1075 return allowed;
1076 }
1077 return ttl_in;
1078}
1079
1080
1081/**
1082 * Closure for #test_exist_cb().
1083 */
1084struct TestExistClosure
1085{
1086 /**
1087 * Priority of the incoming request.
1088 */
1089 int32_t priority;
1090
1091 /**
1092 * Relative TTL of the incoming request.
1093 */
1094 int32_t ttl;
1095
1096 /**
1097 * Type of the incoming request.
1098 */
1099 enum GNUNET_BLOCK_Type type;
1100
1101 /**
1102 * Set to #GNUNET_YES if we are done handling the query.
1103 */
1104 int finished;
1105};
1106
1107
1108/**
1109 * Test if the query already exists. If so, merge it, otherwise
1110 * keep `finished` at #GNUNET_NO.
1111 *
1112 * @param cls our `struct TestExistClosure`
1113 * @param hc the key of the query
1114 * @param value the existing `struct PeerRequest`.
1115 * @return #GNUNET_YES to continue to iterate,
1116 * #GNUNET_NO if we successfully merged
1117 */
1118static int
1119test_exist_cb (void *cls,
1120 const struct GNUNET_HashCode *hc,
1121 void *value)
1122{
1123 struct TestExistClosure *tec = cls;
1124 struct PeerRequest *peerreq = value;
1125 struct GSF_PendingRequest *pr;
1126 struct GSF_PendingRequestData *prd;
1127
1128 pr = peerreq->pr;
1129 prd = GSF_pending_request_get_data_ (pr);
1130 if (prd->type != tec->type)
1131 return GNUNET_YES;
1132 if (prd->ttl.abs_value_us >=
1133 GNUNET_TIME_absolute_get ().abs_value_us + tec->ttl * 1000LL)
1134 {
1135 /* existing request has higher TTL, drop new one! */
1136 prd->priority += tec->priority;
1137 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1138 "Have existing request with higher TTL, dropping new request.\n");
1139 GNUNET_STATISTICS_update (GSF_stats,
1140 gettext_noop
1141 ("# requests dropped due to higher-TTL request"),
1142 1, GNUNET_NO);
1143 tec->finished = GNUNET_YES;
1144 return GNUNET_NO;
1145 }
1146 /* existing request has lower TTL, drop old one! */
1147 tec->priority += prd->priority;
1148 free_pending_request (peerreq);
1149 GSF_pending_request_cancel_ (pr,
1150 GNUNET_YES);
1151 return GNUNET_NO;
1152}
1153
1154
1155/**
1156 * Handle P2P "QUERY" message. Creates the pending request entry
1157 * and sets up all of the data structures to that we will
1158 * process replies properly. Does not initiate forwarding or
1159 * local database lookups.
1160 *
1161 * @param cls the other peer involved (sender of the message)
1162 * @param gm the GET message
1163 */
1164void
1165handle_p2p_get (void *cls,
1166 const struct GetMessage *gm)
1167{
1168 struct GSF_ConnectedPeer *cps = cls;
1169 struct PeerRequest *peerreq;
1170 struct GSF_PendingRequest *pr;
1171 struct GSF_ConnectedPeer *cp;
1172 const struct GNUNET_PeerIdentity *target;
1173 enum GSF_PendingRequestOptions options;
1174 uint16_t msize;
1175 unsigned int bits;
1176 const struct GNUNET_PeerIdentity *opt;
1177 uint32_t bm;
1178 size_t bfsize;
1179 uint32_t ttl_decrement;
1180 struct TestExistClosure tec;
1181 GNUNET_PEER_Id spid;
1182 const struct GSF_PendingRequestData *prd;
1183
1184 msize = ntohs (gm->header.size);
1185 tec.type = ntohl (gm->type);
1186 bm = ntohl (gm->hash_bitmap);
1187 bits = 0;
1188 while (bm > 0)
1189 {
1190 if (1 == (bm & 1))
1191 bits++;
1192 bm >>= 1;
1193 }
1194 opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1195 bfsize = msize - sizeof(struct GetMessage) - bits * sizeof(struct
1196 GNUNET_PeerIdentity);
1197 GNUNET_STATISTICS_update (GSF_stats,
1198 gettext_noop
1199 ("# GET requests received (from other peers)"),
1200 1,
1201 GNUNET_NO);
1202 GSF_cover_query_count++;
1203 bm = ntohl (gm->hash_bitmap);
1204 bits = 0;
1205 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1206 cp = GSF_peer_get_ (&opt[bits++]);
1207 else
1208 cp = cps;
1209 if (NULL == cp)
1210 {
1211 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1212 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1213 "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1214 GNUNET_i2s (&opt[bits - 1]));
1215
1216 else
1217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1218 "Failed to find peer `%s' in connection set. Dropping query.\n",
1219 GNUNET_i2s (cps->ppd.peer));
1220 GNUNET_STATISTICS_update (GSF_stats,
1221 gettext_noop
1222 (
1223 "# requests dropped due to missing reverse route"),
1224 1,
1225 GNUNET_NO);
1226 return;
1227 }
1228 unsigned int queue_size = GNUNET_MQ_get_length (cp->mq);
1229 queue_size += cp->ppd.pending_replies + cp->delay_queue_size;
1230 if (queue_size > MAX_QUEUE_PER_PEER)
1231 {
1232 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1233 "Peer `%s' has too many replies queued already. Dropping query.\n",
1234 GNUNET_i2s (cps->ppd.peer));
1235 GNUNET_STATISTICS_update (GSF_stats,
1236 gettext_noop (
1237 "# requests dropped due to full reply queue"),
1238 1,
1239 GNUNET_NO);
1240 return;
1241 }
1242 /* note that we can really only check load here since otherwise
1243 * peers could find out that we are overloaded by not being
1244 * disconnected after sending us a malformed query... */
1245 tec.priority = bound_priority (ntohl (gm->priority),
1246 cps);
1247 if (tec.priority < 0)
1248 {
1249 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1250 "Dropping query from `%s', this peer is too busy.\n",
1251 GNUNET_i2s (cps->ppd.peer));
1252 return;
1253 }
1254 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255 "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1256 GNUNET_h2s (&gm->query),
1257 (unsigned int) tec.type,
1258 GNUNET_i2s (cps->ppd.peer),
1259 (unsigned int) bm);
1260 target =
1261 (0 !=
1262 (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1263 options = GSF_PRO_DEFAULTS;
1264 spid = 0;
1265 if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1
1266 + tec.priority))
1267 || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1268 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2
1269 + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1270 {
1271 /* don't have BW to send to peer, or would likely take longer than we have for it,
1272 * so at best indirect the query */
1273 tec.priority = 0;
1274 options |= GSF_PRO_FORWARD_ONLY;
1275 spid = GNUNET_PEER_intern (cps->ppd.peer);
1276 GNUNET_assert (0 != spid);
1277 }
1278 tec.ttl = bound_ttl (ntohl (gm->ttl),
1279 tec.priority);
1280 /* decrement ttl (always) */
1281 ttl_decrement =
1282 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1283 TTL_DECREMENT);
1284 if ((tec.ttl < 0) &&
1285 (((int32_t) (tec.ttl - ttl_decrement)) > 0))
1286 {
1287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1288 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1289 GNUNET_i2s (cps->ppd.peer),
1290 tec.ttl,
1291 ttl_decrement);
1292 GNUNET_STATISTICS_update (GSF_stats,
1293 gettext_noop
1294 ("# requests dropped due TTL underflow"), 1,
1295 GNUNET_NO);
1296 /* integer underflow => drop (should be very rare)! */
1297 return;
1298 }
1299 tec.ttl -= ttl_decrement;
1300
1301 /* test if the request already exists */
1302 tec.finished = GNUNET_NO;
1303 GNUNET_CONTAINER_multihashmap_get_multiple (cp->request_map,
1304 &gm->query,
1305 &test_exist_cb,
1306 &tec);
1307 if (GNUNET_YES == tec.finished)
1308 return; /* merged into existing request, we're done */
1309
1310 peerreq = GNUNET_new (struct PeerRequest);
1311 peerreq->cp = cp;
1312 pr = GSF_pending_request_create_ (options,
1313 tec.type,
1314 &gm->query,
1315 target,
1316 (bfsize > 0)
1317 ? (const char *) &opt[bits]
1318 : NULL,
1319 bfsize,
1320 1 /* anonymity */,
1321 (uint32_t) tec.priority,
1322 tec.ttl,
1323 spid,
1324 GNUNET_PEER_intern (cps->ppd.peer),
1325 NULL, 0, /* replies_seen */
1326 &handle_p2p_reply,
1327 peerreq);
1328 GNUNET_assert (NULL != pr);
1329 prd = GSF_pending_request_get_data_ (pr);
1330 peerreq->pr = pr;
1331 GNUNET_break (GNUNET_OK ==
1332 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1333 &prd->query,
1334 peerreq,
1335 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1336 GNUNET_STATISTICS_update (GSF_stats,
1337 gettext_noop (
1338 "# P2P query messages received and processed"),
1339 1,
1340 GNUNET_NO);
1341 GNUNET_STATISTICS_update (GSF_stats,
1342 gettext_noop ("# P2P searches active"),
1343 1,
1344 GNUNET_NO);
1345 GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
1346 GSF_local_lookup_ (pr,
1347 &GSF_consider_forwarding,
1348 NULL);
1349}
1350
1351
1352/**
1353 * Transmit a message to the given peer as soon as possible.
1354 * If the peer disconnects before the transmission can happen,
1355 * the callback is invoked with a `NULL` @a buffer.
1356 *
1357 * @param cp target peer
1358 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1359 * @param priority how important is this request?
1360 * @param timeout when does this request timeout
1361 * @param size number of bytes we would like to send to the peer
1362 * @param env message to send
1363 */
1364void
1365GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1366 int is_query,
1367 uint32_t priority,
1368 struct GNUNET_MQ_Envelope *env)
1369{
1370 struct GSF_PeerTransmitHandle *pth;
1371 struct GSF_PeerTransmitHandle *pos;
1372 struct GSF_PeerTransmitHandle *prev;
1373
1374 pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1375 pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1376 pth->env = env;
1377 pth->is_query = is_query;
1378 pth->priority = priority;
1379 pth->cp = cp;
1380 /* insertion sort (by priority, descending) */
1381 prev = NULL;
1382 pos = cp->pth_head;
1383 while ((NULL != pos) && (pos->priority > priority))
1384 {
1385 prev = pos;
1386 pos = pos->next;
1387 }
1388 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1389 cp->pth_tail,
1390 prev,
1391 pth);
1392 if (GNUNET_YES == is_query)
1393 cp->ppd.pending_queries++;
1394 else if (GNUNET_NO == is_query)
1395 cp->ppd.pending_replies++;
1396 schedule_transmission (pth);
1397}
1398
1399
1400/**
1401 * Report on receiving a reply; update the performance record of the given peer.
1402 *
1403 * @param cp responding peer (will be updated)
1404 * @param request_time time at which the original query was transmitted
1405 * @param request_priority priority of the original request
1406 */
1407void
1408GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1409 struct GNUNET_TIME_Absolute request_time,
1410 uint32_t request_priority)
1411{
1412 struct GNUNET_TIME_Relative delay;
1413
1414 delay = GNUNET_TIME_absolute_get_duration (request_time);
1415 cp->ppd.avg_reply_delay.rel_value_us =
1416 (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1)
1417 + delay.rel_value_us) / RUNAVG_DELAY_N;
1418 cp->ppd.avg_priority =
1419 (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1)
1420 + request_priority) / RUNAVG_DELAY_N;
1421}
1422
1423
1424/**
1425 * Report on receiving a reply in response to an initiating client.
1426 * Remember that this peer is good for this client.
1427 *
1428 * @param cp responding peer (will be updated)
1429 * @param initiator_client local client on responsible for query
1430 */
1431void
1432GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1433 struct GSF_LocalClient *initiator_client)
1434{
1435 cp->ppd.last_client_replies[cp->last_client_replies_woff++
1436 % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1437}
1438
1439
1440/**
1441 * Report on receiving a reply in response to an initiating peer.
1442 * Remember that this peer is good for this initiating peer.
1443 *
1444 * @param cp responding peer (will be updated)
1445 * @param initiator_peer other peer responsible for query
1446 */
1447void
1448GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1449 const struct GSF_ConnectedPeer *initiator_peer)
1450{
1451 unsigned int woff;
1452
1453 woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1454 GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1455 cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1456 GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1457 cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1458}
1459
1460
1461/**
1462 * Write peer-respect information to a file - flush the buffer entry!
1463 *
1464 * @param cls unused
1465 * @param key peer identity
1466 * @param value the `struct GSF_ConnectedPeer` to flush
1467 * @return #GNUNET_OK to continue iteration
1468 */
1469static int
1470flush_respect (void *cls,
1471 const struct GNUNET_PeerIdentity *key,
1472 void *value)
1473{
1474 struct GSF_ConnectedPeer *cp = value;
1475 struct GNUNET_PeerIdentity pid;
1476
1477 if (cp->ppd.respect == cp->disk_respect)
1478 return GNUNET_OK; /* unchanged */
1479 GNUNET_assert (0 != cp->ppd.pid);
1480 GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1481 GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1482 sizeof(cp->ppd.respect),
1483 GNUNET_TIME_UNIT_FOREVER_ABS,
1484 GNUNET_PEERSTORE_STOREOPTION_REPLACE,
1485 NULL,
1486 NULL);
1487 return GNUNET_OK;
1488}
1489
1490
1491/**
1492 * A peer disconnected from us. Tear down the connected peer
1493 * record.
1494 *
1495 * @param cls unused
1496 * @param peer identity of peer that disconnected
1497 * @param internal_cls the corresponding `struct GSF_ConnectedPeer`
1498 */
1499void
1500GSF_peer_disconnect_handler (void *cls,
1501 const struct GNUNET_PeerIdentity *peer,
1502 void *internal_cls)
1503{
1504 struct GSF_ConnectedPeer *cp = internal_cls;
1505 struct GSF_PeerTransmitHandle *pth;
1506 struct GSF_DelayedHandle *dh;
1507
1508 if (NULL == cp)
1509 return; /* must have been disconnect from core with
1510 * 'peer' == my_id, ignore */
1511 flush_respect (NULL,
1512 peer,
1513 cp);
1514 GNUNET_assert (GNUNET_YES ==
1515 GNUNET_CONTAINER_multipeermap_remove (cp_map,
1516 peer,
1517 cp));
1518 GNUNET_STATISTICS_set (GSF_stats,
1519 gettext_noop ("# peers connected"),
1520 GNUNET_CONTAINER_multipeermap_size (cp_map),
1521 GNUNET_NO);
1522 if (NULL != cp->respect_iterate_req)
1523 {
1524 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1525 cp->respect_iterate_req = NULL;
1526 }
1527 if (NULL != cp->rc)
1528 {
1529 GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1530 cp->rc = NULL;
1531 }
1532 if (NULL != cp->rc_delay_task)
1533 {
1534 GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1535 cp->rc_delay_task = NULL;
1536 }
1537 GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1538 &cancel_pending_request,
1539 cp);
1540 GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1541 cp->request_map = NULL;
1542 GSF_plan_notify_peer_disconnect_ (cp);
1543 GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1544 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies,
1545 P2P_SUCCESS_LIST_SIZE);
1546 memset (cp->ppd.last_p2p_replies,
1547 0,
1548 sizeof(cp->ppd.last_p2p_replies));
1549 GSF_push_stop_ (cp);
1550 while (NULL != (pth = cp->pth_head))
1551 {
1552 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1553 cp->pth_tail,
1554 pth);
1555 if (GNUNET_YES == pth->is_query)
1556 GNUNET_assert (0 < cp->ppd.pending_queries--);
1557 else if (GNUNET_NO == pth->is_query)
1558 GNUNET_assert (0 < cp->ppd.pending_replies--);
1559 GNUNET_free (pth);
1560 }
1561 while (NULL != (dh = cp->delayed_head))
1562 {
1563 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1564 cp->delayed_tail,
1565 dh);
1566 GNUNET_MQ_discard (dh->env);
1567 cp->delay_queue_size--;
1568 GNUNET_SCHEDULER_cancel (dh->delay_task);
1569 GNUNET_free (dh);
1570 }
1571 GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1572 if (NULL != cp->mig_revive_task)
1573 {
1574 GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1575 cp->mig_revive_task = NULL;
1576 }
1577 GNUNET_break (0 == cp->ppd.pending_queries);
1578 GNUNET_break (0 == cp->ppd.pending_replies);
1579 GNUNET_free (cp);
1580}
1581
1582
1583/**
1584 * Closure for #call_iterator().
1585 */
1586struct IterationContext
1587{
1588 /**
1589 * Function to call on each entry.
1590 */
1591 GSF_ConnectedPeerIterator it;
1592
1593 /**
1594 * Closure for @e it.
1595 */
1596 void *it_cls;
1597};
1598
1599
1600/**
1601 * Function that calls the callback for each peer.
1602 *
1603 * @param cls the `struct IterationContext *`
1604 * @param key identity of the peer
1605 * @param value the `struct GSF_ConnectedPeer *`
1606 * @return #GNUNET_YES to continue iteration
1607 */
1608static int
1609call_iterator (void *cls,
1610 const struct GNUNET_PeerIdentity *key,
1611 void *value)
1612{
1613 struct IterationContext *ic = cls;
1614 struct GSF_ConnectedPeer *cp = value;
1615
1616 ic->it (ic->it_cls,
1617 key, cp,
1618 &cp->ppd);
1619 return GNUNET_YES;
1620}
1621
1622
1623/**
1624 * Iterate over all connected peers.
1625 *
1626 * @param it function to call for each peer
1627 * @param it_cls closure for @a it
1628 */
1629void
1630GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1631 void *it_cls)
1632{
1633 struct IterationContext ic;
1634
1635 ic.it = it;
1636 ic.it_cls = it_cls;
1637 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1638 &call_iterator,
1639 &ic);
1640}
1641
1642
1643/**
1644 * Obtain the identity of a connected peer.
1645 *
1646 * @param cp peer to get identity of
1647 * @param id identity to set (written to)
1648 */
1649void
1650GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1651 struct GNUNET_PeerIdentity *id)
1652{
1653 GNUNET_assert (0 != cp->ppd.pid);
1654 GNUNET_PEER_resolve (cp->ppd.pid, id);
1655}
1656
1657
1658/**
1659 * Obtain the identity of a connected peer.
1660 *
1661 * @param cp peer to get identity of
1662 * @return reference to peer identity, valid until peer disconnects (!)
1663 */
1664const struct GNUNET_PeerIdentity *
1665GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1666{
1667 GNUNET_assert (0 != cp->ppd.pid);
1668 return GNUNET_PEER_resolve2 (cp->ppd.pid);
1669}
1670
1671
1672/**
1673 * Ask a peer to stop migrating data to us until the given point
1674 * in time.
1675 *
1676 * @param cp peer to ask
1677 * @param block_time until when to block
1678 */
1679void
1680GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1681 struct GNUNET_TIME_Absolute block_time)
1682{
1683 struct GNUNET_MQ_Envelope *env;
1684 struct MigrationStopMessage *msm;
1685
1686 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1687 {
1688 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1689 "Migration already blocked for another %s\n",
1690 GNUNET_STRINGS_relative_time_to_string (
1691 GNUNET_TIME_absolute_get_remaining
1692 (cp->
1693 last_migration_block), GNUNET_YES));
1694 return; /* already blocked */
1695 }
1696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1697 GNUNET_STRINGS_relative_time_to_string (
1698 GNUNET_TIME_absolute_get_remaining (block_time),
1699 GNUNET_YES));
1700 cp->last_migration_block = block_time;
1701 env = GNUNET_MQ_msg (msm,
1702 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1703 msm->reserved = htonl (0);
1704 msm->duration
1705 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1706 (cp->last_migration_block));
1707 GNUNET_STATISTICS_update (GSF_stats,
1708 gettext_noop ("# migration stop messages sent"),
1709 1,
1710 GNUNET_NO);
1711 GSF_peer_transmit_ (cp,
1712 GNUNET_SYSERR,
1713 UINT32_MAX,
1714 env);
1715}
1716
1717
1718/**
1719 * Notify core about a preference we have for the given peer
1720 * (to allocate more resources towards it). The change will
1721 * be communicated the next time we reserve bandwidth with
1722 * core (not instantly).
1723 *
1724 * @param cp peer to reserve bandwidth from
1725 * @param pref preference change
1726 */
1727void
1728GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1729 uint64_t pref)
1730{
1731 cp->inc_preference += pref;
1732}
1733
1734
1735/**
1736 * Call this method periodically to flush respect information to disk.
1737 *
1738 * @param cls closure, not used
1739 */
1740static void
1741cron_flush_respect (void *cls)
1742{
1743 fr_task = NULL;
1744 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1745 &flush_respect,
1746 NULL);
1747 fr_task = GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1748 GNUNET_SCHEDULER_PRIORITY_HIGH,
1749 &cron_flush_respect,
1750 NULL);
1751}
1752
1753
1754/**
1755 * Initialize peer management subsystem.
1756 */
1757void
1758GSF_connected_peer_init_ ()
1759{
1760 cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1761 peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1762 fr_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1763 &cron_flush_respect, NULL);
1764}
1765
1766
1767/**
1768 * Shutdown peer management subsystem.
1769 */
1770void
1771GSF_connected_peer_done_ ()
1772{
1773 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1774 &flush_respect,
1775 NULL);
1776 GNUNET_SCHEDULER_cancel (fr_task);
1777 fr_task = NULL;
1778 GNUNET_CONTAINER_multipeermap_destroy (cp_map);
1779 cp_map = NULL;
1780 GNUNET_PEERSTORE_disconnect (peerstore,
1781 GNUNET_YES);
1782}
1783
1784
1785/**
1786 * Iterator to remove references to LC entry.
1787 *
1788 * @param cls the `struct GSF_LocalClient *` to look for
1789 * @param key current key code
1790 * @param value value in the hash map (peer entry)
1791 * @return #GNUNET_YES (we should continue to iterate)
1792 */
1793static int
1794clean_local_client (void *cls,
1795 const struct GNUNET_PeerIdentity *key,
1796 void *value)
1797{
1798 const struct GSF_LocalClient *lc = cls;
1799 struct GSF_ConnectedPeer *cp = value;
1800 unsigned int i;
1801
1802 for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1803 if (cp->ppd.last_client_replies[i] == lc)
1804 cp->ppd.last_client_replies[i] = NULL;
1805 return GNUNET_YES;
1806}
1807
1808
1809/**
1810 * Notification that a local client disconnected. Clean up all of our
1811 * references to the given handle.
1812 *
1813 * @param lc handle to the local client (henceforth invalid)
1814 */
1815void
1816GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1817{
1818 if (NULL == cp_map)
1819 return; /* already cleaned up */
1820 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1821 &clean_local_client,
1822 (void *) lc);
1823}
1824
1825
1826/* end of gnunet-service-fs_cp.c */