aboutsummaryrefslogtreecommitdiff
path: root/src/service/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/service/fs/gnunet-service-fs_cp.c1667
1 files changed, 1667 insertions, 0 deletions
diff --git a/src/service/fs/gnunet-service-fs_cp.c b/src/service/fs/gnunet-service-fs_cp.c
new file mode 100644
index 000000000..df934feaf
--- /dev/null
+++ b/src/service/fs/gnunet-service-fs_cp.c
@@ -0,0 +1,1667 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2011, 2016 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file fs/gnunet-service-fs_cp.c
22 * @brief API to handle 'connected peers'
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "gnunet_util_lib.h"
27#include "gnunet_load_lib.h"
28#include "gnunet-service-fs.h"
29#include "gnunet-service-fs_cp.h"
30#include "gnunet-service-fs_pe.h"
31#include "gnunet-service-fs_pr.h"
32#include "gnunet-service-fs_push.h"
33#include "gnunet_peerstore_service.h"
34
35
36/**
37 * Ratio for moving average delay calculation. The previous
38 * average goes in with a factor of (n-1) into the calculation.
39 * Must be > 0.
40 */
41#define RUNAVG_DELAY_N 16
42
43/**
44 * How often do we flush respect values to disk?
45 */
46#define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply ( \
47 GNUNET_TIME_UNIT_MINUTES, 5)
48
49/**
50 * After how long do we discard a reply?
51 */
52#define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \
53 2)
54
55/**
56 * Collect an instance number of statistics? May cause excessive IPC.
57 */
58#define INSANE_STATISTICS GNUNET_NO
59
60
61/**
62 * Handle to cancel a transmission request.
63 */
64struct GSF_PeerTransmitHandle
65{
66 /**
67 * Kept in a doubly-linked list.
68 */
69 struct GSF_PeerTransmitHandle *next;
70
71 /**
72 * Kept in a doubly-linked list.
73 */
74 struct GSF_PeerTransmitHandle *prev;
75
76 /**
77 * Time when this transmission request was issued.
78 */
79 struct GNUNET_TIME_Absolute transmission_request_start_time;
80
81 /**
82 * Envelope with the actual message.
83 */
84 struct GNUNET_MQ_Envelope *env;
85
86 /**
87 * Peer this request targets.
88 */
89 struct GSF_ConnectedPeer *cp;
90
91 /**
92 * #GNUNET_YES if this is a query, #GNUNET_NO for content.
93 */
94 int is_query;
95
96 /**
97 * Priority of this request.
98 */
99 uint32_t priority;
100};
101
102
103/**
104 * Handle for an entry in our delay list.
105 */
106struct GSF_DelayedHandle
107{
108 /**
109 * Kept in a doubly-linked list.
110 */
111 struct GSF_DelayedHandle *next;
112
113 /**
114 * Kept in a doubly-linked list.
115 */
116 struct GSF_DelayedHandle *prev;
117
118 /**
119 * Peer this transmission belongs to.
120 */
121 struct GSF_ConnectedPeer *cp;
122
123 /**
124 * Envelope of the message that was delayed.
125 */
126 struct GNUNET_MQ_Envelope *env;
127
128 /**
129 * Task for the delay.
130 */
131 struct GNUNET_SCHEDULER_Task *delay_task;
132
133 /**
134 * Size of the message.
135 */
136 size_t msize;
137};
138
139
140/**
141 * Information per peer and request.
142 */
143struct PeerRequest
144{
145 /**
146 * Handle to generic request (generic: from peer or local client).
147 */
148 struct GSF_PendingRequest *pr;
149
150 /**
151 * Which specific peer issued this request?
152 */
153 struct GSF_ConnectedPeer *cp;
154
155 /**
156 * Task for asynchronous stopping of this request.
157 */
158 struct GNUNET_SCHEDULER_Task *kill_task;
159};
160
161
162/**
163 * A connected peer.
164 */
165struct GSF_ConnectedPeer
166{
167 /**
168 * Performance data for this peer.
169 */
170 struct GSF_PeerPerformanceData ppd;
171
172 /**
173 * Time until when we blocked this peer from migrating
174 * data to us.
175 */
176 struct GNUNET_TIME_Absolute last_migration_block;
177
178 /**
179 * Task scheduled to revive migration to this peer.
180 */
181 struct GNUNET_SCHEDULER_Task *mig_revive_task;
182
183 /**
184 * Messages (replies, queries, content migration) we would like to
185 * send to this peer in the near future. Sorted by priority, head.
186 */
187 struct GSF_PeerTransmitHandle *pth_head;
188
189 /**
190 * Messages (replies, queries, content migration) we would like to
191 * send to this peer in the near future. Sorted by priority, tail.
192 */
193 struct GSF_PeerTransmitHandle *pth_tail;
194
195 /**
196 * Messages (replies, queries, content migration) we would like to
197 * send to this peer in the near future. Sorted by priority, head.
198 */
199 struct GSF_DelayedHandle *delayed_head;
200
201 /**
202 * Messages (replies, queries, content migration) we would like to
203 * send to this peer in the near future. Sorted by priority, tail.
204 */
205 struct GSF_DelayedHandle *delayed_tail;
206
207 /**
208 * Task scheduled if we need to retry bandwidth reservation later.
209 */
210 struct GNUNET_SCHEDULER_Task *rc_delay_task;
211
212 /**
213 * Active requests from this neighbour, map of query to `struct PeerRequest`.
214 */
215 struct GNUNET_CONTAINER_MultiHashMap *request_map;
216
217 /**
218 * Handle for an active request for transmission to this
219 * peer.
220 */
221 struct GNUNET_MQ_Handle *mq;
222
223 /**
224 * Increase in traffic preference still to be submitted
225 * to the core service for this peer.
226 */
227 uint64_t inc_preference;
228
229 /**
230 * Number of entries in @e delayed_head DLL.
231 */
232 unsigned int delay_queue_size;
233
234 /**
235 * Respect rating for this peer on disk.
236 */
237 uint32_t disk_respect;
238
239 /**
240 * Which offset in @e last_p2p_replies will be updated next?
241 * (we go round-robin).
242 */
243 unsigned int last_p2p_replies_woff;
244
245 /**
246 * Which offset in @e last_client_replies will be updated next?
247 * (we go round-robin).
248 */
249 unsigned int last_client_replies_woff;
250
251 /**
252 * Current offset into @e last_request_times ring buffer.
253 */
254 unsigned int last_request_times_off;
255
256 /**
257 * Handle to the PEERSTORE iterate request for peer respect value
258 */
259 struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
260};
261
262
263/**
264 * Map from peer identities to `struct GSF_ConnectPeer` entries.
265 */
266static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
267
268/**
269 * Handle to peerstore service.
270 */
271static struct GNUNET_PEERSTORE_Handle *peerstore;
272
273/**
274 * Task used to flush respect values to disk.
275 */
276static struct GNUNET_SCHEDULER_Task *fr_task;
277
278
279/**
280 * Update the latency information kept for the given peer.
281 *
282 * @param id peer record to update
283 * @param latency current latency value
284 */
285void
286GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
287 struct GNUNET_TIME_Relative latency)
288{
289 struct GSF_ConnectedPeer *cp;
290
291 cp = GSF_peer_get_ (id);
292 if (NULL == cp)
293 return; /* we're not yet connected at the core level, ignore */
294 GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
295 latency);
296}
297
298
299/**
300 * Return the performance data record for the given peer
301 *
302 * @param cp peer to query
303 * @return performance data record for the peer
304 */
305struct GSF_PeerPerformanceData *
306GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
307{
308 return &cp->ppd;
309}
310
311
312/**
313 * Core is ready to transmit to a peer, get the message.
314 *
315 * @param cp which peer to send a message to
316 */
317static void
318peer_transmit (struct GSF_ConnectedPeer *cp);
319
320
321/**
322 * If ready (bandwidth reserved), try to schedule transmission via
323 * core for the given handle.
324 *
325 * @param pth transmission handle to schedule
326 */
327static void
328schedule_transmission (struct GSF_PeerTransmitHandle *pth)
329{
330 struct GSF_ConnectedPeer *cp;
331 struct GNUNET_PeerIdentity target;
332
333 cp = pth->cp;
334 GNUNET_assert (0 != cp->ppd.pid);
335 GNUNET_PEER_resolve (cp->ppd.pid, &target);
336
337 peer_transmit (cp);
338}
339
340
341/**
342 * Core is ready to transmit to a peer, get the message.
343 *
344 * @param cp which peer to send a message to
345 */
346static void
347peer_transmit (struct GSF_ConnectedPeer *cp)
348{
349 struct GSF_PeerTransmitHandle *pth = cp->pth_head;
350 struct GSF_PeerTransmitHandle *pos;
351
352 if (NULL == pth)
353 return;
354 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
355 cp->pth_tail,
356 pth);
357 if (GNUNET_YES == pth->is_query)
358 {
359 cp->ppd.last_request_times[(cp->last_request_times_off++)
360 % MAX_QUEUE_PER_PEER] =
361 GNUNET_TIME_absolute_get ();
362 GNUNET_assert (0 < cp->ppd.pending_queries--);
363 }
364 else if (GNUNET_NO == pth->is_query)
365 {
366 GNUNET_assert (0 < cp->ppd.pending_replies--);
367 }
368 GNUNET_LOAD_update (cp->ppd.transmission_delay,
369 GNUNET_TIME_absolute_get_duration
370 (pth->transmission_request_start_time).rel_value_us);
371 GNUNET_MQ_send (cp->mq,
372 pth->env);
373 GNUNET_free (pth);
374 if (NULL != (pos = cp->pth_head))
375 {
376 GNUNET_assert (pos != pth);
377 schedule_transmission (pos);
378 }
379}
380
381
382/**
383 * Function called by PEERSTORE with peer respect record
384 *
385 * @param cls handle to connected peer entry
386 * @param record peerstore record information
387 * @param emsg error message, or NULL if no errors
388 */
389static void
390peer_respect_cb (void *cls,
391 const struct GNUNET_PEERSTORE_Record *record,
392 const char *emsg)
393{
394 struct GSF_ConnectedPeer *cp = cls;
395
396 GNUNET_assert (NULL != cp->respect_iterate_req);
397 if ((NULL == record) && (NULL == emsg))
398 {
399 cp->respect_iterate_req = NULL;
400 return;
401 }
402 if ((NULL != record) &&
403 (sizeof(cp->disk_respect) == record->value_size))
404 {
405 cp->disk_respect = *((uint32_t *) record->value);
406 cp->ppd.respect += *((uint32_t *) record->value);
407 }
408 GSF_push_start_ (cp);
409 if (NULL != record)
410 {
411 GNUNET_PEERSTORE_iteration_stop (cp->respect_iterate_req);
412 cp->respect_iterate_req = NULL;
413 return;
414 }
415 GNUNET_PEERSTORE_iteration_next (cp->respect_iterate_req, 1);
416}
417
418
419/**
420 * Function called for each pending request whenever a new
421 * peer connects, giving us a chance to decide about submitting
422 * the existing request to the new peer.
423 *
424 * @param cls the `struct GSF_ConnectedPeer` of the new peer
425 * @param key query for the request
426 * @param pr handle to the pending request
427 * @return #GNUNET_YES to continue to iterate
428 */
429static int
430consider_peer_for_forwarding (void *cls,
431 const struct GNUNET_HashCode *key,
432 struct GSF_PendingRequest *pr)
433{
434 struct GSF_ConnectedPeer *cp = cls;
435 struct GNUNET_PeerIdentity pid;
436
437 if (GNUNET_YES !=
438 GSF_pending_request_test_active_ (pr))
439 return GNUNET_YES; /* request is not actually active, skip! */
440 GSF_connected_peer_get_identity_ (cp, &pid);
441 if (GNUNET_YES !=
442 GSF_pending_request_test_target_ (pr, &pid))
443 {
444 GNUNET_STATISTICS_update (GSF_stats,
445 gettext_noop ("# Loopback routes suppressed"),
446 1,
447 GNUNET_NO);
448 return GNUNET_YES;
449 }
450 GSF_plan_add_ (cp, pr);
451 return GNUNET_YES;
452}
453
454
455void *
456GSF_peer_connect_handler (void *cls,
457 const struct GNUNET_PeerIdentity *peer,
458 struct GNUNET_MQ_Handle *mq)
459{
460 struct GSF_ConnectedPeer *cp;
461
462 if (0 ==
463 GNUNET_memcmp (&GSF_my_id,
464 peer))
465 return NULL;
466 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
467 "Connected to peer %s\n",
468 GNUNET_i2s (peer));
469 cp = GNUNET_new (struct GSF_ConnectedPeer);
470 cp->ppd.pid = GNUNET_PEER_intern (peer);
471 cp->ppd.peer = peer;
472 cp->mq = mq;
473 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
474
475 cp->request_map = GNUNET_CONTAINER_multihashmap_create (128,
476 GNUNET_YES);
477 GNUNET_break (GNUNET_OK ==
478 GNUNET_CONTAINER_multipeermap_put (cp_map,
479 GSF_connected_peer_get_identity2_ (
480 cp),
481 cp,
482 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
483 GNUNET_STATISTICS_set (GSF_stats,
484 gettext_noop ("# peers connected"),
485 GNUNET_CONTAINER_multipeermap_size (cp_map),
486 GNUNET_NO);
487 cp->respect_iterate_req
488 = GNUNET_PEERSTORE_iteration_start (peerstore,
489 "fs",
490 peer,
491 "respect",
492 &peer_respect_cb,
493 cp);
494 GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
495 cp);
496 return cp;
497}
498
499
500/**
501 * It may be time to re-start migrating content to this
502 * peer. Check, and if so, restart migration.
503 *
504 * @param cls the `struct GSF_ConnectedPeer`
505 */
506static void
507revive_migration (void *cls)
508{
509 struct GSF_ConnectedPeer *cp = cls;
510 struct GNUNET_TIME_Relative bt;
511
512 cp->mig_revive_task = NULL;
513 bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
514 if (0 != bt.rel_value_us)
515 {
516 /* still time left... */
517 cp->mig_revive_task =
518 GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
519 return;
520 }
521 GSF_push_start_ (cp);
522}
523
524
525struct GSF_ConnectedPeer *
526GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
527{
528 if (NULL == cp_map)
529 return NULL;
530 return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
531}
532
533
534/**
535 * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
536 *
537 * @param cls closure, the `struct GSF_ConnectedPeer`
538 * @param msm the actual message
539 */
540void
541handle_p2p_migration_stop (void *cls,
542 const struct MigrationStopMessage *msm)
543{
544 struct GSF_ConnectedPeer *cp = cls;
545 struct GNUNET_TIME_Relative bt;
546
547 GNUNET_STATISTICS_update (GSF_stats,
548 gettext_noop ("# migration stop messages received"),
549 1, GNUNET_NO);
550 bt = GNUNET_TIME_relative_ntoh (msm->duration);
551 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
552 _ ("Migration of content to peer `%s' blocked for %s\n"),
553 GNUNET_i2s (cp->ppd.peer),
554 GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
555 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
556 if ((NULL == cp->mig_revive_task) &&
557 (NULL == cp->respect_iterate_req))
558 {
559 GSF_push_stop_ (cp);
560 cp->mig_revive_task =
561 GNUNET_SCHEDULER_add_delayed (bt,
562 &revive_migration, cp);
563 }
564}
565
566
567/**
568 * Free resources associated with the given peer request.
569 *
570 * @param peerreq request to free
571 */
572static void
573free_pending_request (struct PeerRequest *peerreq)
574{
575 struct GSF_ConnectedPeer *cp = peerreq->cp;
576 struct GSF_PendingRequestData *prd;
577
578 prd = GSF_pending_request_get_data_ (peerreq->pr);
579 if (NULL != peerreq->kill_task)
580 {
581 GNUNET_SCHEDULER_cancel (peerreq->kill_task);
582 peerreq->kill_task = NULL;
583 }
584 GNUNET_STATISTICS_update (GSF_stats,
585 gettext_noop ("# P2P searches active"),
586 -1,
587 GNUNET_NO);
588 GNUNET_break (GNUNET_YES ==
589 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
590 &prd->query,
591 peerreq));
592 GNUNET_free (peerreq);
593}
594
595
596/**
597 * Cancel all requests associated with the peer.
598 *
599 * @param cls unused
600 * @param query hash code of the request
601 * @param value the `struct GSF_PendingRequest`
602 * @return #GNUNET_YES (continue to iterate)
603 */
604static int
605cancel_pending_request (void *cls,
606 const struct GNUNET_HashCode *query,
607 void *value)
608{
609 struct PeerRequest *peerreq = value;
610 struct GSF_PendingRequest *pr = peerreq->pr;
611
612 free_pending_request (peerreq);
613 GSF_pending_request_cancel_ (pr,
614 GNUNET_NO);
615 return GNUNET_OK;
616}
617
618
619/**
620 * Free the given request.
621 *
622 * @param cls the request to free
623 */
624static void
625peer_request_destroy (void *cls)
626{
627 struct PeerRequest *peerreq = cls;
628 struct GSF_PendingRequest *pr = peerreq->pr;
629 struct GSF_PendingRequestData *prd;
630
631 peerreq->kill_task = NULL;
632 prd = GSF_pending_request_get_data_ (pr);
633 cancel_pending_request (NULL,
634 &prd->query,
635 peerreq);
636}
637
638
639/**
640 * The artificial delay is over, transmit the message now.
641 *
642 * @param cls the `struct GSF_DelayedHandle` with the message
643 */
644static void
645transmit_delayed_now (void *cls)
646{
647 struct GSF_DelayedHandle *dh = cls;
648 struct GSF_ConnectedPeer *cp = dh->cp;
649
650 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
651 cp->delayed_tail,
652 dh);
653 cp->delay_queue_size--;
654 GSF_peer_transmit_ (cp,
655 GNUNET_NO,
656 UINT32_MAX,
657 dh->env);
658 GNUNET_free (dh);
659}
660
661
662/**
663 * Get the randomized delay a response should be subjected to.
664 *
665 * @return desired delay
666 */
667static struct GNUNET_TIME_Relative
668get_randomized_delay ()
669{
670 struct GNUNET_TIME_Relative ret;
671
672 ret =
673 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
674 GNUNET_CRYPTO_random_u32
675 (GNUNET_CRYPTO_QUALITY_WEAK,
676 2 * GSF_avg_latency.rel_value_us + 1));
677#if INSANE_STATISTICS
678 GNUNET_STATISTICS_update (GSF_stats,
679 gettext_noop
680 ("# artificial delays introduced (ms)"),
681 ret.rel_value_us / 1000LL, GNUNET_NO);
682#endif
683 return ret;
684}
685
686
687/**
688 * Handle a reply to a pending request. Also called if a request
689 * expires (then with data == NULL). The handler may be called
690 * many times (depending on the request type), but will not be
691 * called during or after a call to GSF_pending_request_cancel
692 * and will also not be called anymore after a call signalling
693 * expiration.
694 *
695 * @param cls `struct PeerRequest` this is an answer for
696 * @param eval evaluation of the result
697 * @param pr handle to the original pending request
698 * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
699 * @param expiration when does @a data expire?
700 * @param last_transmission when did we last transmit a request for this block
701 * @param type type of the block
702 * @param data response data, NULL on request expiration
703 * @param data_len number of bytes in @a data
704 */
705static void
706handle_p2p_reply (void *cls,
707 enum GNUNET_BLOCK_ReplyEvaluationResult eval,
708 struct GSF_PendingRequest *pr,
709 uint32_t reply_anonymity_level,
710 struct GNUNET_TIME_Absolute expiration,
711 struct GNUNET_TIME_Absolute last_transmission,
712 enum GNUNET_BLOCK_Type type,
713 const void *data,
714 size_t data_len)
715{
716 struct PeerRequest *peerreq = cls;
717 struct GSF_ConnectedPeer *cp = peerreq->cp;
718 struct GSF_PendingRequestData *prd;
719 struct GNUNET_MQ_Envelope *env;
720 struct PutMessage *pm;
721 size_t msize;
722
723 GNUNET_assert (data_len + sizeof(struct PutMessage) <
724 GNUNET_MAX_MESSAGE_SIZE);
725 GNUNET_assert (peerreq->pr == pr);
726 prd = GSF_pending_request_get_data_ (pr);
727 if (NULL == data)
728 {
729 free_pending_request (peerreq);
730 return;
731 }
732 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
733 if ( (prd->type != type) &&
734 (GNUNET_BLOCK_TYPE_ANY != prd->type) )
735 {
736 GNUNET_STATISTICS_update (GSF_stats,
737 "# replies dropped due to type mismatch",
738 1, GNUNET_NO);
739 return;
740 }
741 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
742 "Transmitting result for query `%s' to peer\n",
743 GNUNET_h2s (&prd->query));
744 GNUNET_STATISTICS_update (GSF_stats,
745 "# replies received for other peers",
746 1,
747 GNUNET_NO);
748 msize = sizeof(struct PutMessage) + data_len;
749 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
750 {
751 GNUNET_break (0);
752 return;
753 }
754 if ( (UINT32_MAX != reply_anonymity_level) &&
755 (reply_anonymity_level > 1) )
756 {
757 if (reply_anonymity_level - 1 > GSF_cover_content_count)
758 {
759 GNUNET_STATISTICS_update (GSF_stats,
760 "# replies dropped due to insufficient cover traffic",
761 1, GNUNET_NO);
762 return;
763 }
764 GSF_cover_content_count -= (reply_anonymity_level - 1);
765 }
766
767 env = GNUNET_MQ_msg_extra (pm,
768 data_len,
769 GNUNET_MESSAGE_TYPE_FS_PUT);
770 pm->type = htonl (type);
771 pm->expiration = GNUNET_TIME_absolute_hton (expiration);
772 GNUNET_memcpy (&pm[1],
773 data,
774 data_len);
775 if ((UINT32_MAX != reply_anonymity_level) &&
776 (0 != reply_anonymity_level) &&
777 (GNUNET_YES == GSF_enable_randomized_delays))
778 {
779 struct GSF_DelayedHandle *dh;
780
781 dh = GNUNET_new (struct GSF_DelayedHandle);
782 dh->cp = cp;
783 dh->env = env;
784 dh->msize = msize;
785 GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
786 cp->delayed_tail,
787 dh);
788 cp->delay_queue_size++;
789 dh->delay_task =
790 GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
791 &transmit_delayed_now,
792 dh);
793 }
794 else
795 {
796 GSF_peer_transmit_ (cp,
797 GNUNET_NO,
798 UINT32_MAX,
799 env);
800 }
801 if (GNUNET_BLOCK_REPLY_OK_LAST != eval)
802 return;
803 if (NULL == peerreq->kill_task)
804 {
805 GNUNET_STATISTICS_update (GSF_stats,
806 "# P2P searches destroyed due to ultimate reply",
807 1,
808 GNUNET_NO);
809 peerreq->kill_task =
810 GNUNET_SCHEDULER_add_now (&peer_request_destroy,
811 peerreq);
812 }
813}
814
815
816/**
817 * Increase the peer's respect by a value.
818 *
819 * @param cp which peer to change the respect value on
820 * @param value is the int value by which the
821 * peer's credit is to be increased or decreased
822 * @returns the actual change in respect (positive or negative)
823 */
824static int
825change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
826{
827 if (0 == value)
828 return 0;
829 GNUNET_assert (NULL != cp);
830 if (value > 0)
831 {
832 if (cp->ppd.respect + value < cp->ppd.respect)
833 {
834 value = UINT32_MAX - cp->ppd.respect;
835 cp->ppd.respect = UINT32_MAX;
836 }
837 else
838 cp->ppd.respect += value;
839 }
840 else
841 {
842 if (cp->ppd.respect < -value)
843 {
844 value = -cp->ppd.respect;
845 cp->ppd.respect = 0;
846 }
847 else
848 cp->ppd.respect += value;
849 }
850 return value;
851}
852
853
854/**
855 * We've received a request with the specified priority. Bound it
856 * according to how much we respect the given peer.
857 *
858 * @param prio_in requested priority
859 * @param cp the peer making the request
860 * @return effective priority
861 */
862static int32_t
863bound_priority (uint32_t prio_in,
864 struct GSF_ConnectedPeer *cp)
865{
866#define N ((double) 128.0)
867 uint32_t ret;
868 double rret;
869 int ld;
870
871 ld = GSF_test_get_load_too_high_ (0);
872 if (GNUNET_SYSERR == ld)
873 {
874#if INSANE_STATISTICS
875 GNUNET_STATISTICS_update (GSF_stats,
876 gettext_noop
877 ("# requests done for free (low load)"), 1,
878 GNUNET_NO);
879#endif
880 return 0; /* excess resources */
881 }
882 if (prio_in > INT32_MAX)
883 prio_in = INT32_MAX;
884 ret = -change_peer_respect (cp, -(int) prio_in);
885 if (ret > 0)
886 {
887 if (ret > GSF_current_priorities + N)
888 rret = GSF_current_priorities + N;
889 else
890 rret = ret;
891 GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
892 }
893 if ((GNUNET_YES == ld) && (ret > 0))
894 {
895 /* try with charging */
896 ld = GSF_test_get_load_too_high_ (ret);
897 }
898 if (GNUNET_YES == ld)
899 {
900 GNUNET_STATISTICS_update (GSF_stats,
901 gettext_noop
902 ("# request dropped, priority insufficient"), 1,
903 GNUNET_NO);
904 /* undo charge */
905 change_peer_respect (cp, (int) ret);
906 return -1; /* not enough resources */
907 }
908 else
909 {
910 GNUNET_STATISTICS_update (GSF_stats,
911 gettext_noop
912 ("# requests done for a price (normal load)"),
913 1,
914 GNUNET_NO);
915 }
916#undef N
917 return ret;
918}
919
920
921/**
922 * The priority level imposes a bound on the maximum
923 * value for the ttl that can be requested.
924 *
925 * @param ttl_in requested ttl
926 * @param prio given priority
927 * @return @a ttl_in if @a ttl_in is below the limit,
928 * otherwise the ttl-limit for the given @a prio
929 */
930static int32_t
931bound_ttl (int32_t ttl_in,
932 uint32_t prio)
933{
934 unsigned long long allowed;
935
936 if (ttl_in <= 0)
937 return ttl_in;
938 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
939 if (ttl_in > allowed)
940 {
941 if (allowed >= (1 << 30))
942 return 1 << 30;
943 return allowed;
944 }
945 return ttl_in;
946}
947
948
949/**
950 * Closure for #test_exist_cb().
951 */
952struct TestExistClosure
953{
954 /**
955 * Priority of the incoming request.
956 */
957 int32_t priority;
958
959 /**
960 * Relative TTL of the incoming request.
961 */
962 int32_t ttl;
963
964 /**
965 * Type of the incoming request.
966 */
967 enum GNUNET_BLOCK_Type type;
968
969 /**
970 * Set to #GNUNET_YES if we are done handling the query.
971 */
972 int finished;
973};
974
975
976/**
977 * Test if the query already exists. If so, merge it, otherwise
978 * keep `finished` at #GNUNET_NO.
979 *
980 * @param cls our `struct TestExistClosure`
981 * @param hc the key of the query
982 * @param value the existing `struct PeerRequest`.
983 * @return #GNUNET_YES to continue to iterate,
984 * #GNUNET_NO if we successfully merged
985 */
986static int
987test_exist_cb (void *cls,
988 const struct GNUNET_HashCode *hc,
989 void *value)
990{
991 struct TestExistClosure *tec = cls;
992 struct PeerRequest *peerreq = value;
993 struct GSF_PendingRequest *pr;
994 struct GSF_PendingRequestData *prd;
995
996 pr = peerreq->pr;
997 prd = GSF_pending_request_get_data_ (pr);
998 if (prd->type != tec->type)
999 return GNUNET_YES;
1000 if (prd->ttl.abs_value_us >=
1001 GNUNET_TIME_absolute_get ().abs_value_us + tec->ttl * 1000LL)
1002 {
1003 /* existing request has higher TTL, drop new one! */
1004 prd->priority += tec->priority;
1005 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1006 "Have existing request with higher TTL, dropping new request.\n");
1007 GNUNET_STATISTICS_update (GSF_stats,
1008 gettext_noop
1009 ("# requests dropped due to higher-TTL request"),
1010 1, GNUNET_NO);
1011 tec->finished = GNUNET_YES;
1012 return GNUNET_NO;
1013 }
1014 /* existing request has lower TTL, drop old one! */
1015 tec->priority += prd->priority;
1016 free_pending_request (peerreq);
1017 GSF_pending_request_cancel_ (pr,
1018 GNUNET_YES);
1019 return GNUNET_NO;
1020}
1021
1022
1023/**
1024 * Handle P2P "QUERY" message. Creates the pending request entry
1025 * and sets up all of the data structures to that we will
1026 * process replies properly. Does not initiate forwarding or
1027 * local database lookups.
1028 *
1029 * @param cls the other peer involved (sender of the message)
1030 * @param gm the GET message
1031 */
1032void
1033handle_p2p_get (void *cls,
1034 const struct GetMessage *gm)
1035{
1036 struct GSF_ConnectedPeer *cps = cls;
1037 struct PeerRequest *peerreq;
1038 struct GSF_PendingRequest *pr;
1039 struct GSF_ConnectedPeer *cp;
1040 const struct GNUNET_PeerIdentity *target;
1041 enum GSF_PendingRequestOptions options;
1042 uint16_t msize;
1043 unsigned int bits;
1044 const struct GNUNET_PeerIdentity *opt;
1045 uint32_t bm;
1046 size_t bfsize;
1047 uint32_t ttl_decrement;
1048 struct TestExistClosure tec;
1049 GNUNET_PEER_Id spid;
1050 const struct GSF_PendingRequestData *prd;
1051
1052 msize = ntohs (gm->header.size);
1053 tec.type = ntohl (gm->type);
1054 bm = ntohl (gm->hash_bitmap);
1055 bits = 0;
1056 while (bm > 0)
1057 {
1058 if (1 == (bm & 1))
1059 bits++;
1060 bm >>= 1;
1061 }
1062 opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1063 bfsize = msize - sizeof(struct GetMessage) - bits * sizeof(struct
1064 GNUNET_PeerIdentity);
1065 GNUNET_STATISTICS_update (GSF_stats,
1066 gettext_noop
1067 ("# GET requests received (from other peers)"),
1068 1,
1069 GNUNET_NO);
1070 GSF_cover_query_count++;
1071 bm = ntohl (gm->hash_bitmap);
1072 bits = 0;
1073 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1074 cp = GSF_peer_get_ (&opt[bits++]);
1075 else
1076 cp = cps;
1077 if (NULL == cp)
1078 {
1079 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1080 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1081 "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1082 GNUNET_i2s (&opt[bits - 1]));
1083
1084 else
1085 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1086 "Failed to find peer `%s' in connection set. Dropping query.\n",
1087 GNUNET_i2s (cps->ppd.peer));
1088 GNUNET_STATISTICS_update (GSF_stats,
1089 gettext_noop
1090 (
1091 "# requests dropped due to missing reverse route"),
1092 1,
1093 GNUNET_NO);
1094 return;
1095 }
1096 unsigned int queue_size = GNUNET_MQ_get_length (cp->mq);
1097 queue_size += cp->ppd.pending_replies + cp->delay_queue_size;
1098 if (queue_size > MAX_QUEUE_PER_PEER)
1099 {
1100 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1101 "Peer `%s' has too many replies queued already. Dropping query.\n",
1102 GNUNET_i2s (cps->ppd.peer));
1103 GNUNET_STATISTICS_update (GSF_stats,
1104 gettext_noop (
1105 "# requests dropped due to full reply queue"),
1106 1,
1107 GNUNET_NO);
1108 return;
1109 }
1110 /* note that we can really only check load here since otherwise
1111 * peers could find out that we are overloaded by not being
1112 * disconnected after sending us a malformed query... */
1113 tec.priority = bound_priority (ntohl (gm->priority),
1114 cps);
1115 if (tec.priority < 0)
1116 {
1117 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1118 "Dropping query from `%s', this peer is too busy.\n",
1119 GNUNET_i2s (cps->ppd.peer));
1120 return;
1121 }
1122 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1123 "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1124 GNUNET_h2s (&gm->query),
1125 (unsigned int) tec.type,
1126 GNUNET_i2s (cps->ppd.peer),
1127 (unsigned int) bm);
1128 target =
1129 (0 !=
1130 (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1131 options = GSF_PRO_DEFAULTS;
1132 spid = 0;
1133 if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1
1134 + tec.priority))
1135 || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1136 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2
1137 + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1138 {
1139 /* don't have BW to send to peer, or would likely take longer than we have for it,
1140 * so at best indirect the query */
1141 tec.priority = 0;
1142 options |= GSF_PRO_FORWARD_ONLY;
1143 spid = GNUNET_PEER_intern (cps->ppd.peer);
1144 GNUNET_assert (0 != spid);
1145 }
1146 tec.ttl = bound_ttl (ntohl (gm->ttl),
1147 tec.priority);
1148 /* decrement ttl (always) */
1149 ttl_decrement =
1150 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1151 TTL_DECREMENT);
1152 if ((tec.ttl < 0) &&
1153 (((int32_t) (tec.ttl - ttl_decrement)) > 0))
1154 {
1155 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1156 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1157 GNUNET_i2s (cps->ppd.peer),
1158 tec.ttl,
1159 ttl_decrement);
1160 GNUNET_STATISTICS_update (GSF_stats,
1161 gettext_noop
1162 ("# requests dropped due TTL underflow"), 1,
1163 GNUNET_NO);
1164 /* integer underflow => drop (should be very rare)! */
1165 return;
1166 }
1167 tec.ttl -= ttl_decrement;
1168
1169 /* test if the request already exists */
1170 tec.finished = GNUNET_NO;
1171 GNUNET_CONTAINER_multihashmap_get_multiple (cp->request_map,
1172 &gm->query,
1173 &test_exist_cb,
1174 &tec);
1175 if (GNUNET_YES == tec.finished)
1176 return; /* merged into existing request, we're done */
1177
1178 peerreq = GNUNET_new (struct PeerRequest);
1179 peerreq->cp = cp;
1180 pr = GSF_pending_request_create_ (options,
1181 tec.type,
1182 &gm->query,
1183 target,
1184 (bfsize > 0)
1185 ? (const char *) &opt[bits]
1186 : NULL,
1187 bfsize,
1188 1 /* anonymity */,
1189 (uint32_t) tec.priority,
1190 tec.ttl,
1191 spid,
1192 GNUNET_PEER_intern (cps->ppd.peer),
1193 NULL, 0, /* replies_seen */
1194 &handle_p2p_reply,
1195 peerreq);
1196 GNUNET_assert (NULL != pr);
1197 prd = GSF_pending_request_get_data_ (pr);
1198 peerreq->pr = pr;
1199 GNUNET_break (GNUNET_OK ==
1200 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1201 &prd->query,
1202 peerreq,
1203 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1204 GNUNET_STATISTICS_update (GSF_stats,
1205 gettext_noop (
1206 "# P2P query messages received and processed"),
1207 1,
1208 GNUNET_NO);
1209 GNUNET_STATISTICS_update (GSF_stats,
1210 gettext_noop ("# P2P searches active"),
1211 1,
1212 GNUNET_NO);
1213 GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
1214 GSF_local_lookup_ (pr,
1215 &GSF_consider_forwarding,
1216 NULL);
1217}
1218
1219
1220/**
1221 * Transmit a message to the given peer as soon as possible.
1222 * If the peer disconnects before the transmission can happen,
1223 * the callback is invoked with a `NULL` @a buffer.
1224 *
1225 * @param cp target peer
1226 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1227 * @param priority how important is this request?
1228 * @param env message to send
1229 */
1230void
1231GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1232 int is_query,
1233 uint32_t priority,
1234 struct GNUNET_MQ_Envelope *env)
1235{
1236 struct GSF_PeerTransmitHandle *pth;
1237 struct GSF_PeerTransmitHandle *pos;
1238 struct GSF_PeerTransmitHandle *prev;
1239
1240 pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1241 pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1242 pth->env = env;
1243 pth->is_query = is_query;
1244 pth->priority = priority;
1245 pth->cp = cp;
1246 /* insertion sort (by priority, descending) */
1247 prev = NULL;
1248 pos = cp->pth_head;
1249 while ((NULL != pos) && (pos->priority > priority))
1250 {
1251 prev = pos;
1252 pos = pos->next;
1253 }
1254 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1255 cp->pth_tail,
1256 prev,
1257 pth);
1258 if (GNUNET_YES == is_query)
1259 cp->ppd.pending_queries++;
1260 else if (GNUNET_NO == is_query)
1261 cp->ppd.pending_replies++;
1262 schedule_transmission (pth);
1263}
1264
1265
1266/**
1267 * Report on receiving a reply; update the performance record of the given peer.
1268 *
1269 * @param cp responding peer (will be updated)
1270 * @param request_time time at which the original query was transmitted
1271 * @param request_priority priority of the original request
1272 */
1273void
1274GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1275 struct GNUNET_TIME_Absolute request_time,
1276 uint32_t request_priority)
1277{
1278 struct GNUNET_TIME_Relative delay;
1279
1280 delay = GNUNET_TIME_absolute_get_duration (request_time);
1281 cp->ppd.avg_reply_delay.rel_value_us =
1282 (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1)
1283 + delay.rel_value_us) / RUNAVG_DELAY_N;
1284 cp->ppd.avg_priority =
1285 (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1)
1286 + request_priority) / RUNAVG_DELAY_N;
1287}
1288
1289
1290/**
1291 * Report on receiving a reply in response to an initiating client.
1292 * Remember that this peer is good for this client.
1293 *
1294 * @param cp responding peer (will be updated)
1295 * @param initiator_client local client on responsible for query
1296 */
1297void
1298GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1299 struct GSF_LocalClient *initiator_client)
1300{
1301 cp->ppd.last_client_replies[cp->last_client_replies_woff++
1302 % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1303}
1304
1305
1306/**
1307 * Report on receiving a reply in response to an initiating peer.
1308 * Remember that this peer is good for this initiating peer.
1309 *
1310 * @param cp responding peer (will be updated)
1311 * @param initiator_peer other peer responsible for query
1312 */
1313void
1314GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1315 const struct GSF_ConnectedPeer *initiator_peer)
1316{
1317 unsigned int woff;
1318
1319 woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1320 GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1321 cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1322 GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1323 cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1324}
1325
1326
1327/**
1328 * Write peer-respect information to a file - flush the buffer entry!
1329 *
1330 * @param cls unused
1331 * @param key peer identity
1332 * @param value the `struct GSF_ConnectedPeer` to flush
1333 * @return #GNUNET_OK to continue iteration
1334 */
1335static int
1336flush_respect (void *cls,
1337 const struct GNUNET_PeerIdentity *key,
1338 void *value)
1339{
1340 struct GSF_ConnectedPeer *cp = value;
1341 struct GNUNET_PeerIdentity pid;
1342
1343 if (cp->ppd.respect == cp->disk_respect)
1344 return GNUNET_OK; /* unchanged */
1345 GNUNET_assert (0 != cp->ppd.pid);
1346 GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1347 GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1348 sizeof(cp->ppd.respect),
1349 GNUNET_TIME_UNIT_FOREVER_ABS,
1350 GNUNET_PEERSTORE_STOREOPTION_REPLACE,
1351 NULL,
1352 NULL);
1353 return GNUNET_OK;
1354}
1355
1356
1357void
1358GSF_peer_disconnect_handler (void *cls,
1359 const struct GNUNET_PeerIdentity *peer,
1360 void *internal_cls)
1361{
1362 struct GSF_ConnectedPeer *cp = internal_cls;
1363 struct GSF_PeerTransmitHandle *pth;
1364 struct GSF_DelayedHandle *dh;
1365
1366 if (NULL == cp)
1367 return; /* must have been disconnect from core with
1368 * 'peer' == my_id, ignore */
1369 flush_respect (NULL,
1370 peer,
1371 cp);
1372 GNUNET_assert (GNUNET_YES ==
1373 GNUNET_CONTAINER_multipeermap_remove (cp_map,
1374 peer,
1375 cp));
1376 GNUNET_STATISTICS_set (GSF_stats,
1377 gettext_noop ("# peers connected"),
1378 GNUNET_CONTAINER_multipeermap_size (cp_map),
1379 GNUNET_NO);
1380 if (NULL != cp->respect_iterate_req)
1381 {
1382 GNUNET_PEERSTORE_iteration_stop (cp->respect_iterate_req);
1383 cp->respect_iterate_req = NULL;
1384 }
1385 GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1386 &cancel_pending_request,
1387 cp);
1388 GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1389 cp->request_map = NULL;
1390 GSF_plan_notify_peer_disconnect_ (cp);
1391 GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1392 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies,
1393 P2P_SUCCESS_LIST_SIZE);
1394 memset (cp->ppd.last_p2p_replies,
1395 0,
1396 sizeof(cp->ppd.last_p2p_replies));
1397 GSF_push_stop_ (cp);
1398 while (NULL != (pth = cp->pth_head))
1399 {
1400 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1401 cp->pth_tail,
1402 pth);
1403 if (GNUNET_YES == pth->is_query)
1404 GNUNET_assert (0 < cp->ppd.pending_queries--);
1405 else if (GNUNET_NO == pth->is_query)
1406 GNUNET_assert (0 < cp->ppd.pending_replies--);
1407 GNUNET_free (pth);
1408 }
1409 while (NULL != (dh = cp->delayed_head))
1410 {
1411 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1412 cp->delayed_tail,
1413 dh);
1414 GNUNET_MQ_discard (dh->env);
1415 cp->delay_queue_size--;
1416 GNUNET_SCHEDULER_cancel (dh->delay_task);
1417 GNUNET_free (dh);
1418 }
1419 GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1420 if (NULL != cp->mig_revive_task)
1421 {
1422 GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1423 cp->mig_revive_task = NULL;
1424 }
1425 GNUNET_break (0 == cp->ppd.pending_queries);
1426 GNUNET_break (0 == cp->ppd.pending_replies);
1427 GNUNET_free (cp);
1428}
1429
1430
1431/**
1432 * Closure for #call_iterator().
1433 */
1434struct IterationContext
1435{
1436 /**
1437 * Function to call on each entry.
1438 */
1439 GSF_ConnectedPeerIterator it;
1440
1441 /**
1442 * Closure for @e it.
1443 */
1444 void *it_cls;
1445};
1446
1447
1448/**
1449 * Function that calls the callback for each peer.
1450 *
1451 * @param cls the `struct IterationContext *`
1452 * @param key identity of the peer
1453 * @param value the `struct GSF_ConnectedPeer *`
1454 * @return #GNUNET_YES to continue iteration
1455 */
1456static int
1457call_iterator (void *cls,
1458 const struct GNUNET_PeerIdentity *key,
1459 void *value)
1460{
1461 struct IterationContext *ic = cls;
1462 struct GSF_ConnectedPeer *cp = value;
1463
1464 ic->it (ic->it_cls,
1465 key, cp,
1466 &cp->ppd);
1467 return GNUNET_YES;
1468}
1469
1470
1471void
1472GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1473 void *it_cls)
1474{
1475 struct IterationContext ic;
1476
1477 ic.it = it;
1478 ic.it_cls = it_cls;
1479 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1480 &call_iterator,
1481 &ic);
1482}
1483
1484
1485/**
1486 * Obtain the identity of a connected peer.
1487 *
1488 * @param cp peer to get identity of
1489 * @param id identity to set (written to)
1490 */
1491void
1492GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1493 struct GNUNET_PeerIdentity *id)
1494{
1495 GNUNET_assert (0 != cp->ppd.pid);
1496 GNUNET_PEER_resolve (cp->ppd.pid, id);
1497}
1498
1499
1500/**
1501 * Obtain the identity of a connected peer.
1502 *
1503 * @param cp peer to get identity of
1504 * @return reference to peer identity, valid until peer disconnects (!)
1505 */
1506const struct GNUNET_PeerIdentity *
1507GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1508{
1509 GNUNET_assert (0 != cp->ppd.pid);
1510 return GNUNET_PEER_resolve2 (cp->ppd.pid);
1511}
1512
1513
1514/**
1515 * Ask a peer to stop migrating data to us until the given point
1516 * in time.
1517 *
1518 * @param cp peer to ask
1519 * @param block_time until when to block
1520 */
1521void
1522GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1523 struct GNUNET_TIME_Absolute block_time)
1524{
1525 struct GNUNET_MQ_Envelope *env;
1526 struct MigrationStopMessage *msm;
1527
1528 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1529 {
1530 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1531 "Migration already blocked for another %s\n",
1532 GNUNET_STRINGS_relative_time_to_string (
1533 GNUNET_TIME_absolute_get_remaining
1534 (cp->
1535 last_migration_block), GNUNET_YES));
1536 return; /* already blocked */
1537 }
1538 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1539 GNUNET_STRINGS_relative_time_to_string (
1540 GNUNET_TIME_absolute_get_remaining (block_time),
1541 GNUNET_YES));
1542 cp->last_migration_block = block_time;
1543 env = GNUNET_MQ_msg (msm,
1544 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1545 msm->reserved = htonl (0);
1546 msm->duration
1547 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1548 (cp->last_migration_block));
1549 GNUNET_STATISTICS_update (GSF_stats,
1550 gettext_noop ("# migration stop messages sent"),
1551 1,
1552 GNUNET_NO);
1553 GSF_peer_transmit_ (cp,
1554 GNUNET_SYSERR,
1555 UINT32_MAX,
1556 env);
1557}
1558
1559
1560/**
1561 * Notify core about a preference we have for the given peer
1562 * (to allocate more resources towards it). The change will
1563 * be communicated the next time we reserve bandwidth with
1564 * core (not instantly).
1565 *
1566 * @param cp peer to reserve bandwidth from
1567 * @param pref preference change
1568 */
1569void
1570GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1571 uint64_t pref)
1572{
1573 cp->inc_preference += pref;
1574}
1575
1576
1577/**
1578 * Call this method periodically to flush respect information to disk.
1579 *
1580 * @param cls closure, not used
1581 */
1582static void
1583cron_flush_respect (void *cls)
1584{
1585 fr_task = NULL;
1586 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1587 &flush_respect,
1588 NULL);
1589 fr_task = GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1590 GNUNET_SCHEDULER_PRIORITY_HIGH,
1591 &cron_flush_respect,
1592 NULL);
1593}
1594
1595
1596/**
1597 * Initialize peer management subsystem.
1598 */
1599void
1600GSF_connected_peer_init_ ()
1601{
1602 cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1603 peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1604 fr_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1605 &cron_flush_respect, NULL);
1606}
1607
1608
1609/**
1610 * Shutdown peer management subsystem.
1611 */
1612void
1613GSF_connected_peer_done_ ()
1614{
1615 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1616 &flush_respect,
1617 NULL);
1618 GNUNET_SCHEDULER_cancel (fr_task);
1619 fr_task = NULL;
1620 GNUNET_CONTAINER_multipeermap_destroy (cp_map);
1621 cp_map = NULL;
1622 GNUNET_PEERSTORE_disconnect (peerstore);
1623}
1624
1625
1626/**
1627 * Iterator to remove references to LC entry.
1628 *
1629 * @param cls the `struct GSF_LocalClient *` to look for
1630 * @param key current key code
1631 * @param value value in the hash map (peer entry)
1632 * @return #GNUNET_YES (we should continue to iterate)
1633 */
1634static int
1635clean_local_client (void *cls,
1636 const struct GNUNET_PeerIdentity *key,
1637 void *value)
1638{
1639 const struct GSF_LocalClient *lc = cls;
1640 struct GSF_ConnectedPeer *cp = value;
1641 unsigned int i;
1642
1643 for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1644 if (cp->ppd.last_client_replies[i] == lc)
1645 cp->ppd.last_client_replies[i] = NULL;
1646 return GNUNET_YES;
1647}
1648
1649
1650/**
1651 * Notification that a local client disconnected. Clean up all of our
1652 * references to the given handle.
1653 *
1654 * @param lc handle to the local client (henceforth invalid)
1655 */
1656void
1657GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1658{
1659 if (NULL == cp_map)
1660 return; /* already cleaned up */
1661 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1662 &clean_local_client,
1663 (void *) lc);
1664}
1665
1666
1667/* end of gnunet-service-fs_cp.c */