aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c1827
1 files changed, 0 insertions, 1827 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
deleted file mode 100644
index 5476aa2be..000000000
--- a/src/fs/gnunet-service-fs_cp.c
+++ /dev/null
@@ -1,1827 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2011, 2016 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file fs/gnunet-service-fs_cp.c
22 * @brief API to handle 'connected peers'
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "gnunet_util_lib.h"
27#include "gnunet_load_lib.h"
28#include "gnunet-service-fs.h"
29#include "gnunet-service-fs_cp.h"
30#include "gnunet-service-fs_pe.h"
31#include "gnunet-service-fs_pr.h"
32#include "gnunet-service-fs_push.h"
33#include "gnunet_peerstore_service.h"
34
35
36/**
37 * Ratio for moving average delay calculation. The previous
38 * average goes in with a factor of (n-1) into the calculation.
39 * Must be > 0.
40 */
41#define RUNAVG_DELAY_N 16
42
43/**
44 * How often do we flush respect values to disk?
45 */
46#define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply ( \
47 GNUNET_TIME_UNIT_MINUTES, 5)
48
49/**
50 * After how long do we discard a reply?
51 */
52#define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \
53 2)
54
55/**
56 * Collect an instance number of statistics? May cause excessive IPC.
57 */
58#define INSANE_STATISTICS GNUNET_NO
59
60
61/**
62 * Handle to cancel a transmission request.
63 */
64struct GSF_PeerTransmitHandle
65{
66 /**
67 * Kept in a doubly-linked list.
68 */
69 struct GSF_PeerTransmitHandle *next;
70
71 /**
72 * Kept in a doubly-linked list.
73 */
74 struct GSF_PeerTransmitHandle *prev;
75
76 /**
77 * Time when this transmission request was issued.
78 */
79 struct GNUNET_TIME_Absolute transmission_request_start_time;
80
81 /**
82 * Envelope with the actual message.
83 */
84 struct GNUNET_MQ_Envelope *env;
85
86 /**
87 * Peer this request targets.
88 */
89 struct GSF_ConnectedPeer *cp;
90
91 /**
92 * #GNUNET_YES if this is a query, #GNUNET_NO for content.
93 */
94 int is_query;
95
96 /**
97 * Did we get a reservation already?
98 */
99 int was_reserved;
100
101 /**
102 * Priority of this request.
103 */
104 uint32_t priority;
105};
106
107
108/**
109 * Handle for an entry in our delay list.
110 */
111struct GSF_DelayedHandle
112{
113 /**
114 * Kept in a doubly-linked list.
115 */
116 struct GSF_DelayedHandle *next;
117
118 /**
119 * Kept in a doubly-linked list.
120 */
121 struct GSF_DelayedHandle *prev;
122
123 /**
124 * Peer this transmission belongs to.
125 */
126 struct GSF_ConnectedPeer *cp;
127
128 /**
129 * Envelope of the message that was delayed.
130 */
131 struct GNUNET_MQ_Envelope *env;
132
133 /**
134 * Task for the delay.
135 */
136 struct GNUNET_SCHEDULER_Task *delay_task;
137
138 /**
139 * Size of the message.
140 */
141 size_t msize;
142};
143
144
145/**
146 * Information per peer and request.
147 */
148struct PeerRequest
149{
150 /**
151 * Handle to generic request (generic: from peer or local client).
152 */
153 struct GSF_PendingRequest *pr;
154
155 /**
156 * Which specific peer issued this request?
157 */
158 struct GSF_ConnectedPeer *cp;
159
160 /**
161 * Task for asynchronous stopping of this request.
162 */
163 struct GNUNET_SCHEDULER_Task *kill_task;
164};
165
166
167/**
168 * A connected peer.
169 */
170struct GSF_ConnectedPeer
171{
172 /**
173 * Performance data for this peer.
174 */
175 struct GSF_PeerPerformanceData ppd;
176
177 /**
178 * Time until when we blocked this peer from migrating
179 * data to us.
180 */
181 struct GNUNET_TIME_Absolute last_migration_block;
182
183 /**
184 * Task scheduled to revive migration to this peer.
185 */
186 struct GNUNET_SCHEDULER_Task *mig_revive_task;
187
188 /**
189 * Messages (replies, queries, content migration) we would like to
190 * send to this peer in the near future. Sorted by priority, head.
191 */
192 struct GSF_PeerTransmitHandle *pth_head;
193
194 /**
195 * Messages (replies, queries, content migration) we would like to
196 * send to this peer in the near future. Sorted by priority, tail.
197 */
198 struct GSF_PeerTransmitHandle *pth_tail;
199
200 /**
201 * Messages (replies, queries, content migration) we would like to
202 * send to this peer in the near future. Sorted by priority, head.
203 */
204 struct GSF_DelayedHandle *delayed_head;
205
206 /**
207 * Messages (replies, queries, content migration) we would like to
208 * send to this peer in the near future. Sorted by priority, tail.
209 */
210 struct GSF_DelayedHandle *delayed_tail;
211
212 /**
213 * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
214 */
215 struct GNUNET_ATS_ReservationContext *rc;
216
217 /**
218 * Task scheduled if we need to retry bandwidth reservation later.
219 */
220 struct GNUNET_SCHEDULER_Task *rc_delay_task;
221
222 /**
223 * Active requests from this neighbour, map of query to `struct PeerRequest`.
224 */
225 struct GNUNET_CONTAINER_MultiHashMap *request_map;
226
227 /**
228 * Handle for an active request for transmission to this
229 * peer.
230 */
231 struct GNUNET_MQ_Handle *mq;
232
233 /**
234 * Increase in traffic preference still to be submitted
235 * to the core service for this peer.
236 */
237 uint64_t inc_preference;
238
239 /**
240 * Number of entries in @e delayed_head DLL.
241 */
242 unsigned int delay_queue_size;
243
244 /**
245 * Respect rating for this peer on disk.
246 */
247 uint32_t disk_respect;
248
249 /**
250 * Which offset in @e last_p2p_replies will be updated next?
251 * (we go round-robin).
252 */
253 unsigned int last_p2p_replies_woff;
254
255 /**
256 * Which offset in @e last_client_replies will be updated next?
257 * (we go round-robin).
258 */
259 unsigned int last_client_replies_woff;
260
261 /**
262 * Current offset into @e last_request_times ring buffer.
263 */
264 unsigned int last_request_times_off;
265
266 /**
267 * #GNUNET_YES if we did successfully reserve 32k bandwidth,
268 * #GNUNET_NO if not.
269 */
270 int did_reserve;
271
272 /**
273 * Handle to the PEERSTORE iterate request for peer respect value
274 */
275 struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
276};
277
278
279/**
280 * Map from peer identities to `struct GSF_ConnectPeer` entries.
281 */
282static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
283
284/**
285 * Handle to peerstore service.
286 */
287static struct GNUNET_PEERSTORE_Handle *peerstore;
288
289/**
290 * Task used to flush respect values to disk.
291 */
292static struct GNUNET_SCHEDULER_Task *fr_task;
293
294
295/**
296 * Update the latency information kept for the given peer.
297 *
298 * @param id peer record to update
299 * @param latency current latency value
300 */
301void
302GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
303 struct GNUNET_TIME_Relative latency)
304{
305 struct GSF_ConnectedPeer *cp;
306
307 cp = GSF_peer_get_ (id);
308 if (NULL == cp)
309 return; /* we're not yet connected at the core level, ignore */
310 GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
311 latency);
312}
313
314
315/**
316 * Return the performance data record for the given peer
317 *
318 * @param cp peer to query
319 * @return performance data record for the peer
320 */
321struct GSF_PeerPerformanceData *
322GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
323{
324 return &cp->ppd;
325}
326
327
328/**
329 * Core is ready to transmit to a peer, get the message.
330 *
331 * @param cp which peer to send a message to
332 */
333static void
334peer_transmit (struct GSF_ConnectedPeer *cp);
335
336
337/**
338 * Function called by core upon success or failure of our bandwidth reservation request.
339 *
340 * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
341 * @param peer identifies the peer
342 * @param amount set to the amount that was actually reserved or unreserved;
343 * either the full requested amount or zero (no partial reservations)
344 * @param res_delay if the reservation could not be satisfied (amount was 0), how
345 * long should the client wait until re-trying?
346 */
347static void
348ats_reserve_callback (void *cls,
349 const struct GNUNET_PeerIdentity *peer,
350 int32_t amount,
351 struct GNUNET_TIME_Relative res_delay);
352
353
354/**
355 * If ready (bandwidth reserved), try to schedule transmission via
356 * core for the given handle.
357 *
358 * @param pth transmission handle to schedule
359 */
360static void
361schedule_transmission (struct GSF_PeerTransmitHandle *pth)
362{
363 struct GSF_ConnectedPeer *cp;
364 struct GNUNET_PeerIdentity target;
365
366 cp = pth->cp;
367 GNUNET_assert (0 != cp->ppd.pid);
368 GNUNET_PEER_resolve (cp->ppd.pid, &target);
369
370 if (0 != cp->inc_preference)
371 {
372 GNUNET_ATS_performance_change_preference (GSF_ats,
373 &target,
374 GNUNET_ATS_PREFERENCE_BANDWIDTH,
375 (double) cp->inc_preference,
376 GNUNET_ATS_PREFERENCE_END);
377 cp->inc_preference = 0;
378 }
379
380 if ((GNUNET_YES == pth->is_query) &&
381 (GNUNET_YES != pth->was_reserved))
382 {
383 /* query, need reservation */
384 if (GNUNET_YES != cp->did_reserve)
385 return; /* not ready */
386 cp->did_reserve = GNUNET_NO;
387 /* reservation already done! */
388 pth->was_reserved = GNUNET_YES;
389 cp->rc = GNUNET_ATS_reserve_bandwidth (GSF_ats,
390 &target,
391 DBLOCK_SIZE,
392 &ats_reserve_callback,
393 cp);
394 return;
395 }
396 peer_transmit (cp);
397}
398
399
400/**
401 * Core is ready to transmit to a peer, get the message.
402 *
403 * @param cp which peer to send a message to
404 */
405static void
406peer_transmit (struct GSF_ConnectedPeer *cp)
407{
408 struct GSF_PeerTransmitHandle *pth = cp->pth_head;
409 struct GSF_PeerTransmitHandle *pos;
410
411 if (NULL == pth)
412 return;
413 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
414 cp->pth_tail,
415 pth);
416 if (GNUNET_YES == pth->is_query)
417 {
418 cp->ppd.last_request_times[(cp->last_request_times_off++)
419 % MAX_QUEUE_PER_PEER] =
420 GNUNET_TIME_absolute_get ();
421 GNUNET_assert (0 < cp->ppd.pending_queries--);
422 }
423 else if (GNUNET_NO == pth->is_query)
424 {
425 GNUNET_assert (0 < cp->ppd.pending_replies--);
426 }
427 GNUNET_LOAD_update (cp->ppd.transmission_delay,
428 GNUNET_TIME_absolute_get_duration
429 (pth->transmission_request_start_time).rel_value_us);
430 GNUNET_MQ_send (cp->mq,
431 pth->env);
432 GNUNET_free (pth);
433 if (NULL != (pos = cp->pth_head))
434 {
435 GNUNET_assert (pos != pth);
436 schedule_transmission (pos);
437 }
438}
439
440
441/**
442 * (re)try to reserve bandwidth from the given peer.
443 *
444 * @param cls the `struct GSF_ConnectedPeer` to reserve from
445 */
446static void
447retry_reservation (void *cls)
448{
449 struct GSF_ConnectedPeer *cp = cls;
450 struct GNUNET_PeerIdentity target;
451
452 GNUNET_PEER_resolve (cp->ppd.pid, &target);
453 cp->rc_delay_task = NULL;
454 cp->rc =
455 GNUNET_ATS_reserve_bandwidth (GSF_ats,
456 &target,
457 DBLOCK_SIZE,
458 &ats_reserve_callback, cp);
459}
460
461
462/**
463 * Function called by core upon success or failure of our bandwidth reservation request.
464 *
465 * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
466 * @param peer identifies the peer
467 * @param amount set to the amount that was actually reserved or unreserved;
468 * either the full requested amount or zero (no partial reservations)
469 * @param res_delay if the reservation could not be satisfied (amount was 0), how
470 * long should the client wait until re-trying?
471 */
472static void
473ats_reserve_callback (void *cls,
474 const struct GNUNET_PeerIdentity *peer,
475 int32_t amount,
476 struct GNUNET_TIME_Relative res_delay)
477{
478 struct GSF_ConnectedPeer *cp = cls;
479 struct GSF_PeerTransmitHandle *pth;
480
481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482 "Reserved %d bytes / need to wait %s for reservation\n",
483 (int) amount,
484 GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
485 cp->rc = NULL;
486 if (0 == amount)
487 {
488 cp->rc_delay_task =
489 GNUNET_SCHEDULER_add_delayed (res_delay,
490 &retry_reservation,
491 cp);
492 return;
493 }
494 cp->did_reserve = GNUNET_YES;
495 pth = cp->pth_head;
496 if (NULL != pth)
497 {
498 /* reservation success, try transmission now! */
499 peer_transmit (cp);
500 }
501}
502
503
504/**
505 * Function called by PEERSTORE with peer respect record
506 *
507 * @param cls handle to connected peer entry
508 * @param record peerstore record information
509 * @param emsg error message, or NULL if no errors
510 */
511static void
512peer_respect_cb (void *cls,
513 const struct GNUNET_PEERSTORE_Record *record,
514 const char *emsg)
515{
516 struct GSF_ConnectedPeer *cp = cls;
517
518 GNUNET_assert (NULL != cp->respect_iterate_req);
519 if ((NULL != record) &&
520 (sizeof(cp->disk_respect) == record->value_size))
521 {
522 cp->disk_respect = *((uint32_t *) record->value);
523 cp->ppd.respect += *((uint32_t *) record->value);
524 }
525 GSF_push_start_ (cp);
526 if (NULL != record)
527 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
528 cp->respect_iterate_req = NULL;
529}
530
531
532/**
533 * Function called for each pending request whenever a new
534 * peer connects, giving us a chance to decide about submitting
535 * the existing request to the new peer.
536 *
537 * @param cls the `struct GSF_ConnectedPeer` of the new peer
538 * @param key query for the request
539 * @param pr handle to the pending request
540 * @return #GNUNET_YES to continue to iterate
541 */
542static int
543consider_peer_for_forwarding (void *cls,
544 const struct GNUNET_HashCode *key,
545 struct GSF_PendingRequest *pr)
546{
547 struct GSF_ConnectedPeer *cp = cls;
548 struct GNUNET_PeerIdentity pid;
549
550 if (GNUNET_YES !=
551 GSF_pending_request_test_active_ (pr))
552 return GNUNET_YES; /* request is not actually active, skip! */
553 GSF_connected_peer_get_identity_ (cp, &pid);
554 if (GNUNET_YES !=
555 GSF_pending_request_test_target_ (pr, &pid))
556 {
557 GNUNET_STATISTICS_update (GSF_stats,
558 gettext_noop ("# Loopback routes suppressed"),
559 1,
560 GNUNET_NO);
561 return GNUNET_YES;
562 }
563 GSF_plan_add_ (cp, pr);
564 return GNUNET_YES;
565}
566
567
568/**
569 * A peer connected to us. Setup the connected peer
570 * records.
571 *
572 * @param cls NULL
573 * @param peer identity of peer that connected
574 * @param mq message queue for talking to @a peer
575 * @return our internal handle for the peer
576 */
577void *
578GSF_peer_connect_handler (void *cls,
579 const struct GNUNET_PeerIdentity *peer,
580 struct GNUNET_MQ_Handle *mq)
581{
582 struct GSF_ConnectedPeer *cp;
583
584 if (0 ==
585 GNUNET_memcmp (&GSF_my_id,
586 peer))
587 return NULL;
588 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
589 "Connected to peer %s\n",
590 GNUNET_i2s (peer));
591 cp = GNUNET_new (struct GSF_ConnectedPeer);
592 cp->ppd.pid = GNUNET_PEER_intern (peer);
593 cp->ppd.peer = peer;
594 cp->mq = mq;
595 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
596 cp->rc =
597 GNUNET_ATS_reserve_bandwidth (GSF_ats,
598 peer,
599 DBLOCK_SIZE,
600 &ats_reserve_callback, cp);
601 cp->request_map = GNUNET_CONTAINER_multihashmap_create (128,
602 GNUNET_YES);
603 GNUNET_break (GNUNET_OK ==
604 GNUNET_CONTAINER_multipeermap_put (cp_map,
605 GSF_connected_peer_get_identity2_ (
606 cp),
607 cp,
608 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
609 GNUNET_STATISTICS_set (GSF_stats,
610 gettext_noop ("# peers connected"),
611 GNUNET_CONTAINER_multipeermap_size (cp_map),
612 GNUNET_NO);
613 cp->respect_iterate_req
614 = GNUNET_PEERSTORE_iterate (peerstore,
615 "fs",
616 peer,
617 "respect",
618 &peer_respect_cb,
619 cp);
620 GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
621 cp);
622 return cp;
623}
624
625
626/**
627 * It may be time to re-start migrating content to this
628 * peer. Check, and if so, restart migration.
629 *
630 * @param cls the `struct GSF_ConnectedPeer`
631 */
632static void
633revive_migration (void *cls)
634{
635 struct GSF_ConnectedPeer *cp = cls;
636 struct GNUNET_TIME_Relative bt;
637
638 cp->mig_revive_task = NULL;
639 bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
640 if (0 != bt.rel_value_us)
641 {
642 /* still time left... */
643 cp->mig_revive_task =
644 GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
645 return;
646 }
647 GSF_push_start_ (cp);
648}
649
650
651/**
652 * Get a handle for a connected peer.
653 *
654 * @param peer peer's identity
655 * @return NULL if the peer is not currently connected
656 */
657struct GSF_ConnectedPeer *
658GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
659{
660 if (NULL == cp_map)
661 return NULL;
662 return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
663}
664
665
666/**
667 * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
668 *
669 * @param cls closure, the `struct GSF_ConnectedPeer`
670 * @param msm the actual message
671 */
672void
673handle_p2p_migration_stop (void *cls,
674 const struct MigrationStopMessage *msm)
675{
676 struct GSF_ConnectedPeer *cp = cls;
677 struct GNUNET_TIME_Relative bt;
678
679 GNUNET_STATISTICS_update (GSF_stats,
680 gettext_noop ("# migration stop messages received"),
681 1, GNUNET_NO);
682 bt = GNUNET_TIME_relative_ntoh (msm->duration);
683 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
684 _ ("Migration of content to peer `%s' blocked for %s\n"),
685 GNUNET_i2s (cp->ppd.peer),
686 GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
687 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
688 if ((NULL == cp->mig_revive_task) &&
689 (NULL == cp->respect_iterate_req))
690 {
691 GSF_push_stop_ (cp);
692 cp->mig_revive_task =
693 GNUNET_SCHEDULER_add_delayed (bt,
694 &revive_migration, cp);
695 }
696}
697
698
699/**
700 * Free resources associated with the given peer request.
701 *
702 * @param peerreq request to free
703 */
704static void
705free_pending_request (struct PeerRequest *peerreq)
706{
707 struct GSF_ConnectedPeer *cp = peerreq->cp;
708 struct GSF_PendingRequestData *prd;
709
710 prd = GSF_pending_request_get_data_ (peerreq->pr);
711 if (NULL != peerreq->kill_task)
712 {
713 GNUNET_SCHEDULER_cancel (peerreq->kill_task);
714 peerreq->kill_task = NULL;
715 }
716 GNUNET_STATISTICS_update (GSF_stats,
717 gettext_noop ("# P2P searches active"),
718 -1,
719 GNUNET_NO);
720 GNUNET_break (GNUNET_YES ==
721 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
722 &prd->query,
723 peerreq));
724 GNUNET_free (peerreq);
725}
726
727
728/**
729 * Cancel all requests associated with the peer.
730 *
731 * @param cls unused
732 * @param query hash code of the request
733 * @param value the `struct GSF_PendingRequest`
734 * @return #GNUNET_YES (continue to iterate)
735 */
736static int
737cancel_pending_request (void *cls,
738 const struct GNUNET_HashCode *query,
739 void *value)
740{
741 struct PeerRequest *peerreq = value;
742 struct GSF_PendingRequest *pr = peerreq->pr;
743
744 free_pending_request (peerreq);
745 GSF_pending_request_cancel_ (pr,
746 GNUNET_NO);
747 return GNUNET_OK;
748}
749
750
751/**
752 * Free the given request.
753 *
754 * @param cls the request to free
755 */
756static void
757peer_request_destroy (void *cls)
758{
759 struct PeerRequest *peerreq = cls;
760 struct GSF_PendingRequest *pr = peerreq->pr;
761 struct GSF_PendingRequestData *prd;
762
763 peerreq->kill_task = NULL;
764 prd = GSF_pending_request_get_data_ (pr);
765 cancel_pending_request (NULL,
766 &prd->query,
767 peerreq);
768}
769
770
771/**
772 * The artificial delay is over, transmit the message now.
773 *
774 * @param cls the `struct GSF_DelayedHandle` with the message
775 */
776static void
777transmit_delayed_now (void *cls)
778{
779 struct GSF_DelayedHandle *dh = cls;
780 struct GSF_ConnectedPeer *cp = dh->cp;
781
782 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
783 cp->delayed_tail,
784 dh);
785 cp->delay_queue_size--;
786 GSF_peer_transmit_ (cp,
787 GNUNET_NO,
788 UINT32_MAX,
789 dh->env);
790 GNUNET_free (dh);
791}
792
793
794/**
795 * Get the randomized delay a response should be subjected to.
796 *
797 * @return desired delay
798 */
799static struct GNUNET_TIME_Relative
800get_randomized_delay ()
801{
802 struct GNUNET_TIME_Relative ret;
803
804 ret =
805 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
806 GNUNET_CRYPTO_random_u32
807 (GNUNET_CRYPTO_QUALITY_WEAK,
808 2 * GSF_avg_latency.rel_value_us + 1));
809#if INSANE_STATISTICS
810 GNUNET_STATISTICS_update (GSF_stats,
811 gettext_noop
812 ("# artificial delays introduced (ms)"),
813 ret.rel_value_us / 1000LL, GNUNET_NO);
814#endif
815 return ret;
816}
817
818
819/**
820 * Handle a reply to a pending request. Also called if a request
821 * expires (then with data == NULL). The handler may be called
822 * many times (depending on the request type), but will not be
823 * called during or after a call to GSF_pending_request_cancel
824 * and will also not be called anymore after a call signalling
825 * expiration.
826 *
827 * @param cls `struct PeerRequest` this is an answer for
828 * @param eval evaluation of the result
829 * @param pr handle to the original pending request
830 * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
831 * @param expiration when does @a data expire?
832 * @param last_transmission when did we last transmit a request for this block
833 * @param type type of the block
834 * @param data response data, NULL on request expiration
835 * @param data_len number of bytes in @a data
836 */
837static void
838handle_p2p_reply (void *cls,
839 enum GNUNET_BLOCK_ReplyEvaluationResult eval,
840 struct GSF_PendingRequest *pr,
841 uint32_t reply_anonymity_level,
842 struct GNUNET_TIME_Absolute expiration,
843 struct GNUNET_TIME_Absolute last_transmission,
844 enum GNUNET_BLOCK_Type type,
845 const void *data,
846 size_t data_len)
847{
848 struct PeerRequest *peerreq = cls;
849 struct GSF_ConnectedPeer *cp = peerreq->cp;
850 struct GSF_PendingRequestData *prd;
851 struct GNUNET_MQ_Envelope *env;
852 struct PutMessage *pm;
853 size_t msize;
854
855 GNUNET_assert (data_len + sizeof(struct PutMessage) <
856 GNUNET_MAX_MESSAGE_SIZE);
857 GNUNET_assert (peerreq->pr == pr);
858 prd = GSF_pending_request_get_data_ (pr);
859 if (NULL == data)
860 {
861 free_pending_request (peerreq);
862 return;
863 }
864 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
865 if ( (prd->type != type) &&
866 (GNUNET_BLOCK_TYPE_ANY != prd->type) )
867 {
868 GNUNET_STATISTICS_update (GSF_stats,
869 "# replies dropped due to type mismatch",
870 1, GNUNET_NO);
871 return;
872 }
873 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874 "Transmitting result for query `%s' to peer\n",
875 GNUNET_h2s (&prd->query));
876 GNUNET_STATISTICS_update (GSF_stats,
877 "# replies received for other peers",
878 1,
879 GNUNET_NO);
880 msize = sizeof(struct PutMessage) + data_len;
881 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
882 {
883 GNUNET_break (0);
884 return;
885 }
886 if ( (UINT32_MAX != reply_anonymity_level) &&
887 (reply_anonymity_level > 1) )
888 {
889 if (reply_anonymity_level - 1 > GSF_cover_content_count)
890 {
891 GNUNET_STATISTICS_update (GSF_stats,
892 "# replies dropped due to insufficient cover traffic",
893 1, GNUNET_NO);
894 return;
895 }
896 GSF_cover_content_count -= (reply_anonymity_level - 1);
897 }
898
899 env = GNUNET_MQ_msg_extra (pm,
900 data_len,
901 GNUNET_MESSAGE_TYPE_FS_PUT);
902 pm->type = htonl (type);
903 pm->expiration = GNUNET_TIME_absolute_hton (expiration);
904 GNUNET_memcpy (&pm[1],
905 data,
906 data_len);
907 if ((UINT32_MAX != reply_anonymity_level) &&
908 (0 != reply_anonymity_level) &&
909 (GNUNET_YES == GSF_enable_randomized_delays))
910 {
911 struct GSF_DelayedHandle *dh;
912
913 dh = GNUNET_new (struct GSF_DelayedHandle);
914 dh->cp = cp;
915 dh->env = env;
916 dh->msize = msize;
917 GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
918 cp->delayed_tail,
919 dh);
920 cp->delay_queue_size++;
921 dh->delay_task =
922 GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
923 &transmit_delayed_now,
924 dh);
925 }
926 else
927 {
928 GSF_peer_transmit_ (cp,
929 GNUNET_NO,
930 UINT32_MAX,
931 env);
932 }
933 if (GNUNET_BLOCK_REPLY_OK_LAST != eval)
934 return;
935 if (NULL == peerreq->kill_task)
936 {
937 GNUNET_STATISTICS_update (GSF_stats,
938 "# P2P searches destroyed due to ultimate reply",
939 1,
940 GNUNET_NO);
941 peerreq->kill_task =
942 GNUNET_SCHEDULER_add_now (&peer_request_destroy,
943 peerreq);
944 }
945}
946
947
948/**
949 * Increase the peer's respect by a value.
950 *
951 * @param cp which peer to change the respect value on
952 * @param value is the int value by which the
953 * peer's credit is to be increased or decreased
954 * @returns the actual change in respect (positive or negative)
955 */
956static int
957change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
958{
959 if (0 == value)
960 return 0;
961 GNUNET_assert (NULL != cp);
962 if (value > 0)
963 {
964 if (cp->ppd.respect + value < cp->ppd.respect)
965 {
966 value = UINT32_MAX - cp->ppd.respect;
967 cp->ppd.respect = UINT32_MAX;
968 }
969 else
970 cp->ppd.respect += value;
971 }
972 else
973 {
974 if (cp->ppd.respect < -value)
975 {
976 value = -cp->ppd.respect;
977 cp->ppd.respect = 0;
978 }
979 else
980 cp->ppd.respect += value;
981 }
982 return value;
983}
984
985
986/**
987 * We've received a request with the specified priority. Bound it
988 * according to how much we respect the given peer.
989 *
990 * @param prio_in requested priority
991 * @param cp the peer making the request
992 * @return effective priority
993 */
994static int32_t
995bound_priority (uint32_t prio_in,
996 struct GSF_ConnectedPeer *cp)
997{
998#define N ((double) 128.0)
999 uint32_t ret;
1000 double rret;
1001 int ld;
1002
1003 ld = GSF_test_get_load_too_high_ (0);
1004 if (GNUNET_SYSERR == ld)
1005 {
1006#if INSANE_STATISTICS
1007 GNUNET_STATISTICS_update (GSF_stats,
1008 gettext_noop
1009 ("# requests done for free (low load)"), 1,
1010 GNUNET_NO);
1011#endif
1012 return 0; /* excess resources */
1013 }
1014 if (prio_in > INT32_MAX)
1015 prio_in = INT32_MAX;
1016 ret = -change_peer_respect (cp, -(int) prio_in);
1017 if (ret > 0)
1018 {
1019 if (ret > GSF_current_priorities + N)
1020 rret = GSF_current_priorities + N;
1021 else
1022 rret = ret;
1023 GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1024 }
1025 if ((GNUNET_YES == ld) && (ret > 0))
1026 {
1027 /* try with charging */
1028 ld = GSF_test_get_load_too_high_ (ret);
1029 }
1030 if (GNUNET_YES == ld)
1031 {
1032 GNUNET_STATISTICS_update (GSF_stats,
1033 gettext_noop
1034 ("# request dropped, priority insufficient"), 1,
1035 GNUNET_NO);
1036 /* undo charge */
1037 change_peer_respect (cp, (int) ret);
1038 return -1; /* not enough resources */
1039 }
1040 else
1041 {
1042 GNUNET_STATISTICS_update (GSF_stats,
1043 gettext_noop
1044 ("# requests done for a price (normal load)"),
1045 1,
1046 GNUNET_NO);
1047 }
1048#undef N
1049 return ret;
1050}
1051
1052
1053/**
1054 * The priority level imposes a bound on the maximum
1055 * value for the ttl that can be requested.
1056 *
1057 * @param ttl_in requested ttl
1058 * @param prio given priority
1059 * @return @a ttl_in if @a ttl_in is below the limit,
1060 * otherwise the ttl-limit for the given @a prio
1061 */
1062static int32_t
1063bound_ttl (int32_t ttl_in,
1064 uint32_t prio)
1065{
1066 unsigned long long allowed;
1067
1068 if (ttl_in <= 0)
1069 return ttl_in;
1070 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1071 if (ttl_in > allowed)
1072 {
1073 if (allowed >= (1 << 30))
1074 return 1 << 30;
1075 return allowed;
1076 }
1077 return ttl_in;
1078}
1079
1080
1081/**
1082 * Closure for #test_exist_cb().
1083 */
1084struct TestExistClosure
1085{
1086 /**
1087 * Priority of the incoming request.
1088 */
1089 int32_t priority;
1090
1091 /**
1092 * Relative TTL of the incoming request.
1093 */
1094 int32_t ttl;
1095
1096 /**
1097 * Type of the incoming request.
1098 */
1099 enum GNUNET_BLOCK_Type type;
1100
1101 /**
1102 * Set to #GNUNET_YES if we are done handling the query.
1103 */
1104 int finished;
1105};
1106
1107
1108/**
1109 * Test if the query already exists. If so, merge it, otherwise
1110 * keep `finished` at #GNUNET_NO.
1111 *
1112 * @param cls our `struct TestExistClosure`
1113 * @param hc the key of the query
1114 * @param value the existing `struct PeerRequest`.
1115 * @return #GNUNET_YES to continue to iterate,
1116 * #GNUNET_NO if we successfully merged
1117 */
1118static int
1119test_exist_cb (void *cls,
1120 const struct GNUNET_HashCode *hc,
1121 void *value)
1122{
1123 struct TestExistClosure *tec = cls;
1124 struct PeerRequest *peerreq = value;
1125 struct GSF_PendingRequest *pr;
1126 struct GSF_PendingRequestData *prd;
1127
1128 pr = peerreq->pr;
1129 prd = GSF_pending_request_get_data_ (pr);
1130 if (prd->type != tec->type)
1131 return GNUNET_YES;
1132 if (prd->ttl.abs_value_us >=
1133 GNUNET_TIME_absolute_get ().abs_value_us + tec->ttl * 1000LL)
1134 {
1135 /* existing request has higher TTL, drop new one! */
1136 prd->priority += tec->priority;
1137 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1138 "Have existing request with higher TTL, dropping new request.\n");
1139 GNUNET_STATISTICS_update (GSF_stats,
1140 gettext_noop
1141 ("# requests dropped due to higher-TTL request"),
1142 1, GNUNET_NO);
1143 tec->finished = GNUNET_YES;
1144 return GNUNET_NO;
1145 }
1146 /* existing request has lower TTL, drop old one! */
1147 tec->priority += prd->priority;
1148 free_pending_request (peerreq);
1149 GSF_pending_request_cancel_ (pr,
1150 GNUNET_YES);
1151 return GNUNET_NO;
1152}
1153
1154
1155/**
1156 * Handle P2P "QUERY" message. Creates the pending request entry
1157 * and sets up all of the data structures to that we will
1158 * process replies properly. Does not initiate forwarding or
1159 * local database lookups.
1160 *
1161 * @param cls the other peer involved (sender of the message)
1162 * @param gm the GET message
1163 */
1164void
1165handle_p2p_get (void *cls,
1166 const struct GetMessage *gm)
1167{
1168 struct GSF_ConnectedPeer *cps = cls;
1169 struct PeerRequest *peerreq;
1170 struct GSF_PendingRequest *pr;
1171 struct GSF_ConnectedPeer *cp;
1172 const struct GNUNET_PeerIdentity *target;
1173 enum GSF_PendingRequestOptions options;
1174 uint16_t msize;
1175 unsigned int bits;
1176 const struct GNUNET_PeerIdentity *opt;
1177 uint32_t bm;
1178 size_t bfsize;
1179 uint32_t ttl_decrement;
1180 struct TestExistClosure tec;
1181 GNUNET_PEER_Id spid;
1182 const struct GSF_PendingRequestData *prd;
1183
1184 msize = ntohs (gm->header.size);
1185 tec.type = ntohl (gm->type);
1186 bm = ntohl (gm->hash_bitmap);
1187 bits = 0;
1188 while (bm > 0)
1189 {
1190 if (1 == (bm & 1))
1191 bits++;
1192 bm >>= 1;
1193 }
1194 opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1195 bfsize = msize - sizeof(struct GetMessage) - bits * sizeof(struct
1196 GNUNET_PeerIdentity);
1197 GNUNET_STATISTICS_update (GSF_stats,
1198 gettext_noop
1199 ("# GET requests received (from other peers)"),
1200 1,
1201 GNUNET_NO);
1202 GSF_cover_query_count++;
1203 bm = ntohl (gm->hash_bitmap);
1204 bits = 0;
1205 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1206 cp = GSF_peer_get_ (&opt[bits++]);
1207 else
1208 cp = cps;
1209 if (NULL == cp)
1210 {
1211 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1212 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1213 "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1214 GNUNET_i2s (&opt[bits - 1]));
1215
1216 else
1217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1218 "Failed to find peer `%s' in connection set. Dropping query.\n",
1219 GNUNET_i2s (cps->ppd.peer));
1220 GNUNET_STATISTICS_update (GSF_stats,
1221 gettext_noop
1222 (
1223 "# requests dropped due to missing reverse route"),
1224 1,
1225 GNUNET_NO);
1226 return;
1227 }
1228 unsigned int queue_size = GNUNET_MQ_get_length (cp->mq);
1229 queue_size += cp->ppd.pending_replies + cp->delay_queue_size;
1230 if (queue_size > MAX_QUEUE_PER_PEER)
1231 {
1232 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1233 "Peer `%s' has too many replies queued already. Dropping query.\n",
1234 GNUNET_i2s (cps->ppd.peer));
1235 GNUNET_STATISTICS_update (GSF_stats,
1236 gettext_noop (
1237 "# requests dropped due to full reply queue"),
1238 1,
1239 GNUNET_NO);
1240 return;
1241 }
1242 /* note that we can really only check load here since otherwise
1243 * peers could find out that we are overloaded by not being
1244 * disconnected after sending us a malformed query... */
1245 tec.priority = bound_priority (ntohl (gm->priority),
1246 cps);
1247 if (tec.priority < 0)
1248 {
1249 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1250 "Dropping query from `%s', this peer is too busy.\n",
1251 GNUNET_i2s (cps->ppd.peer));
1252 return;
1253 }
1254 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255 "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1256 GNUNET_h2s (&gm->query),
1257 (unsigned int) tec.type,
1258 GNUNET_i2s (cps->ppd.peer),
1259 (unsigned int) bm);
1260 target =
1261 (0 !=
1262 (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1263 options = GSF_PRO_DEFAULTS;
1264 spid = 0;
1265 if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1
1266 + tec.priority))
1267 || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1268 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2
1269 + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1270 {
1271 /* don't have BW to send to peer, or would likely take longer than we have for it,
1272 * so at best indirect the query */
1273 tec.priority = 0;
1274 options |= GSF_PRO_FORWARD_ONLY;
1275 spid = GNUNET_PEER_intern (cps->ppd.peer);
1276 GNUNET_assert (0 != spid);
1277 }
1278 tec.ttl = bound_ttl (ntohl (gm->ttl),
1279 tec.priority);
1280 /* decrement ttl (always) */
1281 ttl_decrement =
1282 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1283 TTL_DECREMENT);
1284 if ((tec.ttl < 0) &&
1285 (((int32_t) (tec.ttl - ttl_decrement)) > 0))
1286 {
1287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1288 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1289 GNUNET_i2s (cps->ppd.peer),
1290 tec.ttl,
1291 ttl_decrement);
1292 GNUNET_STATISTICS_update (GSF_stats,
1293 gettext_noop
1294 ("# requests dropped due TTL underflow"), 1,
1295 GNUNET_NO);
1296 /* integer underflow => drop (should be very rare)! */
1297 return;
1298 }
1299 tec.ttl -= ttl_decrement;
1300
1301 /* test if the request already exists */
1302 tec.finished = GNUNET_NO;
1303 GNUNET_CONTAINER_multihashmap_get_multiple (cp->request_map,
1304 &gm->query,
1305 &test_exist_cb,
1306 &tec);
1307 if (GNUNET_YES == tec.finished)
1308 return; /* merged into existing request, we're done */
1309
1310 peerreq = GNUNET_new (struct PeerRequest);
1311 peerreq->cp = cp;
1312 pr = GSF_pending_request_create_ (options,
1313 tec.type,
1314 &gm->query,
1315 target,
1316 (bfsize > 0)
1317 ? (const char *) &opt[bits]
1318 : NULL,
1319 bfsize,
1320 ntohl (gm->filter_mutator),
1321 1 /* anonymity */,
1322 (uint32_t) tec.priority,
1323 tec.ttl,
1324 spid,
1325 GNUNET_PEER_intern (cps->ppd.peer),
1326 NULL, 0, /* replies_seen */
1327 &handle_p2p_reply,
1328 peerreq);
1329 GNUNET_assert (NULL != pr);
1330 prd = GSF_pending_request_get_data_ (pr);
1331 peerreq->pr = pr;
1332 GNUNET_break (GNUNET_OK ==
1333 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1334 &prd->query,
1335 peerreq,
1336 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1337 GNUNET_STATISTICS_update (GSF_stats,
1338 gettext_noop (
1339 "# P2P query messages received and processed"),
1340 1,
1341 GNUNET_NO);
1342 GNUNET_STATISTICS_update (GSF_stats,
1343 gettext_noop ("# P2P searches active"),
1344 1,
1345 GNUNET_NO);
1346 GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
1347 GSF_local_lookup_ (pr,
1348 &GSF_consider_forwarding,
1349 NULL);
1350}
1351
1352
1353/**
1354 * Transmit a message to the given peer as soon as possible.
1355 * If the peer disconnects before the transmission can happen,
1356 * the callback is invoked with a `NULL` @a buffer.
1357 *
1358 * @param cp target peer
1359 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1360 * @param priority how important is this request?
1361 * @param timeout when does this request timeout
1362 * @param size number of bytes we would like to send to the peer
1363 * @param env message to send
1364 */
1365void
1366GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1367 int is_query,
1368 uint32_t priority,
1369 struct GNUNET_MQ_Envelope *env)
1370{
1371 struct GSF_PeerTransmitHandle *pth;
1372 struct GSF_PeerTransmitHandle *pos;
1373 struct GSF_PeerTransmitHandle *prev;
1374
1375 pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1376 pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1377 pth->env = env;
1378 pth->is_query = is_query;
1379 pth->priority = priority;
1380 pth->cp = cp;
1381 /* insertion sort (by priority, descending) */
1382 prev = NULL;
1383 pos = cp->pth_head;
1384 while ((NULL != pos) && (pos->priority > priority))
1385 {
1386 prev = pos;
1387 pos = pos->next;
1388 }
1389 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1390 cp->pth_tail,
1391 prev,
1392 pth);
1393 if (GNUNET_YES == is_query)
1394 cp->ppd.pending_queries++;
1395 else if (GNUNET_NO == is_query)
1396 cp->ppd.pending_replies++;
1397 schedule_transmission (pth);
1398}
1399
1400
1401/**
1402 * Report on receiving a reply; update the performance record of the given peer.
1403 *
1404 * @param cp responding peer (will be updated)
1405 * @param request_time time at which the original query was transmitted
1406 * @param request_priority priority of the original request
1407 */
1408void
1409GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1410 struct GNUNET_TIME_Absolute request_time,
1411 uint32_t request_priority)
1412{
1413 struct GNUNET_TIME_Relative delay;
1414
1415 delay = GNUNET_TIME_absolute_get_duration (request_time);
1416 cp->ppd.avg_reply_delay.rel_value_us =
1417 (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1)
1418 + delay.rel_value_us) / RUNAVG_DELAY_N;
1419 cp->ppd.avg_priority =
1420 (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1)
1421 + request_priority) / RUNAVG_DELAY_N;
1422}
1423
1424
1425/**
1426 * Report on receiving a reply in response to an initiating client.
1427 * Remember that this peer is good for this client.
1428 *
1429 * @param cp responding peer (will be updated)
1430 * @param initiator_client local client on responsible for query
1431 */
1432void
1433GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1434 struct GSF_LocalClient *initiator_client)
1435{
1436 cp->ppd.last_client_replies[cp->last_client_replies_woff++
1437 % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1438}
1439
1440
1441/**
1442 * Report on receiving a reply in response to an initiating peer.
1443 * Remember that this peer is good for this initiating peer.
1444 *
1445 * @param cp responding peer (will be updated)
1446 * @param initiator_peer other peer responsible for query
1447 */
1448void
1449GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1450 const struct GSF_ConnectedPeer *initiator_peer)
1451{
1452 unsigned int woff;
1453
1454 woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1455 GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1456 cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1457 GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1458 cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1459}
1460
1461
1462/**
1463 * Write peer-respect information to a file - flush the buffer entry!
1464 *
1465 * @param cls unused
1466 * @param key peer identity
1467 * @param value the `struct GSF_ConnectedPeer` to flush
1468 * @return #GNUNET_OK to continue iteration
1469 */
1470static int
1471flush_respect (void *cls,
1472 const struct GNUNET_PeerIdentity *key,
1473 void *value)
1474{
1475 struct GSF_ConnectedPeer *cp = value;
1476 struct GNUNET_PeerIdentity pid;
1477
1478 if (cp->ppd.respect == cp->disk_respect)
1479 return GNUNET_OK; /* unchanged */
1480 GNUNET_assert (0 != cp->ppd.pid);
1481 GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1482 GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1483 sizeof(cp->ppd.respect),
1484 GNUNET_TIME_UNIT_FOREVER_ABS,
1485 GNUNET_PEERSTORE_STOREOPTION_REPLACE,
1486 NULL,
1487 NULL);
1488 return GNUNET_OK;
1489}
1490
1491
1492/**
1493 * A peer disconnected from us. Tear down the connected peer
1494 * record.
1495 *
1496 * @param cls unused
1497 * @param peer identity of peer that disconnected
1498 * @param internal_cls the corresponding `struct GSF_ConnectedPeer`
1499 */
1500void
1501GSF_peer_disconnect_handler (void *cls,
1502 const struct GNUNET_PeerIdentity *peer,
1503 void *internal_cls)
1504{
1505 struct GSF_ConnectedPeer *cp = internal_cls;
1506 struct GSF_PeerTransmitHandle *pth;
1507 struct GSF_DelayedHandle *dh;
1508
1509 if (NULL == cp)
1510 return; /* must have been disconnect from core with
1511 * 'peer' == my_id, ignore */
1512 flush_respect (NULL,
1513 peer,
1514 cp);
1515 GNUNET_assert (GNUNET_YES ==
1516 GNUNET_CONTAINER_multipeermap_remove (cp_map,
1517 peer,
1518 cp));
1519 GNUNET_STATISTICS_set (GSF_stats,
1520 gettext_noop ("# peers connected"),
1521 GNUNET_CONTAINER_multipeermap_size (cp_map),
1522 GNUNET_NO);
1523 if (NULL != cp->respect_iterate_req)
1524 {
1525 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1526 cp->respect_iterate_req = NULL;
1527 }
1528 if (NULL != cp->rc)
1529 {
1530 GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1531 cp->rc = NULL;
1532 }
1533 if (NULL != cp->rc_delay_task)
1534 {
1535 GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1536 cp->rc_delay_task = NULL;
1537 }
1538 GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1539 &cancel_pending_request,
1540 cp);
1541 GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1542 cp->request_map = NULL;
1543 GSF_plan_notify_peer_disconnect_ (cp);
1544 GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1545 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies,
1546 P2P_SUCCESS_LIST_SIZE);
1547 memset (cp->ppd.last_p2p_replies,
1548 0,
1549 sizeof(cp->ppd.last_p2p_replies));
1550 GSF_push_stop_ (cp);
1551 while (NULL != (pth = cp->pth_head))
1552 {
1553 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1554 cp->pth_tail,
1555 pth);
1556 if (GNUNET_YES == pth->is_query)
1557 GNUNET_assert (0 < cp->ppd.pending_queries--);
1558 else if (GNUNET_NO == pth->is_query)
1559 GNUNET_assert (0 < cp->ppd.pending_replies--);
1560 GNUNET_free (pth);
1561 }
1562 while (NULL != (dh = cp->delayed_head))
1563 {
1564 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1565 cp->delayed_tail,
1566 dh);
1567 GNUNET_MQ_discard (dh->env);
1568 cp->delay_queue_size--;
1569 GNUNET_SCHEDULER_cancel (dh->delay_task);
1570 GNUNET_free (dh);
1571 }
1572 GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1573 if (NULL != cp->mig_revive_task)
1574 {
1575 GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1576 cp->mig_revive_task = NULL;
1577 }
1578 GNUNET_break (0 == cp->ppd.pending_queries);
1579 GNUNET_break (0 == cp->ppd.pending_replies);
1580 GNUNET_free (cp);
1581}
1582
1583
1584/**
1585 * Closure for #call_iterator().
1586 */
1587struct IterationContext
1588{
1589 /**
1590 * Function to call on each entry.
1591 */
1592 GSF_ConnectedPeerIterator it;
1593
1594 /**
1595 * Closure for @e it.
1596 */
1597 void *it_cls;
1598};
1599
1600
1601/**
1602 * Function that calls the callback for each peer.
1603 *
1604 * @param cls the `struct IterationContext *`
1605 * @param key identity of the peer
1606 * @param value the `struct GSF_ConnectedPeer *`
1607 * @return #GNUNET_YES to continue iteration
1608 */
1609static int
1610call_iterator (void *cls,
1611 const struct GNUNET_PeerIdentity *key,
1612 void *value)
1613{
1614 struct IterationContext *ic = cls;
1615 struct GSF_ConnectedPeer *cp = value;
1616
1617 ic->it (ic->it_cls,
1618 key, cp,
1619 &cp->ppd);
1620 return GNUNET_YES;
1621}
1622
1623
1624/**
1625 * Iterate over all connected peers.
1626 *
1627 * @param it function to call for each peer
1628 * @param it_cls closure for @a it
1629 */
1630void
1631GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1632 void *it_cls)
1633{
1634 struct IterationContext ic;
1635
1636 ic.it = it;
1637 ic.it_cls = it_cls;
1638 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1639 &call_iterator,
1640 &ic);
1641}
1642
1643
1644/**
1645 * Obtain the identity of a connected peer.
1646 *
1647 * @param cp peer to get identity of
1648 * @param id identity to set (written to)
1649 */
1650void
1651GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1652 struct GNUNET_PeerIdentity *id)
1653{
1654 GNUNET_assert (0 != cp->ppd.pid);
1655 GNUNET_PEER_resolve (cp->ppd.pid, id);
1656}
1657
1658
1659/**
1660 * Obtain the identity of a connected peer.
1661 *
1662 * @param cp peer to get identity of
1663 * @return reference to peer identity, valid until peer disconnects (!)
1664 */
1665const struct GNUNET_PeerIdentity *
1666GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1667{
1668 GNUNET_assert (0 != cp->ppd.pid);
1669 return GNUNET_PEER_resolve2 (cp->ppd.pid);
1670}
1671
1672
1673/**
1674 * Ask a peer to stop migrating data to us until the given point
1675 * in time.
1676 *
1677 * @param cp peer to ask
1678 * @param block_time until when to block
1679 */
1680void
1681GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1682 struct GNUNET_TIME_Absolute block_time)
1683{
1684 struct GNUNET_MQ_Envelope *env;
1685 struct MigrationStopMessage *msm;
1686
1687 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1688 {
1689 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1690 "Migration already blocked for another %s\n",
1691 GNUNET_STRINGS_relative_time_to_string (
1692 GNUNET_TIME_absolute_get_remaining
1693 (cp->
1694 last_migration_block), GNUNET_YES));
1695 return; /* already blocked */
1696 }
1697 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1698 GNUNET_STRINGS_relative_time_to_string (
1699 GNUNET_TIME_absolute_get_remaining (block_time),
1700 GNUNET_YES));
1701 cp->last_migration_block = block_time;
1702 env = GNUNET_MQ_msg (msm,
1703 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1704 msm->reserved = htonl (0);
1705 msm->duration
1706 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1707 (cp->last_migration_block));
1708 GNUNET_STATISTICS_update (GSF_stats,
1709 gettext_noop ("# migration stop messages sent"),
1710 1,
1711 GNUNET_NO);
1712 GSF_peer_transmit_ (cp,
1713 GNUNET_SYSERR,
1714 UINT32_MAX,
1715 env);
1716}
1717
1718
1719/**
1720 * Notify core about a preference we have for the given peer
1721 * (to allocate more resources towards it). The change will
1722 * be communicated the next time we reserve bandwidth with
1723 * core (not instantly).
1724 *
1725 * @param cp peer to reserve bandwidth from
1726 * @param pref preference change
1727 */
1728void
1729GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1730 uint64_t pref)
1731{
1732 cp->inc_preference += pref;
1733}
1734
1735
1736/**
1737 * Call this method periodically to flush respect information to disk.
1738 *
1739 * @param cls closure, not used
1740 */
1741static void
1742cron_flush_respect (void *cls)
1743{
1744 fr_task = NULL;
1745 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1746 &flush_respect,
1747 NULL);
1748 fr_task = GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1749 GNUNET_SCHEDULER_PRIORITY_HIGH,
1750 &cron_flush_respect,
1751 NULL);
1752}
1753
1754
1755/**
1756 * Initialize peer management subsystem.
1757 */
1758void
1759GSF_connected_peer_init_ ()
1760{
1761 cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1762 peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1763 fr_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1764 &cron_flush_respect, NULL);
1765}
1766
1767
1768/**
1769 * Shutdown peer management subsystem.
1770 */
1771void
1772GSF_connected_peer_done_ ()
1773{
1774 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1775 &flush_respect,
1776 NULL);
1777 GNUNET_SCHEDULER_cancel (fr_task);
1778 fr_task = NULL;
1779 GNUNET_CONTAINER_multipeermap_destroy (cp_map);
1780 cp_map = NULL;
1781 GNUNET_PEERSTORE_disconnect (peerstore,
1782 GNUNET_YES);
1783}
1784
1785
1786/**
1787 * Iterator to remove references to LC entry.
1788 *
1789 * @param cls the `struct GSF_LocalClient *` to look for
1790 * @param key current key code
1791 * @param value value in the hash map (peer entry)
1792 * @return #GNUNET_YES (we should continue to iterate)
1793 */
1794static int
1795clean_local_client (void *cls,
1796 const struct GNUNET_PeerIdentity *key,
1797 void *value)
1798{
1799 const struct GSF_LocalClient *lc = cls;
1800 struct GSF_ConnectedPeer *cp = value;
1801 unsigned int i;
1802
1803 for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1804 if (cp->ppd.last_client_replies[i] == lc)
1805 cp->ppd.last_client_replies[i] = NULL;
1806 return GNUNET_YES;
1807}
1808
1809
1810/**
1811 * Notification that a local client disconnected. Clean up all of our
1812 * references to the given handle.
1813 *
1814 * @param lc handle to the local client (henceforth invalid)
1815 */
1816void
1817GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1818{
1819 if (NULL == cp_map)
1820 return; /* already cleaned up */
1821 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1822 &clean_local_client,
1823 (void *) lc);
1824}
1825
1826
1827/* end of gnunet-service-fs_cp.c */