aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c419
1 files changed, 194 insertions, 225 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index e637be664..7b6290bbb 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -241,8 +241,8 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr)
241 return GNUNET_NO; /* size not changed */ 241 return GNUNET_NO; /* size not changed */
242 if (pr->bf != NULL) 242 if (pr->bf != NULL)
243 GNUNET_CONTAINER_bloomfilter_free (pr->bf); 243 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
244 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 244 pr->mingle =
245 UINT32_MAX); 245 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
246 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, nsize, BLOOMFILTER_K); 246 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, nsize, BLOOMFILTER_K);
247 for (i = 0; i < pr->replies_seen_count; i++) 247 for (i = 0; i < pr->replies_seen_count; i++)
248 { 248 {
@@ -280,12 +280,9 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
280 const GNUNET_HashCode * query, 280 const GNUNET_HashCode * query,
281 const GNUNET_HashCode * namespace, 281 const GNUNET_HashCode * namespace,
282 const struct GNUNET_PeerIdentity *target, 282 const struct GNUNET_PeerIdentity *target,
283 const char *bf_data, 283 const char *bf_data, size_t bf_size,
284 size_t bf_size, 284 uint32_t mingle, uint32_t anonymity_level,
285 uint32_t mingle, 285 uint32_t priority, int32_t ttl,
286 uint32_t anonymity_level,
287 uint32_t priority,
288 int32_t ttl,
289 GNUNET_PEER_Id sender_pid, 286 GNUNET_PEER_Id sender_pid,
290 const GNUNET_HashCode * replies_seen, 287 const GNUNET_HashCode * replies_seen,
291 unsigned int replies_seen_count, 288 unsigned int replies_seen_count,
@@ -300,8 +297,8 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
300 GNUNET_h2s (query), type); 297 GNUNET_h2s (query), type);
301#endif 298#endif
302 GNUNET_STATISTICS_update (GSF_stats, 299 GNUNET_STATISTICS_update (GSF_stats,
303 gettext_noop ("# Pending requests created"), 300 gettext_noop ("# Pending requests created"), 1,
304 1, GNUNET_NO); 301 GNUNET_NO);
305 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); 302 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
306 pr->local_result_offset = 303 pr->local_result_offset =
307 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); 304 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
@@ -348,8 +345,8 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
348 } 345 }
349 if (NULL != bf_data) 346 if (NULL != bf_data)
350 { 347 {
351 pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data, 348 pr->bf =
352 bf_size, BLOOMFILTER_K); 349 GNUNET_CONTAINER_bloomfilter_init (bf_data, bf_size, BLOOMFILTER_K);
353 pr->mingle = mingle; 350 pr->mingle = mingle;
354 } 351 }
355 else if ((replies_seen_count > 0) && 352 else if ((replies_seen_count > 0) &&
@@ -357,15 +354,13 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
357 { 354 {
358 GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr)); 355 GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
359 } 356 }
360 GNUNET_CONTAINER_multihashmap_put (pr_map, 357 GNUNET_CONTAINER_multihashmap_put (pr_map, query, pr,
361 query,
362 pr,
363 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 358 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
364 if (0 != (options & GSF_PRO_REQUEST_EXPIRES)) 359 if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
365 { 360 {
366 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, 361 pr->hnode =
367 pr, 362 GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, pr,
368 pr->public_data.ttl.abs_value); 363 pr->public_data.ttl.abs_value);
369 /* make sure we don't track too many requests */ 364 /* make sure we don't track too many requests */
370 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > 365 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
371 max_pending_requests) 366 max_pending_requests)
@@ -374,17 +369,15 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
374 GNUNET_assert (dpr != NULL); 369 GNUNET_assert (dpr != NULL);
375 if (pr == dpr) 370 if (pr == dpr)
376 break; /* let the request live briefly... */ 371 break; /* let the request live briefly... */
377 dpr->rh (dpr->rh_cls, 372 dpr->rh (dpr->rh_cls, GNUNET_BLOCK_EVALUATION_REQUEST_VALID, dpr,
378 GNUNET_BLOCK_EVALUATION_REQUEST_VALID, 373 UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_BLOCK_TYPE_ANY,
379 dpr, 374 NULL, 0);
380 UINT32_MAX,
381 GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_BLOCK_TYPE_ANY, NULL, 0);
382 GSF_pending_request_cancel_ (dpr, GNUNET_YES); 375 GSF_pending_request_cancel_ (dpr, GNUNET_YES);
383 } 376 }
384 } 377 }
385 GNUNET_STATISTICS_update (GSF_stats, 378 GNUNET_STATISTICS_update (GSF_stats,
386 gettext_noop ("# Pending requests active"), 379 gettext_noop ("# Pending requests active"), 1,
387 1, GNUNET_NO); 380 GNUNET_NO);
388 return pr; 381 return pr;
389} 382}
390 383
@@ -416,12 +409,13 @@ GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
416 struct GSF_PendingRequest *prb) 409 struct GSF_PendingRequest *prb)
417{ 410{
418 if ((pra->public_data.type != prb->public_data.type) || 411 if ((pra->public_data.type != prb->public_data.type) ||
419 (0 != memcmp (&pra->public_data.query, 412 (0 !=
420 &prb->public_data.query, 413 memcmp (&pra->public_data.query, &prb->public_data.query,
421 sizeof (GNUNET_HashCode))) || 414 sizeof (GNUNET_HashCode))) ||
422 ((pra->public_data.type == GNUNET_BLOCK_TYPE_FS_SBLOCK) && 415 ((pra->public_data.type == GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
423 (0 != memcmp (&pra->public_data.namespace, 416 (0 !=
424 &prb->public_data.namespace, sizeof (GNUNET_HashCode))))) 417 memcmp (&pra->public_data.namespace, &prb->public_data.namespace,
418 sizeof (GNUNET_HashCode)))))
425 return GNUNET_NO; 419 return GNUNET_NO;
426 return GNUNET_OK; 420 return GNUNET_OK;
427} 421}
@@ -450,11 +444,10 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
450 { 444 {
451 /* we're responsible for the BF, full refresh */ 445 /* we're responsible for the BF, full refresh */
452 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size) 446 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
453 GNUNET_array_grow (pr->replies_seen, 447 GNUNET_array_grow (pr->replies_seen, pr->replies_seen_size,
454 pr->replies_seen_size,
455 replies_seen_count + pr->replies_seen_count); 448 replies_seen_count + pr->replies_seen_count);
456 memcpy (&pr->replies_seen[pr->replies_seen_count], 449 memcpy (&pr->replies_seen[pr->replies_seen_count], replies_seen,
457 replies_seen, sizeof (GNUNET_HashCode) * replies_seen_count); 450 sizeof (GNUNET_HashCode) * replies_seen_count);
458 pr->replies_seen_count += replies_seen_count; 451 pr->replies_seen_count += replies_seen_count;
459 if (GNUNET_NO == refresh_bloomfilter (pr)) 452 if (GNUNET_NO == refresh_bloomfilter (pr))
460 { 453 {
@@ -472,12 +465,13 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
472 { 465 {
473 /* we're not the initiator, but the initiator did not give us 466 /* we're not the initiator, but the initiator did not give us
474 * any bloom-filter, so we need to create one on-the-fly */ 467 * any bloom-filter, so we need to create one on-the-fly */
475 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 468 pr->mingle =
476 UINT32_MAX); 469 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
477 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 470 pr->bf =
478 compute_bloomfilter_size 471 GNUNET_CONTAINER_bloomfilter_init (NULL,
479 (replies_seen_count), 472 compute_bloomfilter_size
480 BLOOMFILTER_K); 473 (replies_seen_count),
474 BLOOMFILTER_K);
481 } 475 }
482 for (i = 0; i < pr->replies_seen_count; i++) 476 for (i = 0; i < pr->replies_seen_count; i++)
483 { 477 {
@@ -552,8 +546,9 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
552 gm->header.size = htons (msize); 546 gm->header.size = htons (msize);
553 gm->type = htonl (pr->public_data.type); 547 gm->type = htonl (pr->public_data.type);
554 if (do_route) 548 if (do_route)
555 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 549 prio =
556 pr->public_data.priority + 1); 550 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
551 pr->public_data.priority + 1);
557 else 552 else
558 prio = 0; 553 prio = 0;
559 pr->public_data.priority -= prio; 554 pr->public_data.priority -= prio;
@@ -640,8 +635,8 @@ clean_request (void *cls, const GNUNET_HashCode * key, void *value)
640 &pr->public_data.query, 635 &pr->public_data.query,
641 pr)); 636 pr));
642 GNUNET_STATISTICS_update (GSF_stats, 637 GNUNET_STATISTICS_update (GSF_stats,
643 gettext_noop ("# Pending requests active"), 638 gettext_noop ("# Pending requests active"), -1,
644 -1, GNUNET_NO); 639 GNUNET_NO);
645 GNUNET_free (pr); 640 GNUNET_free (pr);
646 return GNUNET_YES; 641 return GNUNET_YES;
647} 642}
@@ -776,8 +771,8 @@ update_request_performance_data (struct ProcessReplyClosure *prq,
776{ 771{
777 if (prq->sender == NULL) 772 if (prq->sender == NULL)
778 return; 773 return;
779 GSF_peer_update_performance_ (prq->sender, 774 GSF_peer_update_performance_ (prq->sender, pr->public_data.start_time,
780 pr->public_data.start_time, prq->priority); 775 prq->priority);
781} 776}
782 777
783 778
@@ -804,18 +799,15 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
804 (unsigned int) prq->type, GNUNET_h2s (key)); 799 (unsigned int) prq->type, GNUNET_h2s (key));
805#endif 800#endif
806 GNUNET_STATISTICS_update (GSF_stats, 801 GNUNET_STATISTICS_update (GSF_stats,
807 gettext_noop ("# replies received and matched"), 802 gettext_noop ("# replies received and matched"), 1,
808 1, GNUNET_NO); 803 GNUNET_NO);
809 prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx, 804 prq->eval =
810 prq->type, 805 GNUNET_BLOCK_evaluate (GSF_block_ctx, prq->type, key, &pr->bf, pr->mingle,
811 key, 806 &pr->public_data.namespace,
812 &pr->bf, 807 (prq->type ==
813 pr->mingle, 808 GNUNET_BLOCK_TYPE_FS_SBLOCK) ?
814 &pr->public_data.namespace, 809 sizeof (GNUNET_HashCode) : 0, prq->data,
815 (prq->type == 810 prq->size);
816 GNUNET_BLOCK_TYPE_FS_SBLOCK) ?
817 sizeof (GNUNET_HashCode) : 0, prq->data,
818 prq->size);
819 switch (prq->eval) 811 switch (prq->eval)
820 { 812 {
821 case GNUNET_BLOCK_EVALUATION_OK_MORE: 813 case GNUNET_BLOCK_EVALUATION_OK_MORE:
@@ -825,14 +817,12 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
825 /* short cut: stop processing early, no BF-update, etc. */ 817 /* short cut: stop processing early, no BF-update, etc. */
826 update_request_performance_data (prq, pr); 818 update_request_performance_data (prq, pr);
827 GNUNET_LOAD_update (GSF_rt_entry_lifetime, 819 GNUNET_LOAD_update (GSF_rt_entry_lifetime,
828 GNUNET_TIME_absolute_get_duration (pr-> 820 GNUNET_TIME_absolute_get_duration (pr->public_data.
829 public_data.start_time).rel_value); 821 start_time).
822 rel_value);
830 /* pass on to other peers / local clients */ 823 /* pass on to other peers / local clients */
831 pr->rh (pr->rh_cls, 824 pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration,
832 prq->eval, 825 prq->type, prq->data, prq->size);
833 pr,
834 prq->anonymity_level,
835 prq->expiration, prq->type, prq->data, prq->size);
836 return GNUNET_YES; 826 return GNUNET_YES;
837 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: 827 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
838 GNUNET_STATISTICS_update (GSF_stats, 828 GNUNET_STATISTICS_update (GSF_stats,
@@ -853,8 +843,8 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
853 GNUNET_break (0); 843 GNUNET_break (0);
854 return GNUNET_YES; 844 return GNUNET_YES;
855 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: 845 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
856 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 846 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Unsupported block type %u\n"),
857 _("Unsupported block type %u\n"), prq->type); 847 prq->type);
858 return GNUNET_NO; 848 return GNUNET_NO;
859 } 849 }
860 /* update bloomfilter */ 850 /* update bloomfilter */
@@ -868,8 +858,8 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
868 GNUNET_h2s (key)); 858 GNUNET_h2s (key));
869#endif 859#endif
870 GNUNET_STATISTICS_update (GSF_stats, 860 GNUNET_STATISTICS_update (GSF_stats,
871 gettext_noop ("# results found locally"), 861 gettext_noop ("# results found locally"), 1,
872 1, GNUNET_NO); 862 GNUNET_NO);
873 } 863 }
874 else 864 else
875 { 865 {
@@ -881,11 +871,8 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
881 pr->public_data.results_found++; 871 pr->public_data.results_found++;
882 prq->request_found = GNUNET_YES; 872 prq->request_found = GNUNET_YES;
883 /* finally, pass on to other peer / local client */ 873 /* finally, pass on to other peer / local client */
884 pr->rh (pr->rh_cls, 874 pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration,
885 prq->eval, 875 prq->type, prq->data, prq->size);
886 pr,
887 prq->anonymity_level,
888 prq->expiration, prq->type, prq->data, prq->size);
889 return GNUNET_YES; 876 return GNUNET_YES;
890} 877}
891 878
@@ -940,12 +927,11 @@ put_migration_continuation (void *cls, int success, const char *msg)
940 { 927 {
941 ppd = GSF_get_peer_performance_data_ (cp); 928 ppd = GSF_get_peer_performance_data_ (cp);
942 ppd->migration_duplication++; 929 ppd->migration_duplication++;
943 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 930 block_time =
944 5 * 931 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
945 ppd->migration_duplication + 932 5 * ppd->migration_duplication +
946 GNUNET_CRYPTO_random_u32 933 GNUNET_CRYPTO_random_u32
947 (GNUNET_CRYPTO_QUALITY_WEAK, 934 (GNUNET_CRYPTO_QUALITY_WEAK, 5));
948 5));
949 GSF_block_peer_migration_ (cp, block_time); 935 GSF_block_peer_migration_ (cp, block_time);
950 } 936 }
951 } 937 }
@@ -964,8 +950,8 @@ put_migration_continuation (void *cls, int success, const char *msg)
964 if (GNUNET_OK == success) 950 if (GNUNET_OK == success)
965 return; 951 return;
966 GNUNET_STATISTICS_update (GSF_stats, 952 GNUNET_STATISTICS_update (GSF_stats,
967 gettext_noop ("# Datastore `PUT' failures"), 953 gettext_noop ("# Datastore `PUT' failures"), 1,
968 1, GNUNET_NO); 954 GNUNET_NO);
969} 955}
970 956
971 957
@@ -1013,8 +999,7 @@ test_put_load_too_high (uint32_t priority)
1013 * @param data pointer to the result data 999 * @param data pointer to the result data
1014 */ 1000 */
1015static void 1001static void
1016handle_dht_reply (void *cls, 1002handle_dht_reply (void *cls, struct GNUNET_TIME_Absolute exp,
1017 struct GNUNET_TIME_Absolute exp,
1018 const GNUNET_HashCode * key, 1003 const GNUNET_HashCode * key,
1019 const struct GNUNET_PeerIdentity *const *get_path, 1004 const struct GNUNET_PeerIdentity *const *get_path,
1020 const struct GNUNET_PeerIdentity *const *put_path, 1005 const struct GNUNET_PeerIdentity *const *put_path,
@@ -1025,8 +1010,8 @@ handle_dht_reply (void *cls,
1025 struct PutMigrationContext *pmc; 1010 struct PutMigrationContext *pmc;
1026 1011
1027 GNUNET_STATISTICS_update (GSF_stats, 1012 GNUNET_STATISTICS_update (GSF_stats,
1028 gettext_noop ("# Replies received from DHT"), 1013 gettext_noop ("# Replies received from DHT"), 1,
1029 1, GNUNET_NO); 1014 GNUNET_NO);
1030 memset (&prq, 0, sizeof (prq)); 1015 memset (&prq, 0, sizeof (prq));
1031 prq.data = data; 1016 prq.data = data;
1032 prq.expiration = exp; 1017 prq.expiration = exp;
@@ -1045,12 +1030,10 @@ handle_dht_reply (void *cls,
1045 pmc->start = GNUNET_TIME_absolute_get (); 1030 pmc->start = GNUNET_TIME_absolute_get ();
1046 pmc->requested = GNUNET_YES; 1031 pmc->requested = GNUNET_YES;
1047 if (NULL == 1032 if (NULL ==
1048 GNUNET_DATASTORE_put (GSF_dsh, 1033 GNUNET_DATASTORE_put (GSF_dsh, 0, key, size, data, type, prq.priority,
1049 0, key, size, data, 1034 1 /* anonymity */ ,
1050 type, prq.priority, 1 /* anonymity */ ,
1051 0 /* replication */ , 1035 0 /* replication */ ,
1052 exp, 1036 exp, 1 + prq.priority, MAX_DATASTORE_QUEUE,
1053 1 + prq.priority, MAX_DATASTORE_QUEUE,
1054 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 1037 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1055 &put_migration_continuation, pmc)) 1038 &put_migration_continuation, pmc))
1056 { 1039 {
@@ -1095,15 +1078,13 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1095 memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity)); 1078 memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity));
1096 xquery_size += sizeof (struct GNUNET_PeerIdentity); 1079 xquery_size += sizeof (struct GNUNET_PeerIdentity);
1097 } 1080 }
1098 pr->gh = GNUNET_DHT_get_start (GSF_dht, 1081 pr->gh =
1099 GNUNET_TIME_UNIT_FOREVER_REL, 1082 GNUNET_DHT_get_start (GSF_dht, GNUNET_TIME_UNIT_FOREVER_REL,
1100 pr->public_data.type, 1083 pr->public_data.type, &pr->public_data.query,
1101 &pr->public_data.query, 1084 DEFAULT_GET_REPLICATION,
1102 DEFAULT_GET_REPLICATION, 1085 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, pr->bf,
1103 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, 1086 pr->mingle, xquery, xquery_size, &handle_dht_reply,
1104 pr->bf, 1087 pr);
1105 pr->mingle,
1106 xquery, xquery_size, &handle_dht_reply, pr);
1107} 1088}
1108 1089
1109 1090
@@ -1120,8 +1101,9 @@ warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1120 1101
1121 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1102 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1122 _("Datastore lookup already took %llu ms!\n"), 1103 _("Datastore lookup already took %llu ms!\n"),
1123 (unsigned long long) 1104 (unsigned long long) GNUNET_TIME_absolute_get_duration (pr->
1124 GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value); 1105 qe_start).
1106 rel_value);
1125 pr->warn_task = 1107 pr->warn_task =
1126 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_delay_task, 1108 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_delay_task,
1127 pr); 1109 pr);
@@ -1141,8 +1123,9 @@ odc_warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1141 1123
1142 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1124 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1143 _("On-demand lookup already took %llu ms!\n"), 1125 _("On-demand lookup already took %llu ms!\n"),
1144 (unsigned long long) 1126 (unsigned long long) GNUNET_TIME_absolute_get_duration (pr->
1145 GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value); 1127 qe_start).
1128 rel_value);
1146 pr->warn_task = 1129 pr->warn_task =
1147 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1130 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1148 &odc_warn_delay_task, pr); 1131 &odc_warn_delay_task, pr);
@@ -1167,13 +1150,9 @@ odc_warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1167 * maybe 0 if no unique identifier is available 1150 * maybe 0 if no unique identifier is available
1168 */ 1151 */
1169static void 1152static void
1170process_local_reply (void *cls, 1153process_local_reply (void *cls, const GNUNET_HashCode * key, size_t size,
1171 const GNUNET_HashCode * key, 1154 const void *data, enum GNUNET_BLOCK_Type type,
1172 size_t size, 1155 uint32_t priority, uint32_t anonymity,
1173 const void *data,
1174 enum GNUNET_BLOCK_Type type,
1175 uint32_t priority,
1176 uint32_t anonymity,
1177 struct GNUNET_TIME_Absolute expiration, uint64_t uid) 1156 struct GNUNET_TIME_Absolute expiration, uint64_t uid)
1178{ 1157{
1179 struct GSF_PendingRequest *pr = cls; 1158 struct GSF_PendingRequest *pr = cls;
@@ -1250,8 +1229,9 @@ process_local_reply (void *cls,
1250 ("# on-demand blocks matched requests"), 1, 1229 ("# on-demand blocks matched requests"), 1,
1251 GNUNET_NO); 1230 GNUNET_NO);
1252 pr->qe_start = GNUNET_TIME_absolute_get (); 1231 pr->qe_start = GNUNET_TIME_absolute_get ();
1253 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1232 pr->warn_task =
1254 &odc_warn_delay_task, pr); 1233 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1234 &odc_warn_delay_task, pr);
1255 if (GNUNET_OK == 1235 if (GNUNET_OK ==
1256 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 1236 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
1257 anonymity, expiration, uid, 1237 anonymity, expiration, uid,
@@ -1264,30 +1244,28 @@ process_local_reply (void *cls,
1264 return; /* we're done */ 1244 return; /* we're done */
1265 } 1245 }
1266 GNUNET_STATISTICS_update (GSF_stats, 1246 GNUNET_STATISTICS_update (GSF_stats,
1267 gettext_noop ("# on-demand lookups failed"), 1247 gettext_noop ("# on-demand lookups failed"), 1,
1268 1, GNUNET_NO); 1248 GNUNET_NO);
1269 GNUNET_SCHEDULER_cancel (pr->warn_task); 1249 GNUNET_SCHEDULER_cancel (pr->warn_task);
1270 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1250 pr->warn_task =
1271 &warn_delay_task, pr); 1251 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1272 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 1252 &warn_delay_task, pr);
1273 pr->local_result_offset - 1, 1253 pr->qe =
1274 &pr->public_data.query, 1254 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1,
1275 pr->public_data.type == 1255 &pr->public_data.query,
1276 GNUNET_BLOCK_TYPE_FS_DBLOCK ? 1256 pr->public_data.type ==
1277 GNUNET_BLOCK_TYPE_ANY : pr-> 1257 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1278 public_data.type, 1258 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1279 (0 != 1259 (0 !=
1280 (GSF_PRO_PRIORITY_UNLIMITED & 1260 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1281 pr->public_data. 1261 public_data.options)) ? UINT_MAX : 1
1282 options)) ? UINT_MAX : 1 1262 /* queue priority */ ,
1283 /* queue priority */ , 1263 (0 !=
1284 (0 != 1264 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1285 (GSF_PRO_PRIORITY_UNLIMITED & 1265 public_data.options)) ? UINT_MAX : 1
1286 pr->public_data.options)) ? UINT_MAX : 1266 /* max queue size */ ,
1287 1 1267 GNUNET_TIME_UNIT_FOREVER_REL,
1288 /* max queue size */ , 1268 &process_local_reply, pr);
1289 GNUNET_TIME_UNIT_FOREVER_REL,
1290 &process_local_reply, pr);
1291 if (NULL != pr->qe) 1269 if (NULL != pr->qe)
1292 { 1270 {
1293 GNUNET_STATISTICS_update (GSF_stats, 1271 GNUNET_STATISTICS_update (GSF_stats,
@@ -1307,32 +1285,28 @@ process_local_reply (void *cls,
1307 GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, size, &query)) 1285 GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, size, &query))
1308 { 1286 {
1309 GNUNET_break (0); 1287 GNUNET_break (0);
1310 GNUNET_DATASTORE_remove (GSF_dsh, 1288 GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1,
1311 key, 1289 GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
1312 size, data,
1313 -1, -1, GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
1314 pr->qe_start = GNUNET_TIME_absolute_get (); 1290 pr->qe_start = GNUNET_TIME_absolute_get ();
1315 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1291 pr->warn_task =
1316 &warn_delay_task, pr); 1292 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1317 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 1293 &warn_delay_task, pr);
1318 pr->local_result_offset - 1, 1294 pr->qe =
1319 &pr->public_data.query, 1295 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1,
1320 pr->public_data.type == 1296 &pr->public_data.query,
1321 GNUNET_BLOCK_TYPE_FS_DBLOCK ? 1297 pr->public_data.type ==
1322 GNUNET_BLOCK_TYPE_ANY : pr-> 1298 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1323 public_data.type, 1299 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1324 (0 != 1300 (0 !=
1325 (GSF_PRO_PRIORITY_UNLIMITED & 1301 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1326 pr->public_data. 1302 public_data.options)) ? UINT_MAX : 1
1327 options)) ? UINT_MAX : 1 1303 /* queue priority */ ,
1328 /* queue priority */ , 1304 (0 !=
1329 (0 != 1305 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1330 (GSF_PRO_PRIORITY_UNLIMITED & 1306 public_data.options)) ? UINT_MAX : 1
1331 pr->public_data.options)) ? UINT_MAX : 1307 /* max queue size */ ,
1332 1 1308 GNUNET_TIME_UNIT_FOREVER_REL,
1333 /* max queue size */ , 1309 &process_local_reply, pr);
1334 GNUNET_TIME_UNIT_FOREVER_REL,
1335 &process_local_reply, pr);
1336 if (pr->qe == NULL) 1310 if (pr->qe == NULL)
1337 { 1311 {
1338 GNUNET_STATISTICS_update (GSF_stats, 1312 GNUNET_STATISTICS_update (GSF_stats,
@@ -1373,25 +1347,25 @@ process_local_reply (void *cls,
1373 goto check_error_and_continue; 1347 goto check_error_and_continue;
1374 } 1348 }
1375 pr->qe_start = GNUNET_TIME_absolute_get (); 1349 pr->qe_start = GNUNET_TIME_absolute_get ();
1376 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1350 pr->warn_task =
1377 &warn_delay_task, pr); 1351 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_delay_task,
1378 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 1352 pr);
1379 pr->local_result_offset++, 1353 pr->qe =
1380 &pr->public_data.query, 1354 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++,
1381 pr->public_data.type == 1355 &pr->public_data.query,
1382 GNUNET_BLOCK_TYPE_FS_DBLOCK ? 1356 pr->public_data.type ==
1383 GNUNET_BLOCK_TYPE_ANY : pr-> 1357 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1384 public_data.type, 1358 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1385 (0 != 1359 (0 !=
1386 (GSF_PRO_PRIORITY_UNLIMITED & 1360 (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.
1387 pr->public_data.options)) ? UINT_MAX : 1 1361 options)) ? UINT_MAX : 1
1388 /* queue priority */ , 1362 /* queue priority */ ,
1389 (0 != 1363 (0 !=
1390 (GSF_PRO_PRIORITY_UNLIMITED & 1364 (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.
1391 pr->public_data.options)) ? UINT_MAX : 1 1365 options)) ? UINT_MAX : 1
1392 /* max queue size */ , 1366 /* max queue size */ ,
1393 GNUNET_TIME_UNIT_FOREVER_REL, 1367 GNUNET_TIME_UNIT_FOREVER_REL,
1394 &process_local_reply, pr); 1368 &process_local_reply, pr);
1395 /* check if we successfully queued another datastore request; 1369 /* check if we successfully queued another datastore request;
1396 * if so, return, otherwise call our continuation (if we have 1370 * if so, return, otherwise call our continuation (if we have
1397 * any) */ 1371 * any) */
@@ -1426,28 +1400,28 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1426 pr->llc_cont = cont; 1400 pr->llc_cont = cont;
1427 pr->llc_cont_cls = cont_cls; 1401 pr->llc_cont_cls = cont_cls;
1428 pr->qe_start = GNUNET_TIME_absolute_get (); 1402 pr->qe_start = GNUNET_TIME_absolute_get ();
1429 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1403 pr->warn_task =
1430 &warn_delay_task, pr); 1404 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_delay_task,
1405 pr);
1431 GNUNET_STATISTICS_update (GSF_stats, 1406 GNUNET_STATISTICS_update (GSF_stats,
1432 gettext_noop ("# Datastore lookups initiated"), 1407 gettext_noop ("# Datastore lookups initiated"), 1,
1433 1, GNUNET_NO); 1408 GNUNET_NO);
1434 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 1409 pr->qe =
1435 pr->local_result_offset++, 1410 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++,
1436 &pr->public_data.query, 1411 &pr->public_data.query,
1437 pr->public_data.type == 1412 pr->public_data.type ==
1438 GNUNET_BLOCK_TYPE_FS_DBLOCK ? 1413 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1439 GNUNET_BLOCK_TYPE_ANY : pr-> 1414 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1440 public_data.type, 1415 (0 !=
1441 (0 != 1416 (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.
1442 (GSF_PRO_PRIORITY_UNLIMITED & 1417 options)) ? UINT_MAX : 1
1443 pr->public_data.options)) ? UINT_MAX : 1 1418 /* queue priority */ ,
1444 /* queue priority */ , 1419 (0 !=
1445 (0 != 1420 (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.
1446 (GSF_PRO_PRIORITY_UNLIMITED & 1421 options)) ? UINT_MAX : 1
1447 pr->public_data.options)) ? UINT_MAX : 1 1422 /* max queue size */ ,
1448 /* max queue size */ , 1423 GNUNET_TIME_UNIT_FOREVER_REL,
1449 GNUNET_TIME_UNIT_FOREVER_REL, 1424 &process_local_reply, pr);
1450 &process_local_reply, pr);
1451 if (NULL != pr->qe) 1425 if (NULL != pr->qe)
1452 { 1426 {
1453 GNUNET_STATISTICS_update (GSF_stats, 1427 GNUNET_STATISTICS_update (GSF_stats,
@@ -1513,8 +1487,8 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1513 return GNUNET_SYSERR; 1487 return GNUNET_SYSERR;
1514 } 1488 }
1515 GNUNET_STATISTICS_update (GSF_stats, 1489 GNUNET_STATISTICS_update (GSF_stats,
1516 gettext_noop ("# GAP PUT messages received"), 1490 gettext_noop ("# GAP PUT messages received"), 1,
1517 1, GNUNET_NO); 1491 GNUNET_NO);
1518 /* now, lookup 'query' */ 1492 /* now, lookup 'query' */
1519 prq.data = (const void *) &put[1]; 1493 prq.data = (const void *) &put[1];
1520 if (NULL != cp) 1494 if (NULL != cp)
@@ -1527,8 +1501,8 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1527 prq.priority = 0; 1501 prq.priority = 0;
1528 prq.anonymity_level = UINT32_MAX; 1502 prq.anonymity_level = UINT32_MAX;
1529 prq.request_found = GNUNET_NO; 1503 prq.request_found = GNUNET_NO;
1530 GNUNET_CONTAINER_multihashmap_get_multiple (pr_map, 1504 GNUNET_CONTAINER_multihashmap_get_multiple (pr_map, &query, &process_reply,
1531 &query, &process_reply, &prq); 1505 &prq);
1532 if (NULL != cp) 1506 if (NULL != cp)
1533 { 1507 {
1534 GSF_connected_peer_change_preference_ (cp, 1508 GSF_connected_peer_change_preference_ (cp,
@@ -1551,12 +1525,10 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1551 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, 1525 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
1552 &pmc->origin); 1526 &pmc->origin);
1553 if (NULL == 1527 if (NULL ==
1554 GNUNET_DATASTORE_put (GSF_dsh, 1528 GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type,
1555 0, &query, dsize, &put[1], 1529 prq.priority, 1 /* anonymity */ ,
1556 type, prq.priority, 1 /* anonymity */ ,
1557 0 /* replication */ , 1530 0 /* replication */ ,
1558 expiration, 1531 expiration, 1 + prq.priority, MAX_DATASTORE_QUEUE,
1559 1 + prq.priority, MAX_DATASTORE_QUEUE,
1560 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 1532 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1561 &put_migration_continuation, pmc)) 1533 &put_migration_continuation, pmc))
1562 { 1534 {
@@ -1568,24 +1540,23 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1568#if DEBUG_FS 1540#if DEBUG_FS
1569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1541 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1570 "Choosing not to keep content `%s' (%d/%d)\n", 1542 "Choosing not to keep content `%s' (%d/%d)\n",
1571 GNUNET_h2s (&query), 1543 GNUNET_h2s (&query), active_to_migration,
1572 active_to_migration, test_put_load_too_high (prq.priority)); 1544 test_put_load_too_high (prq.priority));
1573#endif 1545#endif
1574 } 1546 }
1575 putl = GNUNET_LOAD_get_load (datastore_put_load); 1547 putl = GNUNET_LOAD_get_load (datastore_put_load);
1576 if ((NULL != (cp = prq.sender)) && 1548 if ((NULL != (cp = prq.sender)) && (GNUNET_NO == prq.request_found) &&
1577 (GNUNET_NO == prq.request_found) &&
1578 ((GNUNET_YES != active_to_migration) || 1549 ((GNUNET_YES != active_to_migration) ||
1579 (putl > 2.5 * (1 + prq.priority)))) 1550 (putl > 2.5 * (1 + prq.priority))))
1580 { 1551 {
1581 if (GNUNET_YES != active_to_migration) 1552 if (GNUNET_YES != active_to_migration)
1582 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5); 1553 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
1583 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1554 block_time =
1584 5000 + 1555 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1585 GNUNET_CRYPTO_random_u32 1556 5000 +
1586 (GNUNET_CRYPTO_QUALITY_WEAK, 1557 GNUNET_CRYPTO_random_u32
1587 (unsigned int) (60000 * putl * 1558 (GNUNET_CRYPTO_QUALITY_WEAK,
1588 putl))); 1559 (unsigned int) (60000 * putl * putl)));
1589 GSF_block_peer_migration_ (cp, block_time); 1560 GSF_block_peer_migration_ (cp, block_time);
1590 } 1561 }
1591 return GNUNET_OK; 1562 return GNUNET_OK;
@@ -1599,8 +1570,7 @@ void
1599GSF_pending_request_init_ () 1570GSF_pending_request_init_ ()
1600{ 1571{
1601 if (GNUNET_OK != 1572 if (GNUNET_OK !=
1602 GNUNET_CONFIGURATION_get_value_number (GSF_cfg, 1573 GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs",
1603 "fs",
1604 "MAX_PENDING_REQUESTS", 1574 "MAX_PENDING_REQUESTS",
1605 &max_pending_requests)) 1575 &max_pending_requests))
1606 { 1576 {
@@ -1609,9 +1579,8 @@ GSF_pending_request_init_ ()
1609 ("Configuration fails to specify `%s', assuming default value."), 1579 ("Configuration fails to specify `%s', assuming default value."),
1610 "MAX_PENDING_REQUESTS"); 1580 "MAX_PENDING_REQUESTS");
1611 } 1581 }
1612 active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, 1582 active_to_migration =
1613 "FS", 1583 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
1614 "CONTENT_CACHING");
1615 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); 1584 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
1616 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024); 1585 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
1617 requests_by_expiration_heap = 1586 requests_by_expiration_heap =