summaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c1840
1 files changed, 922 insertions, 918 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 90587cfa0..9c558674b 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -74,7 +74,8 @@
74/** 74/**
75 * An active request. 75 * An active request.
76 */ 76 */
77struct GSF_PendingRequest { 77struct GSF_PendingRequest
78{
78 /** 79 /**
79 * Public data for the request. 80 * Public data for the request.
80 */ 81 */
@@ -246,31 +247,32 @@ static unsigned long long max_pending_requests = (32 * 1024);
246 * @param pr request for which the BF is to be recomputed 247 * @param pr request for which the BF is to be recomputed
247 */ 248 */
248static void 249static void
249refresh_bloomfilter(enum GNUNET_BLOCK_Type type, struct GSF_PendingRequest *pr) 250refresh_bloomfilter (enum GNUNET_BLOCK_Type type, struct GSF_PendingRequest *pr)
250{ 251{
251 if (NULL != pr->bg) 252 if (NULL != pr->bg)
252 { 253 {
253 GNUNET_BLOCK_group_destroy(pr->bg); 254 GNUNET_BLOCK_group_destroy (pr->bg);
254 pr->bg = NULL; 255 pr->bg = NULL;
255 } 256 }
256 if (GNUNET_BLOCK_TYPE_FS_UBLOCK != type) 257 if (GNUNET_BLOCK_TYPE_FS_UBLOCK != type)
257 return; /* no need */ 258 return; /* no need */
258 pr->bg = 259 pr->bg =
259 GNUNET_BLOCK_group_create(GSF_block_ctx, 260 GNUNET_BLOCK_group_create (GSF_block_ctx,
260 type, 261 type,
261 GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 262 GNUNET_CRYPTO_random_u32 (
262 UINT32_MAX), 263 GNUNET_CRYPTO_QUALITY_WEAK,
263 NULL, 264 UINT32_MAX),
264 0, 265 NULL,
265 "seen-set-size", 266 0,
266 pr->replies_seen_count, 267 "seen-set-size",
267 NULL); 268 pr->replies_seen_count,
269 NULL);
268 if (NULL == pr->bg) 270 if (NULL == pr->bg)
269 return; 271 return;
270 GNUNET_break(GNUNET_OK == 272 GNUNET_break (GNUNET_OK ==
271 GNUNET_BLOCK_group_set_seen(pr->bg, 273 GNUNET_BLOCK_group_set_seen (pr->bg,
272 pr->replies_seen, 274 pr->replies_seen,
273 pr->replies_seen_count)); 275 pr->replies_seen_count));
274} 276}
275 277
276 278
@@ -296,129 +298,129 @@ refresh_bloomfilter(enum GNUNET_BLOCK_Type type, struct GSF_PendingRequest *pr)
296 * @return handle for the new pending request 298 * @return handle for the new pending request
297 */ 299 */
298struct GSF_PendingRequest * 300struct GSF_PendingRequest *
299GSF_pending_request_create_(enum GSF_PendingRequestOptions options, 301GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
300 enum GNUNET_BLOCK_Type type, 302 enum GNUNET_BLOCK_Type type,
301 const struct GNUNET_HashCode *query, 303 const struct GNUNET_HashCode *query,
302 const struct GNUNET_PeerIdentity *target, 304 const struct GNUNET_PeerIdentity *target,
303 const char *bf_data, 305 const char *bf_data,
304 size_t bf_size, 306 size_t bf_size,
305 uint32_t mingle, 307 uint32_t mingle,
306 uint32_t anonymity_level, 308 uint32_t anonymity_level,
307 uint32_t priority, 309 uint32_t priority,
308 int32_t ttl, 310 int32_t ttl,
309 GNUNET_PEER_Id sender_pid, 311 GNUNET_PEER_Id sender_pid,
310 GNUNET_PEER_Id origin_pid, 312 GNUNET_PEER_Id origin_pid,
311 const struct GNUNET_HashCode *replies_seen, 313 const struct GNUNET_HashCode *replies_seen,
312 unsigned int replies_seen_count, 314 unsigned int replies_seen_count,
313 GSF_PendingRequestReplyHandler rh, 315 GSF_PendingRequestReplyHandler rh,
314 void *rh_cls) 316 void *rh_cls)
315{ 317{
316 struct GSF_PendingRequest *pr; 318 struct GSF_PendingRequest *pr;
317 struct GSF_PendingRequest *dpr; 319 struct GSF_PendingRequest *dpr;
318 size_t extra; 320 size_t extra;
319 struct GNUNET_HashCode *eptr; 321 struct GNUNET_HashCode *eptr;
320 322
321 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 323 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
322 "Creating request handle for `%s' of type %d\n", 324 "Creating request handle for `%s' of type %d\n",
323 GNUNET_h2s(query), 325 GNUNET_h2s (query),
324 type); 326 type);
325#if INSANE_STATISTICS 327#if INSANE_STATISTICS
326 GNUNET_STATISTICS_update(GSF_stats, 328 GNUNET_STATISTICS_update (GSF_stats,
327 gettext_noop("# Pending requests created"), 329 gettext_noop ("# Pending requests created"),
328 1, 330 1,
329 GNUNET_NO); 331 GNUNET_NO);
330#endif 332#endif
331 extra = 0; 333 extra = 0;
332 if (NULL != target) 334 if (NULL != target)
333 extra += sizeof(struct GNUNET_PeerIdentity); 335 extra += sizeof(struct GNUNET_PeerIdentity);
334 pr = GNUNET_malloc(sizeof(struct GSF_PendingRequest) + extra); 336 pr = GNUNET_malloc (sizeof(struct GSF_PendingRequest) + extra);
335 pr->public_data.query = *query; 337 pr->public_data.query = *query;
336 eptr = (struct GNUNET_HashCode *)&pr[1]; 338 eptr = (struct GNUNET_HashCode *) &pr[1];
337 if (NULL != target) 339 if (NULL != target)
338 { 340 {
339 pr->public_data.target = (struct GNUNET_PeerIdentity *)eptr; 341 pr->public_data.target = (struct GNUNET_PeerIdentity *) eptr;
340 GNUNET_memcpy(eptr, target, sizeof(struct GNUNET_PeerIdentity)); 342 GNUNET_memcpy (eptr, target, sizeof(struct GNUNET_PeerIdentity));
341 } 343 }
342 pr->public_data.anonymity_level = anonymity_level; 344 pr->public_data.anonymity_level = anonymity_level;
343 pr->public_data.priority = priority; 345 pr->public_data.priority = priority;
344 pr->public_data.original_priority = priority; 346 pr->public_data.original_priority = priority;
345 pr->public_data.options = options; 347 pr->public_data.options = options;
346 pr->public_data.type = type; 348 pr->public_data.type = type;
347 pr->public_data.start_time = GNUNET_TIME_absolute_get(); 349 pr->public_data.start_time = GNUNET_TIME_absolute_get ();
348 pr->sender_pid = sender_pid; 350 pr->sender_pid = sender_pid;
349 pr->origin_pid = origin_pid; 351 pr->origin_pid = origin_pid;
350 pr->rh = rh; 352 pr->rh = rh;
351 pr->rh_cls = rh_cls; 353 pr->rh_cls = rh_cls;
352 GNUNET_assert((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY))); 354 GNUNET_assert ((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY)));
353 if (ttl >= 0) 355 if (ttl >= 0)
354 pr->public_data.ttl = GNUNET_TIME_relative_to_absolute( 356 pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (
355 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, (uint32_t)ttl)); 357 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, (uint32_t) ttl));
356 else 358 else
357 pr->public_data.ttl = GNUNET_TIME_absolute_subtract( 359 pr->public_data.ttl = GNUNET_TIME_absolute_subtract (
358 pr->public_data.start_time, 360 pr->public_data.start_time,
359 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 361 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
360 (uint32_t)(-ttl))); 362 (uint32_t) (-ttl)));
361 if (replies_seen_count > 0) 363 if (replies_seen_count > 0)
362 { 364 {
363 pr->replies_seen_size = replies_seen_count; 365 pr->replies_seen_size = replies_seen_count;
364 pr->replies_seen = 366 pr->replies_seen =
365 GNUNET_new_array(pr->replies_seen_size, struct GNUNET_HashCode); 367 GNUNET_new_array (pr->replies_seen_size, struct GNUNET_HashCode);
366 GNUNET_memcpy(pr->replies_seen, 368 GNUNET_memcpy (pr->replies_seen,
367 replies_seen, 369 replies_seen,
368 replies_seen_count * sizeof(struct GNUNET_HashCode)); 370 replies_seen_count * sizeof(struct GNUNET_HashCode));
369 pr->replies_seen_count = replies_seen_count; 371 pr->replies_seen_count = replies_seen_count;
370 } 372 }
371 if ((NULL != bf_data) && 373 if ((NULL != bf_data) &&
372 (GNUNET_BLOCK_TYPE_FS_UBLOCK == pr->public_data.type)) 374 (GNUNET_BLOCK_TYPE_FS_UBLOCK == pr->public_data.type))
373 { 375 {
374 pr->bg = GNUNET_BLOCK_group_create(GSF_block_ctx, 376 pr->bg = GNUNET_BLOCK_group_create (GSF_block_ctx,
375 pr->public_data.type, 377 pr->public_data.type,
376 mingle, 378 mingle,
377 bf_data, 379 bf_data,
378 bf_size, 380 bf_size,
379 "seen-set-size", 381 "seen-set-size",
380 0, 382 0,
381 NULL); 383 NULL);
382 } 384 }
383 else if ((replies_seen_count > 0) && 385 else if ((replies_seen_count > 0) &&
384 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))) 386 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)))
385 { 387 {
386 refresh_bloomfilter(pr->public_data.type, pr); 388 refresh_bloomfilter (pr->public_data.type, pr);
387 } 389 }
388 GNUNET_CONTAINER_multihashmap_put(pr_map, 390 GNUNET_CONTAINER_multihashmap_put (pr_map,
389 &pr->public_data.query, 391 &pr->public_data.query,
390 pr, 392 pr,
391 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 393 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
392 if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES)) 394 if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES))
395 {
396 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
397 pr,
398 pr->public_data.ttl.abs_value_us);
399 /* make sure we don't track too many requests */
400 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
401 max_pending_requests)
393 { 402 {
394 pr->hnode = GNUNET_CONTAINER_heap_insert(requests_by_expiration_heap, 403 dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
395 pr, 404 GNUNET_assert (NULL != dpr);
396 pr->public_data.ttl.abs_value_us); 405 if (pr == dpr)
397 /* make sure we don't track too many requests */ 406 break; /* let the request live briefly... */
398 while (GNUNET_CONTAINER_heap_get_size(requests_by_expiration_heap) > 407 if (NULL != dpr->rh)
399 max_pending_requests) 408 dpr->rh (dpr->rh_cls,
400 { 409 GNUNET_BLOCK_EVALUATION_REQUEST_VALID,
401 dpr = GNUNET_CONTAINER_heap_peek(requests_by_expiration_heap); 410 dpr,
402 GNUNET_assert(NULL != dpr); 411 UINT32_MAX,
403 if (pr == dpr) 412 GNUNET_TIME_UNIT_FOREVER_ABS,
404 break; /* let the request live briefly... */ 413 GNUNET_TIME_UNIT_FOREVER_ABS,
405 if (NULL != dpr->rh) 414 GNUNET_BLOCK_TYPE_ANY,
406 dpr->rh(dpr->rh_cls, 415 NULL,
407 GNUNET_BLOCK_EVALUATION_REQUEST_VALID, 416 0);
408 dpr, 417 GSF_pending_request_cancel_ (dpr, GNUNET_YES);
409 UINT32_MAX,
410 GNUNET_TIME_UNIT_FOREVER_ABS,
411 GNUNET_TIME_UNIT_FOREVER_ABS,
412 GNUNET_BLOCK_TYPE_ANY,
413 NULL,
414 0);
415 GSF_pending_request_cancel_(dpr, GNUNET_YES);
416 }
417 } 418 }
418 GNUNET_STATISTICS_update(GSF_stats, 419 }
419 gettext_noop("# Pending requests active"), 420 GNUNET_STATISTICS_update (GSF_stats,
420 1, 421 gettext_noop ("# Pending requests active"),
421 GNUNET_NO); 422 1,
423 GNUNET_NO);
422 return pr; 424 return pr;
423} 425}
424 426
@@ -429,7 +431,7 @@ GSF_pending_request_create_(enum GSF_PendingRequestOptions options,
429 * @return associated public data 431 * @return associated public data
430 */ 432 */
431struct GSF_PendingRequestData * 433struct GSF_PendingRequestData *
432GSF_pending_request_get_data_(struct GSF_PendingRequest *pr) 434GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
433{ 435{
434 return &pr->public_data; 436 return &pr->public_data;
435} 437}
@@ -445,13 +447,13 @@ GSF_pending_request_get_data_(struct GSF_PendingRequest *pr)
445 * @return #GNUNET_OK if the requests are compatible 447 * @return #GNUNET_OK if the requests are compatible
446 */ 448 */
447int 449int
448GSF_pending_request_is_compatible_(struct GSF_PendingRequest *pra, 450GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
449 struct GSF_PendingRequest *prb) 451 struct GSF_PendingRequest *prb)
450{ 452{
451 if ((pra->public_data.type != prb->public_data.type) || 453 if ((pra->public_data.type != prb->public_data.type) ||
452 (0 != memcmp(&pra->public_data.query, 454 (0 != memcmp (&pra->public_data.query,
453 &prb->public_data.query, 455 &prb->public_data.query,
454 sizeof(struct GNUNET_HashCode)))) 456 sizeof(struct GNUNET_HashCode))))
455 return GNUNET_NO; 457 return GNUNET_NO;
456 return GNUNET_OK; 458 return GNUNET_OK;
457} 459}
@@ -466,45 +468,45 @@ GSF_pending_request_is_compatible_(struct GSF_PendingRequest *pra,
466 * @param replies_seen_count size of the replies_seen array 468 * @param replies_seen_count size of the replies_seen array
467 */ 469 */
468void 470void
469GSF_pending_request_update_(struct GSF_PendingRequest *pr, 471GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
470 const struct GNUNET_HashCode *replies_seen, 472 const struct GNUNET_HashCode *replies_seen,
471 unsigned int replies_seen_count) 473 unsigned int replies_seen_count)
472{ 474{
473 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count) 475 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
474 return; /* integer overflow */ 476 return; /* integer overflow */
475 if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) 477 if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
478 {
479 /* we're responsible for the BF, full refresh */
480 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
481 GNUNET_array_grow (pr->replies_seen,
482 pr->replies_seen_size,
483 replies_seen_count + pr->replies_seen_count);
484 GNUNET_memcpy (&pr->replies_seen[pr->replies_seen_count],
485 replies_seen,
486 sizeof(struct GNUNET_HashCode) * replies_seen_count);
487 pr->replies_seen_count += replies_seen_count;
488 refresh_bloomfilter (pr->public_data.type, pr);
489 }
490 else
491 {
492 if (NULL == pr->bg)
476 { 493 {
477 /* we're responsible for the BF, full refresh */ 494 /* we're not the initiator, but the initiator did not give us
478 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size) 495 * any bloom-filter, so we need to create one on-the-fly */
479 GNUNET_array_grow(pr->replies_seen, 496 refresh_bloomfilter (pr->public_data.type, pr);
480 pr->replies_seen_size,
481 replies_seen_count + pr->replies_seen_count);
482 GNUNET_memcpy(&pr->replies_seen[pr->replies_seen_count],
483 replies_seen,
484 sizeof(struct GNUNET_HashCode) * replies_seen_count);
485 pr->replies_seen_count += replies_seen_count;
486 refresh_bloomfilter(pr->public_data.type, pr);
487 } 497 }
488 else 498 else
489 { 499 {
490 if (NULL == pr->bg) 500 GNUNET_break (GNUNET_OK ==
491 { 501 GNUNET_BLOCK_group_set_seen (pr->bg,
492 /* we're not the initiator, but the initiator did not give us 502 replies_seen,
493 * any bloom-filter, so we need to create one on-the-fly */ 503 pr->replies_seen_count));
494 refresh_bloomfilter(pr->public_data.type, pr);
495 }
496 else
497 {
498 GNUNET_break(GNUNET_OK ==
499 GNUNET_BLOCK_group_set_seen(pr->bg,
500 replies_seen,
501 pr->replies_seen_count));
502 }
503 } 504 }
505 }
504 if (NULL != pr->gh) 506 if (NULL != pr->gh)
505 GNUNET_DHT_get_filter_known_results(pr->gh, 507 GNUNET_DHT_get_filter_known_results (pr->gh,
506 replies_seen_count, 508 replies_seen_count,
507 replies_seen); 509 replies_seen);
508} 510}
509 511
510 512
@@ -516,7 +518,7 @@ GSF_pending_request_update_(struct GSF_PendingRequest *pr,
516 * @return envelope with the request message 518 * @return envelope with the request message
517 */ 519 */
518struct GNUNET_MQ_Envelope * 520struct GNUNET_MQ_Envelope *
519GSF_pending_request_get_message_(struct GSF_PendingRequest *pr) 521GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
520{ 522{
521 struct GNUNET_MQ_Envelope *env; 523 struct GNUNET_MQ_Envelope *env;
522 struct GetMessage *gm; 524 struct GetMessage *gm;
@@ -531,61 +533,61 @@ GSF_pending_request_get_message_(struct GSF_PendingRequest *pr)
531 void *bf_data; 533 void *bf_data;
532 uint32_t bf_nonce; 534 uint32_t bf_nonce;
533 535
534 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 536 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
535 "Building request message for `%s' of type %d\n", 537 "Building request message for `%s' of type %d\n",
536 GNUNET_h2s(&pr->public_data.query), 538 GNUNET_h2s (&pr->public_data.query),
537 pr->public_data.type); 539 pr->public_data.type);
538 k = 0; 540 k = 0;
539 bm = 0; 541 bm = 0;
540 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); 542 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
541 if ((!do_route) && (pr->sender_pid == 0)) 543 if ((! do_route) && (pr->sender_pid == 0))
542 { 544 {
543 GNUNET_break(0); 545 GNUNET_break (0);
544 do_route = GNUNET_YES; 546 do_route = GNUNET_YES;
545 } 547 }
546 if (!do_route) 548 if (! do_route)
547 { 549 {
548 bm |= GET_MESSAGE_BIT_RETURN_TO; 550 bm |= GET_MESSAGE_BIT_RETURN_TO;
549 k++; 551 k++;
550 } 552 }
551 if (NULL != pr->public_data.target) 553 if (NULL != pr->public_data.target)
552 { 554 {
553 bm |= GET_MESSAGE_BIT_TRANSMIT_TO; 555 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
554 k++; 556 k++;
555 } 557 }
556 if (GNUNET_OK != 558 if (GNUNET_OK !=
557 GNUNET_BLOCK_group_serialize(pr->bg, &bf_nonce, &bf_data, &bf_size)) 559 GNUNET_BLOCK_group_serialize (pr->bg, &bf_nonce, &bf_data, &bf_size))
558 { 560 {
559 bf_size = 0; 561 bf_size = 0;
560 bf_data = NULL; 562 bf_data = NULL;
561 } 563 }
562 env = GNUNET_MQ_msg_extra(gm, 564 env = GNUNET_MQ_msg_extra (gm,
563 bf_size + k * sizeof(struct GNUNET_PeerIdentity), 565 bf_size + k * sizeof(struct GNUNET_PeerIdentity),
564 GNUNET_MESSAGE_TYPE_FS_GET); 566 GNUNET_MESSAGE_TYPE_FS_GET);
565 gm->type = htonl(pr->public_data.type); 567 gm->type = htonl (pr->public_data.type);
566 if (do_route) 568 if (do_route)
567 prio = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 569 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
568 pr->public_data.priority + 1); 570 pr->public_data.priority + 1);
569 else 571 else
570 prio = 0; 572 prio = 0;
571 pr->public_data.priority -= prio; 573 pr->public_data.priority -= prio;
572 pr->public_data.num_transmissions++; 574 pr->public_data.num_transmissions++;
573 pr->public_data.respect_offered += prio; 575 pr->public_data.respect_offered += prio;
574 gm->priority = htonl(prio); 576 gm->priority = htonl (prio);
575 now = GNUNET_TIME_absolute_get(); 577 now = GNUNET_TIME_absolute_get ();
576 ttl = (int64_t)(pr->public_data.ttl.abs_value_us - now.abs_value_us); 578 ttl = (int64_t) (pr->public_data.ttl.abs_value_us - now.abs_value_us);
577 gm->ttl = htonl(ttl / 1000LL / 1000LL); 579 gm->ttl = htonl (ttl / 1000LL / 1000LL);
578 gm->filter_mutator = htonl(bf_nonce); 580 gm->filter_mutator = htonl (bf_nonce);
579 gm->hash_bitmap = htonl(bm); 581 gm->hash_bitmap = htonl (bm);
580 gm->query = pr->public_data.query; 582 gm->query = pr->public_data.query;
581 ext = (struct GNUNET_PeerIdentity *)&gm[1]; 583 ext = (struct GNUNET_PeerIdentity *) &gm[1];
582 k = 0; 584 k = 0;
583 if (!do_route) 585 if (! do_route)
584 GNUNET_PEER_resolve(pr->sender_pid, &ext[k++]); 586 GNUNET_PEER_resolve (pr->sender_pid, &ext[k++]);
585 if (NULL != pr->public_data.target) 587 if (NULL != pr->public_data.target)
586 ext[k++] = *pr->public_data.target; 588 ext[k++] = *pr->public_data.target;
587 GNUNET_memcpy(&ext[k], bf_data, bf_size); 589 GNUNET_memcpy (&ext[k], bf_data, bf_size);
588 GNUNET_free_non_null(bf_data); 590 GNUNET_free_non_null (bf_data);
589 return env; 591 return env;
590} 592}
591 593
@@ -599,61 +601,61 @@ GSF_pending_request_get_message_(struct GSF_PendingRequest *pr)
599 * @return #GNUNET_YES (we should continue to iterate) 601 * @return #GNUNET_YES (we should continue to iterate)
600 */ 602 */
601static int 603static int
602clean_request(void *cls, const struct GNUNET_HashCode *key, void *value) 604clean_request (void *cls, const struct GNUNET_HashCode *key, void *value)
603{ 605{
604 struct GSF_PendingRequest *pr = value; 606 struct GSF_PendingRequest *pr = value;
605 GSF_LocalLookupContinuation cont; 607 GSF_LocalLookupContinuation cont;
606 608
607 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 609 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
608 "Cleaning up pending request for `%s'.\n", 610 "Cleaning up pending request for `%s'.\n",
609 GNUNET_h2s(key)); 611 GNUNET_h2s (key));
610 if (NULL != pr->cadet_request) 612 if (NULL != pr->cadet_request)
611 { 613 {
612 pr->cadet_retry_count = CADET_RETRY_MAX; 614 pr->cadet_retry_count = CADET_RETRY_MAX;
613 GSF_cadet_query_cancel(pr->cadet_request); 615 GSF_cadet_query_cancel (pr->cadet_request);
614 pr->cadet_request = NULL; 616 pr->cadet_request = NULL;
615 } 617 }
616 if (NULL != (cont = pr->llc_cont)) 618 if (NULL != (cont = pr->llc_cont))
617 { 619 {
618 pr->llc_cont = NULL; 620 pr->llc_cont = NULL;
619 cont(pr->llc_cont_cls, pr, pr->local_result); 621 cont (pr->llc_cont_cls, pr, pr->local_result);
620 } 622 }
621 GSF_plan_notify_request_done_(pr); 623 GSF_plan_notify_request_done_ (pr);
622 GNUNET_free_non_null(pr->replies_seen); 624 GNUNET_free_non_null (pr->replies_seen);
623 GNUNET_BLOCK_group_destroy(pr->bg); 625 GNUNET_BLOCK_group_destroy (pr->bg);
624 pr->bg = NULL; 626 pr->bg = NULL;
625 GNUNET_PEER_change_rc(pr->sender_pid, -1); 627 GNUNET_PEER_change_rc (pr->sender_pid, -1);
626 pr->sender_pid = 0; 628 pr->sender_pid = 0;
627 GNUNET_PEER_change_rc(pr->origin_pid, -1); 629 GNUNET_PEER_change_rc (pr->origin_pid, -1);
628 pr->origin_pid = 0; 630 pr->origin_pid = 0;
629 if (NULL != pr->hnode) 631 if (NULL != pr->hnode)
630 { 632 {
631 GNUNET_CONTAINER_heap_remove_node(pr->hnode); 633 GNUNET_CONTAINER_heap_remove_node (pr->hnode);
632 pr->hnode = NULL; 634 pr->hnode = NULL;
633 } 635 }
634 if (NULL != pr->qe) 636 if (NULL != pr->qe)
635 { 637 {
636 GNUNET_DATASTORE_cancel(pr->qe); 638 GNUNET_DATASTORE_cancel (pr->qe);
637 pr->qe = NULL; 639 pr->qe = NULL;
638 } 640 }
639 if (NULL != pr->gh) 641 if (NULL != pr->gh)
640 { 642 {
641 GNUNET_DHT_get_stop(pr->gh); 643 GNUNET_DHT_get_stop (pr->gh);
642 pr->gh = NULL; 644 pr->gh = NULL;
643 } 645 }
644 if (NULL != pr->warn_task) 646 if (NULL != pr->warn_task)
645 { 647 {
646 GNUNET_SCHEDULER_cancel(pr->warn_task); 648 GNUNET_SCHEDULER_cancel (pr->warn_task);
647 pr->warn_task = NULL; 649 pr->warn_task = NULL;
648 } 650 }
649 GNUNET_assert( 651 GNUNET_assert (
650 GNUNET_OK == 652 GNUNET_OK ==
651 GNUNET_CONTAINER_multihashmap_remove(pr_map, &pr->public_data.query, pr)); 653 GNUNET_CONTAINER_multihashmap_remove (pr_map, &pr->public_data.query, pr));
652 GNUNET_STATISTICS_update(GSF_stats, 654 GNUNET_STATISTICS_update (GSF_stats,
653 gettext_noop("# Pending requests active"), 655 gettext_noop ("# Pending requests active"),
654 -1, 656 -1,
655 GNUNET_NO); 657 GNUNET_NO);
656 GNUNET_free(pr); 658 GNUNET_free (pr);
657 return GNUNET_YES; 659 return GNUNET_YES;
658} 660}
659 661
@@ -665,49 +667,49 @@ clean_request(void *cls, const struct GNUNET_HashCode *key, void *value)
665 * @param full_cleanup fully purge the request 667 * @param full_cleanup fully purge the request
666 */ 668 */
667void 669void
668GSF_pending_request_cancel_(struct GSF_PendingRequest *pr, int full_cleanup) 670GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
669{ 671{
670 GSF_LocalLookupContinuation cont; 672 GSF_LocalLookupContinuation cont;
671 673
672 if (NULL == pr_map) 674 if (NULL == pr_map)
673 return; /* already cleaned up! */ 675 return; /* already cleaned up! */
674 if (GNUNET_NO == full_cleanup) 676 if (GNUNET_NO == full_cleanup)
677 {
678 /* make request inactive (we're no longer interested in more results),
679 * but do NOT remove from our data-structures, we still need it there
680 * to prevent the request from looping */
681 pr->rh = NULL;
682 if (NULL != pr->cadet_request)
683 {
684 pr->cadet_retry_count = CADET_RETRY_MAX;
685 GSF_cadet_query_cancel (pr->cadet_request);
686 pr->cadet_request = NULL;
687 }
688 if (NULL != (cont = pr->llc_cont))
689 {
690 pr->llc_cont = NULL;
691 cont (pr->llc_cont_cls, pr, pr->local_result);
692 }
693 GSF_plan_notify_request_done_ (pr);
694 if (NULL != pr->qe)
695 {
696 GNUNET_DATASTORE_cancel (pr->qe);
697 pr->qe = NULL;
698 }
699 if (NULL != pr->gh)
675 { 700 {
676 /* make request inactive (we're no longer interested in more results), 701 GNUNET_DHT_get_stop (pr->gh);
677 * but do NOT remove from our data-structures, we still need it there 702 pr->gh = NULL;
678 * to prevent the request from looping */ 703 }
679 pr->rh = NULL; 704 if (NULL != pr->warn_task)
680 if (NULL != pr->cadet_request) 705 {
681 { 706 GNUNET_SCHEDULER_cancel (pr->warn_task);
682 pr->cadet_retry_count = CADET_RETRY_MAX; 707 pr->warn_task = NULL;
683 GSF_cadet_query_cancel(pr->cadet_request);
684 pr->cadet_request = NULL;
685 }
686 if (NULL != (cont = pr->llc_cont))
687 {
688 pr->llc_cont = NULL;
689 cont(pr->llc_cont_cls, pr, pr->local_result);
690 }
691 GSF_plan_notify_request_done_(pr);
692 if (NULL != pr->qe)
693 {
694 GNUNET_DATASTORE_cancel(pr->qe);
695 pr->qe = NULL;
696 }
697 if (NULL != pr->gh)
698 {
699 GNUNET_DHT_get_stop(pr->gh);
700 pr->gh = NULL;
701 }
702 if (NULL != pr->warn_task)
703 {
704 GNUNET_SCHEDULER_cancel(pr->warn_task);
705 pr->warn_task = NULL;
706 }
707 return;
708 } 708 }
709 GNUNET_assert(GNUNET_YES == 709 return;
710 clean_request(NULL, &pr->public_data.query, pr)); 710 }
711 GNUNET_assert (GNUNET_YES ==
712 clean_request (NULL, &pr->public_data.query, pr));
711} 713}
712 714
713 715
@@ -718,11 +720,11 @@ GSF_pending_request_cancel_(struct GSF_PendingRequest *pr, int full_cleanup)
718 * @param cls closure for @a it 720 * @param cls closure for @a it
719 */ 721 */
720void 722void
721GSF_iterate_pending_requests_(GSF_PendingRequestIterator it, void *cls) 723GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, void *cls)
722{ 724{
723 GNUNET_CONTAINER_multihashmap_iterate( 725 GNUNET_CONTAINER_multihashmap_iterate (
724 pr_map, 726 pr_map,
725 (GNUNET_CONTAINER_MulitHashMapIteratorCallback)it, 727 (GNUNET_CONTAINER_MulitHashMapIteratorCallback) it,
726 cls); 728 cls);
727} 729}
728 730
@@ -730,7 +732,8 @@ GSF_iterate_pending_requests_(GSF_PendingRequestIterator it, void *cls)
730/** 732/**
731 * Closure for process_reply() function. 733 * Closure for process_reply() function.
732 */ 734 */
733struct ProcessReplyClosure { 735struct ProcessReplyClosure
736{
734 /** 737 /**
735 * The data for the reply. 738 * The data for the reply.
736 */ 739 */
@@ -791,14 +794,14 @@ struct ProcessReplyClosure {
791 * @param pr request that was satisfied 794 * @param pr request that was satisfied
792 */ 795 */
793static void 796static void
794update_request_performance_data(struct ProcessReplyClosure *prq, 797update_request_performance_data (struct ProcessReplyClosure *prq,
795 struct GSF_PendingRequest *pr) 798 struct GSF_PendingRequest *pr)
796{ 799{
797 if (prq->sender == NULL) 800 if (prq->sender == NULL)
798 return; 801 return;
799 GSF_peer_update_performance_(prq->sender, 802 GSF_peer_update_performance_ (prq->sender,
800 pr->public_data.start_time, 803 pr->public_data.start_time,
801 prq->priority); 804 prq->priority);
802} 805}
803 806
804 807
@@ -811,7 +814,7 @@ update_request_performance_data(struct ProcessReplyClosure *prq,
811 * @return #GNUNET_YES (we should continue to iterate) 814 * @return #GNUNET_YES (we should continue to iterate)
812 */ 815 */
813static int 816static int
814process_reply(void *cls, const struct GNUNET_HashCode *key, void *value) 817process_reply (void *cls, const struct GNUNET_HashCode *key, void *value)
815{ 818{
816 struct ProcessReplyClosure *prq = cls; 819 struct ProcessReplyClosure *prq = cls;
817 struct GSF_PendingRequest *pr = value; 820 struct GSF_PendingRequest *pr = value;
@@ -820,128 +823,128 @@ process_reply(void *cls, const struct GNUNET_HashCode *key, void *value)
820 823
821 if (NULL == pr->rh) 824 if (NULL == pr->rh)
822 return GNUNET_YES; 825 return GNUNET_YES;
823 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 826 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
824 "Matched result (type %u) for query `%s' with pending request\n", 827 "Matched result (type %u) for query `%s' with pending request\n",
825 (unsigned int)prq->type, 828 (unsigned int) prq->type,
826 GNUNET_h2s(key)); 829 GNUNET_h2s (key));
827 GNUNET_STATISTICS_update(GSF_stats, 830 GNUNET_STATISTICS_update (GSF_stats,
828 gettext_noop("# replies received and matched"), 831 gettext_noop ("# replies received and matched"),
829 1, 832 1,
830 GNUNET_NO); 833 GNUNET_NO);
831 prq->eval = GNUNET_BLOCK_evaluate(GSF_block_ctx, 834 prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx,
832 prq->type, 835 prq->type,
833 pr->bg, 836 pr->bg,
834 prq->eo, 837 prq->eo,
835 key, 838 key,
836 NULL, 839 NULL,
837 0, 840 0,
838 prq->data, 841 prq->data,
839 prq->size); 842 prq->size);
840 switch (prq->eval) 843 switch (prq->eval)
841 { 844 {
842 case GNUNET_BLOCK_EVALUATION_OK_MORE: 845 case GNUNET_BLOCK_EVALUATION_OK_MORE:
843 update_request_performance_data(prq, pr); 846 update_request_performance_data (prq, pr);
844 break; 847 break;
845 848
846 case GNUNET_BLOCK_EVALUATION_OK_LAST: 849 case GNUNET_BLOCK_EVALUATION_OK_LAST:
847 /* short cut: stop processing early, no BF-update, etc. */ 850 /* short cut: stop processing early, no BF-update, etc. */
848 update_request_performance_data(prq, pr); 851 update_request_performance_data (prq, pr);
849 GNUNET_LOAD_update(GSF_rt_entry_lifetime, 852 GNUNET_LOAD_update (GSF_rt_entry_lifetime,
850 GNUNET_TIME_absolute_get_duration( 853 GNUNET_TIME_absolute_get_duration (
851 pr->public_data.start_time) 854 pr->public_data.start_time)
852 .rel_value_us); 855 .rel_value_us);
853 if (GNUNET_YES != 856 if (GNUNET_YES !=
854 GSF_request_plan_reference_get_last_transmission_(pr->public_data 857 GSF_request_plan_reference_get_last_transmission_ (pr->public_data
855 .pr_head, 858 .pr_head,
856 prq->sender, 859 prq->sender,
857 &last_transmission)) 860 &last_transmission))
858 last_transmission.abs_value_us = 861 last_transmission.abs_value_us =
859 GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us; 862 GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
860 /* pass on to other peers / local clients */ 863 /* pass on to other peers / local clients */
861 pr->rh(pr->rh_cls, 864 pr->rh (pr->rh_cls,
862 prq->eval, 865 prq->eval,
863 pr, 866 pr,
864 prq->anonymity_level, 867 prq->anonymity_level,
865 prq->expiration, 868 prq->expiration,
866 last_transmission, 869 last_transmission,
867 prq->type, 870 prq->type,
868 prq->data, 871 prq->data,
869 prq->size); 872 prq->size);
870 return GNUNET_YES; 873 return GNUNET_YES;
871 874
872 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: 875 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
873#if INSANE_STATISTICS 876#if INSANE_STATISTICS
874 GNUNET_STATISTICS_update(GSF_stats, 877 GNUNET_STATISTICS_update (GSF_stats,
875 gettext_noop( 878 gettext_noop (
876 "# duplicate replies discarded (bloomfilter)"), 879 "# duplicate replies discarded (bloomfilter)"),
877 1, 880 1,
878 GNUNET_NO); 881 GNUNET_NO);
879#endif 882#endif
880 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Duplicate response, discarding.\n"); 883 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Duplicate response, discarding.\n");
881 return GNUNET_YES; /* duplicate */ 884 return GNUNET_YES; /* duplicate */
882 885
883 case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT: 886 case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
884 GNUNET_STATISTICS_update(GSF_stats, 887 GNUNET_STATISTICS_update (GSF_stats,
885 gettext_noop("# irrelevant replies discarded"), 888 gettext_noop ("# irrelevant replies discarded"),
886 1, 889 1,
887 GNUNET_NO); 890 GNUNET_NO);
888 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Irrelevant response, ignoring.\n"); 891 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Irrelevant response, ignoring.\n");
889 return GNUNET_YES; 892 return GNUNET_YES;
890 893
891 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: 894 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
892 return GNUNET_YES; /* wrong namespace */ 895 return GNUNET_YES; /* wrong namespace */
893 896
894 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: 897 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
895 GNUNET_break(0); 898 GNUNET_break (0);
896 return GNUNET_YES; 899 return GNUNET_YES;
897 900
898 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: 901 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
899 GNUNET_break(0); 902 GNUNET_break (0);
900 return GNUNET_YES; 903 return GNUNET_YES;
901 904
902 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: 905 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
903 GNUNET_log(GNUNET_ERROR_TYPE_ERROR, 906 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
904 _("Unsupported block type %u\n"), 907 _ ("Unsupported block type %u\n"),
905 prq->type); 908 prq->type);
906 return GNUNET_NO; 909 return GNUNET_NO;
907 } 910 }
908 /* update bloomfilter */ 911 /* update bloomfilter */
909 GNUNET_CRYPTO_hash(prq->data, prq->size, &chash); 912 GNUNET_CRYPTO_hash (prq->data, prq->size, &chash);
910 GSF_pending_request_update_(pr, &chash, 1); 913 GSF_pending_request_update_ (pr, &chash, 1);
911 if (NULL == prq->sender) 914 if (NULL == prq->sender)
912 { 915 {
913 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 916 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914 "Found result for query `%s' in local datastore\n", 917 "Found result for query `%s' in local datastore\n",
915 GNUNET_h2s(key)); 918 GNUNET_h2s (key));
916 GNUNET_STATISTICS_update(GSF_stats, 919 GNUNET_STATISTICS_update (GSF_stats,
917 gettext_noop("# results found locally"), 920 gettext_noop ("# results found locally"),
918 1, 921 1,
919 GNUNET_NO); 922 GNUNET_NO);
920 } 923 }
921 else 924 else
922 { 925 {
923 GSF_dht_lookup_(pr); 926 GSF_dht_lookup_ (pr);
924 } 927 }
925 prq->priority += pr->public_data.original_priority; 928 prq->priority += pr->public_data.original_priority;
926 pr->public_data.priority = 0; 929 pr->public_data.priority = 0;
927 pr->public_data.original_priority = 0; 930 pr->public_data.original_priority = 0;
928 pr->public_data.results_found++; 931 pr->public_data.results_found++;
929 prq->request_found = GNUNET_YES; 932 prq->request_found = GNUNET_YES;
930 /* finally, pass on to other peer / local client */ 933 /* finally, pass on to other peer / local client */
931 if (!GSF_request_plan_reference_get_last_transmission_(pr->public_data 934 if (! GSF_request_plan_reference_get_last_transmission_ (pr->public_data
932 .pr_head, 935 .pr_head,
933 prq->sender, 936 prq->sender,
934 &last_transmission)) 937 &last_transmission))
935 last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us; 938 last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
936 pr->rh(pr->rh_cls, 939 pr->rh (pr->rh_cls,
937 prq->eval, 940 prq->eval,
938 pr, 941 pr,
939 prq->anonymity_level, 942 prq->anonymity_level,
940 prq->expiration, 943 prq->expiration,
941 last_transmission, 944 last_transmission,
942 prq->type, 945 prq->type,
943 prq->data, 946 prq->data,
944 prq->size); 947 prq->size);
945 return GNUNET_YES; 948 return GNUNET_YES;
946} 949}
947 950
@@ -949,7 +952,8 @@ process_reply(void *cls, const struct GNUNET_HashCode *key, void *value)
949/** 952/**
950 * Context for put_migration_continuation(). 953 * Context for put_migration_continuation().
951 */ 954 */
952struct PutMigrationContext { 955struct PutMigrationContext
956{
953 /** 957 /**
954 * Start time for the operation. 958 * Start time for the operation.
955 */ 959 */
@@ -978,10 +982,10 @@ struct PutMigrationContext {
978 * @param msg NULL on success, otherwise an error message 982 * @param msg NULL on success, otherwise an error message
979 */ 983 */
980static void 984static void
981put_migration_continuation(void *cls, 985put_migration_continuation (void *cls,
982 int success, 986 int success,
983 struct GNUNET_TIME_Absolute min_expiration, 987 struct GNUNET_TIME_Absolute min_expiration,
984 const char *msg) 988 const char *msg)
985{ 989{
986 struct PutMigrationContext *pmc = cls; 990 struct PutMigrationContext *pmc = cls;
987 struct GSF_ConnectedPeer *cp; 991 struct GSF_ConnectedPeer *cp;
@@ -989,67 +993,67 @@ put_migration_continuation(void *cls,
989 struct GSF_PeerPerformanceData *ppd; 993 struct GSF_PeerPerformanceData *ppd;
990 994
991 if (NULL != datastore_put_load) 995 if (NULL != datastore_put_load)
996 {
997 if (GNUNET_SYSERR != success)
998 {
999 GNUNET_LOAD_update (datastore_put_load,
1000 GNUNET_TIME_absolute_get_duration (pmc->start)
1001 .rel_value_us);
1002 }
1003 else
992 { 1004 {
993 if (GNUNET_SYSERR != success) 1005 /* on queue failure / timeout, increase the put load dramatically */
994 { 1006 GNUNET_LOAD_update (datastore_put_load,
995 GNUNET_LOAD_update(datastore_put_load, 1007 GNUNET_TIME_UNIT_MINUTES.rel_value_us);
996 GNUNET_TIME_absolute_get_duration(pmc->start)
997 .rel_value_us);
998 }
999 else
1000 {
1001 /* on queue failure / timeout, increase the put load dramatically */
1002 GNUNET_LOAD_update(datastore_put_load,
1003 GNUNET_TIME_UNIT_MINUTES.rel_value_us);
1004 }
1005 } 1008 }
1006 cp = GSF_peer_get_(&pmc->origin); 1009 }
1010 cp = GSF_peer_get_ (&pmc->origin);
1007 if (GNUNET_OK == success) 1011 if (GNUNET_OK == success)
1012 {
1013 if (NULL != cp)
1008 { 1014 {
1009 if (NULL != cp) 1015 ppd = GSF_get_peer_performance_data_ (cp);
1010 { 1016 ppd->migration_delay.rel_value_us /= 2;
1011 ppd = GSF_get_peer_performance_data_(cp);
1012 ppd->migration_delay.rel_value_us /= 2;
1013 }
1014 GNUNET_free(pmc);
1015 return;
1016 } 1017 }
1018 GNUNET_free (pmc);
1019 return;
1020 }
1017 if ((GNUNET_NO == success) && (GNUNET_NO == pmc->requested) && (NULL != cp)) 1021 if ((GNUNET_NO == success) && (GNUNET_NO == pmc->requested) && (NULL != cp))
1022 {
1023 ppd = GSF_get_peer_performance_data_ (cp);
1024 if (min_expiration.abs_value_us > 0)
1018 { 1025 {
1019 ppd = GSF_get_peer_performance_data_(cp); 1026 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020 if (min_expiration.abs_value_us > 0) 1027 "Asking to stop migration for %s because datastore is full\n",
1021 { 1028 GNUNET_STRINGS_relative_time_to_string (
1022 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1029 GNUNET_TIME_absolute_get_remaining (min_expiration),
1023 "Asking to stop migration for %s because datastore is full\n", 1030 GNUNET_YES));
1024 GNUNET_STRINGS_relative_time_to_string( 1031 GSF_block_peer_migration_ (cp, min_expiration);
1025 GNUNET_TIME_absolute_get_remaining(min_expiration),
1026 GNUNET_YES));
1027 GSF_block_peer_migration_(cp, min_expiration);
1028 }
1029 else
1030 {
1031 ppd->migration_delay = GNUNET_TIME_relative_max(GNUNET_TIME_UNIT_SECONDS,
1032 ppd->migration_delay);
1033 ppd->migration_delay =
1034 GNUNET_TIME_relative_min(GNUNET_TIME_UNIT_HOURS, ppd->migration_delay);
1035 mig_pause.rel_value_us =
1036 GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK,
1037 ppd->migration_delay.rel_value_us);
1038 ppd->migration_delay =
1039 GNUNET_TIME_relative_saturating_multiply(ppd->migration_delay, 2);
1040 GNUNET_log(
1041 GNUNET_ERROR_TYPE_DEBUG,
1042 "Replicated content already exists locally, asking to stop migration for %s\n",
1043 GNUNET_STRINGS_relative_time_to_string(mig_pause, GNUNET_YES));
1044 GSF_block_peer_migration_(cp,
1045 GNUNET_TIME_relative_to_absolute(mig_pause));
1046 }
1047 } 1032 }
1048 GNUNET_free(pmc); 1033 else
1049 GNUNET_STATISTICS_update(GSF_stats, 1034 {
1050 gettext_noop("# Datastore `PUT' failures"), 1035 ppd->migration_delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_SECONDS,
1051 1, 1036 ppd->migration_delay);
1052 GNUNET_NO); 1037 ppd->migration_delay =
1038 GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS, ppd->migration_delay);
1039 mig_pause.rel_value_us =
1040 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1041 ppd->migration_delay.rel_value_us);
1042 ppd->migration_delay =
1043 GNUNET_TIME_relative_saturating_multiply (ppd->migration_delay, 2);
1044 GNUNET_log (
1045 GNUNET_ERROR_TYPE_DEBUG,
1046 "Replicated content already exists locally, asking to stop migration for %s\n",
1047 GNUNET_STRINGS_relative_time_to_string (mig_pause, GNUNET_YES));
1048 GSF_block_peer_migration_ (cp,
1049 GNUNET_TIME_relative_to_absolute (mig_pause));
1050 }
1051 }
1052 GNUNET_free (pmc);
1053 GNUNET_STATISTICS_update (GSF_stats,
1054 gettext_noop ("# Datastore `PUT' failures"),
1055 1,
1056 GNUNET_NO);
1053} 1057}
1054 1058
1055 1059
@@ -1063,22 +1067,22 @@ put_migration_continuation(void *cls,
1063 * #GNUNET_NO to process normally (load normal or low) 1067 * #GNUNET_NO to process normally (load normal or low)
1064 */ 1068 */
1065static int 1069static int
1066test_put_load_too_high(uint32_t priority) 1070test_put_load_too_high (uint32_t priority)
1067{ 1071{
1068 double ld; 1072 double ld;
1069 1073
1070 if (NULL == datastore_put_load) 1074 if (NULL == datastore_put_load)
1071 return GNUNET_NO; 1075 return GNUNET_NO;
1072 if (GNUNET_LOAD_get_average(datastore_put_load) < 50) 1076 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
1073 return GNUNET_NO; /* very fast */ 1077 return GNUNET_NO; /* very fast */
1074 ld = GNUNET_LOAD_get_load(datastore_put_load); 1078 ld = GNUNET_LOAD_get_load (datastore_put_load);
1075 if (ld < 2.0 * (1 + priority)) 1079 if (ld < 2.0 * (1 + priority))
1076 return GNUNET_NO; 1080 return GNUNET_NO;
1077 GNUNET_STATISTICS_update(GSF_stats, 1081 GNUNET_STATISTICS_update (GSF_stats,
1078 gettext_noop( 1082 gettext_noop (
1079 "# storage requests dropped due to high load"), 1083 "# storage requests dropped due to high load"),
1080 1, 1084 1,
1081 GNUNET_NO); 1085 GNUNET_NO);
1082 return GNUNET_YES; 1086 return GNUNET_YES;
1083} 1087}
1084 1088
@@ -1099,67 +1103,67 @@ test_put_load_too_high(uint32_t priority)
1099 * @param data pointer to the result data 1103 * @param data pointer to the result data
1100 */ 1104 */
1101static void 1105static void
1102handle_dht_reply(void *cls, 1106handle_dht_reply (void *cls,
1103 struct GNUNET_TIME_Absolute exp, 1107 struct GNUNET_TIME_Absolute exp,
1104 const struct GNUNET_HashCode *key, 1108 const struct GNUNET_HashCode *key,
1105 const struct GNUNET_PeerIdentity *get_path, 1109 const struct GNUNET_PeerIdentity *get_path,
1106 unsigned int get_path_length, 1110 unsigned int get_path_length,
1107 const struct GNUNET_PeerIdentity *put_path, 1111 const struct GNUNET_PeerIdentity *put_path,
1108 unsigned int put_path_length, 1112 unsigned int put_path_length,
1109 enum GNUNET_BLOCK_Type type, 1113 enum GNUNET_BLOCK_Type type,
1110 size_t size, 1114 size_t size,
1111 const void *data) 1115 const void *data)
1112{ 1116{
1113 struct GSF_PendingRequest *pr = cls; 1117 struct GSF_PendingRequest *pr = cls;
1114 struct ProcessReplyClosure prq; 1118 struct ProcessReplyClosure prq;
1115 struct PutMigrationContext *pmc; 1119 struct PutMigrationContext *pmc;
1116 1120
1117 GNUNET_STATISTICS_update(GSF_stats, 1121 GNUNET_STATISTICS_update (GSF_stats,
1118 gettext_noop("# Replies received from DHT"), 1122 gettext_noop ("# Replies received from DHT"),
1119 1, 1123 1,
1120 GNUNET_NO); 1124 GNUNET_NO);
1121 memset(&prq, 0, sizeof(prq)); 1125 memset (&prq, 0, sizeof(prq));
1122 prq.data = data; 1126 prq.data = data;
1123 prq.expiration = exp; 1127 prq.expiration = exp;
1124 /* do not allow migrated content to live longer than 1 year */ 1128 /* do not allow migrated content to live longer than 1 year */
1125 prq.expiration = GNUNET_TIME_absolute_min(GNUNET_TIME_relative_to_absolute( 1129 prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (
1126 GNUNET_TIME_UNIT_YEARS), 1130 GNUNET_TIME_UNIT_YEARS),
1127 prq.expiration); 1131 prq.expiration);
1128 prq.size = size; 1132 prq.size = size;
1129 prq.type = type; 1133 prq.type = type;
1130 prq.eo = GNUNET_BLOCK_EO_NONE; 1134 prq.eo = GNUNET_BLOCK_EO_NONE;
1131 process_reply(&prq, key, pr); 1135 process_reply (&prq, key, pr);
1132 if ((GNUNET_YES == active_to_migration) && 1136 if ((GNUNET_YES == active_to_migration) &&
1133 (GNUNET_NO == test_put_load_too_high(prq.priority))) 1137 (GNUNET_NO == test_put_load_too_high (prq.priority)))
1138 {
1139 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140 "Replicating result for query `%s' with priority %u\n",
1141 GNUNET_h2s (key),
1142 prq.priority);
1143 pmc = GNUNET_new (struct PutMigrationContext);
1144 pmc->start = GNUNET_TIME_absolute_get ();
1145 pmc->requested = GNUNET_YES;
1146 if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1147 0,
1148 key,
1149 size,
1150 data,
1151 type,
1152 prq.priority,
1153 1 /* anonymity */,
1154 0 /* replication */,
1155 exp,
1156 1 + prq.priority,
1157 MAX_DATASTORE_QUEUE,
1158 &put_migration_continuation,
1159 pmc))
1134 { 1160 {
1135 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1161 put_migration_continuation (pmc,
1136 "Replicating result for query `%s' with priority %u\n", 1162 GNUNET_SYSERR,
1137 GNUNET_h2s(key), 1163 GNUNET_TIME_UNIT_ZERO_ABS,
1138 prq.priority); 1164 NULL);
1139 pmc = GNUNET_new(struct PutMigrationContext);
1140 pmc->start = GNUNET_TIME_absolute_get();
1141 pmc->requested = GNUNET_YES;
1142 if (NULL == GNUNET_DATASTORE_put(GSF_dsh,
1143 0,
1144 key,
1145 size,
1146 data,
1147 type,
1148 prq.priority,
1149 1 /* anonymity */,
1150 0 /* replication */,
1151 exp,
1152 1 + prq.priority,
1153 MAX_DATASTORE_QUEUE,
1154 &put_migration_continuation,
1155 pmc))
1156 {
1157 put_migration_continuation(pmc,
1158 GNUNET_SYSERR,
1159 GNUNET_TIME_UNIT_ZERO_ABS,
1160 NULL);
1161 }
1162 } 1165 }
1166 }
1163} 1167}
1164 1168
1165 1169
@@ -1169,7 +1173,7 @@ handle_dht_reply(void *cls,
1169 * @param pr the pending request to process 1173 * @param pr the pending request to process
1170 */ 1174 */
1171void 1175void
1172GSF_dht_lookup_(struct GSF_PendingRequest *pr) 1176GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1173{ 1177{
1174 const void *xquery; 1178 const void *xquery;
1175 size_t xquery_size; 1179 size_t xquery_size;
@@ -1179,32 +1183,32 @@ GSF_dht_lookup_(struct GSF_PendingRequest *pr)
1179 if (0 != pr->public_data.anonymity_level) 1183 if (0 != pr->public_data.anonymity_level)
1180 return; 1184 return;
1181 if (NULL != pr->gh) 1185 if (NULL != pr->gh)
1182 { 1186 {
1183 GNUNET_DHT_get_stop(pr->gh); 1187 GNUNET_DHT_get_stop (pr->gh);
1184 pr->gh = NULL; 1188 pr->gh = NULL;
1185 } 1189 }
1186 xquery = NULL; 1190 xquery = NULL;
1187 xquery_size = 0; 1191 xquery_size = 0;
1188 if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY)) 1192 if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
1189 { 1193 {
1190 GNUNET_assert(0 != pr->sender_pid); 1194 GNUNET_assert (0 != pr->sender_pid);
1191 GNUNET_PEER_resolve(pr->sender_pid, &pi); 1195 GNUNET_PEER_resolve (pr->sender_pid, &pi);
1192 GNUNET_memcpy(&buf[xquery_size], &pi, sizeof(struct GNUNET_PeerIdentity)); 1196 GNUNET_memcpy (&buf[xquery_size], &pi, sizeof(struct GNUNET_PeerIdentity));
1193 xquery_size += sizeof(struct GNUNET_PeerIdentity); 1197 xquery_size += sizeof(struct GNUNET_PeerIdentity);
1194 } 1198 }
1195 pr->gh = GNUNET_DHT_get_start(GSF_dht, 1199 pr->gh = GNUNET_DHT_get_start (GSF_dht,
1196 pr->public_data.type, 1200 pr->public_data.type,
1197 &pr->public_data.query, 1201 &pr->public_data.query,
1198 DHT_GET_REPLICATION, 1202 DHT_GET_REPLICATION,
1199 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, 1203 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1200 xquery, 1204 xquery,
1201 xquery_size, 1205 xquery_size,
1202 &handle_dht_reply, 1206 &handle_dht_reply,
1203 pr); 1207 pr);
1204 if ((NULL != pr->gh) && (0 != pr->replies_seen_count)) 1208 if ((NULL != pr->gh) && (0 != pr->replies_seen_count))
1205 GNUNET_DHT_get_filter_known_results(pr->gh, 1209 GNUNET_DHT_get_filter_known_results (pr->gh,
1206 pr->replies_seen_count, 1210 pr->replies_seen_count,
1207 pr->replies_seen); 1211 pr->replies_seen);
1208} 1212}
1209 1213
1210 1214
@@ -1218,11 +1222,11 @@ GSF_dht_lookup_(struct GSF_PendingRequest *pr)
1218 * @param data reply block data, NULL on error 1222 * @param data reply block data, NULL on error
1219 */ 1223 */
1220static void 1224static void
1221cadet_reply_proc(void *cls, 1225cadet_reply_proc (void *cls,
1222 enum GNUNET_BLOCK_Type type, 1226 enum GNUNET_BLOCK_Type type,
1223 struct GNUNET_TIME_Absolute expiration, 1227 struct GNUNET_TIME_Absolute expiration,
1224 size_t data_size, 1228 size_t data_size,
1225 const void *data) 1229 const void *data)
1226{ 1230{
1227 struct GSF_PendingRequest *pr = cls; 1231 struct GSF_PendingRequest *pr = cls;
1228 struct ProcessReplyClosure prq; 1232 struct ProcessReplyClosure prq;
@@ -1230,46 +1234,46 @@ cadet_reply_proc(void *cls,
1230 1234
1231 pr->cadet_request = NULL; 1235 pr->cadet_request = NULL;
1232 if (GNUNET_BLOCK_TYPE_ANY == type) 1236 if (GNUNET_BLOCK_TYPE_ANY == type)
1233 { 1237 {
1234 GNUNET_break(NULL == data); 1238 GNUNET_break (NULL == data);
1235 GNUNET_break(0 == data_size); 1239 GNUNET_break (0 == data_size);
1236 pr->cadet_retry_count++; 1240 pr->cadet_retry_count++;
1237 if (pr->cadet_retry_count >= CADET_RETRY_MAX) 1241 if (pr->cadet_retry_count >= CADET_RETRY_MAX)
1238 return; /* give up on cadet */ 1242 return; /* give up on cadet */
1239 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Error retrieiving block via cadet\n"); 1243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Error retrieiving block via cadet\n");
1240 /* retry -- without delay, as this is non-anonymous 1244 /* retry -- without delay, as this is non-anonymous
1241 and cadet/cadet connect will take some time anyway */ 1245 and cadet/cadet connect will take some time anyway */
1242 pr->cadet_request = GSF_cadet_query(pr->public_data.target, 1246 pr->cadet_request = GSF_cadet_query (pr->public_data.target,
1243 &pr->public_data.query, 1247 &pr->public_data.query,
1244 pr->public_data.type, 1248 pr->public_data.type,
1245 &cadet_reply_proc, 1249 &cadet_reply_proc,
1246 pr); 1250 pr);
1247 return; 1251 return;
1248 } 1252 }
1249 if (GNUNET_YES != 1253 if (GNUNET_YES !=
1250 GNUNET_BLOCK_get_key(GSF_block_ctx, type, data, data_size, &query)) 1254 GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, data_size, &query))
1251 { 1255 {
1252 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1256 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1253 "Failed to derive key for block of type %d\n", 1257 "Failed to derive key for block of type %d\n",
1254 (int)type); 1258 (int) type);
1255 GNUNET_break_op(0); 1259 GNUNET_break_op (0);
1256 return; 1260 return;
1257 } 1261 }
1258 GNUNET_STATISTICS_update(GSF_stats, 1262 GNUNET_STATISTICS_update (GSF_stats,
1259 gettext_noop("# Replies received from CADET"), 1263 gettext_noop ("# Replies received from CADET"),
1260 1, 1264 1,
1261 GNUNET_NO); 1265 GNUNET_NO);
1262 memset(&prq, 0, sizeof(prq)); 1266 memset (&prq, 0, sizeof(prq));
1263 prq.data = data; 1267 prq.data = data;
1264 prq.expiration = expiration; 1268 prq.expiration = expiration;
1265 /* do not allow migrated content to live longer than 1 year */ 1269 /* do not allow migrated content to live longer than 1 year */
1266 prq.expiration = GNUNET_TIME_absolute_min(GNUNET_TIME_relative_to_absolute( 1270 prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (
1267 GNUNET_TIME_UNIT_YEARS), 1271 GNUNET_TIME_UNIT_YEARS),
1268 prq.expiration); 1272 prq.expiration);
1269 prq.size = data_size; 1273 prq.size = data_size;
1270 prq.type = type; 1274 prq.type = type;
1271 prq.eo = GNUNET_BLOCK_EO_NONE; 1275 prq.eo = GNUNET_BLOCK_EO_NONE;
1272 process_reply(&prq, &query, pr); 1276 process_reply (&prq, &query, pr);
1273} 1277}
1274 1278
1275 1279
@@ -1279,23 +1283,23 @@ cadet_reply_proc(void *cls,
1279 * @param pr the pending request to process 1283 * @param pr the pending request to process
1280 */ 1284 */
1281void 1285void
1282GSF_cadet_lookup_(struct GSF_PendingRequest *pr) 1286GSF_cadet_lookup_ (struct GSF_PendingRequest *pr)
1283{ 1287{
1284 if (0 != pr->public_data.anonymity_level) 1288 if (0 != pr->public_data.anonymity_level)
1285 return; 1289 return;
1286 if (0 == pr->public_data.target) 1290 if (0 == pr->public_data.target)
1287 { 1291 {
1288 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1292 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1289 "Cannot do cadet-based download, target peer not known\n"); 1293 "Cannot do cadet-based download, target peer not known\n");
1290 return; 1294 return;
1291 } 1295 }
1292 if (NULL != pr->cadet_request) 1296 if (NULL != pr->cadet_request)
1293 return; 1297 return;
1294 pr->cadet_request = GSF_cadet_query(pr->public_data.target, 1298 pr->cadet_request = GSF_cadet_query (pr->public_data.target,
1295 &pr->public_data.query, 1299 &pr->public_data.query,
1296 pr->public_data.type, 1300 pr->public_data.type,
1297 &cadet_reply_proc, 1301 &cadet_reply_proc,
1298 pr); 1302 pr);
1299} 1303}
1300 1304
1301 1305
@@ -1305,18 +1309,18 @@ GSF_cadet_lookup_(struct GSF_PendingRequest *pr)
1305 * @param cls the `struct GSF_PendingRequest` 1309 * @param cls the `struct GSF_PendingRequest`
1306 */ 1310 */
1307static void 1311static void
1308warn_delay_task(void *cls) 1312warn_delay_task (void *cls)
1309{ 1313{
1310 struct GSF_PendingRequest *pr = cls; 1314 struct GSF_PendingRequest *pr = cls;
1311 1315
1312 GNUNET_log(GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, 1316 GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1313 _("Datastore lookup already took %s!\n"), 1317 _ ("Datastore lookup already took %s!\n"),
1314 GNUNET_STRINGS_relative_time_to_string( 1318 GNUNET_STRINGS_relative_time_to_string (
1315 GNUNET_TIME_absolute_get_duration(pr->qe_start), 1319 GNUNET_TIME_absolute_get_duration (pr->qe_start),
1316 GNUNET_YES)); 1320 GNUNET_YES));
1317 pr->warn_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_MINUTES, 1321 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1318 &warn_delay_task, 1322 &warn_delay_task,
1319 pr); 1323 pr);
1320} 1324}
1321 1325
1322 1326
@@ -1326,140 +1330,140 @@ warn_delay_task(void *cls)
1326 * @param cls the `struct GSF_PendingRequest` 1330 * @param cls the `struct GSF_PendingRequest`
1327 */ 1331 */
1328static void 1332static void
1329odc_warn_delay_task(void *cls) 1333odc_warn_delay_task (void *cls)
1330{ 1334{
1331 struct GSF_PendingRequest *pr = cls; 1335 struct GSF_PendingRequest *pr = cls;
1332 1336
1333 GNUNET_log(GNUNET_ERROR_TYPE_WARNING, 1337 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1334 _("On-demand lookup already took %s!\n"), 1338 _ ("On-demand lookup already took %s!\n"),
1335 GNUNET_STRINGS_relative_time_to_string( 1339 GNUNET_STRINGS_relative_time_to_string (
1336 GNUNET_TIME_absolute_get_duration(pr->qe_start), 1340 GNUNET_TIME_absolute_get_duration (pr->qe_start),
1337 GNUNET_YES)); 1341 GNUNET_YES));
1338 pr->warn_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_MINUTES, 1342 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1339 &odc_warn_delay_task, 1343 &odc_warn_delay_task,
1340 pr); 1344 pr);
1341} 1345}
1342 1346
1343 1347
1344/* Call our continuation (if we have any) */ 1348/* Call our continuation (if we have any) */
1345static void 1349static void
1346call_continuation(struct GSF_PendingRequest *pr) 1350call_continuation (struct GSF_PendingRequest *pr)
1347{ 1351{
1348 GSF_LocalLookupContinuation cont = pr->llc_cont; 1352 GSF_LocalLookupContinuation cont = pr->llc_cont;
1349 1353
1350 GNUNET_assert(NULL == pr->qe); 1354 GNUNET_assert (NULL == pr->qe);
1351 if (NULL != pr->warn_task) 1355 if (NULL != pr->warn_task)
1352 { 1356 {
1353 GNUNET_SCHEDULER_cancel(pr->warn_task); 1357 GNUNET_SCHEDULER_cancel (pr->warn_task);
1354 pr->warn_task = NULL; 1358 pr->warn_task = NULL;
1355 } 1359 }
1356 if (NULL == cont) 1360 if (NULL == cont)
1357 return; /* no continuation */ 1361 return; /* no continuation */
1358 pr->llc_cont = NULL; 1362 pr->llc_cont = NULL;
1359 if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options)) 1363 if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1364 {
1365 if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result)
1360 { 1366 {
1361 if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result) 1367 /* Signal that we are done and that there won't be any
1362 { 1368 additional results to allow client to clean up state. */
1363 /* Signal that we are done and that there won't be any 1369 pr->rh (pr->rh_cls,
1364 additional results to allow client to clean up state. */ 1370 GNUNET_BLOCK_EVALUATION_OK_LAST,
1365 pr->rh(pr->rh_cls, 1371 pr,
1366 GNUNET_BLOCK_EVALUATION_OK_LAST, 1372 UINT32_MAX,
1367 pr, 1373 GNUNET_TIME_UNIT_ZERO_ABS,
1368 UINT32_MAX, 1374 GNUNET_TIME_UNIT_FOREVER_ABS,
1369 GNUNET_TIME_UNIT_ZERO_ABS, 1375 GNUNET_BLOCK_TYPE_ANY,
1370 GNUNET_TIME_UNIT_FOREVER_ABS, 1376 NULL,
1371 GNUNET_BLOCK_TYPE_ANY, 1377 0);
1372 NULL,
1373 0);
1374 }
1375 /* Finally, call our continuation to signal that we are
1376 done with local processing of this request; i.e. to
1377 start reading again from the client. */
1378 cont(pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
1379 return;
1380 } 1378 }
1379 /* Finally, call our continuation to signal that we are
1380 done with local processing of this request; i.e. to
1381 start reading again from the client. */
1382 cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
1383 return;
1384 }
1381 1385
1382 cont(pr->llc_cont_cls, pr, pr->local_result); 1386 cont (pr->llc_cont_cls, pr, pr->local_result);
1383} 1387}
1384 1388
1385 1389
1386/* Update stats and call continuation */ 1390/* Update stats and call continuation */
1387static void 1391static void
1388no_more_local_results(struct GSF_PendingRequest *pr) 1392no_more_local_results (struct GSF_PendingRequest *pr)
1389{ 1393{
1390 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, 1394 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1391 "No further local responses available.\n"); 1395 "No further local responses available.\n");
1392#if INSANE_STATISTICS 1396#if INSANE_STATISTICS
1393 if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == pr->public_data.type) || 1397 if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == pr->public_data.type) ||
1394 (GNUNET_BLOCK_TYPE_FS_IBLOCK == pr->public_data.type)) 1398 (GNUNET_BLOCK_TYPE_FS_IBLOCK == pr->public_data.type))
1395 GNUNET_STATISTICS_update(GSF_stats, 1399 GNUNET_STATISTICS_update (GSF_stats,
1396 gettext_noop( 1400 gettext_noop (
1397 "# requested DBLOCK or IBLOCK not found"), 1401 "# requested DBLOCK or IBLOCK not found"),
1398 1, 1402 1,
1399 GNUNET_NO); 1403 GNUNET_NO);
1400#endif 1404#endif
1401 call_continuation(pr); 1405 call_continuation (pr);
1402} 1406}
1403 1407
1404 1408
1405/* forward declaration */ 1409/* forward declaration */
1406static void 1410static void
1407process_local_reply(void *cls, 1411process_local_reply (void *cls,
1408 const struct GNUNET_HashCode *key, 1412 const struct GNUNET_HashCode *key,
1409 size_t size, 1413 size_t size,
1410 const void *data, 1414 const void *data,
1411 enum GNUNET_BLOCK_Type type, 1415 enum GNUNET_BLOCK_Type type,
1412 uint32_t priority, 1416 uint32_t priority,
1413 uint32_t anonymity, 1417 uint32_t anonymity,
1414 uint32_t replication, 1418 uint32_t replication,
1415 struct GNUNET_TIME_Absolute expiration, 1419 struct GNUNET_TIME_Absolute expiration,
1416 uint64_t uid); 1420 uint64_t uid);
1417 1421
1418 1422
1419/* Start a local query */ 1423/* Start a local query */
1420static void 1424static void
1421start_local_query(struct GSF_PendingRequest *pr, 1425start_local_query (struct GSF_PendingRequest *pr,
1422 uint64_t next_uid, 1426 uint64_t next_uid,
1423 bool random) 1427 bool random)
1424{ 1428{
1425 pr->qe_start = GNUNET_TIME_absolute_get(); 1429 pr->qe_start = GNUNET_TIME_absolute_get ();
1426 pr->warn_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_MINUTES, 1430 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1427 &warn_delay_task, 1431 &warn_delay_task,
1428 pr); 1432 pr);
1429 pr->qe = GNUNET_DATASTORE_get_key(GSF_dsh, 1433 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1430 next_uid, 1434 next_uid,
1431 random, 1435 random,
1432 &pr->public_data.query, 1436 &pr->public_data.query,
1433 pr->public_data.type == 1437 pr->public_data.type ==
1434 GNUNET_BLOCK_TYPE_FS_DBLOCK 1438 GNUNET_BLOCK_TYPE_FS_DBLOCK
1435 ? GNUNET_BLOCK_TYPE_ANY 1439 ? GNUNET_BLOCK_TYPE_ANY
1436 : pr->public_data.type, 1440 : pr->public_data.type,
1437 (0 != (GSF_PRO_PRIORITY_UNLIMITED & 1441 (0 != (GSF_PRO_PRIORITY_UNLIMITED
1438 pr->public_data.options)) 1442 & pr->public_data.options))
1439 ? UINT_MAX 1443 ? UINT_MAX
1440 : 1 1444 : 1
1441 /* queue priority */, 1445 /* queue priority */,
1442 (0 != (GSF_PRO_PRIORITY_UNLIMITED & 1446 (0 != (GSF_PRO_PRIORITY_UNLIMITED
1443 pr->public_data.options)) 1447 & pr->public_data.options))
1444 ? UINT_MAX 1448 ? UINT_MAX
1445 : GSF_datastore_queue_size 1449 : GSF_datastore_queue_size
1446 /* max queue size */, 1450 /* max queue size */,
1447 &process_local_reply, 1451 &process_local_reply,
1448 pr); 1452 pr);
1449 if (NULL != pr->qe) 1453 if (NULL != pr->qe)
1450 return; 1454 return;
1451 GNUNET_log( 1455 GNUNET_log (
1452 GNUNET_ERROR_TYPE_DEBUG, 1456 GNUNET_ERROR_TYPE_DEBUG,
1453 "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n", 1457 "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n",
1454 GNUNET_h2s(&pr->public_data.query), 1458 GNUNET_h2s (&pr->public_data.query),
1455 pr->public_data.type, 1459 pr->public_data.type,
1456 (unsigned long long)next_uid); 1460 (unsigned long long) next_uid);
1457 GNUNET_STATISTICS_update(GSF_stats, 1461 GNUNET_STATISTICS_update (GSF_stats,
1458 gettext_noop( 1462 gettext_noop (
1459 "# Datastore lookups concluded (error queueing)"), 1463 "# Datastore lookups concluded (error queueing)"),
1460 1, 1464 1,
1461 GNUNET_NO); 1465 GNUNET_NO);
1462 call_continuation(pr); 1466 call_continuation (pr);
1463} 1467}
1464 1468
1465 1469
@@ -1482,178 +1486,178 @@ start_local_query(struct GSF_PendingRequest *pr,
1482 * maybe 0 if no unique identifier is available 1486 * maybe 0 if no unique identifier is available
1483 */ 1487 */
1484static void 1488static void
1485process_local_reply(void *cls, 1489process_local_reply (void *cls,
1486 const struct GNUNET_HashCode *key, 1490 const struct GNUNET_HashCode *key,
1487 size_t size, 1491 size_t size,
1488 const void *data, 1492 const void *data,
1489 enum GNUNET_BLOCK_Type type, 1493 enum GNUNET_BLOCK_Type type,
1490 uint32_t priority, 1494 uint32_t priority,
1491 uint32_t anonymity, 1495 uint32_t anonymity,
1492 uint32_t replication, 1496 uint32_t replication,
1493 struct GNUNET_TIME_Absolute expiration, 1497 struct GNUNET_TIME_Absolute expiration,
1494 uint64_t uid) 1498 uint64_t uid)
1495{ 1499{
1496 struct GSF_PendingRequest *pr = cls; 1500 struct GSF_PendingRequest *pr = cls;
1497 struct ProcessReplyClosure prq; 1501 struct ProcessReplyClosure prq;
1498 struct GNUNET_HashCode query; 1502 struct GNUNET_HashCode query;
1499 unsigned int old_rf; 1503 unsigned int old_rf;
1500 1504
1501 GNUNET_SCHEDULER_cancel(pr->warn_task); 1505 GNUNET_SCHEDULER_cancel (pr->warn_task);
1502 pr->warn_task = NULL; 1506 pr->warn_task = NULL;
1503 if (NULL == pr->qe) 1507 if (NULL == pr->qe)
1504 goto called_from_on_demand; 1508 goto called_from_on_demand;
1505 pr->qe = NULL; 1509 pr->qe = NULL;
1506 if ( 1510 if (
1507 (NULL == key) && pr->seen_null && 1511 (NULL == key) && pr->seen_null &&
1508 !pr->have_first_uid) /* We have hit the end for the 2nd time with no results */ 1512 ! pr->have_first_uid) /* We have hit the end for the 2nd time with no results */
1509 { 1513 {
1510 /* No results */ 1514 /* No results */
1511#if INSANE_STATISTICS 1515#if INSANE_STATISTICS
1512 GNUNET_STATISTICS_update(GSF_stats, 1516 GNUNET_STATISTICS_update (GSF_stats,
1513 gettext_noop( 1517 gettext_noop (
1514 "# Datastore lookups concluded (no results)"), 1518 "# Datastore lookups concluded (no results)"),
1515 1, 1519 1,
1516 GNUNET_NO); 1520 GNUNET_NO);
1517#endif 1521#endif
1518 no_more_local_results(pr); 1522 no_more_local_results (pr);
1519 return; 1523 return;
1520 } 1524 }
1521 if (((NULL == key) && 1525 if (((NULL == key) &&
1522 pr->seen_null) || /* We have hit the end for the 2nd time OR */ 1526 pr->seen_null) || /* We have hit the end for the 2nd time OR */
1523 (pr->seen_null && pr->have_first_uid && 1527 (pr->seen_null && pr->have_first_uid &&
1524 (uid >= pr->first_uid))) /* We have hit the end and past first UID */ 1528 (uid >= pr->first_uid))) /* We have hit the end and past first UID */
1525 { 1529 {
1526 /* Seen all results */ 1530 /* Seen all results */
1527 GNUNET_STATISTICS_update(GSF_stats, 1531 GNUNET_STATISTICS_update (GSF_stats,
1528 gettext_noop( 1532 gettext_noop (
1529 "# Datastore lookups concluded (seen all)"), 1533 "# Datastore lookups concluded (seen all)"),
1530 1, 1534 1,
1531 GNUNET_NO); 1535 GNUNET_NO);
1532 no_more_local_results(pr); 1536 no_more_local_results (pr);
1533 return; 1537 return;
1534 } 1538 }
1535 if (NULL == key) 1539 if (NULL == key)
1536 { 1540 {
1537 GNUNET_assert(!pr->seen_null); 1541 GNUNET_assert (! pr->seen_null);
1538 pr->seen_null = true; 1542 pr->seen_null = true;
1539 start_local_query(pr, 0 /* next_uid */, false /* random */); 1543 start_local_query (pr, 0 /* next_uid */, false /* random */);
1540 return; 1544 return;
1541 } 1545 }
1542 if (!pr->have_first_uid) 1546 if (! pr->have_first_uid)
1543 { 1547 {
1544 pr->first_uid = uid; 1548 pr->first_uid = uid;
1545 pr->have_first_uid = true; 1549 pr->have_first_uid = true;
1546 } 1550 }
1547 pr->result_count++; 1551 pr->result_count++;
1548 if (pr->result_count > MAX_RESULTS) 1552 if (pr->result_count > MAX_RESULTS)
1549 { 1553 {
1550 GNUNET_STATISTICS_update( 1554 GNUNET_STATISTICS_update (
1551 GSF_stats, 1555 GSF_stats,
1552 gettext_noop("# Datastore lookups aborted (more than MAX_RESULTS)"), 1556 gettext_noop ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1553 1, 1557 1,
1554 GNUNET_NO); 1558 GNUNET_NO);
1555 no_more_local_results(pr); 1559 no_more_local_results (pr);
1556 return; 1560 return;
1557 } 1561 }
1558 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1559 "Received reply for `%s' of type %d with UID %llu from datastore.\n", 1563 "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1560 GNUNET_h2s(key), 1564 GNUNET_h2s (key),
1561 type, 1565 type,
1562 (unsigned long long)uid); 1566 (unsigned long long) uid);
1563 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) 1567 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1568 {
1569 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1570 "Found ONDEMAND block, performing on-demand encoding\n");
1571 GNUNET_STATISTICS_update (GSF_stats,
1572 gettext_noop (
1573 "# on-demand blocks matched requests"),
1574 1,
1575 GNUNET_NO);
1576 pr->qe_start = GNUNET_TIME_absolute_get ();
1577 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1578 &odc_warn_delay_task,
1579 pr);
1580 if (GNUNET_OK == GNUNET_FS_handle_on_demand_block (key,
1581 size,
1582 data,
1583 type,
1584 priority,
1585 anonymity,
1586 replication,
1587 expiration,
1588 uid,
1589 &process_local_reply,
1590 pr))
1564 { 1591 {
1565 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1592 GNUNET_STATISTICS_update (GSF_stats,
1566 "Found ONDEMAND block, performing on-demand encoding\n"); 1593 gettext_noop (
1567 GNUNET_STATISTICS_update(GSF_stats, 1594 "# on-demand lookups performed successfully"),
1568 gettext_noop( 1595 1,
1569 "# on-demand blocks matched requests"), 1596 GNUNET_NO);
1570 1, 1597 return; /* we're done */
1571 GNUNET_NO);
1572 pr->qe_start = GNUNET_TIME_absolute_get();
1573 pr->warn_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_MINUTES,
1574 &odc_warn_delay_task,
1575 pr);
1576 if (GNUNET_OK == GNUNET_FS_handle_on_demand_block(key,
1577 size,
1578 data,
1579 type,
1580 priority,
1581 anonymity,
1582 replication,
1583 expiration,
1584 uid,
1585 &process_local_reply,
1586 pr))
1587 {
1588 GNUNET_STATISTICS_update(GSF_stats,
1589 gettext_noop(
1590 "# on-demand lookups performed successfully"),
1591 1,
1592 GNUNET_NO);
1593 return; /* we're done */
1594 }
1595 GNUNET_STATISTICS_update(GSF_stats,
1596 gettext_noop("# on-demand lookups failed"),
1597 1,
1598 GNUNET_NO);
1599 GNUNET_SCHEDULER_cancel(pr->warn_task);
1600 start_local_query(pr, uid + 1 /* next_uid */, false /* random */);
1601 return;
1602 } 1598 }
1599 GNUNET_STATISTICS_update (GSF_stats,
1600 gettext_noop ("# on-demand lookups failed"),
1601 1,
1602 GNUNET_NO);
1603 GNUNET_SCHEDULER_cancel (pr->warn_task);
1604 start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1605 return;
1606 }
1603called_from_on_demand: 1607called_from_on_demand:
1604 old_rf = pr->public_data.results_found; 1608 old_rf = pr->public_data.results_found;
1605 memset(&prq, 0, sizeof(prq)); 1609 memset (&prq, 0, sizeof(prq));
1606 prq.data = data; 1610 prq.data = data;
1607 prq.expiration = expiration; 1611 prq.expiration = expiration;
1608 prq.size = size; 1612 prq.size = size;
1609 if (GNUNET_OK != 1613 if (GNUNET_OK !=
1610 GNUNET_BLOCK_get_key(GSF_block_ctx, type, data, size, &query)) 1614 GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, size, &query))
1611 { 1615 {
1612 GNUNET_break(0); 1616 GNUNET_break (0);
1613 GNUNET_DATASTORE_remove(GSF_dsh, 1617 GNUNET_DATASTORE_remove (GSF_dsh,
1614 key, 1618 key,
1615 size, 1619 size,
1616 data, 1620 data,
1617 UINT_MAX, 1621 UINT_MAX,
1618 UINT_MAX, 1622 UINT_MAX,
1619 NULL, 1623 NULL,
1620 NULL); 1624 NULL);
1621 start_local_query(pr, uid + 1 /* next_uid */, false /* random */); 1625 start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1622 return; 1626 return;
1623 } 1627 }
1624 prq.type = type; 1628 prq.type = type;
1625 prq.priority = priority; 1629 prq.priority = priority;
1626 prq.request_found = GNUNET_NO; 1630 prq.request_found = GNUNET_NO;
1627 prq.anonymity_level = anonymity; 1631 prq.anonymity_level = anonymity;
1628 if ((0 == old_rf) && (0 == pr->public_data.results_found)) 1632 if ((0 == old_rf) && (0 == pr->public_data.results_found))
1629 GSF_update_datastore_delay_(pr->public_data.start_time); 1633 GSF_update_datastore_delay_ (pr->public_data.start_time);
1630 prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO; 1634 prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO;
1631 process_reply(&prq, key, pr); 1635 process_reply (&prq, key, pr);
1632 pr->local_result = prq.eval; 1636 pr->local_result = prq.eval;
1633 if (GNUNET_BLOCK_EVALUATION_OK_LAST == prq.eval) 1637 if (GNUNET_BLOCK_EVALUATION_OK_LAST == prq.eval)
1634 { 1638 {
1635 GNUNET_STATISTICS_update( 1639 GNUNET_STATISTICS_update (
1636 GSF_stats, 1640 GSF_stats,
1637 gettext_noop("# Datastore lookups concluded (found last result)"), 1641 gettext_noop ("# Datastore lookups concluded (found last result)"),
1638 1, 1642 1,
1639 GNUNET_NO); 1643 GNUNET_NO);
1640 call_continuation(pr); 1644 call_continuation (pr);
1641 return; 1645 return;
1642 } 1646 }
1643 if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && 1647 if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1644 ((GNUNET_YES == GSF_test_get_load_too_high_(0)) || 1648 ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
1645 (pr->public_data.results_found > 5 + 2 * pr->public_data.priority))) 1649 (pr->public_data.results_found > 5 + 2 * pr->public_data.priority)))
1646 { 1650 {
1647 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n"); 1651 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n");
1648 GNUNET_STATISTICS_update(GSF_stats, 1652 GNUNET_STATISTICS_update (GSF_stats,
1649 gettext_noop( 1653 gettext_noop (
1650 "# Datastore lookups concluded (load too high)"), 1654 "# Datastore lookups concluded (load too high)"),
1651 1, 1655 1,
1652 GNUNET_NO); 1656 GNUNET_NO);
1653 call_continuation(pr); 1657 call_continuation (pr);
1654 return; 1658 return;
1655 } 1659 }
1656 start_local_query(pr, uid + 1 /* next_uid */, false /* random */); 1660 start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1657} 1661}
1658 1662
1659 1663
@@ -1665,15 +1669,15 @@ called_from_on_demand:
1665 * @return #GNUNET_YES if this request could be forwarded to the given peer 1669 * @return #GNUNET_YES if this request could be forwarded to the given peer
1666 */ 1670 */
1667int 1671int
1668GSF_pending_request_test_target_(struct GSF_PendingRequest *pr, 1672GSF_pending_request_test_target_ (struct GSF_PendingRequest *pr,
1669 const struct GNUNET_PeerIdentity *target) 1673 const struct GNUNET_PeerIdentity *target)
1670{ 1674{
1671 struct GNUNET_PeerIdentity pi; 1675 struct GNUNET_PeerIdentity pi;
1672 1676
1673 if (0 == pr->origin_pid) 1677 if (0 == pr->origin_pid)
1674 return GNUNET_YES; 1678 return GNUNET_YES;
1675 GNUNET_PEER_resolve(pr->origin_pid, &pi); 1679 GNUNET_PEER_resolve (pr->origin_pid, &pi);
1676 return (0 == memcmp(&pi, target, sizeof(struct GNUNET_PeerIdentity))) 1680 return (0 == memcmp (&pi, target, sizeof(struct GNUNET_PeerIdentity)))
1677 ? GNUNET_NO 1681 ? GNUNET_NO
1678 : GNUNET_YES; 1682 : GNUNET_YES;
1679} 1683}
@@ -1687,22 +1691,22 @@ GSF_pending_request_test_target_(struct GSF_PendingRequest *pr,
1687 * @param cont_cls closure for @a cont 1691 * @param cont_cls closure for @a cont
1688 */ 1692 */
1689void 1693void
1690GSF_local_lookup_(struct GSF_PendingRequest *pr, 1694GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1691 GSF_LocalLookupContinuation cont, 1695 GSF_LocalLookupContinuation cont,
1692 void *cont_cls) 1696 void *cont_cls)
1693{ 1697{
1694 GNUNET_assert(NULL == pr->gh); 1698 GNUNET_assert (NULL == pr->gh);
1695 GNUNET_assert(NULL == pr->cadet_request); 1699 GNUNET_assert (NULL == pr->cadet_request);
1696 GNUNET_assert(NULL == pr->llc_cont); 1700 GNUNET_assert (NULL == pr->llc_cont);
1697 pr->llc_cont = cont; 1701 pr->llc_cont = cont;
1698 pr->llc_cont_cls = cont_cls; 1702 pr->llc_cont_cls = cont_cls;
1699#if INSANE_STATISTICS 1703#if INSANE_STATISTICS
1700 GNUNET_STATISTICS_update(GSF_stats, 1704 GNUNET_STATISTICS_update (GSF_stats,
1701 gettext_noop("# Datastore lookups initiated"), 1705 gettext_noop ("# Datastore lookups initiated"),
1702 1, 1706 1,
1703 GNUNET_NO); 1707 GNUNET_NO);
1704#endif 1708#endif
1705 start_local_query(pr, 0 /* next_uid */, true /* random */); 1709 start_local_query (pr, 0 /* next_uid */, true /* random */);
1706} 1710}
1707 1711
1708 1712
@@ -1716,7 +1720,7 @@ GSF_local_lookup_(struct GSF_PendingRequest *pr,
1716 * @param put the actual message 1720 * @param put the actual message
1717 */ 1721 */
1718void 1722void
1719handle_p2p_put(void *cls, const struct PutMessage *put) 1723handle_p2p_put (void *cls, const struct PutMessage *put)
1720{ 1724{
1721 struct GSF_ConnectedPeer *cp = cls; 1725 struct GSF_ConnectedPeer *cp = cls;
1722 uint16_t msize; 1726 uint16_t msize;
@@ -1729,30 +1733,30 @@ handle_p2p_put(void *cls, const struct PutMessage *put)
1729 double putl; 1733 double putl;
1730 struct PutMigrationContext *pmc; 1734 struct PutMigrationContext *pmc;
1731 1735
1732 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1736 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1733 "Received P2P PUT from %s\n", 1737 "Received P2P PUT from %s\n",
1734 GNUNET_i2s(GSF_get_peer_performance_data_(cp)->peer)); 1738 GNUNET_i2s (GSF_get_peer_performance_data_ (cp)->peer));
1735 GSF_cover_content_count++; 1739 GSF_cover_content_count++;
1736 msize = ntohs(put->header.size); 1740 msize = ntohs (put->header.size);
1737 dsize = msize - sizeof(struct PutMessage); 1741 dsize = msize - sizeof(struct PutMessage);
1738 type = ntohl(put->type); 1742 type = ntohl (put->type);
1739 expiration = GNUNET_TIME_absolute_ntoh(put->expiration); 1743 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1740 /* do not allow migrated content to live longer than 1 year */ 1744 /* do not allow migrated content to live longer than 1 year */
1741 expiration = GNUNET_TIME_absolute_min(GNUNET_TIME_relative_to_absolute( 1745 expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (
1742 GNUNET_TIME_UNIT_YEARS), 1746 GNUNET_TIME_UNIT_YEARS),
1743 expiration); 1747 expiration);
1744 if (GNUNET_OK != 1748 if (GNUNET_OK !=
1745 GNUNET_BLOCK_get_key(GSF_block_ctx, type, &put[1], dsize, &query)) 1749 GNUNET_BLOCK_get_key (GSF_block_ctx, type, &put[1], dsize, &query))
1746 { 1750 {
1747 GNUNET_break_op(0); 1751 GNUNET_break_op (0);
1748 return; 1752 return;
1749 } 1753 }
1750 GNUNET_STATISTICS_update(GSF_stats, 1754 GNUNET_STATISTICS_update (GSF_stats,
1751 gettext_noop("# GAP PUT messages received"), 1755 gettext_noop ("# GAP PUT messages received"),
1752 1, 1756 1,
1753 GNUNET_NO); 1757 GNUNET_NO);
1754 /* now, lookup 'query' */ 1758 /* now, lookup 'query' */
1755 prq.data = (const void *)&put[1]; 1759 prq.data = (const void *) &put[1];
1756 prq.sender = cp; 1760 prq.sender = cp;
1757 prq.size = dsize; 1761 prq.size = dsize;
1758 prq.type = type; 1762 prq.type = type;
@@ -1761,80 +1765,80 @@ handle_p2p_put(void *cls, const struct PutMessage *put)
1761 prq.anonymity_level = UINT32_MAX; 1765 prq.anonymity_level = UINT32_MAX;
1762 prq.request_found = GNUNET_NO; 1766 prq.request_found = GNUNET_NO;
1763 prq.eo = GNUNET_BLOCK_EO_NONE; 1767 prq.eo = GNUNET_BLOCK_EO_NONE;
1764 GNUNET_CONTAINER_multihashmap_get_multiple(pr_map, 1768 GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
1765 &query, 1769 &query,
1766 &process_reply, 1770 &process_reply,
1767 &prq); 1771 &prq);
1768 if (NULL != cp) 1772 if (NULL != cp)
1769 { 1773 {
1770 GSF_connected_peer_change_preference_(cp, 1774 GSF_connected_peer_change_preference_ (cp,
1771 CONTENT_BANDWIDTH_VALUE + 1775 CONTENT_BANDWIDTH_VALUE
1772 1000 * prq.priority); 1776 + 1000 * prq.priority);
1773 GSF_get_peer_performance_data_(cp)->respect += prq.priority; 1777 GSF_get_peer_performance_data_ (cp)->respect += prq.priority;
1774 } 1778 }
1775 if ((GNUNET_YES == active_to_migration) && (NULL != cp) && 1779 if ((GNUNET_YES == active_to_migration) && (NULL != cp) &&
1776 (GNUNET_NO == test_put_load_too_high(prq.priority))) 1780 (GNUNET_NO == test_put_load_too_high (prq.priority)))
1781 {
1782 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1783 "Replicating result for query `%s' with priority %u\n",
1784 GNUNET_h2s (&query),
1785 prq.priority);
1786 pmc = GNUNET_new (struct PutMigrationContext);
1787 pmc->start = GNUNET_TIME_absolute_get ();
1788 pmc->requested = prq.request_found;
1789 GNUNET_assert (0 != GSF_get_peer_performance_data_ (cp)->pid);
1790 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
1791 &pmc->origin);
1792 if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1793 0,
1794 &query,
1795 dsize,
1796 &put[1],
1797 type,
1798 prq.priority,
1799 1 /* anonymity */,
1800 0 /* replication */,
1801 expiration,
1802 1 + prq.priority,
1803 MAX_DATASTORE_QUEUE,
1804 &put_migration_continuation,
1805 pmc))
1777 { 1806 {
1778 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1807 put_migration_continuation (pmc,
1779 "Replicating result for query `%s' with priority %u\n", 1808 GNUNET_SYSERR,
1780 GNUNET_h2s(&query), 1809 GNUNET_TIME_UNIT_ZERO_ABS,
1781 prq.priority); 1810 NULL);
1782 pmc = GNUNET_new(struct PutMigrationContext);
1783 pmc->start = GNUNET_TIME_absolute_get();
1784 pmc->requested = prq.request_found;
1785 GNUNET_assert(0 != GSF_get_peer_performance_data_(cp)->pid);
1786 GNUNET_PEER_resolve(GSF_get_peer_performance_data_(cp)->pid,
1787 &pmc->origin);
1788 if (NULL == GNUNET_DATASTORE_put(GSF_dsh,
1789 0,
1790 &query,
1791 dsize,
1792 &put[1],
1793 type,
1794 prq.priority,
1795 1 /* anonymity */,
1796 0 /* replication */,
1797 expiration,
1798 1 + prq.priority,
1799 MAX_DATASTORE_QUEUE,
1800 &put_migration_continuation,
1801 pmc))
1802 {
1803 put_migration_continuation(pmc,
1804 GNUNET_SYSERR,
1805 GNUNET_TIME_UNIT_ZERO_ABS,
1806 NULL);
1807 }
1808 } 1811 }
1812 }
1809 else if (NULL != cp) 1813 else if (NULL != cp)
1810 { 1814 {
1811 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1815 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1812 "Choosing not to keep content `%s' (%d/%d)\n", 1816 "Choosing not to keep content `%s' (%d/%d)\n",
1813 GNUNET_h2s(&query), 1817 GNUNET_h2s (&query),
1814 active_to_migration, 1818 active_to_migration,
1815 test_put_load_too_high(prq.priority)); 1819 test_put_load_too_high (prq.priority));
1816 } 1820 }
1817 putl = GNUNET_LOAD_get_load(datastore_put_load); 1821 putl = GNUNET_LOAD_get_load (datastore_put_load);
1818 if ((NULL != cp) && (GNUNET_NO == prq.request_found) && 1822 if ((NULL != cp) && (GNUNET_NO == prq.request_found) &&
1819 ((GNUNET_YES != active_to_migration) || 1823 ((GNUNET_YES != active_to_migration) ||
1820 (putl > 2.5 * (1 + prq.priority)))) 1824 (putl > 2.5 * (1 + prq.priority))))
1821 { 1825 {
1822 if (GNUNET_YES != active_to_migration) 1826 if (GNUNET_YES != active_to_migration)
1823 putl = 1.0 + GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 5); 1827 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
1824 block_time = GNUNET_TIME_relative_multiply( 1828 block_time = GNUNET_TIME_relative_multiply (
1825 GNUNET_TIME_UNIT_MILLISECONDS, 1829 GNUNET_TIME_UNIT_MILLISECONDS,
1826 5000 + GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 1830 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1827 (unsigned int)(60000 * putl * putl))); 1831 (unsigned int) (60000 * putl * putl)));
1828 GNUNET_log( 1832 GNUNET_log (
1829 GNUNET_ERROR_TYPE_DEBUG, 1833 GNUNET_ERROR_TYPE_DEBUG,
1830 "Asking to stop migration for %s because of load %f and events %d/%d\n", 1834 "Asking to stop migration for %s because of load %f and events %d/%d\n",
1831 GNUNET_STRINGS_relative_time_to_string(block_time, GNUNET_YES), 1835 GNUNET_STRINGS_relative_time_to_string (block_time, GNUNET_YES),
1832 putl, 1836 putl,
1833 active_to_migration, 1837 active_to_migration,
1834 (GNUNET_NO == prq.request_found)); 1838 (GNUNET_NO == prq.request_found));
1835 GSF_block_peer_migration_(cp, 1839 GSF_block_peer_migration_ (cp,
1836 GNUNET_TIME_relative_to_absolute(block_time)); 1840 GNUNET_TIME_relative_to_absolute (block_time));
1837 } 1841 }
1838} 1842}
1839 1843
1840 1844
@@ -1845,7 +1849,7 @@ handle_p2p_put(void *cls, const struct PutMessage *put)
1845 * @return #GNUNET_YES if the request is still active 1849 * @return #GNUNET_YES if the request is still active
1846 */ 1850 */
1847int 1851int
1848GSF_pending_request_test_active_(struct GSF_PendingRequest *pr) 1852GSF_pending_request_test_active_ (struct GSF_PendingRequest *pr)
1849{ 1853{
1850 return (NULL != pr->rh) ? GNUNET_YES : GNUNET_NO; 1854 return (NULL != pr->rh) ? GNUNET_YES : GNUNET_NO;
1851} 1855}
@@ -1855,24 +1859,24 @@ GSF_pending_request_test_active_(struct GSF_PendingRequest *pr)
1855 * Setup the subsystem. 1859 * Setup the subsystem.
1856 */ 1860 */
1857void 1861void
1858GSF_pending_request_init_() 1862GSF_pending_request_init_ ()
1859{ 1863{
1860 if (GNUNET_OK != 1864 if (GNUNET_OK !=
1861 GNUNET_CONFIGURATION_get_value_number(GSF_cfg, 1865 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1862 "fs", 1866 "fs",
1863 "MAX_PENDING_REQUESTS", 1867 "MAX_PENDING_REQUESTS",
1864 &max_pending_requests)) 1868 &max_pending_requests))
1865 { 1869 {
1866 GNUNET_log_config_missing(GNUNET_ERROR_TYPE_INFO, 1870 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
1867 "fs", 1871 "fs",
1868 "MAX_PENDING_REQUESTS"); 1872 "MAX_PENDING_REQUESTS");
1869 } 1873 }
1870 active_to_migration = 1874 active_to_migration =
1871 GNUNET_CONFIGURATION_get_value_yesno(GSF_cfg, "FS", "CONTENT_CACHING"); 1875 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
1872 datastore_put_load = GNUNET_LOAD_value_init(DATASTORE_LOAD_AUTODECLINE); 1876 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
1873 pr_map = GNUNET_CONTAINER_multihashmap_create(32 * 1024, GNUNET_YES); 1877 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024, GNUNET_YES);
1874 requests_by_expiration_heap = 1878 requests_by_expiration_heap =
1875 GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN); 1879 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1876} 1880}
1877 1881
1878 1882
@@ -1880,14 +1884,14 @@ GSF_pending_request_init_()
1880 * Shutdown the subsystem. 1884 * Shutdown the subsystem.
1881 */ 1885 */
1882void 1886void
1883GSF_pending_request_done_() 1887GSF_pending_request_done_ ()
1884{ 1888{
1885 GNUNET_CONTAINER_multihashmap_iterate(pr_map, &clean_request, NULL); 1889 GNUNET_CONTAINER_multihashmap_iterate (pr_map, &clean_request, NULL);
1886 GNUNET_CONTAINER_multihashmap_destroy(pr_map); 1890 GNUNET_CONTAINER_multihashmap_destroy (pr_map);
1887 pr_map = NULL; 1891 pr_map = NULL;
1888 GNUNET_CONTAINER_heap_destroy(requests_by_expiration_heap); 1892 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1889 requests_by_expiration_heap = NULL; 1893 requests_by_expiration_heap = NULL;
1890 GNUNET_LOAD_value_free(datastore_put_load); 1894 GNUNET_LOAD_value_free (datastore_put_load);
1891 datastore_put_load = NULL; 1895 datastore_put_load = NULL;
1892} 1896}
1893 1897