aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2011-03-04 12:39:07 +0000
committerChristian Grothoff <christian@grothoff.org>2011-03-04 12:39:07 +0000
commit8111b29a93408e1befe322c87b468d6a86a8ae66 (patch)
tree363488aebad790bc17d7b05ceb1d53c53c4ddcb0 /src/fs/gnunet-service-fs_pr.c
parent8ef6a251c80c2a57e470e5928c44954b2d582484 (diff)
downloadgnunet-8111b29a93408e1befe322c87b468d6a86a8ae66.tar.gz
gnunet-8111b29a93408e1befe322c87b468d6a86a8ae66.zip
fixes
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c153
1 files changed, 91 insertions, 62 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index aca63ac94..3c291cfc9 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -24,6 +24,8 @@
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 */ 25 */
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_load_lib.h"
28#include "gnunet-service-fs_cp.h"
27#include "gnunet-service-fs_pr.h" 29#include "gnunet-service-fs_pr.h"
28 30
29 31
@@ -111,6 +113,14 @@ static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
111 113
112 114
113/** 115/**
116 * Maximum number of requests (from other peers, overall) that we're
117 * willing to have pending at any given point in time. Can be changed
118 * via the configuration file (32k is just the default).
119 */
120static unsigned long long max_pending_requests = (32 * 1024);
121
122
123/**
114 * How many bytes should a bloomfilter be if we have already seen 124 * How many bytes should a bloomfilter be if we have already seen
115 * entry_count responses? Note that BLOOMFILTER_K gives us the number 125 * entry_count responses? Note that BLOOMFILTER_K gives us the number
116 * of bits set per entry. Furthermore, we should not re-size the 126 * of bits set per entry. Furthermore, we should not re-size the
@@ -157,8 +167,8 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr)
157 size_t nsize; 167 size_t nsize;
158 GNUNET_HashCode mhash; 168 GNUNET_HashCode mhash;
159 169
160 nsize = compute_bloomfilter_size (pr->replies_seen_off); 170 nsize = compute_bloomfilter_size (pr->replies_seen_count);
161 if ( (bf != NULL) && 171 if ( (pr->bf != NULL) &&
162 (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) ) 172 (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
163 return GNUNET_NO; /* size not changed */ 173 return GNUNET_NO; /* size not changed */
164 if (pr->bf != NULL) 174 if (pr->bf != NULL)
@@ -221,7 +231,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
221 231
222 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); 232 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
223 pr->public_data.query = *query; 233 pr->public_data.query = *query;
224 if (GNUNET_BLOCK_TYPE_SBLOCK == type) 234 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
225 { 235 {
226 GNUNET_assert (NULL != namespace); 236 GNUNET_assert (NULL != namespace);
227 pr->public_data.namespace = *namespace; 237 pr->public_data.namespace = *namespace;
@@ -229,9 +239,9 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
229 if (NULL != target) 239 if (NULL != target)
230 { 240 {
231 pr->public_data.target = *target; 241 pr->public_data.target = *target;
232 pr->has_target = GNUNET_YES; 242 pr->public_data.has_target = GNUNET_YES;
233 } 243 }
234 pr->public_data.anonymity_level = anonymity_data; 244 pr->public_data.anonymity_level = anonymity_level;
235 pr->public_data.priority = priority; 245 pr->public_data.priority = priority;
236 pr->public_data.original_priority = priority; 246 pr->public_data.original_priority = priority;
237 pr->public_data.options = options; 247 pr->public_data.options = options;
@@ -240,19 +250,19 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
240 pr->rh = rh; 250 pr->rh = rh;
241 pr->rh_cls = rh_cls; 251 pr->rh_cls = rh_cls;
242 if (ttl >= 0) 252 if (ttl >= 0)
243 pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 253 pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
244 (uint32_t) ttl)); 254 (uint32_t) ttl));
245 else 255 else
246 pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time, 256 pr->public_data.ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
247 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 257 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
248 (uint32_t) (- ttl))); 258 (uint32_t) (- ttl)));
249 if (replies_seen_count > 0) 259 if (replies_seen_count > 0)
250 { 260 {
251 pr->replies_seen_size = replies_seen_count; 261 pr->replies_seen_size = replies_seen_count;
252 pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size); 262 pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
253 memcpy (pr->replies_seen, 263 memcpy (pr->replies_seen,
254 replies_seen, 264 replies_seen,
255 replies_seen_count * sizeof (struct GNUNET_HashCode)); 265 replies_seen_count * sizeof (GNUNET_HashCode));
256 pr->replies_seen_count = replies_seen_count; 266 pr->replies_seen_count = replies_seen_count;
257 } 267 }
258 if (NULL != bf_data) 268 if (NULL != bf_data)
@@ -275,7 +285,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
275 { 285 {
276 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, 286 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
277 pr, 287 pr,
278 pr->ttl.abs_value); 288 pr->public_data.ttl.abs_value);
279 /* make sure we don't track too many requests */ 289 /* make sure we don't track too many requests */
280 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) 290 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
281 { 291 {
@@ -326,7 +336,7 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
326 336
327 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count) 337 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
328 return; /* integer overflow */ 338 return; /* integer overflow */
329 if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) 339 if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
330 { 340 {
331 /* we're responsible for the BF, full refresh */ 341 /* we're responsible for the BF, full refresh */
332 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size) 342 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
@@ -336,7 +346,7 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
336 memcpy (&pr->replies_seen[pr->replies_seen_count], 346 memcpy (&pr->replies_seen[pr->replies_seen_count],
337 replies_seen, 347 replies_seen,
338 sizeof (GNUNET_HashCode) * replies_seen_count); 348 sizeof (GNUNET_HashCode) * replies_seen_count);
339 pr->replies_seen_count += replies_seen; 349 pr->replies_seen_count += replies_seen_count;
340 if (GNUNET_NO == refresh_bloomfilter (pr)) 350 if (GNUNET_NO == refresh_bloomfilter (pr))
341 { 351 {
342 /* bf not recalculated, simply extend it with new bits */ 352 /* bf not recalculated, simply extend it with new bits */
@@ -357,8 +367,8 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
357 any bloom-filter, so we need to create one on-the-fly */ 367 any bloom-filter, so we need to create one on-the-fly */
358 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 368 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
359 UINT32_MAX); 369 UINT32_MAX);
360 pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count), 370 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
361 pr->mingle, 371 compute_bloomfilter_size (replies_seen_count),
362 BLOOMFILTER_K); 372 BLOOMFILTER_K);
363 } 373 }
364 for (i=0;i<pr->replies_seen_count;i++) 374 for (i=0;i<pr->replies_seen_count;i++)
@@ -388,16 +398,16 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
388 size_t buf_size, 398 size_t buf_size,
389 void *buf) 399 void *buf)
390{ 400{
391 struct PendingMessage *pm;
392 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; 401 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
393 struct GetMessage *gm; 402 struct GetMessage *gm;
394 GNUNET_HashCode *ext; 403 GNUNET_HashCode *ext;
395 size_t msize; 404 size_t msize;
396 unsigned int k; 405 unsigned int k;
397 int no_route;
398 uint32_t bm; 406 uint32_t bm;
399 uint32_t prio; 407 uint32_t prio;
400 size_t bf_size; 408 size_t bf_size;
409 struct GNUNET_TIME_Absolute now;
410 int64_t ttl;
401 411
402 k = 0; 412 k = 0;
403 bm = 0; 413 bm = 0;
@@ -406,12 +416,12 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
406 bm |= GET_MESSAGE_BIT_RETURN_TO; 416 bm |= GET_MESSAGE_BIT_RETURN_TO;
407 k++; 417 k++;
408 } 418 }
409 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type) 419 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
410 { 420 {
411 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE; 421 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
412 k++; 422 k++;
413 } 423 }
414 if (GNUNET_YES == pr->has_target) 424 if (GNUNET_YES == pr->public_data.has_target)
415 { 425 {
416 bm |= GET_MESSAGE_BIT_TRANSMIT_TO; 426 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
417 k++; 427 k++;
@@ -424,7 +434,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
424 gm = (struct GetMessage*) lbuf; 434 gm = (struct GetMessage*) lbuf;
425 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); 435 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
426 gm->header.size = htons (msize); 436 gm->header.size = htons (msize);
427 gm->type = htonl (pr->type); 437 gm->type = htonl (pr->public_data.type);
428 if (GNUNET_YES == do_route) 438 if (GNUNET_YES == do_route)
429 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 439 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
430 pr->public_data.priority + 1); 440 pr->public_data.priority + 1);
@@ -432,18 +442,24 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
432 prio = 0; 442 prio = 0;
433 pr->public_data.priority -= prio; 443 pr->public_data.priority -= prio;
434 gm->priority = htonl (prio); 444 gm->priority = htonl (prio);
435 gm->ttl = htonl (pr->ttl); 445 now = GNUNET_TIME_absolute_get ();
446 ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value);
447 gm->ttl = htonl (ttl / 1000);
436 gm->filter_mutator = htonl(pr->mingle); 448 gm->filter_mutator = htonl(pr->mingle);
437 gm->hash_bitmap = htonl (bm); 449 gm->hash_bitmap = htonl (bm);
438 gm->query = pr->query; 450 gm->query = pr->public_data.query;
439 ext = (GNUNET_HashCode*) &gm[1]; 451 ext = (GNUNET_HashCode*) &gm[1];
440 k = 0; 452 k = 0;
441 if (GNUNET_YES != do_route) 453 if (GNUNET_YES != do_route)
442 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]); 454 GNUNET_PEER_resolve (pr->cp->pid,
443 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type) 455 (struct GNUNET_PeerIdentity*) &ext[k++]);
444 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode)); 456 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
445 if (GNUNET_YES == pr->has_target) 457 memcpy (&ext[k++],
446 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]); 458 &pr->public_data.namespace,
459 sizeof (GNUNET_HashCode));
460 if (GNUNET_YES == pr->public_data.has_target)
461 GNUNET_PEER_resolve (pr->public_data.target_pid,
462 (struct GNUNET_PeerIdentity*) &ext[k++]);
447 if (pr->bf != NULL) 463 if (pr->bf != NULL)
448 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, 464 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
449 (char*) &ext[k], 465 (char*) &ext[k],
@@ -582,13 +598,10 @@ static void
582update_request_performance_data (struct ProcessReplyClosure *prq, 598update_request_performance_data (struct ProcessReplyClosure *prq,
583 struct GSF_PendingRequest *pr) 599 struct GSF_PendingRequest *pr)
584{ 600{
585 unsigned int i;
586 struct GNUNET_TIME_Relative cur_delay;
587
588 if (prq->sender == NULL) 601 if (prq->sender == NULL)
589 return; 602 return;
590 GSF_peer_update_performance_ (prq->sender, 603 GSF_peer_update_performance_ (prq->sender,
591 pr->start_time, 604 pr->public_data.start_time,
592 prq->priority); 605 prq->priority);
593} 606}
594 607
@@ -608,12 +621,6 @@ process_reply (void *cls,
608{ 621{
609 struct ProcessReplyClosure *prq = cls; 622 struct ProcessReplyClosure *prq = cls;
610 struct GSF_PendingRequest *pr = value; 623 struct GSF_PendingRequest *pr = value;
611 struct PendingMessage *reply;
612 struct ClientResponseMessage *creply;
613 struct ClientList *cl;
614 struct PutMessage *pm;
615 struct ConnectedPeer *cp;
616 size_t msize;
617 GNUNET_HashCode chash; 624 GNUNET_HashCode chash;
618 625
619#if DEBUG_FS 626#if DEBUG_FS
@@ -622,16 +629,17 @@ process_reply (void *cls,
622 (unsigned int) prq->type, 629 (unsigned int) prq->type,
623 GNUNET_h2s (key)); 630 GNUNET_h2s (key));
624#endif 631#endif
625 GNUNET_STATISTICS_update (stats, 632 GNUNET_STATISTICS_update (GSF_stats,
626 gettext_noop ("# replies received and matched"), 633 gettext_noop ("# replies received and matched"),
627 1, 634 1,
628 GNUNET_NO); 635 GNUNET_NO);
629 prq->eval = GNUNET_BLOCK_evaluate (block_ctx, 636 prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx,
630 prq->type, 637 prq->type,
631 key, 638 key,
632 &pr->bf, 639 &pr->bf,
633 pr->mingle, 640 pr->mingle,
634 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0, 641 &pr->public_data.namespace,
642 (prq->type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof (GNUNET_HashCode) : 0,
635 prq->data, 643 prq->data,
636 prq->size); 644 prq->size);
637 switch (prq->eval) 645 switch (prq->eval)
@@ -642,15 +650,19 @@ process_reply (void *cls,
642 case GNUNET_BLOCK_EVALUATION_OK_LAST: 650 case GNUNET_BLOCK_EVALUATION_OK_LAST:
643 /* short cut: stop processing early, no BF-update, etc. */ 651 /* short cut: stop processing early, no BF-update, etc. */
644 update_request_performance_data (prq, pr); 652 update_request_performance_data (prq, pr);
645 GNUNET_LOAD_update (rt_entry_lifetime, 653 GNUNET_LOAD_update (GSF_rt_entry_lifetime,
646 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value); 654 GNUNET_TIME_absolute_get_duration (pr->public_data.start_time).rel_value);
647 /* pass on to other peers / local clients */ 655 /* pass on to other peers / local clients */
648 pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_NO); 656 pr->rh (pr->rh_cls,
657 pr,
658 prq->expiration,
659 prq->data, prq->size,
660 GNUNET_NO);
649 /* destroy request, we're done */ 661 /* destroy request, we're done */
650 GSF_pending_request_cancel_ (pr); 662 GSF_pending_request_cancel_ (pr);
651 return GNUNET_YES; 663 return GNUNET_YES;
652 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: 664 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
653 GNUNET_STATISTICS_update (stats, 665 GNUNET_STATISTICS_update (GSF_stats,
654 gettext_noop ("# duplicate replies discarded (bloomfilter)"), 666 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
655 1, 667 1,
656 GNUNET_NO); 668 GNUNET_NO);
@@ -686,18 +698,22 @@ process_reply (void *cls,
686 "Found result for query `%s' in local datastore\n", 698 "Found result for query `%s' in local datastore\n",
687 GNUNET_h2s (key)); 699 GNUNET_h2s (key));
688#endif 700#endif
689 GNUNET_STATISTICS_update (stats, 701 GNUNET_STATISTICS_update (GSF_stats,
690 gettext_noop ("# results found locally"), 702 gettext_noop ("# results found locally"),
691 1, 703 1,
692 GNUNET_NO); 704 GNUNET_NO);
693 } 705 }
694 prq->priority += pr->public_data.original_priority; 706 prq->priority += pr->public_data.original_priority;
695 pr->public_data.remaining_priority = 0; 707 pr->public_data.priority = 0;
696 pr->public_data.original_priority = 0; 708 pr->public_data.original_priority = 0;
697 pr->public_data.results_found++; 709 pr->public_data.results_found++;
698 prq->request_found = GNUNET_YES; 710 prq->request_found = GNUNET_YES;
699 /* finally, pass on to other peer / local client */ 711 /* finally, pass on to other peer / local client */
700 pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES); 712 pr->rh (pr->rh_cls,
713 pr,
714 prq->expiration,
715 prq->data, prq->size,
716 GNUNET_YES);
701 return GNUNET_YES; 717 return GNUNET_YES;
702} 718}
703 719
@@ -725,7 +741,7 @@ put_migration_continuation (void *cls,
725 delay.rel_value); 741 delay.rel_value);
726 if (GNUNET_OK == success) 742 if (GNUNET_OK == success)
727 return; 743 return;
728 GNUNET_STATISTICS_update (stats, 744 GNUNET_STATISTICS_update (GSF_stats,
729 gettext_noop ("# datastore 'put' failures"), 745 gettext_noop ("# datastore 'put' failures"),
730 1, 746 1,
731 GNUNET_NO); 747 GNUNET_NO);
@@ -750,7 +766,7 @@ test_put_load_too_high (uint32_t priority)
750 ld = GNUNET_LOAD_get_load (datastore_put_load); 766 ld = GNUNET_LOAD_get_load (datastore_put_load);
751 if (ld < 2.0 * (1 + priority)) 767 if (ld < 2.0 * (1 + priority))
752 return GNUNET_NO; 768 return GNUNET_NO;
753 GNUNET_STATISTICS_update (stats, 769 GNUNET_STATISTICS_update (GSF_stats,
754 gettext_noop ("# storage requests dropped due to high load"), 770 gettext_noop ("# storage requests dropped due to high load"),
755 1, 771 1,
756 GNUNET_NO); 772 GNUNET_NO);
@@ -776,7 +792,7 @@ test_put_load_too_high (uint32_t priority)
776void 792void
777GSF_handle_dht_reply_ (void *cls, 793GSF_handle_dht_reply_ (void *cls,
778 struct GNUNET_TIME_Absolute exp, 794 struct GNUNET_TIME_Absolute exp,
779 const GNUNET_HashCode * key, 795 const GNUNET_HashCode *key,
780 const struct GNUNET_PeerIdentity * const *get_path, 796 const struct GNUNET_PeerIdentity * const *get_path,
781 const struct GNUNET_PeerIdentity * const *put_path, 797 const struct GNUNET_PeerIdentity * const *put_path,
782 enum GNUNET_BLOCK_Type type, 798 enum GNUNET_BLOCK_Type type,
@@ -785,6 +801,7 @@ GSF_handle_dht_reply_ (void *cls,
785{ 801{
786 struct GSF_PendingRequest *pr = cls; 802 struct GSF_PendingRequest *pr = cls;
787 struct ProcessReplyClosure prq; 803 struct ProcessReplyClosure prq;
804 struct GNUNET_TIME_Absolute *start;
788 805
789 memset (&prq, 0, sizeof (prq)); 806 memset (&prq, 0, sizeof (prq));
790 prq.data = data; 807 prq.data = data;
@@ -803,10 +820,10 @@ GSF_handle_dht_reply_ (void *cls,
803#endif 820#endif
804 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); 821 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
805 *start = GNUNET_TIME_absolute_get (); 822 *start = GNUNET_TIME_absolute_get ();
806 GNUNET_DATASTORE_put (dsh, 823 GNUNET_DATASTORE_put (GSF_dsh,
807 0, &query, dsize, &put[1], 824 0, key, size, data,
808 type, prq.priority, 1 /* anonymity */, 825 type, prq.priority, 1 /* anonymity */,
809 expiration, 826 exp,
810 1 + prq.priority, MAX_DATASTORE_QUEUE, 827 1 + prq.priority, MAX_DATASTORE_QUEUE,
811 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 828 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
812 &put_migration_continuation, 829 &put_migration_continuation,
@@ -856,7 +873,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
856 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 873 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
857 return GNUNET_SYSERR; 874 return GNUNET_SYSERR;
858 if (GNUNET_OK != 875 if (GNUNET_OK !=
859 GNUNET_BLOCK_get_key (block_ctx, 876 GNUNET_BLOCK_get_key (GSF_block_ctx,
860 type, 877 type,
861 &put[1], 878 &put[1],
862 dsize, 879 dsize,
@@ -878,14 +895,14 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
878 prq.anonymity_level = 1; 895 prq.anonymity_level = 1;
879 prq.finished = GNUNET_NO; 896 prq.finished = GNUNET_NO;
880 prq.request_found = GNUNET_NO; 897 prq.request_found = GNUNET_NO;
881 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, 898 GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
882 &query, 899 &query,
883 &process_reply, 900 &process_reply,
884 &prq); 901 &prq);
885 if (NULL != cp) 902 if (NULL != cp)
886 { 903 {
887 GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority); 904 GSF_connected_peer_change_preference_ (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
888 GSF_get_peer_performance_data (cp)->trust += prq.priority; 905 GSF_get_peer_performance_data_ (cp)->trust += prq.priority;
889 } 906 }
890 if ( (GNUNET_YES == active_to_migration) && 907 if ( (GNUNET_YES == active_to_migration) &&
891 (GNUNET_NO == test_put_load_too_high (prq.priority)) ) 908 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
@@ -898,7 +915,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
898#endif 915#endif
899 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); 916 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
900 *start = GNUNET_TIME_absolute_get (); 917 *start = GNUNET_TIME_absolute_get ();
901 GNUNET_DATASTORE_put (dsh, 918 GNUNET_DATASTORE_put (GSF_dsh,
902 0, &query, dsize, &put[1], 919 0, &query, dsize, &put[1],
903 type, prq.priority, 1 /* anonymity */, 920 type, prq.priority, 1 /* anonymity */,
904 expiration, 921 expiration,
@@ -918,7 +935,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
918 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 935 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
919 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 936 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
920 (unsigned int) (60000 * putl * putl))); 937 (unsigned int) (60000 * putl * putl)));
921 GSF_block_peer_migration (cp, block_time); 938 GSF_block_peer_migration_ (cp, block_time);
922 } 939 }
923 return GNUNET_OK; 940 return GNUNET_OK;
924} 941}
@@ -926,10 +943,22 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
926 943
927/** 944/**
928 * Setup the subsystem. 945 * Setup the subsystem.
946 *
947 * @param cfg configuration to use
929 */ 948 */
930void 949void
931GSF_pending_request_init_ () 950GSF_pending_request_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
932{ 951{
952 if (GNUNET_OK !=
953 GNUNET_CONFIGURATION_get_value_number (cfg,
954 "fs",
955 "MAX_PENDING_REQUESTS",
956 &max_pending_requests))
957 {
958 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
959 _("Configuration fails to specify `%s', assuming default value."),
960 "MAX_PENDING_REQUESTS");
961 }
933 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024); 962 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
934 requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 963 requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
935} 964}