aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c1486
1 files changed, 711 insertions, 775 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index d3ea241a5..e637be664 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -43,7 +43,7 @@ struct GSF_PendingRequest
43{ 43{
44 /** 44 /**
45 * Public data for the request. 45 * Public data for the request.
46 */ 46 */
47 struct GSF_PendingRequestData public_data; 47 struct GSF_PendingRequestData public_data;
48 48
49 /** 49 /**
@@ -236,23 +236,19 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr)
236 GNUNET_HashCode mhash; 236 GNUNET_HashCode mhash;
237 237
238 nsize = compute_bloomfilter_size (pr->replies_seen_count); 238 nsize = compute_bloomfilter_size (pr->replies_seen_count);
239 if ( (pr->bf != NULL) && 239 if ((pr->bf != NULL) &&
240 (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) ) 240 (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)))
241 return GNUNET_NO; /* size not changed */ 241 return GNUNET_NO; /* size not changed */
242 if (pr->bf != NULL) 242 if (pr->bf != NULL)
243 GNUNET_CONTAINER_bloomfilter_free (pr->bf); 243 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
244 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 244 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
245 UINT32_MAX); 245 UINT32_MAX);
246 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 246 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, nsize, BLOOMFILTER_K);
247 nsize, 247 for (i = 0; i < pr->replies_seen_count; i++)
248 BLOOMFILTER_K); 248 {
249 for (i=0;i<pr->replies_seen_count;i++) 249 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
250 { 250 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
251 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i], 251 }
252 pr->mingle,
253 &mhash);
254 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
255 }
256 return GNUNET_YES; 252 return GNUNET_YES;
257} 253}
258 254
@@ -280,118 +276,115 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr)
280 */ 276 */
281struct GSF_PendingRequest * 277struct GSF_PendingRequest *
282GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, 278GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
283 enum GNUNET_BLOCK_Type type, 279 enum GNUNET_BLOCK_Type type,
284 const GNUNET_HashCode *query, 280 const GNUNET_HashCode * query,
285 const GNUNET_HashCode *namespace, 281 const GNUNET_HashCode * namespace,
286 const struct GNUNET_PeerIdentity *target, 282 const struct GNUNET_PeerIdentity *target,
287 const char *bf_data, 283 const char *bf_data,
288 size_t bf_size, 284 size_t bf_size,
289 uint32_t mingle, 285 uint32_t mingle,
290 uint32_t anonymity_level, 286 uint32_t anonymity_level,
291 uint32_t priority, 287 uint32_t priority,
292 int32_t ttl, 288 int32_t ttl,
293 GNUNET_PEER_Id sender_pid, 289 GNUNET_PEER_Id sender_pid,
294 const GNUNET_HashCode *replies_seen, 290 const GNUNET_HashCode * replies_seen,
295 unsigned int replies_seen_count, 291 unsigned int replies_seen_count,
296 GSF_PendingRequestReplyHandler rh, 292 GSF_PendingRequestReplyHandler rh, void *rh_cls)
297 void *rh_cls)
298{ 293{
299 struct GSF_PendingRequest *pr; 294 struct GSF_PendingRequest *pr;
300 struct GSF_PendingRequest *dpr; 295 struct GSF_PendingRequest *dpr;
301 296
302#if DEBUG_FS > 1 297#if DEBUG_FS > 1
303 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 298 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
304 "Creating request handle for `%s' of type %d\n", 299 "Creating request handle for `%s' of type %d\n",
305 GNUNET_h2s (query), 300 GNUNET_h2s (query), type);
306 type); 301#endif
307#endif
308 GNUNET_STATISTICS_update (GSF_stats, 302 GNUNET_STATISTICS_update (GSF_stats,
309 gettext_noop ("# Pending requests created"), 303 gettext_noop ("# Pending requests created"),
310 1, 304 1, GNUNET_NO);
311 GNUNET_NO);
312 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest)); 305 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
313 pr->local_result_offset = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, 306 pr->local_result_offset =
314 UINT64_MAX); 307 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
315 pr->public_data.query = *query; 308 pr->public_data.query = *query;
316 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type) 309 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
317 { 310 {
318 GNUNET_assert (NULL != namespace); 311 GNUNET_assert (NULL != namespace);
319 pr->public_data.namespace = *namespace; 312 pr->public_data.namespace = *namespace;
320 } 313 }
321 if (NULL != target) 314 if (NULL != target)
322 { 315 {
323 pr->public_data.target = *target; 316 pr->public_data.target = *target;
324 pr->public_data.has_target = GNUNET_YES; 317 pr->public_data.has_target = GNUNET_YES;
325 } 318 }
326 pr->public_data.anonymity_level = anonymity_level; 319 pr->public_data.anonymity_level = anonymity_level;
327 pr->public_data.priority = priority; 320 pr->public_data.priority = priority;
328 pr->public_data.original_priority = priority; 321 pr->public_data.original_priority = priority;
329 pr->public_data.options = options; 322 pr->public_data.options = options;
330 pr->public_data.type = type; 323 pr->public_data.type = type;
331 pr->public_data.start_time = GNUNET_TIME_absolute_get (); 324 pr->public_data.start_time = GNUNET_TIME_absolute_get ();
332 pr->sender_pid = sender_pid; 325 pr->sender_pid = sender_pid;
333 pr->rh = rh; 326 pr->rh = rh;
334 pr->rh_cls = rh_cls; 327 pr->rh_cls = rh_cls;
335 GNUNET_assert ( (sender_pid != 0) || 328 GNUNET_assert ((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY)));
336 (0 == (options & GSF_PRO_FORWARD_ONLY)) );
337 if (ttl >= 0) 329 if (ttl >= 0)
338 pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 330 pr->public_data.ttl =
339 (uint32_t) ttl)); 331 GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply
332 (GNUNET_TIME_UNIT_SECONDS,
333 (uint32_t) ttl));
340 else 334 else
341 pr->public_data.ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time, 335 pr->public_data.ttl =
342 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 336 GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
343 (uint32_t) (- ttl))); 337 GNUNET_TIME_relative_multiply
338 (GNUNET_TIME_UNIT_SECONDS,
339 (uint32_t) (-ttl)));
344 if (replies_seen_count > 0) 340 if (replies_seen_count > 0)
345 { 341 {
346 pr->replies_seen_size = replies_seen_count; 342 pr->replies_seen_size = replies_seen_count;
347 pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size); 343 pr->replies_seen =
348 memcpy (pr->replies_seen, 344 GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
349 replies_seen, 345 memcpy (pr->replies_seen, replies_seen,
350 replies_seen_count * sizeof (GNUNET_HashCode)); 346 replies_seen_count * sizeof (GNUNET_HashCode));
351 pr->replies_seen_count = replies_seen_count; 347 pr->replies_seen_count = replies_seen_count;
352 } 348 }
353 if (NULL != bf_data) 349 if (NULL != bf_data)
354 { 350 {
355 pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data, 351 pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
356 bf_size, 352 bf_size, BLOOMFILTER_K);
357 BLOOMFILTER_K); 353 pr->mingle = mingle;
358 pr->mingle = mingle; 354 }
359 } 355 else if ((replies_seen_count > 0) &&
360 else if ( (replies_seen_count > 0) && 356 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)))
361 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) ) 357 {
362 { 358 GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
363 GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr)); 359 }
364 }
365 GNUNET_CONTAINER_multihashmap_put (pr_map, 360 GNUNET_CONTAINER_multihashmap_put (pr_map,
366 query, 361 query,
367 pr, 362 pr,
368 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 363 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
369 if (0 != (options & GSF_PRO_REQUEST_EXPIRES)) 364 if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
365 {
366 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
367 pr,
368 pr->public_data.ttl.abs_value);
369 /* make sure we don't track too many requests */
370 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
371 max_pending_requests)
370 { 372 {
371 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, 373 dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
372 pr, 374 GNUNET_assert (dpr != NULL);
373 pr->public_data.ttl.abs_value); 375 if (pr == dpr)
374 /* make sure we don't track too many requests */ 376 break; /* let the request live briefly... */
375 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests) 377 dpr->rh (dpr->rh_cls,
376 { 378 GNUNET_BLOCK_EVALUATION_REQUEST_VALID,
377 dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); 379 dpr,
378 GNUNET_assert (dpr != NULL); 380 UINT32_MAX,
379 if (pr == dpr) 381 GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_BLOCK_TYPE_ANY, NULL, 0);
380 break; /* let the request live briefly... */ 382 GSF_pending_request_cancel_ (dpr, GNUNET_YES);
381 dpr->rh (dpr->rh_cls,
382 GNUNET_BLOCK_EVALUATION_REQUEST_VALID,
383 dpr,
384 UINT32_MAX,
385 GNUNET_TIME_UNIT_FOREVER_ABS,
386 GNUNET_BLOCK_TYPE_ANY,
387 NULL, 0);
388 GSF_pending_request_cancel_ (dpr, GNUNET_YES);
389 }
390 } 383 }
384 }
391 GNUNET_STATISTICS_update (GSF_stats, 385 GNUNET_STATISTICS_update (GSF_stats,
392 gettext_noop ("# Pending requests active"), 386 gettext_noop ("# Pending requests active"),
393 1, 387 1, GNUNET_NO);
394 GNUNET_NO);
395 return pr; 388 return pr;
396} 389}
397 390
@@ -420,16 +413,15 @@ GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
420 */ 413 */
421int 414int
422GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra, 415GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
423 struct GSF_PendingRequest *prb) 416 struct GSF_PendingRequest *prb)
424{ 417{
425 if ( (pra->public_data.type != prb->public_data.type) || 418 if ((pra->public_data.type != prb->public_data.type) ||
426 (0 != memcmp (&pra->public_data.query, 419 (0 != memcmp (&pra->public_data.query,
427 &prb->public_data.query, 420 &prb->public_data.query,
428 sizeof (GNUNET_HashCode))) || 421 sizeof (GNUNET_HashCode))) ||
429 ( (pra->public_data.type == GNUNET_BLOCK_TYPE_FS_SBLOCK) && 422 ((pra->public_data.type == GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
430 (0 != memcmp (&pra->public_data.namespace, 423 (0 != memcmp (&pra->public_data.namespace,
431 &prb->public_data.namespace, 424 &prb->public_data.namespace, sizeof (GNUNET_HashCode)))))
432 sizeof (GNUNET_HashCode))) ) )
433 return GNUNET_NO; 425 return GNUNET_NO;
434 return GNUNET_OK; 426 return GNUNET_OK;
435} 427}
@@ -446,57 +438,53 @@ GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
446 */ 438 */
447void 439void
448GSF_pending_request_update_ (struct GSF_PendingRequest *pr, 440GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
449 const GNUNET_HashCode *replies_seen, 441 const GNUNET_HashCode * replies_seen,
450 unsigned int replies_seen_count) 442 unsigned int replies_seen_count)
451{ 443{
452 unsigned int i; 444 unsigned int i;
453 GNUNET_HashCode mhash; 445 GNUNET_HashCode mhash;
454 446
455 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count) 447 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
456 return; /* integer overflow */ 448 return; /* integer overflow */
457 if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) 449 if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
450 {
451 /* we're responsible for the BF, full refresh */
452 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
453 GNUNET_array_grow (pr->replies_seen,
454 pr->replies_seen_size,
455 replies_seen_count + pr->replies_seen_count);
456 memcpy (&pr->replies_seen[pr->replies_seen_count],
457 replies_seen, sizeof (GNUNET_HashCode) * replies_seen_count);
458 pr->replies_seen_count += replies_seen_count;
459 if (GNUNET_NO == refresh_bloomfilter (pr))
458 { 460 {
459 /* we're responsible for the BF, full refresh */ 461 /* bf not recalculated, simply extend it with new bits */
460 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size) 462 for (i = 0; i < replies_seen_count; i++)
461 GNUNET_array_grow (pr->replies_seen, 463 {
462 pr->replies_seen_size, 464 GNUNET_BLOCK_mingle_hash (&replies_seen[i], pr->mingle, &mhash);
463 replies_seen_count + pr->replies_seen_count); 465 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
464 memcpy (&pr->replies_seen[pr->replies_seen_count], 466 }
465 replies_seen,
466 sizeof (GNUNET_HashCode) * replies_seen_count);
467 pr->replies_seen_count += replies_seen_count;
468 if (GNUNET_NO == refresh_bloomfilter (pr))
469 {
470 /* bf not recalculated, simply extend it with new bits */
471 for (i=0;i<replies_seen_count;i++)
472 {
473 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
474 pr->mingle,
475 &mhash);
476 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
477 }
478 }
479 } 467 }
468 }
480 else 469 else
470 {
471 if (NULL == pr->bf)
472 {
473 /* we're not the initiator, but the initiator did not give us
474 * any bloom-filter, so we need to create one on-the-fly */
475 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
476 UINT32_MAX);
477 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
478 compute_bloomfilter_size
479 (replies_seen_count),
480 BLOOMFILTER_K);
481 }
482 for (i = 0; i < pr->replies_seen_count; i++)
481 { 483 {
482 if (NULL == pr->bf) 484 GNUNET_BLOCK_mingle_hash (&replies_seen[i], pr->mingle, &mhash);
483 { 485 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
484 /* we're not the initiator, but the initiator did not give us
485 any bloom-filter, so we need to create one on-the-fly */
486 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
487 UINT32_MAX);
488 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
489 compute_bloomfilter_size (replies_seen_count),
490 BLOOMFILTER_K);
491 }
492 for (i=0;i<pr->replies_seen_count;i++)
493 {
494 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
495 pr->mingle,
496 &mhash);
497 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
498 }
499 } 486 }
487 }
500} 488}
501 489
502 490
@@ -511,8 +499,7 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
511 */ 499 */
512size_t 500size_t
513GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr, 501GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
514 size_t buf_size, 502 size_t buf_size, void *buf)
515 void *buf)
516{ 503{
517 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE]; 504 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
518 struct GetMessage *gm; 505 struct GetMessage *gm;
@@ -529,45 +516,44 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
529#if DEBUG_FS 516#if DEBUG_FS
530 if (buf_size > 0) 517 if (buf_size > 0)
531 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
532 "Building request message for `%s' of type %d\n", 519 "Building request message for `%s' of type %d\n",
533 GNUNET_h2s (&pr->public_data.query), 520 GNUNET_h2s (&pr->public_data.query), pr->public_data.type);
534 pr->public_data.type); 521#endif
535#endif
536 k = 0; 522 k = 0;
537 bm = 0; 523 bm = 0;
538 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); 524 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
539 if ( (! do_route) && (pr->sender_pid == 0)) 525 if ((!do_route) && (pr->sender_pid == 0))
540 { 526 {
541 GNUNET_break (0); 527 GNUNET_break (0);
542 do_route = GNUNET_YES; 528 do_route = GNUNET_YES;
543 } 529 }
544 if (! do_route) 530 if (!do_route)
545 { 531 {
546 bm |= GET_MESSAGE_BIT_RETURN_TO; 532 bm |= GET_MESSAGE_BIT_RETURN_TO;
547 k++; 533 k++;
548 } 534 }
549 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) 535 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
550 { 536 {
551 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE; 537 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
552 k++; 538 k++;
553 } 539 }
554 if (GNUNET_YES == pr->public_data.has_target) 540 if (GNUNET_YES == pr->public_data.has_target)
555 { 541 {
556 bm |= GET_MESSAGE_BIT_TRANSMIT_TO; 542 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
557 k++; 543 k++;
558 } 544 }
559 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf); 545 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
560 msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode); 546 msize = sizeof (struct GetMessage) + bf_size + k * sizeof (GNUNET_HashCode);
561 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); 547 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
562 if (buf_size < msize) 548 if (buf_size < msize)
563 return msize; 549 return msize;
564 gm = (struct GetMessage*) lbuf; 550 gm = (struct GetMessage *) lbuf;
565 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); 551 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
566 gm->header.size = htons (msize); 552 gm->header.size = htons (msize);
567 gm->type = htonl (pr->public_data.type); 553 gm->type = htonl (pr->public_data.type);
568 if (do_route) 554 if (do_route)
569 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 555 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
570 pr->public_data.priority + 1); 556 pr->public_data.priority + 1);
571 else 557 else
572 prio = 0; 558 prio = 0;
573 pr->public_data.priority -= prio; 559 pr->public_data.priority -= prio;
@@ -575,25 +561,23 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
575 now = GNUNET_TIME_absolute_get (); 561 now = GNUNET_TIME_absolute_get ();
576 ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value); 562 ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value);
577 gm->ttl = htonl (ttl / 1000); 563 gm->ttl = htonl (ttl / 1000);
578 gm->filter_mutator = htonl(pr->mingle); 564 gm->filter_mutator = htonl (pr->mingle);
579 gm->hash_bitmap = htonl (bm); 565 gm->hash_bitmap = htonl (bm);
580 gm->query = pr->public_data.query; 566 gm->query = pr->public_data.query;
581 ext = (GNUNET_HashCode*) &gm[1]; 567 ext = (GNUNET_HashCode *) & gm[1];
582 k = 0; 568 k = 0;
583 if (! do_route) 569 if (!do_route)
584 GNUNET_PEER_resolve (pr->sender_pid, 570 GNUNET_PEER_resolve (pr->sender_pid,
585 (struct GNUNET_PeerIdentity*) &ext[k++]); 571 (struct GNUNET_PeerIdentity *) &ext[k++]);
586 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) 572 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
587 memcpy (&ext[k++], 573 memcpy (&ext[k++], &pr->public_data.namespace, sizeof (GNUNET_HashCode));
588 &pr->public_data.namespace,
589 sizeof (GNUNET_HashCode));
590 if (GNUNET_YES == pr->public_data.has_target) 574 if (GNUNET_YES == pr->public_data.has_target)
591 ext[k++] = pr->public_data.target.hashPubKey; 575 ext[k++] = pr->public_data.target.hashPubKey;
592 if (pr->bf != NULL) 576 if (pr->bf != NULL)
593 GNUNET_assert (GNUNET_SYSERR != 577 GNUNET_assert (GNUNET_SYSERR !=
594 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, 578 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
595 (char*) &ext[k], 579 (char *) &ext[k],
596 bf_size)); 580 bf_size));
597 memcpy (buf, gm, msize); 581 memcpy (buf, gm, msize);
598 return msize; 582 return msize;
599} 583}
@@ -607,63 +591,57 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
607 * @param value value in the hash map (pending request) 591 * @param value value in the hash map (pending request)
608 * @return GNUNET_YES (we should continue to iterate) 592 * @return GNUNET_YES (we should continue to iterate)
609 */ 593 */
610static int 594static int
611clean_request (void *cls, 595clean_request (void *cls, const GNUNET_HashCode * key, void *value)
612 const GNUNET_HashCode * key,
613 void *value)
614{ 596{
615 struct GSF_PendingRequest *pr = value; 597 struct GSF_PendingRequest *pr = value;
616 GSF_LocalLookupContinuation cont; 598 GSF_LocalLookupContinuation cont;
617 599
618#if DEBUG_FS 600#if DEBUG_FS
619 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 601 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
620 "Cleaning up pending request for `%s'.\n", 602 "Cleaning up pending request for `%s'.\n", GNUNET_h2s (key));
621 GNUNET_h2s (key)); 603#endif
622#endif
623 if (NULL != (cont = pr->llc_cont)) 604 if (NULL != (cont = pr->llc_cont))
624 { 605 {
625 pr->llc_cont = NULL; 606 pr->llc_cont = NULL;
626 cont (pr->llc_cont_cls, 607 cont (pr->llc_cont_cls, pr, pr->local_result);
627 pr, 608 }
628 pr->local_result);
629 }
630 GSF_plan_notify_request_done_ (pr); 609 GSF_plan_notify_request_done_ (pr);
631 GNUNET_free_non_null (pr->replies_seen); 610 GNUNET_free_non_null (pr->replies_seen);
632 if (NULL != pr->bf) 611 if (NULL != pr->bf)
633 { 612 {
634 GNUNET_CONTAINER_bloomfilter_free (pr->bf); 613 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
635 pr->bf = NULL; 614 pr->bf = NULL;
636 } 615 }
637 GNUNET_PEER_change_rc (pr->sender_pid, -1); 616 GNUNET_PEER_change_rc (pr->sender_pid, -1);
638 pr->sender_pid = 0; 617 pr->sender_pid = 0;
639 if (NULL != pr->hnode) 618 if (NULL != pr->hnode)
640 { 619 {
641 GNUNET_CONTAINER_heap_remove_node (pr->hnode); 620 GNUNET_CONTAINER_heap_remove_node (pr->hnode);
642 pr->hnode = NULL; 621 pr->hnode = NULL;
643 } 622 }
644 if (NULL != pr->qe) 623 if (NULL != pr->qe)
645 { 624 {
646 GNUNET_DATASTORE_cancel (pr->qe); 625 GNUNET_DATASTORE_cancel (pr->qe);
647 pr->qe = NULL; 626 pr->qe = NULL;
648 } 627 }
649 if (NULL != pr->gh) 628 if (NULL != pr->gh)
650 { 629 {
651 GNUNET_DHT_get_stop (pr->gh); 630 GNUNET_DHT_get_stop (pr->gh);
652 pr->gh = NULL; 631 pr->gh = NULL;
653 } 632 }
654 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) 633 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
655 { 634 {
656 GNUNET_SCHEDULER_cancel (pr->warn_task); 635 GNUNET_SCHEDULER_cancel (pr->warn_task);
657 pr->warn_task = GNUNET_SCHEDULER_NO_TASK; 636 pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
658 } 637 }
659 GNUNET_assert (GNUNET_OK == 638 GNUNET_assert (GNUNET_OK ==
660 GNUNET_CONTAINER_multihashmap_remove (pr_map, 639 GNUNET_CONTAINER_multihashmap_remove (pr_map,
661 &pr->public_data.query, 640 &pr->public_data.query,
662 pr)); 641 pr));
663 GNUNET_STATISTICS_update (GSF_stats, 642 GNUNET_STATISTICS_update (GSF_stats,
664 gettext_noop ("# Pending requests active"), 643 gettext_noop ("# Pending requests active"),
665 -1, 644 -1, GNUNET_NO);
666 GNUNET_NO);
667 GNUNET_free (pr); 645 GNUNET_free (pr);
668 return GNUNET_YES; 646 return GNUNET_YES;
669} 647}
@@ -676,46 +654,43 @@ clean_request (void *cls,
676 * @param full_cleanup fully purge the request 654 * @param full_cleanup fully purge the request
677 */ 655 */
678void 656void
679GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, 657GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
680 int full_cleanup)
681{ 658{
682 GSF_LocalLookupContinuation cont; 659 GSF_LocalLookupContinuation cont;
683 660
684 if (NULL == pr_map) 661 if (NULL == pr_map)
685 return; /* already cleaned up! */ 662 return; /* already cleaned up! */
686 if (GNUNET_YES != full_cleanup) 663 if (GNUNET_YES != full_cleanup)
664 {
665 /* make request inactive (we're no longer interested in more results),
666 * but do NOT remove from our data-structures, we still need it there
667 * to prevent the request from looping */
668 pr->rh = NULL;
669 if (NULL != (cont = pr->llc_cont))
670 {
671 pr->llc_cont = NULL;
672 cont (pr->llc_cont_cls, pr, pr->local_result);
673 }
674 GSF_plan_notify_request_done_ (pr);
675 if (NULL != pr->qe)
676 {
677 GNUNET_DATASTORE_cancel (pr->qe);
678 pr->qe = NULL;
679 }
680 if (NULL != pr->gh)
687 { 681 {
688 /* make request inactive (we're no longer interested in more results), 682 GNUNET_DHT_get_stop (pr->gh);
689 but do NOT remove from our data-structures, we still need it there 683 pr->gh = NULL;
690 to prevent the request from looping */ 684 }
691 pr->rh = NULL; 685 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
692 if (NULL != (cont = pr->llc_cont)) 686 {
693 { 687 GNUNET_SCHEDULER_cancel (pr->warn_task);
694 pr->llc_cont = NULL; 688 pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
695 cont (pr->llc_cont_cls,
696 pr,
697 pr->local_result);
698 }
699 GSF_plan_notify_request_done_ (pr);
700 if (NULL != pr->qe)
701 {
702 GNUNET_DATASTORE_cancel (pr->qe);
703 pr->qe = NULL;
704 }
705 if (NULL != pr->gh)
706 {
707 GNUNET_DHT_get_stop (pr->gh);
708 pr->gh = NULL;
709 }
710 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
711 {
712 GNUNET_SCHEDULER_cancel (pr->warn_task);
713 pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
714 }
715 return;
716 } 689 }
690 return;
691 }
717 GNUNET_assert (GNUNET_YES == 692 GNUNET_assert (GNUNET_YES ==
718 clean_request (NULL, &pr->public_data.query, pr)); 693 clean_request (NULL, &pr->public_data.query, pr));
719} 694}
720 695
721 696
@@ -726,12 +701,11 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr,
726 * @param cls closure for it 701 * @param cls closure for it
727 */ 702 */
728void 703void
729GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, 704GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, void *cls)
730 void *cls)
731{ 705{
732 GNUNET_CONTAINER_multihashmap_iterate (pr_map, 706 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
733 (GNUNET_CONTAINER_HashMapIterator) it, 707 (GNUNET_CONTAINER_HashMapIterator) it,
734 cls); 708 cls);
735} 709}
736 710
737 711
@@ -798,15 +772,14 @@ struct ProcessReplyClosure
798 */ 772 */
799static void 773static void
800update_request_performance_data (struct ProcessReplyClosure *prq, 774update_request_performance_data (struct ProcessReplyClosure *prq,
801 struct GSF_PendingRequest *pr) 775 struct GSF_PendingRequest *pr)
802{ 776{
803 if (prq->sender == NULL) 777 if (prq->sender == NULL)
804 return; 778 return;
805 GSF_peer_update_performance_ (prq->sender, 779 GSF_peer_update_performance_ (prq->sender,
806 pr->public_data.start_time, 780 pr->public_data.start_time, prq->priority);
807 prq->priority);
808} 781}
809 782
810 783
811/** 784/**
812 * We have received a reply; handle it! 785 * We have received a reply; handle it!
@@ -817,9 +790,7 @@ update_request_performance_data (struct ProcessReplyClosure *prq,
817 * @return GNUNET_YES (we should continue to iterate) 790 * @return GNUNET_YES (we should continue to iterate)
818 */ 791 */
819static int 792static int
820process_reply (void *cls, 793process_reply (void *cls, const GNUNET_HashCode * key, void *value)
821 const GNUNET_HashCode *key,
822 void *value)
823{ 794{
824 struct ProcessReplyClosure *prq = cls; 795 struct ProcessReplyClosure *prq = cls;
825 struct GSF_PendingRequest *pr = value; 796 struct GSF_PendingRequest *pr = value;
@@ -829,88 +800,81 @@ process_reply (void *cls,
829 return GNUNET_YES; 800 return GNUNET_YES;
830#if DEBUG_FS 801#if DEBUG_FS
831 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 802 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
832 "Matched result (type %u) for query `%s' with pending request\n", 803 "Matched result (type %u) for query `%s' with pending request\n",
833 (unsigned int) prq->type, 804 (unsigned int) prq->type, GNUNET_h2s (key));
834 GNUNET_h2s (key)); 805#endif
835#endif
836 GNUNET_STATISTICS_update (GSF_stats, 806 GNUNET_STATISTICS_update (GSF_stats,
837 gettext_noop ("# replies received and matched"), 807 gettext_noop ("# replies received and matched"),
838 1, 808 1, GNUNET_NO);
839 GNUNET_NO);
840 prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx, 809 prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx,
841 prq->type, 810 prq->type,
842 key, 811 key,
843 &pr->bf, 812 &pr->bf,
844 pr->mingle, 813 pr->mingle,
845 &pr->public_data.namespace, 814 &pr->public_data.namespace,
846 (prq->type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof (GNUNET_HashCode) : 0, 815 (prq->type ==
847 prq->data, 816 GNUNET_BLOCK_TYPE_FS_SBLOCK) ?
848 prq->size); 817 sizeof (GNUNET_HashCode) : 0, prq->data,
818 prq->size);
849 switch (prq->eval) 819 switch (prq->eval)
850 { 820 {
851 case GNUNET_BLOCK_EVALUATION_OK_MORE: 821 case GNUNET_BLOCK_EVALUATION_OK_MORE:
852 update_request_performance_data (prq, pr); 822 update_request_performance_data (prq, pr);
853 break; 823 break;
854 case GNUNET_BLOCK_EVALUATION_OK_LAST: 824 case GNUNET_BLOCK_EVALUATION_OK_LAST:
855 /* short cut: stop processing early, no BF-update, etc. */ 825 /* short cut: stop processing early, no BF-update, etc. */
856 update_request_performance_data (prq, pr); 826 update_request_performance_data (prq, pr);
857 GNUNET_LOAD_update (GSF_rt_entry_lifetime, 827 GNUNET_LOAD_update (GSF_rt_entry_lifetime,
858 GNUNET_TIME_absolute_get_duration (pr->public_data.start_time).rel_value); 828 GNUNET_TIME_absolute_get_duration (pr->
859 /* pass on to other peers / local clients */ 829 public_data.start_time).rel_value);
860 pr->rh (pr->rh_cls, 830 /* pass on to other peers / local clients */
861 prq->eval, 831 pr->rh (pr->rh_cls,
862 pr, 832 prq->eval,
863 prq->anonymity_level, 833 pr,
864 prq->expiration, 834 prq->anonymity_level,
865 prq->type, 835 prq->expiration, prq->type, prq->data, prq->size);
866 prq->data, prq->size); 836 return GNUNET_YES;
867 return GNUNET_YES; 837 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
868 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: 838 GNUNET_STATISTICS_update (GSF_stats,
869 GNUNET_STATISTICS_update (GSF_stats, 839 gettext_noop
870 gettext_noop ("# duplicate replies discarded (bloomfilter)"), 840 ("# duplicate replies discarded (bloomfilter)"),
871 1, 841 1, GNUNET_NO);
872 GNUNET_NO);
873#if DEBUG_FS && 0 842#if DEBUG_FS && 0
874 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 843 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
875 "Duplicate response `%s', discarding.\n", 844 "Duplicate response `%s', discarding.\n", GNUNET_h2s (&mhash));
876 GNUNET_h2s (&mhash));
877#endif 845#endif
878 return GNUNET_YES; /* duplicate */ 846 return GNUNET_YES; /* duplicate */
879 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: 847 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
880 return GNUNET_YES; /* wrong namespace */ 848 return GNUNET_YES; /* wrong namespace */
881 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: 849 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
882 GNUNET_break (0); 850 GNUNET_break (0);
883 return GNUNET_YES; 851 return GNUNET_YES;
884 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: 852 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
885 GNUNET_break (0); 853 GNUNET_break (0);
886 return GNUNET_YES; 854 return GNUNET_YES;
887 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: 855 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
888 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 856 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
889 _("Unsupported block type %u\n"), 857 _("Unsupported block type %u\n"), prq->type);
890 prq->type); 858 return GNUNET_NO;
891 return GNUNET_NO; 859 }
892 }
893 /* update bloomfilter */ 860 /* update bloomfilter */
894 GNUNET_CRYPTO_hash (prq->data, 861 GNUNET_CRYPTO_hash (prq->data, prq->size, &chash);
895 prq->size,
896 &chash);
897 GSF_pending_request_update_ (pr, &chash, 1); 862 GSF_pending_request_update_ (pr, &chash, 1);
898 if (NULL == prq->sender) 863 if (NULL == prq->sender)
899 { 864 {
900#if DEBUG_FS 865#if DEBUG_FS
901 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 866 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
902 "Found result for query `%s' in local datastore\n", 867 "Found result for query `%s' in local datastore\n",
903 GNUNET_h2s (key)); 868 GNUNET_h2s (key));
904#endif 869#endif
905 GNUNET_STATISTICS_update (GSF_stats, 870 GNUNET_STATISTICS_update (GSF_stats,
906 gettext_noop ("# results found locally"), 871 gettext_noop ("# results found locally"),
907 1, 872 1, GNUNET_NO);
908 GNUNET_NO); 873 }
909 }
910 else 874 else
911 { 875 {
912 GSF_dht_lookup_ (pr); 876 GSF_dht_lookup_ (pr);
913 } 877 }
914 prq->priority += pr->public_data.original_priority; 878 prq->priority += pr->public_data.original_priority;
915 pr->public_data.priority = 0; 879 pr->public_data.priority = 0;
916 pr->public_data.original_priority = 0; 880 pr->public_data.original_priority = 0;
@@ -918,12 +882,10 @@ process_reply (void *cls,
918 prq->request_found = GNUNET_YES; 882 prq->request_found = GNUNET_YES;
919 /* finally, pass on to other peer / local client */ 883 /* finally, pass on to other peer / local client */
920 pr->rh (pr->rh_cls, 884 pr->rh (pr->rh_cls,
921 prq->eval, 885 prq->eval,
922 pr, 886 pr,
923 prq->anonymity_level, 887 prq->anonymity_level,
924 prq->expiration, 888 prq->expiration, prq->type, prq->data, prq->size);
925 prq->type,
926 prq->data, prq->size);
927 return GNUNET_YES; 889 return GNUNET_YES;
928} 890}
929 891
@@ -960,52 +922,50 @@ struct PutMigrationContext
960 * @param success GNUNET_SYSERR on failure 922 * @param success GNUNET_SYSERR on failure
961 * @param msg NULL on success, otherwise an error message 923 * @param msg NULL on success, otherwise an error message
962 */ 924 */
963static void 925static void
964put_migration_continuation (void *cls, 926put_migration_continuation (void *cls, int success, const char *msg)
965 int success,
966 const char *msg)
967{ 927{
968 struct PutMigrationContext *pmc = cls; 928 struct PutMigrationContext *pmc = cls;
969 struct GNUNET_TIME_Relative delay; 929 struct GNUNET_TIME_Relative delay;
970 struct GNUNET_TIME_Relative block_time; 930 struct GNUNET_TIME_Relative block_time;
971 struct GSF_ConnectedPeer *cp; 931 struct GSF_ConnectedPeer *cp;
972 struct GSF_PeerPerformanceData *ppd; 932 struct GSF_PeerPerformanceData *ppd;
973 933
974 delay = GNUNET_TIME_absolute_get_duration (pmc->start); 934 delay = GNUNET_TIME_absolute_get_duration (pmc->start);
975 cp = GSF_peer_get_ (&pmc->origin); 935 cp = GSF_peer_get_ (&pmc->origin);
976 if ( (GNUNET_OK != success) && 936 if ((GNUNET_OK != success) && (GNUNET_NO == pmc->requested))
977 (GNUNET_NO == pmc->requested) ) 937 {
938 /* block migration for a bit... */
939 if (NULL != cp)
978 { 940 {
979 /* block migration for a bit... */ 941 ppd = GSF_get_peer_performance_data_ (cp);
980 if (NULL != cp) 942 ppd->migration_duplication++;
981 { 943 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
982 ppd = GSF_get_peer_performance_data_ (cp); 944 5 *
983 ppd->migration_duplication++; 945 ppd->migration_duplication +
984 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 946 GNUNET_CRYPTO_random_u32
985 5 * ppd->migration_duplication + 947 (GNUNET_CRYPTO_QUALITY_WEAK,
986 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5)); 948 5));
987 GSF_block_peer_migration_ (cp, block_time); 949 GSF_block_peer_migration_ (cp, block_time);
988 }
989 } 950 }
951 }
990 else 952 else
953 {
954 if (NULL != cp)
991 { 955 {
992 if (NULL != cp) 956 ppd = GSF_get_peer_performance_data_ (cp);
993 { 957 ppd->migration_duplication = 0; /* reset counter */
994 ppd = GSF_get_peer_performance_data_ (cp);
995 ppd->migration_duplication = 0; /* reset counter */
996 }
997 } 958 }
959 }
998 GNUNET_free (pmc); 960 GNUNET_free (pmc);
999 /* FIXME: should we really update the load value on failure? */ 961 /* FIXME: should we really update the load value on failure? */
1000 if (NULL != datastore_put_load) 962 if (NULL != datastore_put_load)
1001 GNUNET_LOAD_update (datastore_put_load, 963 GNUNET_LOAD_update (datastore_put_load, delay.rel_value);
1002 delay.rel_value);
1003 if (GNUNET_OK == success) 964 if (GNUNET_OK == success)
1004 return; 965 return;
1005 GNUNET_STATISTICS_update (GSF_stats, 966 GNUNET_STATISTICS_update (GSF_stats,
1006 gettext_noop ("# Datastore `PUT' failures"), 967 gettext_noop ("# Datastore `PUT' failures"),
1007 1, 968 1, GNUNET_NO);
1008 GNUNET_NO);
1009} 969}
1010 970
1011 971
@@ -1025,14 +985,14 @@ test_put_load_too_high (uint32_t priority)
1025 if (NULL == datastore_put_load) 985 if (NULL == datastore_put_load)
1026 return GNUNET_NO; 986 return GNUNET_NO;
1027 if (GNUNET_LOAD_get_average (datastore_put_load) < 50) 987 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
1028 return GNUNET_NO; /* very fast */ 988 return GNUNET_NO; /* very fast */
1029 ld = GNUNET_LOAD_get_load (datastore_put_load); 989 ld = GNUNET_LOAD_get_load (datastore_put_load);
1030 if (ld < 2.0 * (1 + priority)) 990 if (ld < 2.0 * (1 + priority))
1031 return GNUNET_NO; 991 return GNUNET_NO;
1032 GNUNET_STATISTICS_update (GSF_stats, 992 GNUNET_STATISTICS_update (GSF_stats,
1033 gettext_noop ("# storage requests dropped due to high load"), 993 gettext_noop
1034 1, 994 ("# storage requests dropped due to high load"), 1,
1035 GNUNET_NO); 995 GNUNET_NO);
1036 return GNUNET_YES; 996 return GNUNET_YES;
1037} 997}
1038 998
@@ -1054,54 +1014,49 @@ test_put_load_too_high (uint32_t priority)
1054 */ 1014 */
1055static void 1015static void
1056handle_dht_reply (void *cls, 1016handle_dht_reply (void *cls,
1057 struct GNUNET_TIME_Absolute exp, 1017 struct GNUNET_TIME_Absolute exp,
1058 const GNUNET_HashCode *key, 1018 const GNUNET_HashCode * key,
1059 const struct GNUNET_PeerIdentity * const *get_path, 1019 const struct GNUNET_PeerIdentity *const *get_path,
1060 const struct GNUNET_PeerIdentity * const *put_path, 1020 const struct GNUNET_PeerIdentity *const *put_path,
1061 enum GNUNET_BLOCK_Type type, 1021 enum GNUNET_BLOCK_Type type, size_t size, const void *data)
1062 size_t size,
1063 const void *data)
1064{ 1022{
1065 struct GSF_PendingRequest *pr = cls; 1023 struct GSF_PendingRequest *pr = cls;
1066 struct ProcessReplyClosure prq; 1024 struct ProcessReplyClosure prq;
1067 struct PutMigrationContext *pmc; 1025 struct PutMigrationContext *pmc;
1068 1026
1069 GNUNET_STATISTICS_update (GSF_stats, 1027 GNUNET_STATISTICS_update (GSF_stats,
1070 gettext_noop ("# Replies received from DHT"), 1028 gettext_noop ("# Replies received from DHT"),
1071 1, 1029 1, GNUNET_NO);
1072 GNUNET_NO);
1073 memset (&prq, 0, sizeof (prq)); 1030 memset (&prq, 0, sizeof (prq));
1074 prq.data = data; 1031 prq.data = data;
1075 prq.expiration = exp; 1032 prq.expiration = exp;
1076 prq.size = size; 1033 prq.size = size;
1077 prq.type = type; 1034 prq.type = type;
1078 process_reply (&prq, key, pr); 1035 process_reply (&prq, key, pr);
1079 if ( (GNUNET_YES == active_to_migration) && 1036 if ((GNUNET_YES == active_to_migration) &&
1080 (GNUNET_NO == test_put_load_too_high (prq.priority)) ) 1037 (GNUNET_NO == test_put_load_too_high (prq.priority)))
1081 { 1038 {
1082#if DEBUG_FS 1039#if DEBUG_FS
1083 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1040 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1084 "Replicating result for query `%s' with priority %u\n", 1041 "Replicating result for query `%s' with priority %u\n",
1085 GNUNET_h2s (key), 1042 GNUNET_h2s (key), prq.priority);
1086 prq.priority);
1087#endif 1043#endif
1088 pmc = GNUNET_malloc (sizeof (struct PutMigrationContext)); 1044 pmc = GNUNET_malloc (sizeof (struct PutMigrationContext));
1089 pmc->start = GNUNET_TIME_absolute_get (); 1045 pmc->start = GNUNET_TIME_absolute_get ();
1090 pmc->requested = GNUNET_YES; 1046 pmc->requested = GNUNET_YES;
1091 if (NULL == 1047 if (NULL ==
1092 GNUNET_DATASTORE_put (GSF_dsh, 1048 GNUNET_DATASTORE_put (GSF_dsh,
1093 0, key, size, data, 1049 0, key, size, data,
1094 type, prq.priority, 1 /* anonymity */, 1050 type, prq.priority, 1 /* anonymity */ ,
1095 0 /* replication */, 1051 0 /* replication */ ,
1096 exp, 1052 exp,
1097 1 + prq.priority, MAX_DATASTORE_QUEUE, 1053 1 + prq.priority, MAX_DATASTORE_QUEUE,
1098 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 1054 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1099 &put_migration_continuation, 1055 &put_migration_continuation, pmc))
1100 pmc)) 1056 {
1101 { 1057 put_migration_continuation (pmc, GNUNET_NO, NULL);
1102 put_migration_continuation (pmc, GNUNET_NO, NULL);
1103 }
1104 } 1058 }
1059 }
1105} 1060}
1106 1061
1107 1062
@@ -1121,38 +1076,34 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1121 if (0 != pr->public_data.anonymity_level) 1076 if (0 != pr->public_data.anonymity_level)
1122 return; 1077 return;
1123 if (NULL != pr->gh) 1078 if (NULL != pr->gh)
1124 { 1079 {
1125 GNUNET_DHT_get_stop (pr->gh); 1080 GNUNET_DHT_get_stop (pr->gh);
1126 pr->gh = NULL; 1081 pr->gh = NULL;
1127 } 1082 }
1128 xquery = NULL; 1083 xquery = NULL;
1129 xquery_size = 0; 1084 xquery_size = 0;
1130 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type) 1085 if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
1131 { 1086 {
1132 xquery = buf; 1087 xquery = buf;
1133 memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode)); 1088 memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode));
1134 xquery_size = sizeof (GNUNET_HashCode); 1089 xquery_size = sizeof (GNUNET_HashCode);
1135 } 1090 }
1136 if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY)) 1091 if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
1137 { 1092 {
1138 GNUNET_assert (0 != pr->sender_pid); 1093 GNUNET_assert (0 != pr->sender_pid);
1139 GNUNET_PEER_resolve (pr->sender_pid, 1094 GNUNET_PEER_resolve (pr->sender_pid, &pi);
1140 &pi); 1095 memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity));
1141 memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity)); 1096 xquery_size += sizeof (struct GNUNET_PeerIdentity);
1142 xquery_size += sizeof (struct GNUNET_PeerIdentity); 1097 }
1143 }
1144 pr->gh = GNUNET_DHT_get_start (GSF_dht, 1098 pr->gh = GNUNET_DHT_get_start (GSF_dht,
1145 GNUNET_TIME_UNIT_FOREVER_REL, 1099 GNUNET_TIME_UNIT_FOREVER_REL,
1146 pr->public_data.type, 1100 pr->public_data.type,
1147 &pr->public_data.query, 1101 &pr->public_data.query,
1148 DEFAULT_GET_REPLICATION, 1102 DEFAULT_GET_REPLICATION,
1149 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, 1103 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1150 pr->bf, 1104 pr->bf,
1151 pr->mingle, 1105 pr->mingle,
1152 xquery, 1106 xquery, xquery_size, &handle_dht_reply, pr);
1153 xquery_size,
1154 &handle_dht_reply,
1155 pr);
1156} 1107}
1157 1108
1158 1109
@@ -1163,17 +1114,17 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1163 * @param tc task context 1114 * @param tc task context
1164 */ 1115 */
1165static void 1116static void
1166warn_delay_task (void *cls, 1117warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1167 const struct GNUNET_SCHEDULER_TaskContext *tc)
1168{ 1118{
1169 struct GSF_PendingRequest *pr = cls; 1119 struct GSF_PendingRequest *pr = cls;
1170 1120
1171 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1121 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1172 _("Datastore lookup already took %llu ms!\n"), 1122 _("Datastore lookup already took %llu ms!\n"),
1173 (unsigned long long) GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value); 1123 (unsigned long long)
1174 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1124 GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value);
1175 &warn_delay_task, 1125 pr->warn_task =
1176 pr); 1126 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_delay_task,
1127 pr);
1177} 1128}
1178 1129
1179 1130
@@ -1184,17 +1135,17 @@ warn_delay_task (void *cls,
1184 * @param tc task context 1135 * @param tc task context
1185 */ 1136 */
1186static void 1137static void
1187odc_warn_delay_task (void *cls, 1138odc_warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1188 const struct GNUNET_SCHEDULER_TaskContext *tc)
1189{ 1139{
1190 struct GSF_PendingRequest *pr = cls; 1140 struct GSF_PendingRequest *pr = cls;
1191 1141
1192 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1142 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1193 _("On-demand lookup already took %llu ms!\n"), 1143 _("On-demand lookup already took %llu ms!\n"),
1194 (unsigned long long) GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value); 1144 (unsigned long long)
1195 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1145 GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value);
1196 &odc_warn_delay_task, 1146 pr->warn_task =
1197 pr); 1147 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1148 &odc_warn_delay_task, pr);
1198} 1149}
1199 1150
1200 1151
@@ -1217,14 +1168,13 @@ odc_warn_delay_task (void *cls,
1217 */ 1168 */
1218static void 1169static void
1219process_local_reply (void *cls, 1170process_local_reply (void *cls,
1220 const GNUNET_HashCode *key, 1171 const GNUNET_HashCode * key,
1221 size_t size, 1172 size_t size,
1222 const void *data, 1173 const void *data,
1223 enum GNUNET_BLOCK_Type type, 1174 enum GNUNET_BLOCK_Type type,
1224 uint32_t priority, 1175 uint32_t priority,
1225 uint32_t anonymity, 1176 uint32_t anonymity,
1226 struct GNUNET_TIME_Absolute expiration, 1177 struct GNUNET_TIME_Absolute expiration, uint64_t uid)
1227 uint64_t uid)
1228{ 1178{
1229 struct GSF_PendingRequest *pr = cls; 1179 struct GSF_PendingRequest *pr = cls;
1230 GSF_LocalLookupContinuation cont; 1180 GSF_LocalLookupContinuation cont;
@@ -1235,236 +1185,228 @@ process_local_reply (void *cls,
1235 GNUNET_SCHEDULER_cancel (pr->warn_task); 1185 GNUNET_SCHEDULER_cancel (pr->warn_task);
1236 pr->warn_task = GNUNET_SCHEDULER_NO_TASK; 1186 pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
1237 if (NULL != pr->qe) 1187 if (NULL != pr->qe)
1188 {
1189 pr->qe = NULL;
1190 if (NULL == key)
1238 { 1191 {
1239 pr->qe = NULL; 1192 GNUNET_STATISTICS_update (GSF_stats,
1240 if (NULL == key) 1193 gettext_noop
1241 { 1194 ("# Datastore lookups concluded (no results)"),
1242 GNUNET_STATISTICS_update (GSF_stats, 1195 1, GNUNET_NO);
1243 gettext_noop ("# Datastore lookups concluded (no results)"),
1244 1,
1245 GNUNET_NO);
1246 }
1247 if (GNUNET_NO == pr->have_first_uid)
1248 {
1249 pr->first_uid = uid;
1250 pr->have_first_uid = 1;
1251 }
1252 else
1253 {
1254 if ( (uid == pr->first_uid) && (key != NULL) )
1255 {
1256 GNUNET_STATISTICS_update (GSF_stats,
1257 gettext_noop ("# Datastore lookups concluded (seen all)"),
1258 1,
1259 GNUNET_NO);
1260 key = NULL; /* all replies seen! */
1261 }
1262 pr->have_first_uid++;
1263 if ( (pr->have_first_uid > MAX_RESULTS) && (key != NULL) )
1264 {
1265 GNUNET_STATISTICS_update (GSF_stats,
1266 gettext_noop ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1267 1,
1268 GNUNET_NO);
1269 key = NULL; /* all replies seen! */
1270 }
1271 }
1272 } 1196 }
1273 if (NULL == key) 1197 if (GNUNET_NO == pr->have_first_uid)
1198 {
1199 pr->first_uid = uid;
1200 pr->have_first_uid = 1;
1201 }
1202 else
1274 { 1203 {
1204 if ((uid == pr->first_uid) && (key != NULL))
1205 {
1206 GNUNET_STATISTICS_update (GSF_stats,
1207 gettext_noop
1208 ("# Datastore lookups concluded (seen all)"),
1209 1, GNUNET_NO);
1210 key = NULL; /* all replies seen! */
1211 }
1212 pr->have_first_uid++;
1213 if ((pr->have_first_uid > MAX_RESULTS) && (key != NULL))
1214 {
1215 GNUNET_STATISTICS_update (GSF_stats,
1216 gettext_noop
1217 ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1218 1, GNUNET_NO);
1219 key = NULL; /* all replies seen! */
1220 }
1221 }
1222 }
1223 if (NULL == key)
1224 {
1275#if DEBUG_FS > 1 1225#if DEBUG_FS > 1
1276 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1226 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1277 "No further local responses available.\n"); 1227 "No further local responses available.\n");
1278#endif 1228#endif
1279 if ( (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) || 1229 if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) ||
1280 (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_IBLOCK) ) 1230 (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_IBLOCK))
1281 GNUNET_STATISTICS_update (GSF_stats, 1231 GNUNET_STATISTICS_update (GSF_stats,
1282 gettext_noop ("# requested DBLOCK or IBLOCK not found"), 1232 gettext_noop
1283 1, 1233 ("# requested DBLOCK or IBLOCK not found"), 1,
1284 GNUNET_NO); 1234 GNUNET_NO);
1285 goto check_error_and_continue; 1235 goto check_error_and_continue;
1286 } 1236 }
1287#if DEBUG_FS 1237#if DEBUG_FS
1288 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1238 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1289 "Received reply for `%s' of type %d with UID %llu from datastore.\n", 1239 "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1290 GNUNET_h2s (key), 1240 GNUNET_h2s (key), type, (unsigned long long) uid);
1291 type,
1292 (unsigned long long) uid);
1293#endif 1241#endif
1294 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 1242 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1295 { 1243 {
1296#if DEBUG_FS > 1 1244#if DEBUG_FS > 1
1297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1245 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1298 "Found ONDEMAND block, performing on-demand encoding\n"); 1246 "Found ONDEMAND block, performing on-demand encoding\n");
1299#endif 1247#endif
1248 GNUNET_STATISTICS_update (GSF_stats,
1249 gettext_noop
1250 ("# on-demand blocks matched requests"), 1,
1251 GNUNET_NO);
1252 pr->qe_start = GNUNET_TIME_absolute_get ();
1253 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1254 &odc_warn_delay_task, pr);
1255 if (GNUNET_OK ==
1256 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
1257 anonymity, expiration, uid,
1258 &process_local_reply, pr))
1259 {
1300 GNUNET_STATISTICS_update (GSF_stats, 1260 GNUNET_STATISTICS_update (GSF_stats,
1301 gettext_noop ("# on-demand blocks matched requests"), 1261 gettext_noop
1302 1, 1262 ("# on-demand lookups performed successfully"),
1303 GNUNET_NO); 1263 1, GNUNET_NO);
1304 pr->qe_start = GNUNET_TIME_absolute_get (); 1264 return; /* we're done */
1305 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1265 }
1306 &odc_warn_delay_task, 1266 GNUNET_STATISTICS_update (GSF_stats,
1307 pr); 1267 gettext_noop ("# on-demand lookups failed"),
1308 if (GNUNET_OK == 1268 1, GNUNET_NO);
1309 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 1269 GNUNET_SCHEDULER_cancel (pr->warn_task);
1310 anonymity, expiration, uid, 1270 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1311 &process_local_reply, 1271 &warn_delay_task, pr);
1312 pr)) 1272 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1313 { 1273 pr->local_result_offset - 1,
1314 GNUNET_STATISTICS_update (GSF_stats, 1274 &pr->public_data.query,
1315 gettext_noop ("# on-demand lookups performed successfully"), 1275 pr->public_data.type ==
1316 1, 1276 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1317 GNUNET_NO); 1277 GNUNET_BLOCK_TYPE_ANY : pr->
1318 return; /* we're done */ 1278 public_data.type,
1319 } 1279 (0 !=
1280 (GSF_PRO_PRIORITY_UNLIMITED &
1281 pr->public_data.
1282 options)) ? UINT_MAX : 1
1283 /* queue priority */ ,
1284 (0 !=
1285 (GSF_PRO_PRIORITY_UNLIMITED &
1286 pr->public_data.options)) ? UINT_MAX :
1287 1
1288 /* max queue size */ ,
1289 GNUNET_TIME_UNIT_FOREVER_REL,
1290 &process_local_reply, pr);
1291 if (NULL != pr->qe)
1292 {
1320 GNUNET_STATISTICS_update (GSF_stats, 1293 GNUNET_STATISTICS_update (GSF_stats,
1321 gettext_noop ("# on-demand lookups failed"), 1294 gettext_noop
1322 1, 1295 ("# Datastore lookups concluded (error queueing)"),
1323 GNUNET_NO); 1296 1, GNUNET_NO);
1324 GNUNET_SCHEDULER_cancel (pr->warn_task); 1297 return; /* we're done */
1325 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1326 &warn_delay_task,
1327 pr);
1328 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1329 pr->local_result_offset - 1,
1330 &pr->public_data.query,
1331 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
1332 ? GNUNET_BLOCK_TYPE_ANY
1333 : pr->public_data.type,
1334 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1335 ? UINT_MAX
1336 : 1 /* queue priority */,
1337 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1338 ? UINT_MAX
1339 : 1 /* max queue size */,
1340 GNUNET_TIME_UNIT_FOREVER_REL,
1341 &process_local_reply,
1342 pr);
1343 if (NULL != pr->qe)
1344 {
1345 GNUNET_STATISTICS_update (GSF_stats,
1346 gettext_noop ("# Datastore lookups concluded (error queueing)"),
1347 1,
1348 GNUNET_NO);
1349 return; /* we're done */
1350 }
1351 goto check_error_and_continue;
1352 } 1298 }
1299 goto check_error_and_continue;
1300 }
1353 old_rf = pr->public_data.results_found; 1301 old_rf = pr->public_data.results_found;
1354 memset (&prq, 0, sizeof (prq)); 1302 memset (&prq, 0, sizeof (prq));
1355 prq.data = data; 1303 prq.data = data;
1356 prq.expiration = expiration; 1304 prq.expiration = expiration;
1357 prq.size = size; 1305 prq.size = size;
1358 if (GNUNET_OK != 1306 if (GNUNET_OK !=
1359 GNUNET_BLOCK_get_key (GSF_block_ctx, 1307 GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, size, &query))
1360 type, 1308 {
1361 data, 1309 GNUNET_break (0);
1362 size, 1310 GNUNET_DATASTORE_remove (GSF_dsh,
1363 &query)) 1311 key,
1312 size, data,
1313 -1, -1, GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
1314 pr->qe_start = GNUNET_TIME_absolute_get ();
1315 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1316 &warn_delay_task, pr);
1317 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1318 pr->local_result_offset - 1,
1319 &pr->public_data.query,
1320 pr->public_data.type ==
1321 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1322 GNUNET_BLOCK_TYPE_ANY : pr->
1323 public_data.type,
1324 (0 !=
1325 (GSF_PRO_PRIORITY_UNLIMITED &
1326 pr->public_data.
1327 options)) ? UINT_MAX : 1
1328 /* queue priority */ ,
1329 (0 !=
1330 (GSF_PRO_PRIORITY_UNLIMITED &
1331 pr->public_data.options)) ? UINT_MAX :
1332 1
1333 /* max queue size */ ,
1334 GNUNET_TIME_UNIT_FOREVER_REL,
1335 &process_local_reply, pr);
1336 if (pr->qe == NULL)
1364 { 1337 {
1365 GNUNET_break (0); 1338 GNUNET_STATISTICS_update (GSF_stats,
1366 GNUNET_DATASTORE_remove (GSF_dsh, 1339 gettext_noop
1367 key, 1340 ("# Datastore lookups concluded (error queueing)"),
1368 size, data, 1341 1, GNUNET_NO);
1369 -1, -1, 1342 goto check_error_and_continue;
1370 GNUNET_TIME_UNIT_FOREVER_REL,
1371 NULL, NULL);
1372 pr->qe_start = GNUNET_TIME_absolute_get ();
1373 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1374 &warn_delay_task,
1375 pr);
1376 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1377 pr->local_result_offset - 1,
1378 &pr->public_data.query,
1379 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK
1380 ? GNUNET_BLOCK_TYPE_ANY
1381 : pr->public_data.type,
1382 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1383 ? UINT_MAX
1384 : 1 /* queue priority */,
1385 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1386 ? UINT_MAX
1387 : 1 /* max queue size */,
1388 GNUNET_TIME_UNIT_FOREVER_REL,
1389 &process_local_reply,
1390 pr);
1391 if (pr->qe == NULL)
1392 {
1393 GNUNET_STATISTICS_update (GSF_stats,
1394 gettext_noop ("# Datastore lookups concluded (error queueing)"),
1395 1,
1396 GNUNET_NO);
1397 goto check_error_and_continue;
1398 }
1399 return;
1400 } 1343 }
1344 return;
1345 }
1401 prq.type = type; 1346 prq.type = type;
1402 prq.priority = priority; 1347 prq.priority = priority;
1403 prq.request_found = GNUNET_NO; 1348 prq.request_found = GNUNET_NO;
1404 prq.anonymity_level = anonymity; 1349 prq.anonymity_level = anonymity;
1405 if ( (old_rf == 0) && 1350 if ((old_rf == 0) && (pr->public_data.results_found == 0))
1406 (pr->public_data.results_found == 0) )
1407 GSF_update_datastore_delay_ (pr->public_data.start_time); 1351 GSF_update_datastore_delay_ (pr->public_data.start_time);
1408 process_reply (&prq, key, pr); 1352 process_reply (&prq, key, pr);
1409 pr->local_result = prq.eval; 1353 pr->local_result = prq.eval;
1410 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST) 1354 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
1411 { 1355 {
1412 GNUNET_STATISTICS_update (GSF_stats, 1356 GNUNET_STATISTICS_update (GSF_stats,
1413 gettext_noop ("# Datastore lookups concluded (found ultimate result)"), 1357 gettext_noop
1414 1, 1358 ("# Datastore lookups concluded (found ultimate result)"),
1415 GNUNET_NO); 1359 1, GNUNET_NO);
1416 goto check_error_and_continue; 1360 goto check_error_and_continue;
1417 } 1361 }
1418 if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && 1362 if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1419 ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) || 1363 ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
1420 (pr->public_data.results_found > 5 + 2 * pr->public_data.priority) ) ) 1364 (pr->public_data.results_found > 5 + 2 * pr->public_data.priority)))
1421 { 1365 {
1422#if DEBUG_FS > 2 1366#if DEBUG_FS > 2
1423 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1367 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n");
1424 "Load too high, done with request\n");
1425#endif 1368#endif
1426 GNUNET_STATISTICS_update (GSF_stats, 1369 GNUNET_STATISTICS_update (GSF_stats,
1427 gettext_noop ("# Datastore lookups concluded (load too high)"), 1370 gettext_noop
1428 1, 1371 ("# Datastore lookups concluded (load too high)"),
1429 GNUNET_NO); 1372 1, GNUNET_NO);
1430 goto check_error_and_continue; 1373 goto check_error_and_continue;
1431 } 1374 }
1432 pr->qe_start = GNUNET_TIME_absolute_get (); 1375 pr->qe_start = GNUNET_TIME_absolute_get ();
1433 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1376 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1434 &warn_delay_task, 1377 &warn_delay_task, pr);
1435 pr);
1436 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 1378 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1437 pr->local_result_offset++, 1379 pr->local_result_offset++,
1438 &pr->public_data.query, 1380 &pr->public_data.query,
1439 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 1381 pr->public_data.type ==
1440 ? GNUNET_BLOCK_TYPE_ANY 1382 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1441 : pr->public_data.type, 1383 GNUNET_BLOCK_TYPE_ANY : pr->
1442 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) 1384 public_data.type,
1443 ? UINT_MAX 1385 (0 !=
1444 : 1 /* queue priority */, 1386 (GSF_PRO_PRIORITY_UNLIMITED &
1445 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) 1387 pr->public_data.options)) ? UINT_MAX : 1
1446 ? UINT_MAX 1388 /* queue priority */ ,
1447 : 1 /* max queue size */, 1389 (0 !=
1448 GNUNET_TIME_UNIT_FOREVER_REL, 1390 (GSF_PRO_PRIORITY_UNLIMITED &
1449 &process_local_reply, 1391 pr->public_data.options)) ? UINT_MAX : 1
1450 pr); 1392 /* max queue size */ ,
1393 GNUNET_TIME_UNIT_FOREVER_REL,
1394 &process_local_reply, pr);
1451 /* check if we successfully queued another datastore request; 1395 /* check if we successfully queued another datastore request;
1452 if so, return, otherwise call our continuation (if we have 1396 * if so, return, otherwise call our continuation (if we have
1453 any) */ 1397 * any) */
1454 check_error_and_continue: 1398check_error_and_continue:
1455 if (NULL != pr->qe) 1399 if (NULL != pr->qe)
1456 return; 1400 return;
1457 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task) 1401 if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
1458 { 1402 {
1459 GNUNET_SCHEDULER_cancel (pr->warn_task); 1403 GNUNET_SCHEDULER_cancel (pr->warn_task);
1460 pr->warn_task = GNUNET_SCHEDULER_NO_TASK; 1404 pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
1461 } 1405 }
1462 if (NULL == (cont = pr->llc_cont)) 1406 if (NULL == (cont = pr->llc_cont))
1463 return; /* no continuation */ 1407 return; /* no continuation */
1464 pr->llc_cont = NULL; 1408 pr->llc_cont = NULL;
1465 cont (pr->llc_cont_cls, 1409 cont (pr->llc_cont_cls, pr, pr->local_result);
1466 pr,
1467 pr->local_result);
1468} 1410}
1469 1411
1470 1412
@@ -1477,8 +1419,7 @@ process_local_reply (void *cls,
1477 */ 1419 */
1478void 1420void
1479GSF_local_lookup_ (struct GSF_PendingRequest *pr, 1421GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1480 GSF_LocalLookupContinuation cont, 1422 GSF_LocalLookupContinuation cont, void *cont_cls)
1481 void *cont_cls)
1482{ 1423{
1483 GNUNET_assert (NULL == pr->gh); 1424 GNUNET_assert (NULL == pr->gh);
1484 GNUNET_assert (NULL == pr->llc_cont); 1425 GNUNET_assert (NULL == pr->llc_cont);
@@ -1486,41 +1427,41 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1486 pr->llc_cont_cls = cont_cls; 1427 pr->llc_cont_cls = cont_cls;
1487 pr->qe_start = GNUNET_TIME_absolute_get (); 1428 pr->qe_start = GNUNET_TIME_absolute_get ();
1488 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1429 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1489 &warn_delay_task, 1430 &warn_delay_task, pr);
1490 pr);
1491 GNUNET_STATISTICS_update (GSF_stats, 1431 GNUNET_STATISTICS_update (GSF_stats,
1492 gettext_noop ("# Datastore lookups initiated"), 1432 gettext_noop ("# Datastore lookups initiated"),
1493 1, 1433 1, GNUNET_NO);
1494 GNUNET_NO);
1495 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 1434 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1496 pr->local_result_offset++, 1435 pr->local_result_offset++,
1497 &pr->public_data.query, 1436 &pr->public_data.query,
1498 pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 1437 pr->public_data.type ==
1499 ? GNUNET_BLOCK_TYPE_ANY 1438 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1500 : pr->public_data.type, 1439 GNUNET_BLOCK_TYPE_ANY : pr->
1501 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) 1440 public_data.type,
1502 ? UINT_MAX 1441 (0 !=
1503 : 1 /* queue priority */, 1442 (GSF_PRO_PRIORITY_UNLIMITED &
1504 (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) 1443 pr->public_data.options)) ? UINT_MAX : 1
1505 ? UINT_MAX 1444 /* queue priority */ ,
1506 : 1 /* max queue size */, 1445 (0 !=
1507 GNUNET_TIME_UNIT_FOREVER_REL, 1446 (GSF_PRO_PRIORITY_UNLIMITED &
1508 &process_local_reply, 1447 pr->public_data.options)) ? UINT_MAX : 1
1509 pr); 1448 /* max queue size */ ,
1449 GNUNET_TIME_UNIT_FOREVER_REL,
1450 &process_local_reply, pr);
1510 if (NULL != pr->qe) 1451 if (NULL != pr->qe)
1511 { 1452 {
1512 GNUNET_STATISTICS_update (GSF_stats, 1453 GNUNET_STATISTICS_update (GSF_stats,
1513 gettext_noop ("# Datastore lookups concluded (error queueing)"), 1454 gettext_noop
1514 1, 1455 ("# Datastore lookups concluded (error queueing)"),
1515 GNUNET_NO); 1456 1, GNUNET_NO);
1516 1457
1517 return; 1458 return;
1518 } 1459 }
1519 GNUNET_SCHEDULER_cancel (pr->warn_task); 1460 GNUNET_SCHEDULER_cancel (pr->warn_task);
1520 pr->warn_task = GNUNET_SCHEDULER_NO_TASK; 1461 pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
1521 pr->llc_cont = NULL; 1462 pr->llc_cont = NULL;
1522 if (NULL != cont) 1463 if (NULL != cont)
1523 cont (cont_cls, pr, pr->local_result); 1464 cont (cont_cls, pr, pr->local_result);
1524} 1465}
1525 1466
1526 1467
@@ -1540,7 +1481,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1540 */ 1481 */
1541int 1482int
1542GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, 1483GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1543 const struct GNUNET_MessageHeader *message) 1484 const struct GNUNET_MessageHeader *message)
1544{ 1485{
1545 const struct PutMessage *put; 1486 const struct PutMessage *put;
1546 uint16_t msize; 1487 uint16_t msize;
@@ -1549,38 +1490,33 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1549 struct GNUNET_TIME_Absolute expiration; 1490 struct GNUNET_TIME_Absolute expiration;
1550 GNUNET_HashCode query; 1491 GNUNET_HashCode query;
1551 struct ProcessReplyClosure prq; 1492 struct ProcessReplyClosure prq;
1552 struct GNUNET_TIME_Relative block_time; 1493 struct GNUNET_TIME_Relative block_time;
1553 double putl; 1494 double putl;
1554 struct PutMigrationContext *pmc; 1495 struct PutMigrationContext *pmc;
1555 1496
1556 msize = ntohs (message->size); 1497 msize = ntohs (message->size);
1557 if (msize < sizeof (struct PutMessage)) 1498 if (msize < sizeof (struct PutMessage))
1558 { 1499 {
1559 GNUNET_break_op(0); 1500 GNUNET_break_op (0);
1560 return GNUNET_SYSERR; 1501 return GNUNET_SYSERR;
1561 } 1502 }
1562 put = (const struct PutMessage*) message; 1503 put = (const struct PutMessage *) message;
1563 dsize = msize - sizeof (struct PutMessage); 1504 dsize = msize - sizeof (struct PutMessage);
1564 type = ntohl (put->type); 1505 type = ntohl (put->type);
1565 expiration = GNUNET_TIME_absolute_ntoh (put->expiration); 1506 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1566 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND) 1507 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1567 return GNUNET_SYSERR; 1508 return GNUNET_SYSERR;
1568 if (GNUNET_OK != 1509 if (GNUNET_OK !=
1569 GNUNET_BLOCK_get_key (GSF_block_ctx, 1510 GNUNET_BLOCK_get_key (GSF_block_ctx, type, &put[1], dsize, &query))
1570 type, 1511 {
1571 &put[1], 1512 GNUNET_break_op (0);
1572 dsize, 1513 return GNUNET_SYSERR;
1573 &query)) 1514 }
1574 {
1575 GNUNET_break_op (0);
1576 return GNUNET_SYSERR;
1577 }
1578 GNUNET_STATISTICS_update (GSF_stats, 1515 GNUNET_STATISTICS_update (GSF_stats,
1579 gettext_noop ("# GAP PUT messages received"), 1516 gettext_noop ("# GAP PUT messages received"),
1580 1, 1517 1, GNUNET_NO);
1581 GNUNET_NO);
1582 /* now, lookup 'query' */ 1518 /* now, lookup 'query' */
1583 prq.data = (const void*) &put[1]; 1519 prq.data = (const void *) &put[1];
1584 if (NULL != cp) 1520 if (NULL != cp)
1585 prq.sender = cp; 1521 prq.sender = cp;
1586 else 1522 else
@@ -1592,66 +1528,66 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1592 prq.anonymity_level = UINT32_MAX; 1528 prq.anonymity_level = UINT32_MAX;
1593 prq.request_found = GNUNET_NO; 1529 prq.request_found = GNUNET_NO;
1594 GNUNET_CONTAINER_multihashmap_get_multiple (pr_map, 1530 GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
1595 &query, 1531 &query, &process_reply, &prq);
1596 &process_reply,
1597 &prq);
1598 if (NULL != cp) 1532 if (NULL != cp)
1599 { 1533 {
1600 GSF_connected_peer_change_preference_ (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority); 1534 GSF_connected_peer_change_preference_ (cp,
1601 GSF_get_peer_performance_data_ (cp)->trust += prq.priority; 1535 CONTENT_BANDWIDTH_VALUE +
1602 } 1536 1000 * prq.priority);
1603 if ( (GNUNET_YES == active_to_migration) && 1537 GSF_get_peer_performance_data_ (cp)->trust += prq.priority;
1604 (GNUNET_NO == test_put_load_too_high (prq.priority)) ) 1538 }
1605 { 1539 if ((GNUNET_YES == active_to_migration) &&
1540 (GNUNET_NO == test_put_load_too_high (prq.priority)))
1541 {
1606#if DEBUG_FS 1542#if DEBUG_FS
1607 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1543 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1608 "Replicating result for query `%s' with priority %u\n", 1544 "Replicating result for query `%s' with priority %u\n",
1609 GNUNET_h2s (&query), 1545 GNUNET_h2s (&query), prq.priority);
1610 prq.priority);
1611#endif 1546#endif
1612 pmc = GNUNET_malloc (sizeof (struct PutMigrationContext)); 1547 pmc = GNUNET_malloc (sizeof (struct PutMigrationContext));
1613 pmc->start = GNUNET_TIME_absolute_get (); 1548 pmc->start = GNUNET_TIME_absolute_get ();
1614 pmc->requested = prq.request_found; 1549 pmc->requested = prq.request_found;
1615 GNUNET_assert (0 != GSF_get_peer_performance_data_ (cp)->pid); 1550 GNUNET_assert (0 != GSF_get_peer_performance_data_ (cp)->pid);
1616 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid, 1551 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
1617 &pmc->origin); 1552 &pmc->origin);
1618 if (NULL == 1553 if (NULL ==
1619 GNUNET_DATASTORE_put (GSF_dsh, 1554 GNUNET_DATASTORE_put (GSF_dsh,
1620 0, &query, dsize, &put[1], 1555 0, &query, dsize, &put[1],
1621 type, prq.priority, 1 /* anonymity */, 1556 type, prq.priority, 1 /* anonymity */ ,
1622 0 /* replication */, 1557 0 /* replication */ ,
1623 expiration, 1558 expiration,
1624 1 + prq.priority, MAX_DATASTORE_QUEUE, 1559 1 + prq.priority, MAX_DATASTORE_QUEUE,
1625 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 1560 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1626 &put_migration_continuation, 1561 &put_migration_continuation, pmc))
1627 pmc)) 1562 {
1628 { 1563 put_migration_continuation (pmc, GNUNET_NO, NULL);
1629 put_migration_continuation (pmc, GNUNET_NO, NULL);
1630 }
1631 } 1564 }
1565 }
1632 else 1566 else
1633 { 1567 {
1634#if DEBUG_FS 1568#if DEBUG_FS
1635 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636 "Choosing not to keep content `%s' (%d/%d)\n", 1570 "Choosing not to keep content `%s' (%d/%d)\n",
1637 GNUNET_h2s (&query), 1571 GNUNET_h2s (&query),
1638 active_to_migration, 1572 active_to_migration, test_put_load_too_high (prq.priority));
1639 test_put_load_too_high (prq.priority));
1640#endif 1573#endif
1641 } 1574 }
1642 putl = GNUNET_LOAD_get_load (datastore_put_load); 1575 putl = GNUNET_LOAD_get_load (datastore_put_load);
1643 if ( (NULL != (cp = prq.sender)) && 1576 if ((NULL != (cp = prq.sender)) &&
1644 (GNUNET_NO == prq.request_found) && 1577 (GNUNET_NO == prq.request_found) &&
1645 ( (GNUNET_YES != active_to_migration) || 1578 ((GNUNET_YES != active_to_migration) ||
1646 (putl > 2.5 * (1 + prq.priority)) ) ) 1579 (putl > 2.5 * (1 + prq.priority))))
1647 { 1580 {
1648 if (GNUNET_YES != active_to_migration) 1581 if (GNUNET_YES != active_to_migration)
1649 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5); 1582 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
1650 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 1583 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1651 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1584 5000 +
1652 (unsigned int) (60000 * putl * putl))); 1585 GNUNET_CRYPTO_random_u32
1653 GSF_block_peer_migration_ (cp, block_time); 1586 (GNUNET_CRYPTO_QUALITY_WEAK,
1654 } 1587 (unsigned int) (60000 * putl *
1588 putl)));
1589 GSF_block_peer_migration_ (cp, block_time);
1590 }
1655 return GNUNET_OK; 1591 return GNUNET_OK;
1656} 1592}
1657 1593
@@ -1664,20 +1600,22 @@ GSF_pending_request_init_ ()
1664{ 1600{
1665 if (GNUNET_OK != 1601 if (GNUNET_OK !=
1666 GNUNET_CONFIGURATION_get_value_number (GSF_cfg, 1602 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1667 "fs", 1603 "fs",
1668 "MAX_PENDING_REQUESTS", 1604 "MAX_PENDING_REQUESTS",
1669 &max_pending_requests)) 1605 &max_pending_requests))
1670 { 1606 {
1671 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1607 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1672 _("Configuration fails to specify `%s', assuming default value."), 1608 _
1673 "MAX_PENDING_REQUESTS"); 1609 ("Configuration fails to specify `%s', assuming default value."),
1674 } 1610 "MAX_PENDING_REQUESTS");
1611 }
1675 active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, 1612 active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
1676 "FS", 1613 "FS",
1677 "CONTENT_CACHING"); 1614 "CONTENT_CACHING");
1678 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); 1615 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
1679 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024); 1616 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
1680 requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 1617 requests_by_expiration_heap =
1618 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1681} 1619}
1682 1620
1683 1621
@@ -1687,9 +1625,7 @@ GSF_pending_request_init_ ()
1687void 1625void
1688GSF_pending_request_done_ () 1626GSF_pending_request_done_ ()
1689{ 1627{
1690 GNUNET_CONTAINER_multihashmap_iterate (pr_map, 1628 GNUNET_CONTAINER_multihashmap_iterate (pr_map, &clean_request, NULL);
1691 &clean_request,
1692 NULL);
1693 GNUNET_CONTAINER_multihashmap_destroy (pr_map); 1629 GNUNET_CONTAINER_multihashmap_destroy (pr_map);
1694 pr_map = NULL; 1630 pr_map = NULL;
1695 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap); 1631 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);