aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-03-04 11:31:08 +0000
committerChristian Grothoff <christian@grothoff.org>2011-03-04 11:31:08 +0000
commit3ef84f3677066f95a67a3df8b9943cf7892a7894 (patch)
tree516f96330eb166f9e4b940cb20ca00cae96c7570 /src/fs/gnunet-service-fs_cp.c
parentfb199b4553100aa977d4d2f4a108bb0a27a705d2 (diff)
downloadgnunet-3ef84f3677066f95a67a3df8b9943cf7892a7894.tar.gz
gnunet-3ef84f3677066f95a67a3df8b9943cf7892a7894.zip
fixes
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c347
1 files changed, 255 insertions, 92 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index be041c861..e015b954a 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -24,8 +24,11 @@
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_load_lib.h"
27#include "gnunet-service-fs.h" 28#include "gnunet-service-fs.h"
28#include "gnunet-service-fs_cp.h" 29#include "gnunet-service-fs_cp.h"
30#include "gnunet-service-fs_pe.h"
31#include "gnunet-service-fs_pr.h"
29#include "gnunet-service-fs_push.h" 32#include "gnunet-service-fs_push.h"
30 33
31/** 34/**
@@ -46,6 +49,16 @@ struct GSF_PeerTransmitHandle
46{ 49{
47 50
48 /** 51 /**
52 * Kept in a doubly-linked list.
53 */
54 struct GSF_PeerTransmitHandle *next;
55
56 /**
57 * Kept in a doubly-linked list.
58 */
59 struct GSF_PeerTransmitHandle *prev;
60
61 /**
49 * Handle for an active request for transmission to this 62 * Handle for an active request for transmission to this
50 * peer, or NULL (if core queue was full). 63 * peer, or NULL (if core queue was full).
51 */ 64 */
@@ -119,7 +132,7 @@ struct GSF_ConnectedPeer
119 /** 132 /**
120 * Task scheduled to revive migration to this peer. 133 * Task scheduled to revive migration to this peer.
121 */ 134 */
122 struct GNUNET_SCHEDULER_TaskIdentifier mig_revive_task; 135 GNUNET_SCHEDULER_TaskIdentifier mig_revive_task;
123 136
124 /** 137 /**
125 * Messages (replies, queries, content migration) we would like to 138 * Messages (replies, queries, content migration) we would like to
@@ -147,7 +160,7 @@ struct GSF_ConnectedPeer
147 /** 160 /**
148 * Active requests from this neighbour. 161 * Active requests from this neighbour.
149 */ 162 */
150 struct GNUNET_CONTAINER_MulitHashMap *request_map; 163 struct GNUNET_CONTAINER_MultiHashMap *request_map;
151 164
152 /** 165 /**
153 * Increase in traffic preference still to be submitted 166 * Increase in traffic preference still to be submitted
@@ -185,7 +198,6 @@ struct GSF_ConnectedPeer
185 */ 198 */
186static struct GNUNET_CONTAINER_MultiHashMap *cp_map; 199static struct GNUNET_CONTAINER_MultiHashMap *cp_map;
187 200
188
189/** 201/**
190 * Where do we store trust information? 202 * Where do we store trust information?
191 */ 203 */
@@ -247,7 +259,7 @@ update_atsi (struct GSF_ConnectedPeer *cp,
247 struct GNUNET_TIME_Relative latency; 259 struct GNUNET_TIME_Relative latency;
248 260
249 latency = get_latency (atsi); 261 latency = get_latency (atsi);
250 GNUNET_LOAD_value_set_decline (cp->transmission_delay, 262 GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
251 latency); 263 latency);
252 /* LATER: merge atsi into cp's performance data (if we ever care...) */ 264 /* LATER: merge atsi into cp's performance data (if we ever care...) */
253} 265}
@@ -302,7 +314,7 @@ peer_transmit_ready_cb (void *cls,
302 GNUNET_assert (0 < cp->ppd.pending_replies--); 314 GNUNET_assert (0 < cp->ppd.pending_replies--);
303 } 315 }
304 GNUNET_LOAD_update (cp->ppd.transmission_delay, 316 GNUNET_LOAD_update (cp->ppd.transmission_delay,
305 GNUNET_TIME_absolute_get_duration (pth->request_start_time).rel_value); 317 GNUNET_TIME_absolute_get_duration (pth->transmission_request_start_time).rel_value);
306 ret = pth->gmc (pth->gmc_cls, 318 ret = pth->gmc (pth->gmc_cls,
307 0, NULL); 319 0, NULL);
308 GNUNET_free (pth); 320 GNUNET_free (pth);
@@ -322,12 +334,13 @@ peer_transmit_ready_cb (void *cls,
322 */ 334 */
323static void 335static void
324core_reserve_callback (void *cls, 336core_reserve_callback (void *cls,
325 const struct GNUNET_PeerIdentity * peer, 337 const struct GNUNET_PeerIdentity *peer,
326 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, 338 struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
327 int amount, 339 int amount,
328 uint64_t preference) 340 uint64_t preference)
329{ 341{
330 struct GSF_ConnectedPeer *cp = cls; 342 struct GSF_ConnectedPeer *cp = cls;
343 struct GSF_PeerTransmitHandle *pth;
331 uint64_t ip; 344 uint64_t ip;
332 345
333 cp->irc = NULL; 346 cp->irc = NULL;
@@ -339,11 +352,11 @@ core_reserve_callback (void *cls,
339 GNUNET_i2s (peer)); 352 GNUNET_i2s (peer));
340 ip = cp->inc_preference; 353 ip = cp->inc_preference;
341 cp->inc_preference = 0; 354 cp->inc_preference = 0;
342 cp->irc = GNUNET_CORE_peer_change_preference (core, 355 cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
343 peer, 356 peer,
344 GNUNET_TIME_UNIT_FOREVER_REL, 357 GNUNET_TIME_UNIT_FOREVER_REL,
345 GNUNET_BANDWIDTH_VALUE_MAX, 358 GNUNET_BANDWIDTH_VALUE_MAX,
346 GNUNET_FS_DBLOCK_SIZE, 359 DBLOCK_SIZE,
347 ip, 360 ip,
348 &core_reserve_callback, 361 &core_reserve_callback,
349 cp); 362 cp);
@@ -354,11 +367,11 @@ core_reserve_callback (void *cls,
354 (NULL == pth->cth) ) 367 (NULL == pth->cth) )
355 { 368 {
356 /* reservation success, try transmission now! */ 369 /* reservation success, try transmission now! */
357 pth->cth = GNUNET_CORE_notify_transmit_ready (core, 370 pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
358 priority, 371 pth->priority,
359 GNUNET_TIME_absolute_get_remaining (pth->timeout), 372 GNUNET_TIME_absolute_get_remaining (pth->timeout),
360 &target, 373 peer,
361 size, 374 pth->size,
362 &peer_transmit_ready_cb, 375 &peer_transmit_ready_cb,
363 pth); 376 pth);
364 } 377 }
@@ -380,24 +393,22 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
380 struct GSF_ConnectedPeer *cp; 393 struct GSF_ConnectedPeer *cp;
381 char *fn; 394 char *fn;
382 uint32_t trust; 395 uint32_t trust;
383 struct GNUNET_TIME_Relative latency;
384 396
385 cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer)); 397 cp = GNUNET_malloc (sizeof (struct GSF_ConnectedPeer));
386 cp->transmission_delay = GNUNET_LOAD_value_init (latency);
387 cp->ppd.pid = GNUNET_PEER_intern (peer); 398 cp->ppd.pid = GNUNET_PEER_intern (peer);
388 cp->transmission_delay = GNUNET_LOAD_value_init (0); 399 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
389 cp->irc = GNUNET_CORE_peer_change_preference (core, 400 cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
390 peer, 401 peer,
391 GNUNET_TIME_UNIT_FOREVER_REL, 402 GNUNET_TIME_UNIT_FOREVER_REL,
392 GNUNET_BANDWIDTH_VALUE_MAX, 403 GNUNET_BANDWIDTH_VALUE_MAX,
393 GNUNET_FS_DBLOCK_SIZE, 404 DBLOCK_SIZE,
394 0, 405 0,
395 &core_reserve_callback, 406 &core_reserve_callback,
396 cp); 407 cp);
397 fn = get_trust_filename (peer); 408 fn = get_trust_filename (peer);
398 if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) && 409 if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
399 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust)))) 410 (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
400 cp->disk_trust = cp->trust = ntohl (trust); 411 cp->disk_trust = cp->ppd.trust = ntohl (trust);
401 GNUNET_free (fn); 412 GNUNET_free (fn);
402 cp->request_map = GNUNET_CONTAINER_multihashmap_create (128); 413 cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
403 GNUNET_break (GNUNET_OK == 414 GNUNET_break (GNUNET_OK ==
@@ -406,7 +417,6 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
406 cp, 417 cp,
407 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 418 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
408 update_atsi (cp, atsi); 419 update_atsi (cp, atsi);
409 GSF_plan_notify_new_peer_ (cp);
410 GSF_push_start_ (cp); 420 GSF_push_start_ (cp);
411 return cp; 421 return cp;
412} 422}
@@ -499,20 +509,22 @@ copy_reply (void *cls,
499 void *buf) 509 void *buf)
500{ 510{
501 struct PutMessage *pm = cls; 511 struct PutMessage *pm = cls;
512 size_t size;
502 513
503 if (buf != NULL) 514 if (buf != NULL)
504 { 515 {
505 GNUNET_assert (size >= ntohs (pm->header.size)); 516 GNUNET_assert (buf_size >= ntohs (pm->header.size));
506 size = ntohs (pm->header.size); 517 size = ntohs (pm->header.size);
507 memcpy (buf, pm, size); 518 memcpy (buf, pm, size);
508 GNUNET_STATISTICS_update (stats, 519 GNUNET_STATISTICS_update (GSF_stats,
509 gettext_noop ("# replies transmitted to other peers"), 520 gettext_noop ("# replies transmitted to other peers"),
510 1, 521 1,
511 GNUNET_NO); 522 GNUNET_NO);
512 } 523 }
513 else 524 else
514 { 525 {
515 GNUNET_STATISTICS_update (stats, 526 size = 0;
527 GNUNET_STATISTICS_update (GSF_stats,
516 gettext_noop ("# replies dropped"), 528 gettext_noop ("# replies dropped"),
517 1, 529 1,
518 GNUNET_NO); 530 GNUNET_NO);
@@ -549,7 +561,7 @@ handle_p2p_reply (void *cls,
549 int more) 561 int more)
550{ 562{
551 struct GSF_ConnectedPeer *cp = cls; 563 struct GSF_ConnectedPeer *cp = cls;
552 struct GSF_PendingRequest *prd; 564 struct GSF_PendingRequestData *prd;
553 struct PutMessage *pm; 565 struct PutMessage *pm;
554 size_t msize; 566 size_t msize;
555 567
@@ -557,7 +569,7 @@ handle_p2p_reply (void *cls,
557 if (NULL == data) 569 if (NULL == data)
558 { 570 {
559 GNUNET_assert (GNUNET_NO == more); 571 GNUNET_assert (GNUNET_NO == more);
560 GNUNET_STATISTICS_update (stats, 572 GNUNET_STATISTICS_update (GSF_stats,
561 gettext_noop ("# P2P searches active"), 573 gettext_noop ("# P2P searches active"),
562 -1, 574 -1,
563 GNUNET_NO); 575 GNUNET_NO);
@@ -572,7 +584,7 @@ handle_p2p_reply (void *cls,
572 "Transmitting result for query `%s'\n", 584 "Transmitting result for query `%s'\n",
573 GNUNET_h2s (key)); 585 GNUNET_h2s (key));
574#endif 586#endif
575 GNUNET_STATISTICS_update (stats, 587 GNUNET_STATISTICS_update (GSF_stats,
576 gettext_noop ("# replies received for other peers"), 588 gettext_noop ("# replies received for other peers"),
577 1, 589 1,
578 GNUNET_NO); 590 GNUNET_NO);
@@ -593,6 +605,163 @@ handle_p2p_reply (void *cls,
593 605
594 606
595/** 607/**
608 * Test if the DATABASE (GET) load on this peer is too high
609 * to even consider processing the query at
610 * all.
611 *
612 * @return GNUNET_YES if the load is too high to do anything (load high)
613 * GNUNET_NO to process normally (load normal)
614 * GNUNET_SYSERR to process for free (load low)
615 */
616static int
617test_get_load_too_high (uint32_t priority)
618{
619#if FIXME_later
620 double ld;
621
622 ld = GNUNET_LOAD_get_load (datastore_get_load);
623 if (ld < 1)
624 return GNUNET_SYSERR;
625 if (ld <= priority)
626 return GNUNET_NO;
627 return GNUNET_YES;
628#else
629 return GNUNET_SYSERR;
630#endif
631}
632
633
634/**
635 * Increase the host credit by a value.
636 *
637 * @param cp which peer to change the trust value on
638 * @param value is the int value by which the
639 * host credit is to be increased or decreased
640 * @returns the actual change in trust (positive or negative)
641 */
642static int
643change_host_trust (struct GSF_ConnectedPeer *cp, int value)
644{
645 if (value == 0)
646 return 0;
647 GNUNET_assert (cp != NULL);
648 if (value > 0)
649 {
650 if (cp->ppd.trust + value < cp->ppd.trust)
651 {
652 value = UINT32_MAX - cp->ppd.trust;
653 cp->ppd.trust = UINT32_MAX;
654 }
655 else
656 cp->ppd.trust += value;
657 }
658 else
659 {
660 if (cp->ppd.trust < -value)
661 {
662 value = -cp->ppd.trust;
663 cp->ppd.trust = 0;
664 }
665 else
666 cp->ppd.trust += value;
667 }
668 return value;
669}
670
671
672/**
673 * We've received a request with the specified priority. Bound it
674 * according to how much we trust the given peer.
675 *
676 * @param prio_in requested priority
677 * @param cp the peer making the request
678 * @return effective priority
679 */
680static int32_t
681bound_priority (uint32_t prio_in,
682 struct GSF_ConnectedPeer *cp)
683{
684#define N ((double)128.0)
685 uint32_t ret;
686 double rret;
687 int ld;
688
689 ld = test_get_load_too_high (0);
690 if (ld == GNUNET_SYSERR)
691 {
692 GNUNET_STATISTICS_update (GSF_stats,
693 gettext_noop ("# requests done for free (low load)"),
694 1,
695 GNUNET_NO);
696 return 0; /* excess resources */
697 }
698 if (prio_in > INT32_MAX)
699 prio_in = INT32_MAX;
700 ret = - change_host_trust (cp, - (int) prio_in);
701 if (ret > 0)
702 {
703 if (ret > GSF_current_priorities + N)
704 rret = GSF_current_priorities + N;
705 else
706 rret = ret;
707 GSF_current_priorities
708 = (GSF_current_priorities * (N-1) + rret)/N;
709 }
710 if ( (ld == GNUNET_YES) && (ret > 0) )
711 {
712 /* try with charging */
713 ld = test_get_load_too_high (ret);
714 }
715 if (ld == GNUNET_YES)
716 {
717 GNUNET_STATISTICS_update (GSF_stats,
718 gettext_noop ("# request dropped, priority insufficient"),
719 1,
720 GNUNET_NO);
721 /* undo charge */
722 change_host_trust (cp, (int) ret);
723 return -1; /* not enough resources */
724 }
725 else
726 {
727 GNUNET_STATISTICS_update (GSF_stats,
728 gettext_noop ("# requests done for a price (normal load)"),
729 1,
730 GNUNET_NO);
731 }
732#undef N
733 return ret;
734}
735
736
737/**
738 * The priority level imposes a bound on the maximum
739 * value for the ttl that can be requested.
740 *
741 * @param ttl_in requested ttl
742 * @param prio given priority
743 * @return ttl_in if ttl_in is below the limit,
744 * otherwise the ttl-limit for the given priority
745 */
746static int32_t
747bound_ttl (int32_t ttl_in, uint32_t prio)
748{
749 unsigned long long allowed;
750
751 if (ttl_in <= 0)
752 return ttl_in;
753 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
754 if (ttl_in > allowed)
755 {
756 if (allowed >= (1 << 30))
757 return 1 << 30;
758 return allowed;
759 }
760 return ttl_in;
761}
762
763
764/**
596 * Handle P2P "QUERY" message. Creates the pending request entry 765 * Handle P2P "QUERY" message. Creates the pending request entry
597 * and sets up all of the data structures to that we will 766 * and sets up all of the data structures to that we will
598 * process replies properly. Does not initiate forwarding or 767 * process replies properly. Does not initiate forwarding or
@@ -611,10 +780,9 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
611 struct GSF_PendingRequestData *prd; 780 struct GSF_PendingRequestData *prd;
612 struct GSF_ConnectedPeer *cp; 781 struct GSF_ConnectedPeer *cp;
613 struct GSF_ConnectedPeer *cps; 782 struct GSF_ConnectedPeer *cps;
614 GNUNET_HashCode *namespace; 783 const GNUNET_HashCode *namespace;
615 struct GNUNET_PeerIdentity *target; 784 const struct GNUNET_PeerIdentity *target;
616 enum GSF_PendingRequestOptions options; 785 enum GSF_PendingRequestOptions options;
617 struct GNUNET_TIME_Relative timeout;
618 uint16_t msize; 786 uint16_t msize;
619 const struct GetMessage *gm; 787 const struct GetMessage *gm;
620 unsigned int bits; 788 unsigned int bits;
@@ -631,7 +799,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
631 if (msize < sizeof (struct GetMessage)) 799 if (msize < sizeof (struct GetMessage))
632 { 800 {
633 GNUNET_break_op (0); 801 GNUNET_break_op (0);
634 return GNUNET_SYSERR; 802 return NULL;
635 } 803 }
636 gm = (const struct GetMessage*) message; 804 gm = (const struct GetMessage*) message;
637#if DEBUG_FS 805#if DEBUG_FS
@@ -651,7 +819,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
651 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode)) 819 if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
652 { 820 {
653 GNUNET_break_op (0); 821 GNUNET_break_op (0);
654 return GNUNET_SYSERR; 822 return NULL;
655 } 823 }
656 opt = (const GNUNET_HashCode*) &gm[1]; 824 opt = (const GNUNET_HashCode*) &gm[1];
657 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode); 825 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
@@ -659,24 +827,24 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
659 if (0 != ( (bfsize - 1) & bfsize)) 827 if (0 != ( (bfsize - 1) & bfsize))
660 { 828 {
661 GNUNET_break_op (0); 829 GNUNET_break_op (0);
662 return GNUNET_SYSERR; 830 return NULL;
663 } 831 }
664 cover_query_count++; 832 GSF_cover_query_count++;
665 bm = ntohl (gm->hash_bitmap); 833 bm = ntohl (gm->hash_bitmap);
666 bits = 0; 834 bits = 0;
667 cps = GNUNET_CONTAINER_multihashmap_get (connected_peers, 835 cps = GNUNET_CONTAINER_multihashmap_get (cp_map,
668 &other->hashPubKey); 836 &other->hashPubKey);
669 if (NULL == cps) 837 if (NULL == cps)
670 { 838 {
671 /* peer must have just disconnected */ 839 /* peer must have just disconnected */
672 GNUNET_STATISTICS_update (stats, 840 GNUNET_STATISTICS_update (GSF_stats,
673 gettext_noop ("# requests dropped due to initiator not being connected"), 841 gettext_noop ("# requests dropped due to initiator not being connected"),
674 1, 842 1,
675 GNUNET_NO); 843 GNUNET_NO);
676 return GNUNET_SYSERR; 844 return NULL;
677 } 845 }
678 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) 846 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
679 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, 847 cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
680 &opt[bits++]); 848 &opt[bits++]);
681 else 849 else
682 cp = cps; 850 cp = cps;
@@ -693,11 +861,11 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
693 "Failed to find peer `%4s' in connection set. Dropping query.\n", 861 "Failed to find peer `%4s' in connection set. Dropping query.\n",
694 GNUNET_i2s (other)); 862 GNUNET_i2s (other));
695#endif 863#endif
696 GNUNET_STATISTICS_update (stats, 864 GNUNET_STATISTICS_update (GSF_stats,
697 gettext_noop ("# requests dropped due to missing reverse route"), 865 gettext_noop ("# requests dropped due to missing reverse route"),
698 1, 866 1,
699 GNUNET_NO); 867 GNUNET_NO);
700 return GNUNET_OK; 868 return NULL;
701 } 869 }
702 /* note that we can really only check load here since otherwise 870 /* note that we can really only check load here since otherwise
703 peers could find out that we are overloaded by not being 871 peers could find out that we are overloaded by not being
@@ -710,7 +878,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
710 "Dropping query from `%s', this peer is too busy.\n", 878 "Dropping query from `%s', this peer is too busy.\n",
711 GNUNET_i2s (other)); 879 GNUNET_i2s (other));
712#endif 880#endif
713 return GNUNET_OK; 881 return NULL;
714 } 882 }
715#if DEBUG_FS 883#if DEBUG_FS
716 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 884 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -723,16 +891,16 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
723 namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL; 891 namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : NULL;
724 target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL; 892 target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
725 options = 0; 893 options = 0;
726 if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) || 894 if ( (GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + priority)) ||
727 (GNUNET_LOAD_get_average (cp->transmission_delay) > 895 (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
728 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) ) 896 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)) )
729 { 897 {
730 /* don't have BW to send to peer, or would likely take longer than we have for it, 898 /* don't have BW to send to peer, or would likely take longer than we have for it,
731 so at best indirect the query */ 899 so at best indirect the query */
732 priority = 0; 900 priority = 0;
733 options |= GSF_PRO_FORWARD_ONLY; 901 options |= GSF_PRO_FORWARD_ONLY;
734 } 902 }
735 ttl = bound_ttl (ntohl (gm->ttl), pr->priority); 903 ttl = bound_ttl (ntohl (gm->ttl), priority);
736 /* decrement ttl (always) */ 904 /* decrement ttl (always) */
737 ttl_decrement = 2 * TTL_DECREMENT + 905 ttl_decrement = 2 * TTL_DECREMENT +
738 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 906 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
@@ -747,12 +915,12 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
747 ttl, 915 ttl,
748 ttl_decrement); 916 ttl_decrement);
749#endif 917#endif
750 GNUNET_STATISTICS_update (stats, 918 GNUNET_STATISTICS_update (GSF_stats,
751 gettext_noop ("# requests dropped due TTL underflow"), 919 gettext_noop ("# requests dropped due TTL underflow"),
752 1, 920 1,
753 GNUNET_NO); 921 GNUNET_NO);
754 /* integer underflow => drop (should be very rare)! */ 922 /* integer underflow => drop (should be very rare)! */
755 return GNUNET_OK; 923 return NULL;
756 } 924 }
757 ttl -= ttl_decrement; 925 ttl -= ttl_decrement;
758 926
@@ -763,8 +931,8 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
763 { 931 {
764 prd = GSF_pending_request_get_data_ (pr); 932 prd = GSF_pending_request_get_data_ (pr);
765 if ( (prd->type == type) && 933 if ( (prd->type == type) &&
766 ( (type != GNUNET_BLOCK_TYPE_SBLOCK) || 934 ( (type != GNUNET_BLOCK_TYPE_FS_SBLOCK) ||
767 (0 == memcmp (prd->namespace, 935 (0 == memcmp (&prd->namespace,
768 namespace, 936 namespace,
769 sizeof (GNUNET_HashCode))) ) ) 937 sizeof (GNUNET_HashCode))) ) )
770 { 938 {
@@ -777,14 +945,14 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
777 "Have existing request with higher TTL, dropping new request.\n", 945 "Have existing request with higher TTL, dropping new request.\n",
778 GNUNET_i2s (other)); 946 GNUNET_i2s (other));
779#endif 947#endif
780 GNUNET_STATISTICS_update (stats, 948 GNUNET_STATISTICS_update (GSF_stats,
781 gettext_noop ("# requests dropped due to higher-TTL request"), 949 gettext_noop ("# requests dropped due to higher-TTL request"),
782 1, 950 1,
783 GNUNET_NO); 951 GNUNET_NO);
784 return GNUNET_OK; 952 return NULL;
785 } 953 }
786 /* existing request has lower TTL, drop old one! */ 954 /* existing request has lower TTL, drop old one! */
787 pr->priority += prd->priority; 955 priority += prd->priority;
788 GSF_pending_request_cancel_ (pr); 956 GSF_pending_request_cancel_ (pr);
789 GNUNET_assert (GNUNET_YES == 957 GNUNET_assert (GNUNET_YES ==
790 GNUNET_CONTAINER_multihashmap_remove (cp->request_map, 958 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
@@ -793,30 +961,30 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
793 } 961 }
794 } 962 }
795 963
796 pr = GSF_pending_request_create (options, 964 pr = GSF_pending_request_create_ (options,
797 type, 965 type,
798 &gm->query, 966 &gm->query,
799 namespace, 967 namespace,
800 target, 968 target,
801 (bf_size > 0) ? (const char*)&opt[bits] : NULL, 969 (bfsize > 0) ? (const char*)&opt[bits] : NULL,
802 bf_size, 970 bfsize,
803 ntohl (gm->filter_mutator), 971 ntohl (gm->filter_mutator),
804 1 /* anonymity */ 972 1 /* anonymity */,
805 (uint32_t) priority, 973 (uint32_t) priority,
806 ttl, 974 ttl,
807 NULL, 0, /* replies_seen */ 975 NULL, 0, /* replies_seen */
808 &handle_p2p_reply, 976 &handle_p2p_reply,
809 cp); 977 cp);
810 GNUNET_break (GNUNET_OK == 978 GNUNET_break (GNUNET_OK ==
811 GNUNET_CONTAINER_multihashmap_put (cp->request_map, 979 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
812 &gm->query, 980 &gm->query,
813 pr, 981 pr,
814 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 982 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
815 GNUNET_STATISTICS_update (stats, 983 GNUNET_STATISTICS_update (GSF_stats,
816 gettext_noop ("# P2P searches received"), 984 gettext_noop ("# P2P searches received"),
817 1, 985 1,
818 GNUNET_NO); 986 GNUNET_NO);
819 GNUNET_STATISTICS_update (stats, 987 GNUNET_STATISTICS_update (GSF_stats,
820 gettext_noop ("# P2P searches active"), 988 gettext_noop ("# P2P searches active"),
821 1, 989 1,
822 GNUNET_NO); 990 GNUNET_NO);
@@ -860,7 +1028,7 @@ peer_transmit_timeout (void *cls,
860 * If the peer disconnects before the transmission can happen, 1028 * If the peer disconnects before the transmission can happen,
861 * the callback is invoked with a 'NULL' buffer. 1029 * the callback is invoked with a 'NULL' buffer.
862 * 1030 *
863 * @param peer target peer 1031 * @param cp target peer
864 * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR) 1032 * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or neither (GNUNET_SYSERR)
865 * @param priority how important is this request? 1033 * @param priority how important is this request?
866 * @param timeout when does this request timeout (call gmc with error) 1034 * @param timeout when does this request timeout (call gmc with error)
@@ -870,7 +1038,7 @@ peer_transmit_timeout (void *cls,
870 * @return handle to cancel request 1038 * @return handle to cancel request
871 */ 1039 */
872struct GSF_PeerTransmitHandle * 1040struct GSF_PeerTransmitHandle *
873GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer, 1041GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
874 int is_query, 1042 int is_query,
875 uint32_t priority, 1043 uint32_t priority,
876 struct GNUNET_TIME_Relative timeout, 1044 struct GNUNET_TIME_Relative timeout,
@@ -878,7 +1046,6 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
878 GSF_GetMessageCallback gmc, 1046 GSF_GetMessageCallback gmc,
879 void *gmc_cls) 1047 void *gmc_cls)
880{ 1048{
881 struct GSF_ConnectedPeer *cp;
882 struct GSF_PeerTransmitHandle *pth; 1049 struct GSF_PeerTransmitHandle *pth;
883 struct GSF_PeerTransmitHandle *pos; 1050 struct GSF_PeerTransmitHandle *pos;
884 struct GSF_PeerTransmitHandle *prev; 1051 struct GSF_PeerTransmitHandle *prev;
@@ -886,11 +1053,8 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
886 uint64_t ip; 1053 uint64_t ip;
887 int is_ready; 1054 int is_ready;
888 1055
889 cp = GNUNET_CONTAINER_multihashmap_get (cp_map,
890 &peer->hashPubKey);
891 GNUNET_assert (NULL != cp);
892 pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle)); 1056 pth = GNUNET_malloc (sizeof (struct GSF_PeerTransmitHandle));
893 pth->transmission_request_start_time = GNUNET_TIME_absolute_now (); 1057 pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
894 pth->timeout = GNUNET_TIME_relative_to_absolute (timeout); 1058 pth->timeout = GNUNET_TIME_relative_to_absolute (timeout);
895 pth->gmc = gmc; 1059 pth->gmc = gmc;
896 pth->gmc_cls = gmc_cls; 1060 pth->gmc_cls = gmc_cls;
@@ -908,9 +1072,9 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
908 pos = pos->next; 1072 pos = pos->next;
909 } 1073 }
910 if (prev == NULL) 1074 if (prev == NULL)
911 GNUNET_CONTAINER_DLL_insert_head (cp->pth_head, 1075 GNUNET_CONTAINER_DLL_insert (cp->pth_head,
912 cp->pth_tail, 1076 cp->pth_tail,
913 pth); 1077 pth);
914 else 1078 else
915 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, 1079 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
916 cp->pth_tail, 1080 cp->pth_tail,
@@ -928,11 +1092,11 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
928 is_ready = GNUNET_YES; 1092 is_ready = GNUNET_YES;
929 ip = cp->inc_preference; 1093 ip = cp->inc_preference;
930 cp->inc_preference = 0; 1094 cp->inc_preference = 0;
931 cp->irc = GNUNET_CORE_peer_change_preference (core, 1095 cp->irc = GNUNET_CORE_peer_change_preference (GSF_core,
932 peer, 1096 &target,
933 GNUNET_TIME_UNIT_FOREVER_REL, 1097 GNUNET_TIME_UNIT_FOREVER_REL,
934 GNUNET_BANDWIDTH_VALUE_MAX, 1098 GNUNET_BANDWIDTH_VALUE_MAX,
935 GNUNET_FS_DBLOCK_SIZE, 1099 DBLOCK_SIZE,
936 ip, 1100 ip,
937 &core_reserve_callback, 1101 &core_reserve_callback,
938 cp); 1102 cp);
@@ -956,7 +1120,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
956 } 1120 }
957 if (is_ready) 1121 if (is_ready)
958 { 1122 {
959 pth->cth = GNUNET_CORE_notify_transmit_ready (core, 1123 pth->cth = GNUNET_CORE_notify_transmit_ready (GSF_core,
960 priority, 1124 priority,
961 timeout, 1125 timeout,
962 &target, 1126 &target,
@@ -986,7 +1150,6 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *peer,
986void 1150void
987GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth) 1151GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
988{ 1152{
989 struct GSF_PeerTransmitHandle *pth = cls;
990 struct GSF_ConnectedPeer *cp; 1153 struct GSF_ConnectedPeer *cp;
991 1154
992 if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK) 1155 if (pth->timeout_task != GNUNET_SCHEDULER_NO_TASK)
@@ -1026,8 +1189,8 @@ GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1026 struct GNUNET_TIME_Relative delay; 1189 struct GNUNET_TIME_Relative delay;
1027 1190
1028 delay = GNUNET_TIME_absolute_get_duration (request_time); 1191 delay = GNUNET_TIME_absolute_get_duration (request_time);
1029 cp->ppd.avg_reply_delay = (cp->ppd.avg_reply_delay * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N; 1192 cp->ppd.avg_reply_delay.rel_value = (cp->ppd.avg_reply_delay.rel_value * (RUNAVG_DELAY_N-1) + delay.rel_value) / RUNAVG_DELAY_N;
1030 cp->ppd.avg_priority = (cp->avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N; 1193 cp->ppd.avg_priority = (cp->ppd.avg_priority * (RUNAVG_DELAY_N-1) + request_priority) / RUNAVG_DELAY_N;
1031} 1194}
1032 1195
1033 1196
@@ -1040,7 +1203,7 @@ GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1040 */ 1203 */
1041void 1204void
1042GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp, 1205GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1043 const struct GSF_LocalClient *initiator_client) 1206 struct GSF_LocalClient *initiator_client)
1044{ 1207{
1045 cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client; 1208 cp->ppd.last_client_replies[cp->last_client_replies_woff++ % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1046} 1209}
@@ -1151,10 +1314,10 @@ GSF_peer_disconnect_handler_ (void *cls,
1151 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE); 1314 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1152 while (NULL != (pth = cp->pth_head)) 1315 while (NULL != (pth = cp->pth_head))
1153 { 1316 {
1154 if (NULL != pth->th) 1317 if (NULL != pth->cth)
1155 { 1318 {
1156 GNUNET_CORE_notify_transmit_ready_cancel (pth->th); 1319 GNUNET_CORE_notify_transmit_ready_cancel (pth->cth);
1157 pth->th = NULL; 1320 pth->cth = NULL;
1158 } 1321 }
1159 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 1322 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1160 cp->pth_tail, 1323 cp->pth_tail,
@@ -1243,7 +1406,7 @@ GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1243 struct GNUNET_PeerIdentity *id) 1406 struct GNUNET_PeerIdentity *id)
1244{ 1407{
1245 GNUNET_PEER_resolve (cp->ppd.pid, 1408 GNUNET_PEER_resolve (cp->ppd.pid,
1246 &id); 1409 id);
1247} 1410}
1248 1411
1249 1412
@@ -1320,12 +1483,12 @@ flush_trust (void *cls,
1320 uint32_t trust; 1483 uint32_t trust;
1321 struct GNUNET_PeerIdentity pid; 1484 struct GNUNET_PeerIdentity pid;
1322 1485
1323 if (cp->trust == cp->disk_trust) 1486 if (cp->ppd.trust == cp->disk_trust)
1324 return GNUNET_OK; /* unchanged */ 1487 return GNUNET_OK; /* unchanged */
1325 GNUNET_PEER_resolve (cp->ppd.pid, 1488 GNUNET_PEER_resolve (cp->ppd.pid,
1326 &pid); 1489 &pid);
1327 fn = get_trust_filename (&pid); 1490 fn = get_trust_filename (&pid);
1328 if (cp->trust == 0) 1491 if (cp->ppd.trust == 0)
1329 { 1492 {
1330 if ((0 != UNLINK (fn)) && (errno != ENOENT)) 1493 if ((0 != UNLINK (fn)) && (errno != ENOENT))
1331 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING | 1494 GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
@@ -1333,12 +1496,12 @@ flush_trust (void *cls,
1333 } 1496 }
1334 else 1497 else
1335 { 1498 {
1336 trust = htonl (cp->trust); 1499 trust = htonl (cp->ppd.trust);
1337 if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 1500 if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust,
1338 sizeof(uint32_t), 1501 sizeof(uint32_t),
1339 GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE 1502 GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1340 | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ)) 1503 | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1341 cp->disk_trust = cp->trust; 1504 cp->disk_trust = cp->ppd.trust;
1342 } 1505 }
1343 GNUNET_free (fn); 1506 GNUNET_free (fn);
1344 return GNUNET_OK; 1507 return GNUNET_OK;