diff options
author | Christian Grothoff <christian@grothoff.org> | 2011-03-04 12:39:07 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2011-03-04 12:39:07 +0000 |
commit | 8111b29a93408e1befe322c87b468d6a86a8ae66 (patch) | |
tree | 363488aebad790bc17d7b05ceb1d53c53c4ddcb0 /src/fs/gnunet-service-fs_pr.c | |
parent | 8ef6a251c80c2a57e470e5928c44954b2d582484 (diff) | |
download | gnunet-8111b29a93408e1befe322c87b468d6a86a8ae66.tar.gz gnunet-8111b29a93408e1befe322c87b468d6a86a8ae66.zip |
fixes
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r-- | src/fs/gnunet-service-fs_pr.c | 153 |
1 files changed, 91 insertions, 62 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index aca63ac94..3c291cfc9 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c | |||
@@ -24,6 +24,8 @@ | |||
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | */ | 25 | */ |
26 | #include "platform.h" | 26 | #include "platform.h" |
27 | #include "gnunet_load_lib.h" | ||
28 | #include "gnunet-service-fs_cp.h" | ||
27 | #include "gnunet-service-fs_pr.h" | 29 | #include "gnunet-service-fs_pr.h" |
28 | 30 | ||
29 | 31 | ||
@@ -111,6 +113,14 @@ static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap; | |||
111 | 113 | ||
112 | 114 | ||
113 | /** | 115 | /** |
116 | * Maximum number of requests (from other peers, overall) that we're | ||
117 | * willing to have pending at any given point in time. Can be changed | ||
118 | * via the configuration file (32k is just the default). | ||
119 | */ | ||
120 | static unsigned long long max_pending_requests = (32 * 1024); | ||
121 | |||
122 | |||
123 | /** | ||
114 | * How many bytes should a bloomfilter be if we have already seen | 124 | * How many bytes should a bloomfilter be if we have already seen |
115 | * entry_count responses? Note that BLOOMFILTER_K gives us the number | 125 | * entry_count responses? Note that BLOOMFILTER_K gives us the number |
116 | * of bits set per entry. Furthermore, we should not re-size the | 126 | * of bits set per entry. Furthermore, we should not re-size the |
@@ -157,8 +167,8 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr) | |||
157 | size_t nsize; | 167 | size_t nsize; |
158 | GNUNET_HashCode mhash; | 168 | GNUNET_HashCode mhash; |
159 | 169 | ||
160 | nsize = compute_bloomfilter_size (pr->replies_seen_off); | 170 | nsize = compute_bloomfilter_size (pr->replies_seen_count); |
161 | if ( (bf != NULL) && | 171 | if ( (pr->bf != NULL) && |
162 | (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) ) | 172 | (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) ) |
163 | return GNUNET_NO; /* size not changed */ | 173 | return GNUNET_NO; /* size not changed */ |
164 | if (pr->bf != NULL) | 174 | if (pr->bf != NULL) |
@@ -221,7 +231,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, | |||
221 | 231 | ||
222 | pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); | 232 | pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); |
223 | pr->public_data.query = *query; | 233 | pr->public_data.query = *query; |
224 | if (GNUNET_BLOCK_TYPE_SBLOCK == type) | 234 | if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type) |
225 | { | 235 | { |
226 | GNUNET_assert (NULL != namespace); | 236 | GNUNET_assert (NULL != namespace); |
227 | pr->public_data.namespace = *namespace; | 237 | pr->public_data.namespace = *namespace; |
@@ -229,9 +239,9 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, | |||
229 | if (NULL != target) | 239 | if (NULL != target) |
230 | { | 240 | { |
231 | pr->public_data.target = *target; | 241 | pr->public_data.target = *target; |
232 | pr->has_target = GNUNET_YES; | 242 | pr->public_data.has_target = GNUNET_YES; |
233 | } | 243 | } |
234 | pr->public_data.anonymity_level = anonymity_data; | 244 | pr->public_data.anonymity_level = anonymity_level; |
235 | pr->public_data.priority = priority; | 245 | pr->public_data.priority = priority; |
236 | pr->public_data.original_priority = priority; | 246 | pr->public_data.original_priority = priority; |
237 | pr->public_data.options = options; | 247 | pr->public_data.options = options; |
@@ -240,19 +250,19 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, | |||
240 | pr->rh = rh; | 250 | pr->rh = rh; |
241 | pr->rh_cls = rh_cls; | 251 | pr->rh_cls = rh_cls; |
242 | if (ttl >= 0) | 252 | if (ttl >= 0) |
243 | pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, | 253 | pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, |
244 | (uint32_t) ttl)); | 254 | (uint32_t) ttl)); |
245 | else | 255 | else |
246 | pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time, | 256 | pr->public_data.ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time, |
247 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, | 257 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, |
248 | (uint32_t) (- ttl))); | 258 | (uint32_t) (- ttl))); |
249 | if (replies_seen_count > 0) | 259 | if (replies_seen_count > 0) |
250 | { | 260 | { |
251 | pr->replies_seen_size = replies_seen_count; | 261 | pr->replies_seen_size = replies_seen_count; |
252 | pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size); | 262 | pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size); |
253 | memcpy (pr->replies_seen, | 263 | memcpy (pr->replies_seen, |
254 | replies_seen, | 264 | replies_seen, |
255 | replies_seen_count * sizeof (struct GNUNET_HashCode)); | 265 | replies_seen_count * sizeof (GNUNET_HashCode)); |
256 | pr->replies_seen_count = replies_seen_count; | 266 | pr->replies_seen_count = replies_seen_count; |
257 | } | 267 | } |
258 | if (NULL != bf_data) | 268 | if (NULL != bf_data) |
@@ -275,7 +285,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, | |||
275 | { | 285 | { |
276 | pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, | 286 | pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, |
277 | pr, | 287 | pr, |
278 | pr->ttl.abs_value); | 288 | pr->public_data.ttl.abs_value); |
279 | /* make sure we don't track too many requests */ | 289 | /* make sure we don't track too many requests */ |
280 | while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) | 290 | while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) |
281 | { | 291 | { |
@@ -326,7 +336,7 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, | |||
326 | 336 | ||
327 | if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count) | 337 | if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count) |
328 | return; /* integer overflow */ | 338 | return; /* integer overflow */ |
329 | if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) | 339 | if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) |
330 | { | 340 | { |
331 | /* we're responsible for the BF, full refresh */ | 341 | /* we're responsible for the BF, full refresh */ |
332 | if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size) | 342 | if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size) |
@@ -336,7 +346,7 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, | |||
336 | memcpy (&pr->replies_seen[pr->replies_seen_count], | 346 | memcpy (&pr->replies_seen[pr->replies_seen_count], |
337 | replies_seen, | 347 | replies_seen, |
338 | sizeof (GNUNET_HashCode) * replies_seen_count); | 348 | sizeof (GNUNET_HashCode) * replies_seen_count); |
339 | pr->replies_seen_count += replies_seen; | 349 | pr->replies_seen_count += replies_seen_count; |
340 | if (GNUNET_NO == refresh_bloomfilter (pr)) | 350 | if (GNUNET_NO == refresh_bloomfilter (pr)) |
341 | { | 351 | { |
342 | /* bf not recalculated, simply extend it with new bits */ | 352 | /* bf not recalculated, simply extend it with new bits */ |
@@ -357,8 +367,8 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr, | |||
357 | any bloom-filter, so we need to create one on-the-fly */ | 367 | any bloom-filter, so we need to create one on-the-fly */ |
358 | pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | 368 | pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, |
359 | UINT32_MAX); | 369 | UINT32_MAX); |
360 | pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count), | 370 | pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, |
361 | pr->mingle, | 371 | compute_bloomfilter_size (replies_seen_count), |
362 | BLOOMFILTER_K); | 372 | BLOOMFILTER_K); |
363 | } | 373 | } |
364 | for (i=0;i<pr->replies_seen_count;i++) | 374 | for (i=0;i<pr->replies_seen_count;i++) |
@@ -388,16 +398,16 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
388 | size_t buf_size, | 398 | size_t buf_size, |
389 | void *buf) | 399 | void *buf) |
390 | { | 400 | { |
391 | struct PendingMessage *pm; | ||
392 | char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; | 401 | char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; |
393 | struct GetMessage *gm; | 402 | struct GetMessage *gm; |
394 | GNUNET_HashCode *ext; | 403 | GNUNET_HashCode *ext; |
395 | size_t msize; | 404 | size_t msize; |
396 | unsigned int k; | 405 | unsigned int k; |
397 | int no_route; | ||
398 | uint32_t bm; | 406 | uint32_t bm; |
399 | uint32_t prio; | 407 | uint32_t prio; |
400 | size_t bf_size; | 408 | size_t bf_size; |
409 | struct GNUNET_TIME_Absolute now; | ||
410 | int64_t ttl; | ||
401 | 411 | ||
402 | k = 0; | 412 | k = 0; |
403 | bm = 0; | 413 | bm = 0; |
@@ -406,12 +416,12 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
406 | bm |= GET_MESSAGE_BIT_RETURN_TO; | 416 | bm |= GET_MESSAGE_BIT_RETURN_TO; |
407 | k++; | 417 | k++; |
408 | } | 418 | } |
409 | if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type) | 419 | if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) |
410 | { | 420 | { |
411 | bm |= GET_MESSAGE_BIT_SKS_NAMESPACE; | 421 | bm |= GET_MESSAGE_BIT_SKS_NAMESPACE; |
412 | k++; | 422 | k++; |
413 | } | 423 | } |
414 | if (GNUNET_YES == pr->has_target) | 424 | if (GNUNET_YES == pr->public_data.has_target) |
415 | { | 425 | { |
416 | bm |= GET_MESSAGE_BIT_TRANSMIT_TO; | 426 | bm |= GET_MESSAGE_BIT_TRANSMIT_TO; |
417 | k++; | 427 | k++; |
@@ -424,7 +434,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
424 | gm = (struct GetMessage*) lbuf; | 434 | gm = (struct GetMessage*) lbuf; |
425 | gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); | 435 | gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); |
426 | gm->header.size = htons (msize); | 436 | gm->header.size = htons (msize); |
427 | gm->type = htonl (pr->type); | 437 | gm->type = htonl (pr->public_data.type); |
428 | if (GNUNET_YES == do_route) | 438 | if (GNUNET_YES == do_route) |
429 | prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | 439 | prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, |
430 | pr->public_data.priority + 1); | 440 | pr->public_data.priority + 1); |
@@ -432,18 +442,24 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, | |||
432 | prio = 0; | 442 | prio = 0; |
433 | pr->public_data.priority -= prio; | 443 | pr->public_data.priority -= prio; |
434 | gm->priority = htonl (prio); | 444 | gm->priority = htonl (prio); |
435 | gm->ttl = htonl (pr->ttl); | 445 | now = GNUNET_TIME_absolute_get (); |
446 | ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value); | ||
447 | gm->ttl = htonl (ttl / 1000); | ||
436 | gm->filter_mutator = htonl(pr->mingle); | 448 | gm->filter_mutator = htonl(pr->mingle); |
437 | gm->hash_bitmap = htonl (bm); | 449 | gm->hash_bitmap = htonl (bm); |
438 | gm->query = pr->query; | 450 | gm->query = pr->public_data.query; |
439 | ext = (GNUNET_HashCode*) &gm[1]; | 451 | ext = (GNUNET_HashCode*) &gm[1]; |
440 | k = 0; | 452 | k = 0; |
441 | if (GNUNET_YES != do_route) | 453 | if (GNUNET_YES != do_route) |
442 | GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]); | 454 | GNUNET_PEER_resolve (pr->cp->pid, |
443 | if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type) | 455 | (struct GNUNET_PeerIdentity*) &ext[k++]); |
444 | memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode)); | 456 | if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) |
445 | if (GNUNET_YES == pr->has_target) | 457 | memcpy (&ext[k++], |
446 | GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]); | 458 | &pr->public_data.namespace, |
459 | sizeof (GNUNET_HashCode)); | ||
460 | if (GNUNET_YES == pr->public_data.has_target) | ||
461 | GNUNET_PEER_resolve (pr->public_data.target_pid, | ||
462 | (struct GNUNET_PeerIdentity*) &ext[k++]); | ||
447 | if (pr->bf != NULL) | 463 | if (pr->bf != NULL) |
448 | GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, | 464 | GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, |
449 | (char*) &ext[k], | 465 | (char*) &ext[k], |
@@ -582,13 +598,10 @@ static void | |||
582 | update_request_performance_data (struct ProcessReplyClosure *prq, | 598 | update_request_performance_data (struct ProcessReplyClosure *prq, |
583 | struct GSF_PendingRequest *pr) | 599 | struct GSF_PendingRequest *pr) |
584 | { | 600 | { |
585 | unsigned int i; | ||
586 | struct GNUNET_TIME_Relative cur_delay; | ||
587 | |||
588 | if (prq->sender == NULL) | 601 | if (prq->sender == NULL) |
589 | return; | 602 | return; |
590 | GSF_peer_update_performance_ (prq->sender, | 603 | GSF_peer_update_performance_ (prq->sender, |
591 | pr->start_time, | 604 | pr->public_data.start_time, |
592 | prq->priority); | 605 | prq->priority); |
593 | } | 606 | } |
594 | 607 | ||
@@ -608,12 +621,6 @@ process_reply (void *cls, | |||
608 | { | 621 | { |
609 | struct ProcessReplyClosure *prq = cls; | 622 | struct ProcessReplyClosure *prq = cls; |
610 | struct GSF_PendingRequest *pr = value; | 623 | struct GSF_PendingRequest *pr = value; |
611 | struct PendingMessage *reply; | ||
612 | struct ClientResponseMessage *creply; | ||
613 | struct ClientList *cl; | ||
614 | struct PutMessage *pm; | ||
615 | struct ConnectedPeer *cp; | ||
616 | size_t msize; | ||
617 | GNUNET_HashCode chash; | 624 | GNUNET_HashCode chash; |
618 | 625 | ||
619 | #if DEBUG_FS | 626 | #if DEBUG_FS |
@@ -622,16 +629,17 @@ process_reply (void *cls, | |||
622 | (unsigned int) prq->type, | 629 | (unsigned int) prq->type, |
623 | GNUNET_h2s (key)); | 630 | GNUNET_h2s (key)); |
624 | #endif | 631 | #endif |
625 | GNUNET_STATISTICS_update (stats, | 632 | GNUNET_STATISTICS_update (GSF_stats, |
626 | gettext_noop ("# replies received and matched"), | 633 | gettext_noop ("# replies received and matched"), |
627 | 1, | 634 | 1, |
628 | GNUNET_NO); | 635 | GNUNET_NO); |
629 | prq->eval = GNUNET_BLOCK_evaluate (block_ctx, | 636 | prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx, |
630 | prq->type, | 637 | prq->type, |
631 | key, | 638 | key, |
632 | &pr->bf, | 639 | &pr->bf, |
633 | pr->mingle, | 640 | pr->mingle, |
634 | pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0, | 641 | &pr->public_data.namespace, |
642 | (prq->type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof (GNUNET_HashCode) : 0, | ||
635 | prq->data, | 643 | prq->data, |
636 | prq->size); | 644 | prq->size); |
637 | switch (prq->eval) | 645 | switch (prq->eval) |
@@ -642,15 +650,19 @@ process_reply (void *cls, | |||
642 | case GNUNET_BLOCK_EVALUATION_OK_LAST: | 650 | case GNUNET_BLOCK_EVALUATION_OK_LAST: |
643 | /* short cut: stop processing early, no BF-update, etc. */ | 651 | /* short cut: stop processing early, no BF-update, etc. */ |
644 | update_request_performance_data (prq, pr); | 652 | update_request_performance_data (prq, pr); |
645 | GNUNET_LOAD_update (rt_entry_lifetime, | 653 | GNUNET_LOAD_update (GSF_rt_entry_lifetime, |
646 | GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value); | 654 | GNUNET_TIME_absolute_get_duration (pr->public_data.start_time).rel_value); |
647 | /* pass on to other peers / local clients */ | 655 | /* pass on to other peers / local clients */ |
648 | pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_NO); | 656 | pr->rh (pr->rh_cls, |
657 | pr, | ||
658 | prq->expiration, | ||
659 | prq->data, prq->size, | ||
660 | GNUNET_NO); | ||
649 | /* destroy request, we're done */ | 661 | /* destroy request, we're done */ |
650 | GSF_pending_request_cancel_ (pr); | 662 | GSF_pending_request_cancel_ (pr); |
651 | return GNUNET_YES; | 663 | return GNUNET_YES; |
652 | case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: | 664 | case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: |
653 | GNUNET_STATISTICS_update (stats, | 665 | GNUNET_STATISTICS_update (GSF_stats, |
654 | gettext_noop ("# duplicate replies discarded (bloomfilter)"), | 666 | gettext_noop ("# duplicate replies discarded (bloomfilter)"), |
655 | 1, | 667 | 1, |
656 | GNUNET_NO); | 668 | GNUNET_NO); |
@@ -686,18 +698,22 @@ process_reply (void *cls, | |||
686 | "Found result for query `%s' in local datastore\n", | 698 | "Found result for query `%s' in local datastore\n", |
687 | GNUNET_h2s (key)); | 699 | GNUNET_h2s (key)); |
688 | #endif | 700 | #endif |
689 | GNUNET_STATISTICS_update (stats, | 701 | GNUNET_STATISTICS_update (GSF_stats, |
690 | gettext_noop ("# results found locally"), | 702 | gettext_noop ("# results found locally"), |
691 | 1, | 703 | 1, |
692 | GNUNET_NO); | 704 | GNUNET_NO); |
693 | } | 705 | } |
694 | prq->priority += pr->public_data.original_priority; | 706 | prq->priority += pr->public_data.original_priority; |
695 | pr->public_data.remaining_priority = 0; | 707 | pr->public_data.priority = 0; |
696 | pr->public_data.original_priority = 0; | 708 | pr->public_data.original_priority = 0; |
697 | pr->public_data.results_found++; | 709 | pr->public_data.results_found++; |
698 | prq->request_found = GNUNET_YES; | 710 | prq->request_found = GNUNET_YES; |
699 | /* finally, pass on to other peer / local client */ | 711 | /* finally, pass on to other peer / local client */ |
700 | pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES); | 712 | pr->rh (pr->rh_cls, |
713 | pr, | ||
714 | prq->expiration, | ||
715 | prq->data, prq->size, | ||
716 | GNUNET_YES); | ||
701 | return GNUNET_YES; | 717 | return GNUNET_YES; |
702 | } | 718 | } |
703 | 719 | ||
@@ -725,7 +741,7 @@ put_migration_continuation (void *cls, | |||
725 | delay.rel_value); | 741 | delay.rel_value); |
726 | if (GNUNET_OK == success) | 742 | if (GNUNET_OK == success) |
727 | return; | 743 | return; |
728 | GNUNET_STATISTICS_update (stats, | 744 | GNUNET_STATISTICS_update (GSF_stats, |
729 | gettext_noop ("# datastore 'put' failures"), | 745 | gettext_noop ("# datastore 'put' failures"), |
730 | 1, | 746 | 1, |
731 | GNUNET_NO); | 747 | GNUNET_NO); |
@@ -750,7 +766,7 @@ test_put_load_too_high (uint32_t priority) | |||
750 | ld = GNUNET_LOAD_get_load (datastore_put_load); | 766 | ld = GNUNET_LOAD_get_load (datastore_put_load); |
751 | if (ld < 2.0 * (1 + priority)) | 767 | if (ld < 2.0 * (1 + priority)) |
752 | return GNUNET_NO; | 768 | return GNUNET_NO; |
753 | GNUNET_STATISTICS_update (stats, | 769 | GNUNET_STATISTICS_update (GSF_stats, |
754 | gettext_noop ("# storage requests dropped due to high load"), | 770 | gettext_noop ("# storage requests dropped due to high load"), |
755 | 1, | 771 | 1, |
756 | GNUNET_NO); | 772 | GNUNET_NO); |
@@ -776,7 +792,7 @@ test_put_load_too_high (uint32_t priority) | |||
776 | void | 792 | void |
777 | GSF_handle_dht_reply_ (void *cls, | 793 | GSF_handle_dht_reply_ (void *cls, |
778 | struct GNUNET_TIME_Absolute exp, | 794 | struct GNUNET_TIME_Absolute exp, |
779 | const GNUNET_HashCode * key, | 795 | const GNUNET_HashCode *key, |
780 | const struct GNUNET_PeerIdentity * const *get_path, | 796 | const struct GNUNET_PeerIdentity * const *get_path, |
781 | const struct GNUNET_PeerIdentity * const *put_path, | 797 | const struct GNUNET_PeerIdentity * const *put_path, |
782 | enum GNUNET_BLOCK_Type type, | 798 | enum GNUNET_BLOCK_Type type, |
@@ -785,6 +801,7 @@ GSF_handle_dht_reply_ (void *cls, | |||
785 | { | 801 | { |
786 | struct GSF_PendingRequest *pr = cls; | 802 | struct GSF_PendingRequest *pr = cls; |
787 | struct ProcessReplyClosure prq; | 803 | struct ProcessReplyClosure prq; |
804 | struct GNUNET_TIME_Absolute *start; | ||
788 | 805 | ||
789 | memset (&prq, 0, sizeof (prq)); | 806 | memset (&prq, 0, sizeof (prq)); |
790 | prq.data = data; | 807 | prq.data = data; |
@@ -803,10 +820,10 @@ GSF_handle_dht_reply_ (void *cls, | |||
803 | #endif | 820 | #endif |
804 | start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); | 821 | start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); |
805 | *start = GNUNET_TIME_absolute_get (); | 822 | *start = GNUNET_TIME_absolute_get (); |
806 | GNUNET_DATASTORE_put (dsh, | 823 | GNUNET_DATASTORE_put (GSF_dsh, |
807 | 0, &query, dsize, &put[1], | 824 | 0, key, size, data, |
808 | type, prq.priority, 1 /* anonymity */, | 825 | type, prq.priority, 1 /* anonymity */, |
809 | expiration, | 826 | exp, |
810 | 1 + prq.priority, MAX_DATASTORE_QUEUE, | 827 | 1 + prq.priority, MAX_DATASTORE_QUEUE, |
811 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | 828 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, |
812 | &put_migration_continuation, | 829 | &put_migration_continuation, |
@@ -856,7 +873,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
856 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) | 873 | if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) |
857 | return GNUNET_SYSERR; | 874 | return GNUNET_SYSERR; |
858 | if (GNUNET_OK != | 875 | if (GNUNET_OK != |
859 | GNUNET_BLOCK_get_key (block_ctx, | 876 | GNUNET_BLOCK_get_key (GSF_block_ctx, |
860 | type, | 877 | type, |
861 | &put[1], | 878 | &put[1], |
862 | dsize, | 879 | dsize, |
@@ -878,14 +895,14 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
878 | prq.anonymity_level = 1; | 895 | prq.anonymity_level = 1; |
879 | prq.finished = GNUNET_NO; | 896 | prq.finished = GNUNET_NO; |
880 | prq.request_found = GNUNET_NO; | 897 | prq.request_found = GNUNET_NO; |
881 | GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map, | 898 | GNUNET_CONTAINER_multihashmap_get_multiple (pr_map, |
882 | &query, | 899 | &query, |
883 | &process_reply, | 900 | &process_reply, |
884 | &prq); | 901 | &prq); |
885 | if (NULL != cp) | 902 | if (NULL != cp) |
886 | { | 903 | { |
887 | GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority); | 904 | GSF_connected_peer_change_preference_ (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority); |
888 | GSF_get_peer_performance_data (cp)->trust += prq.priority; | 905 | GSF_get_peer_performance_data_ (cp)->trust += prq.priority; |
889 | } | 906 | } |
890 | if ( (GNUNET_YES == active_to_migration) && | 907 | if ( (GNUNET_YES == active_to_migration) && |
891 | (GNUNET_NO == test_put_load_too_high (prq.priority)) ) | 908 | (GNUNET_NO == test_put_load_too_high (prq.priority)) ) |
@@ -898,7 +915,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
898 | #endif | 915 | #endif |
899 | start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); | 916 | start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute)); |
900 | *start = GNUNET_TIME_absolute_get (); | 917 | *start = GNUNET_TIME_absolute_get (); |
901 | GNUNET_DATASTORE_put (dsh, | 918 | GNUNET_DATASTORE_put (GSF_dsh, |
902 | 0, &query, dsize, &put[1], | 919 | 0, &query, dsize, &put[1], |
903 | type, prq.priority, 1 /* anonymity */, | 920 | type, prq.priority, 1 /* anonymity */, |
904 | expiration, | 921 | expiration, |
@@ -918,7 +935,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
918 | block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, | 935 | block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, |
919 | 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | 936 | 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, |
920 | (unsigned int) (60000 * putl * putl))); | 937 | (unsigned int) (60000 * putl * putl))); |
921 | GSF_block_peer_migration (cp, block_time); | 938 | GSF_block_peer_migration_ (cp, block_time); |
922 | } | 939 | } |
923 | return GNUNET_OK; | 940 | return GNUNET_OK; |
924 | } | 941 | } |
@@ -926,10 +943,22 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
926 | 943 | ||
927 | /** | 944 | /** |
928 | * Setup the subsystem. | 945 | * Setup the subsystem. |
946 | * | ||
947 | * @param cfg configuration to use | ||
929 | */ | 948 | */ |
930 | void | 949 | void |
931 | GSF_pending_request_init_ () | 950 | GSF_pending_request_init_ (struct GNUNET_CONFIGURATION_Handle *cfg) |
932 | { | 951 | { |
952 | if (GNUNET_OK != | ||
953 | GNUNET_CONFIGURATION_get_value_number (cfg, | ||
954 | "fs", | ||
955 | "MAX_PENDING_REQUESTS", | ||
956 | &max_pending_requests)) | ||
957 | { | ||
958 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
959 | _("Configuration fails to specify `%s', assuming default value."), | ||
960 | "MAX_PENDING_REQUESTS"); | ||
961 | } | ||
933 | pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024); | 962 | pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024); |
934 | requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); | 963 | requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); |
935 | } | 964 | } |