aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c1845
1 files changed, 924 insertions, 921 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index f9702486a..90587cfa0 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -16,7 +16,7 @@
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
18 SPDX-License-Identifier: AGPL3.0-or-later 18 SPDX-License-Identifier: AGPL3.0-or-later
19*/ 19 */
20 20
21/** 21/**
22 * @file fs/gnunet-service-fs_pr.c 22 * @file fs/gnunet-service-fs_pr.c
@@ -74,8 +74,7 @@
74/** 74/**
75 * An active request. 75 * An active request.
76 */ 76 */
77struct GSF_PendingRequest 77struct GSF_PendingRequest {
78{
79 /** 78 /**
80 * Public data for the request. 79 * Public data for the request.
81 */ 80 */
@@ -247,31 +246,31 @@ static unsigned long long max_pending_requests = (32 * 1024);
247 * @param pr request for which the BF is to be recomputed 246 * @param pr request for which the BF is to be recomputed
248 */ 247 */
249static void 248static void
250refresh_bloomfilter (enum GNUNET_BLOCK_Type type, struct GSF_PendingRequest *pr) 249refresh_bloomfilter(enum GNUNET_BLOCK_Type type, struct GSF_PendingRequest *pr)
251{ 250{
252 if (NULL != pr->bg) 251 if (NULL != pr->bg)
253 { 252 {
254 GNUNET_BLOCK_group_destroy (pr->bg); 253 GNUNET_BLOCK_group_destroy(pr->bg);
255 pr->bg = NULL; 254 pr->bg = NULL;
256 } 255 }
257 if (GNUNET_BLOCK_TYPE_FS_UBLOCK != type) 256 if (GNUNET_BLOCK_TYPE_FS_UBLOCK != type)
258 return; /* no need */ 257 return; /* no need */
259 pr->bg = 258 pr->bg =
260 GNUNET_BLOCK_group_create (GSF_block_ctx, 259 GNUNET_BLOCK_group_create(GSF_block_ctx,
261 type, 260 type,
262 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 261 GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
263 UINT32_MAX), 262 UINT32_MAX),
264 NULL, 263 NULL,
265 0, 264 0,
266 "seen-set-size", 265 "seen-set-size",
267 pr->replies_seen_count, 266 pr->replies_seen_count,
268 NULL); 267 NULL);
269 if (NULL == pr->bg) 268 if (NULL == pr->bg)
270 return; 269 return;
271 GNUNET_break (GNUNET_OK == 270 GNUNET_break(GNUNET_OK ==
272 GNUNET_BLOCK_group_set_seen (pr->bg, 271 GNUNET_BLOCK_group_set_seen(pr->bg,
273 pr->replies_seen, 272 pr->replies_seen,
274 pr->replies_seen_count)); 273 pr->replies_seen_count));
275} 274}
276 275
277 276
@@ -297,129 +296,129 @@ refresh_bloomfilter (enum GNUNET_BLOCK_Type type, struct GSF_PendingRequest *pr)
297 * @return handle for the new pending request 296 * @return handle for the new pending request
298 */ 297 */
299struct GSF_PendingRequest * 298struct GSF_PendingRequest *
300GSF_pending_request_create_ (enum GSF_PendingRequestOptions options, 299GSF_pending_request_create_(enum GSF_PendingRequestOptions options,
301 enum GNUNET_BLOCK_Type type, 300 enum GNUNET_BLOCK_Type type,
302 const struct GNUNET_HashCode *query, 301 const struct GNUNET_HashCode *query,
303 const struct GNUNET_PeerIdentity *target, 302 const struct GNUNET_PeerIdentity *target,
304 const char *bf_data, 303 const char *bf_data,
305 size_t bf_size, 304 size_t bf_size,
306 uint32_t mingle, 305 uint32_t mingle,
307 uint32_t anonymity_level, 306 uint32_t anonymity_level,
308 uint32_t priority, 307 uint32_t priority,
309 int32_t ttl, 308 int32_t ttl,
310 GNUNET_PEER_Id sender_pid, 309 GNUNET_PEER_Id sender_pid,
311 GNUNET_PEER_Id origin_pid, 310 GNUNET_PEER_Id origin_pid,
312 const struct GNUNET_HashCode *replies_seen, 311 const struct GNUNET_HashCode *replies_seen,
313 unsigned int replies_seen_count, 312 unsigned int replies_seen_count,
314 GSF_PendingRequestReplyHandler rh, 313 GSF_PendingRequestReplyHandler rh,
315 void *rh_cls) 314 void *rh_cls)
316{ 315{
317 struct GSF_PendingRequest *pr; 316 struct GSF_PendingRequest *pr;
318 struct GSF_PendingRequest *dpr; 317 struct GSF_PendingRequest *dpr;
319 size_t extra; 318 size_t extra;
320 struct GNUNET_HashCode *eptr; 319 struct GNUNET_HashCode *eptr;
321 320
322 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 321 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
323 "Creating request handle for `%s' of type %d\n", 322 "Creating request handle for `%s' of type %d\n",
324 GNUNET_h2s (query), 323 GNUNET_h2s(query),
325 type); 324 type);
326#if INSANE_STATISTICS 325#if INSANE_STATISTICS
327 GNUNET_STATISTICS_update (GSF_stats, 326 GNUNET_STATISTICS_update(GSF_stats,
328 gettext_noop ("# Pending requests created"), 327 gettext_noop("# Pending requests created"),
329 1, 328 1,
330 GNUNET_NO); 329 GNUNET_NO);
331#endif 330#endif
332 extra = 0; 331 extra = 0;
333 if (NULL != target) 332 if (NULL != target)
334 extra += sizeof (struct GNUNET_PeerIdentity); 333 extra += sizeof(struct GNUNET_PeerIdentity);
335 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra); 334 pr = GNUNET_malloc(sizeof(struct GSF_PendingRequest) + extra);
336 pr->public_data.query = *query; 335 pr->public_data.query = *query;
337 eptr = (struct GNUNET_HashCode *) &pr[1]; 336 eptr = (struct GNUNET_HashCode *)&pr[1];
338 if (NULL != target) 337 if (NULL != target)
339 { 338 {
340 pr->public_data.target = (struct GNUNET_PeerIdentity *) eptr; 339 pr->public_data.target = (struct GNUNET_PeerIdentity *)eptr;
341 GNUNET_memcpy (eptr, target, sizeof (struct GNUNET_PeerIdentity)); 340 GNUNET_memcpy(eptr, target, sizeof(struct GNUNET_PeerIdentity));
342 } 341 }
343 pr->public_data.anonymity_level = anonymity_level; 342 pr->public_data.anonymity_level = anonymity_level;
344 pr->public_data.priority = priority; 343 pr->public_data.priority = priority;
345 pr->public_data.original_priority = priority; 344 pr->public_data.original_priority = priority;
346 pr->public_data.options = options; 345 pr->public_data.options = options;
347 pr->public_data.type = type; 346 pr->public_data.type = type;
348 pr->public_data.start_time = GNUNET_TIME_absolute_get (); 347 pr->public_data.start_time = GNUNET_TIME_absolute_get();
349 pr->sender_pid = sender_pid; 348 pr->sender_pid = sender_pid;
350 pr->origin_pid = origin_pid; 349 pr->origin_pid = origin_pid;
351 pr->rh = rh; 350 pr->rh = rh;
352 pr->rh_cls = rh_cls; 351 pr->rh_cls = rh_cls;
353 GNUNET_assert ((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY))); 352 GNUNET_assert((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY)));
354 if (ttl >= 0) 353 if (ttl >= 0)
355 pr->public_data.ttl = GNUNET_TIME_relative_to_absolute ( 354 pr->public_data.ttl = GNUNET_TIME_relative_to_absolute(
356 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, (uint32_t) ttl)); 355 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, (uint32_t)ttl));
357 else 356 else
358 pr->public_data.ttl = GNUNET_TIME_absolute_subtract ( 357 pr->public_data.ttl = GNUNET_TIME_absolute_subtract(
359 pr->public_data.start_time, 358 pr->public_data.start_time,
360 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 359 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS,
361 (uint32_t) (-ttl))); 360 (uint32_t)(-ttl)));
362 if (replies_seen_count > 0) 361 if (replies_seen_count > 0)
363 { 362 {
364 pr->replies_seen_size = replies_seen_count; 363 pr->replies_seen_size = replies_seen_count;
365 pr->replies_seen = 364 pr->replies_seen =
366 GNUNET_new_array (pr->replies_seen_size, struct GNUNET_HashCode); 365 GNUNET_new_array(pr->replies_seen_size, struct GNUNET_HashCode);
367 GNUNET_memcpy (pr->replies_seen, 366 GNUNET_memcpy(pr->replies_seen,
368 replies_seen, 367 replies_seen,
369 replies_seen_count * sizeof (struct GNUNET_HashCode)); 368 replies_seen_count * sizeof(struct GNUNET_HashCode));
370 pr->replies_seen_count = replies_seen_count; 369 pr->replies_seen_count = replies_seen_count;
371 } 370 }
372 if ((NULL != bf_data) && 371 if ((NULL != bf_data) &&
373 (GNUNET_BLOCK_TYPE_FS_UBLOCK == pr->public_data.type)) 372 (GNUNET_BLOCK_TYPE_FS_UBLOCK == pr->public_data.type))
374 { 373 {
375 pr->bg = GNUNET_BLOCK_group_create (GSF_block_ctx, 374 pr->bg = GNUNET_BLOCK_group_create(GSF_block_ctx,
376 pr->public_data.type, 375 pr->public_data.type,
377 mingle, 376 mingle,
378 bf_data, 377 bf_data,
379 bf_size, 378 bf_size,
380 "seen-set-size", 379 "seen-set-size",
381 0, 380 0,
382 NULL); 381 NULL);
383 } 382 }
384 else if ((replies_seen_count > 0) && 383 else if ((replies_seen_count > 0) &&
385 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))) 384 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)))
386 { 385 {
387 refresh_bloomfilter (pr->public_data.type, pr); 386 refresh_bloomfilter(pr->public_data.type, pr);
388 } 387 }
389 GNUNET_CONTAINER_multihashmap_put (pr_map, 388 GNUNET_CONTAINER_multihashmap_put(pr_map,
390 &pr->public_data.query, 389 &pr->public_data.query,
391 pr, 390 pr,
392 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 391 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
393 if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES)) 392 if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES))
394 {
395 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
396 pr,
397 pr->public_data.ttl.abs_value_us);
398 /* make sure we don't track too many requests */
399 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
400 max_pending_requests)
401 { 393 {
402 dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap); 394 pr->hnode = GNUNET_CONTAINER_heap_insert(requests_by_expiration_heap,
403 GNUNET_assert (NULL != dpr); 395 pr,
404 if (pr == dpr) 396 pr->public_data.ttl.abs_value_us);
405 break; /* let the request live briefly... */ 397 /* make sure we don't track too many requests */
406 if (NULL != dpr->rh) 398 while (GNUNET_CONTAINER_heap_get_size(requests_by_expiration_heap) >
407 dpr->rh (dpr->rh_cls, 399 max_pending_requests)
408 GNUNET_BLOCK_EVALUATION_REQUEST_VALID, 400 {
409 dpr, 401 dpr = GNUNET_CONTAINER_heap_peek(requests_by_expiration_heap);
410 UINT32_MAX, 402 GNUNET_assert(NULL != dpr);
411 GNUNET_TIME_UNIT_FOREVER_ABS, 403 if (pr == dpr)
412 GNUNET_TIME_UNIT_FOREVER_ABS, 404 break; /* let the request live briefly... */
413 GNUNET_BLOCK_TYPE_ANY, 405 if (NULL != dpr->rh)
414 NULL, 406 dpr->rh(dpr->rh_cls,
415 0); 407 GNUNET_BLOCK_EVALUATION_REQUEST_VALID,
416 GSF_pending_request_cancel_ (dpr, GNUNET_YES); 408 dpr,
409 UINT32_MAX,
410 GNUNET_TIME_UNIT_FOREVER_ABS,
411 GNUNET_TIME_UNIT_FOREVER_ABS,
412 GNUNET_BLOCK_TYPE_ANY,
413 NULL,
414 0);
415 GSF_pending_request_cancel_(dpr, GNUNET_YES);
416 }
417 } 417 }
418 } 418 GNUNET_STATISTICS_update(GSF_stats,
419 GNUNET_STATISTICS_update (GSF_stats, 419 gettext_noop("# Pending requests active"),
420 gettext_noop ("# Pending requests active"), 420 1,
421 1, 421 GNUNET_NO);
422 GNUNET_NO);
423 return pr; 422 return pr;
424} 423}
425 424
@@ -430,7 +429,7 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
430 * @return associated public data 429 * @return associated public data
431 */ 430 */
432struct GSF_PendingRequestData * 431struct GSF_PendingRequestData *
433GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr) 432GSF_pending_request_get_data_(struct GSF_PendingRequest *pr)
434{ 433{
435 return &pr->public_data; 434 return &pr->public_data;
436} 435}
@@ -446,13 +445,13 @@ GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
446 * @return #GNUNET_OK if the requests are compatible 445 * @return #GNUNET_OK if the requests are compatible
447 */ 446 */
448int 447int
449GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra, 448GSF_pending_request_is_compatible_(struct GSF_PendingRequest *pra,
450 struct GSF_PendingRequest *prb) 449 struct GSF_PendingRequest *prb)
451{ 450{
452 if ((pra->public_data.type != prb->public_data.type) || 451 if ((pra->public_data.type != prb->public_data.type) ||
453 (0 != memcmp (&pra->public_data.query, 452 (0 != memcmp(&pra->public_data.query,
454 &prb->public_data.query, 453 &prb->public_data.query,
455 sizeof (struct GNUNET_HashCode)))) 454 sizeof(struct GNUNET_HashCode))))
456 return GNUNET_NO; 455 return GNUNET_NO;
457 return GNUNET_OK; 456 return GNUNET_OK;
458} 457}
@@ -467,45 +466,45 @@ GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
467 * @param replies_seen_count size of the replies_seen array 466 * @param replies_seen_count size of the replies_seen array
468 */ 467 */
469void 468void
470GSF_pending_request_update_ (struct GSF_PendingRequest *pr, 469GSF_pending_request_update_(struct GSF_PendingRequest *pr,
471 const struct GNUNET_HashCode *replies_seen, 470 const struct GNUNET_HashCode *replies_seen,
472 unsigned int replies_seen_count) 471 unsigned int replies_seen_count)
473{ 472{
474 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count) 473 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
475 return; /* integer overflow */ 474 return; /* integer overflow */
476 if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) 475 if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
477 {
478 /* we're responsible for the BF, full refresh */
479 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
480 GNUNET_array_grow (pr->replies_seen,
481 pr->replies_seen_size,
482 replies_seen_count + pr->replies_seen_count);
483 GNUNET_memcpy (&pr->replies_seen[pr->replies_seen_count],
484 replies_seen,
485 sizeof (struct GNUNET_HashCode) * replies_seen_count);
486 pr->replies_seen_count += replies_seen_count;
487 refresh_bloomfilter (pr->public_data.type, pr);
488 }
489 else
490 {
491 if (NULL == pr->bg)
492 { 476 {
493 /* we're not the initiator, but the initiator did not give us 477 /* we're responsible for the BF, full refresh */
494 * any bloom-filter, so we need to create one on-the-fly */ 478 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
495 refresh_bloomfilter (pr->public_data.type, pr); 479 GNUNET_array_grow(pr->replies_seen,
480 pr->replies_seen_size,
481 replies_seen_count + pr->replies_seen_count);
482 GNUNET_memcpy(&pr->replies_seen[pr->replies_seen_count],
483 replies_seen,
484 sizeof(struct GNUNET_HashCode) * replies_seen_count);
485 pr->replies_seen_count += replies_seen_count;
486 refresh_bloomfilter(pr->public_data.type, pr);
496 } 487 }
497 else 488 else
498 { 489 {
499 GNUNET_break (GNUNET_OK == 490 if (NULL == pr->bg)
500 GNUNET_BLOCK_group_set_seen (pr->bg, 491 {
501 replies_seen, 492 /* we're not the initiator, but the initiator did not give us
502 pr->replies_seen_count)); 493 * any bloom-filter, so we need to create one on-the-fly */
494 refresh_bloomfilter(pr->public_data.type, pr);
495 }
496 else
497 {
498 GNUNET_break(GNUNET_OK ==
499 GNUNET_BLOCK_group_set_seen(pr->bg,
500 replies_seen,
501 pr->replies_seen_count));
502 }
503 } 503 }
504 }
505 if (NULL != pr->gh) 504 if (NULL != pr->gh)
506 GNUNET_DHT_get_filter_known_results (pr->gh, 505 GNUNET_DHT_get_filter_known_results(pr->gh,
507 replies_seen_count, 506 replies_seen_count,
508 replies_seen); 507 replies_seen);
509} 508}
510 509
511 510
@@ -517,7 +516,7 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
517 * @return envelope with the request message 516 * @return envelope with the request message
518 */ 517 */
519struct GNUNET_MQ_Envelope * 518struct GNUNET_MQ_Envelope *
520GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr) 519GSF_pending_request_get_message_(struct GSF_PendingRequest *pr)
521{ 520{
522 struct GNUNET_MQ_Envelope *env; 521 struct GNUNET_MQ_Envelope *env;
523 struct GetMessage *gm; 522 struct GetMessage *gm;
@@ -532,61 +531,61 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
532 void *bf_data; 531 void *bf_data;
533 uint32_t bf_nonce; 532 uint32_t bf_nonce;
534 533
535 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 534 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
536 "Building request message for `%s' of type %d\n", 535 "Building request message for `%s' of type %d\n",
537 GNUNET_h2s (&pr->public_data.query), 536 GNUNET_h2s(&pr->public_data.query),
538 pr->public_data.type); 537 pr->public_data.type);
539 k = 0; 538 k = 0;
540 bm = 0; 539 bm = 0;
541 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY)); 540 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
542 if ((! do_route) && (pr->sender_pid == 0)) 541 if ((!do_route) && (pr->sender_pid == 0))
543 { 542 {
544 GNUNET_break (0); 543 GNUNET_break(0);
545 do_route = GNUNET_YES; 544 do_route = GNUNET_YES;
546 } 545 }
547 if (! do_route) 546 if (!do_route)
548 { 547 {
549 bm |= GET_MESSAGE_BIT_RETURN_TO; 548 bm |= GET_MESSAGE_BIT_RETURN_TO;
550 k++; 549 k++;
551 } 550 }
552 if (NULL != pr->public_data.target) 551 if (NULL != pr->public_data.target)
553 { 552 {
554 bm |= GET_MESSAGE_BIT_TRANSMIT_TO; 553 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
555 k++; 554 k++;
556 } 555 }
557 if (GNUNET_OK != 556 if (GNUNET_OK !=
558 GNUNET_BLOCK_group_serialize (pr->bg, &bf_nonce, &bf_data, &bf_size)) 557 GNUNET_BLOCK_group_serialize(pr->bg, &bf_nonce, &bf_data, &bf_size))
559 { 558 {
560 bf_size = 0; 559 bf_size = 0;
561 bf_data = NULL; 560 bf_data = NULL;
562 } 561 }
563 env = GNUNET_MQ_msg_extra (gm, 562 env = GNUNET_MQ_msg_extra(gm,
564 bf_size + k * sizeof (struct GNUNET_PeerIdentity), 563 bf_size + k * sizeof(struct GNUNET_PeerIdentity),
565 GNUNET_MESSAGE_TYPE_FS_GET); 564 GNUNET_MESSAGE_TYPE_FS_GET);
566 gm->type = htonl (pr->public_data.type); 565 gm->type = htonl(pr->public_data.type);
567 if (do_route) 566 if (do_route)
568 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 567 prio = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
569 pr->public_data.priority + 1); 568 pr->public_data.priority + 1);
570 else 569 else
571 prio = 0; 570 prio = 0;
572 pr->public_data.priority -= prio; 571 pr->public_data.priority -= prio;
573 pr->public_data.num_transmissions++; 572 pr->public_data.num_transmissions++;
574 pr->public_data.respect_offered += prio; 573 pr->public_data.respect_offered += prio;
575 gm->priority = htonl (prio); 574 gm->priority = htonl(prio);
576 now = GNUNET_TIME_absolute_get (); 575 now = GNUNET_TIME_absolute_get();
577 ttl = (int64_t) (pr->public_data.ttl.abs_value_us - now.abs_value_us); 576 ttl = (int64_t)(pr->public_data.ttl.abs_value_us - now.abs_value_us);
578 gm->ttl = htonl (ttl / 1000LL / 1000LL); 577 gm->ttl = htonl(ttl / 1000LL / 1000LL);
579 gm->filter_mutator = htonl (bf_nonce); 578 gm->filter_mutator = htonl(bf_nonce);
580 gm->hash_bitmap = htonl (bm); 579 gm->hash_bitmap = htonl(bm);
581 gm->query = pr->public_data.query; 580 gm->query = pr->public_data.query;
582 ext = (struct GNUNET_PeerIdentity *) &gm[1]; 581 ext = (struct GNUNET_PeerIdentity *)&gm[1];
583 k = 0; 582 k = 0;
584 if (! do_route) 583 if (!do_route)
585 GNUNET_PEER_resolve (pr->sender_pid, &ext[k++]); 584 GNUNET_PEER_resolve(pr->sender_pid, &ext[k++]);
586 if (NULL != pr->public_data.target) 585 if (NULL != pr->public_data.target)
587 ext[k++] = *pr->public_data.target; 586 ext[k++] = *pr->public_data.target;
588 GNUNET_memcpy (&ext[k], bf_data, bf_size); 587 GNUNET_memcpy(&ext[k], bf_data, bf_size);
589 GNUNET_free_non_null (bf_data); 588 GNUNET_free_non_null(bf_data);
590 return env; 589 return env;
591} 590}
592 591
@@ -600,61 +599,61 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
600 * @return #GNUNET_YES (we should continue to iterate) 599 * @return #GNUNET_YES (we should continue to iterate)
601 */ 600 */
602static int 601static int
603clean_request (void *cls, const struct GNUNET_HashCode *key, void *value) 602clean_request(void *cls, const struct GNUNET_HashCode *key, void *value)
604{ 603{
605 struct GSF_PendingRequest *pr = value; 604 struct GSF_PendingRequest *pr = value;
606 GSF_LocalLookupContinuation cont; 605 GSF_LocalLookupContinuation cont;
607 606
608 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 607 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
609 "Cleaning up pending request for `%s'.\n", 608 "Cleaning up pending request for `%s'.\n",
610 GNUNET_h2s (key)); 609 GNUNET_h2s(key));
611 if (NULL != pr->cadet_request) 610 if (NULL != pr->cadet_request)
612 { 611 {
613 pr->cadet_retry_count = CADET_RETRY_MAX; 612 pr->cadet_retry_count = CADET_RETRY_MAX;
614 GSF_cadet_query_cancel (pr->cadet_request); 613 GSF_cadet_query_cancel(pr->cadet_request);
615 pr->cadet_request = NULL; 614 pr->cadet_request = NULL;
616 } 615 }
617 if (NULL != (cont = pr->llc_cont)) 616 if (NULL != (cont = pr->llc_cont))
618 { 617 {
619 pr->llc_cont = NULL; 618 pr->llc_cont = NULL;
620 cont (pr->llc_cont_cls, pr, pr->local_result); 619 cont(pr->llc_cont_cls, pr, pr->local_result);
621 } 620 }
622 GSF_plan_notify_request_done_ (pr); 621 GSF_plan_notify_request_done_(pr);
623 GNUNET_free_non_null (pr->replies_seen); 622 GNUNET_free_non_null(pr->replies_seen);
624 GNUNET_BLOCK_group_destroy (pr->bg); 623 GNUNET_BLOCK_group_destroy(pr->bg);
625 pr->bg = NULL; 624 pr->bg = NULL;
626 GNUNET_PEER_change_rc (pr->sender_pid, -1); 625 GNUNET_PEER_change_rc(pr->sender_pid, -1);
627 pr->sender_pid = 0; 626 pr->sender_pid = 0;
628 GNUNET_PEER_change_rc (pr->origin_pid, -1); 627 GNUNET_PEER_change_rc(pr->origin_pid, -1);
629 pr->origin_pid = 0; 628 pr->origin_pid = 0;
630 if (NULL != pr->hnode) 629 if (NULL != pr->hnode)
631 { 630 {
632 GNUNET_CONTAINER_heap_remove_node (pr->hnode); 631 GNUNET_CONTAINER_heap_remove_node(pr->hnode);
633 pr->hnode = NULL; 632 pr->hnode = NULL;
634 } 633 }
635 if (NULL != pr->qe) 634 if (NULL != pr->qe)
636 { 635 {
637 GNUNET_DATASTORE_cancel (pr->qe); 636 GNUNET_DATASTORE_cancel(pr->qe);
638 pr->qe = NULL; 637 pr->qe = NULL;
639 } 638 }
640 if (NULL != pr->gh) 639 if (NULL != pr->gh)
641 { 640 {
642 GNUNET_DHT_get_stop (pr->gh); 641 GNUNET_DHT_get_stop(pr->gh);
643 pr->gh = NULL; 642 pr->gh = NULL;
644 } 643 }
645 if (NULL != pr->warn_task) 644 if (NULL != pr->warn_task)
646 { 645 {
647 GNUNET_SCHEDULER_cancel (pr->warn_task); 646 GNUNET_SCHEDULER_cancel(pr->warn_task);
648 pr->warn_task = NULL; 647 pr->warn_task = NULL;
649 } 648 }
650 GNUNET_assert ( 649 GNUNET_assert(
651 GNUNET_OK == 650 GNUNET_OK ==
652 GNUNET_CONTAINER_multihashmap_remove (pr_map, &pr->public_data.query, pr)); 651 GNUNET_CONTAINER_multihashmap_remove(pr_map, &pr->public_data.query, pr));
653 GNUNET_STATISTICS_update (GSF_stats, 652 GNUNET_STATISTICS_update(GSF_stats,
654 gettext_noop ("# Pending requests active"), 653 gettext_noop("# Pending requests active"),
655 -1, 654 -1,
656 GNUNET_NO); 655 GNUNET_NO);
657 GNUNET_free (pr); 656 GNUNET_free(pr);
658 return GNUNET_YES; 657 return GNUNET_YES;
659} 658}
660 659
@@ -666,49 +665,49 @@ clean_request (void *cls, const struct GNUNET_HashCode *key, void *value)
666 * @param full_cleanup fully purge the request 665 * @param full_cleanup fully purge the request
667 */ 666 */
668void 667void
669GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup) 668GSF_pending_request_cancel_(struct GSF_PendingRequest *pr, int full_cleanup)
670{ 669{
671 GSF_LocalLookupContinuation cont; 670 GSF_LocalLookupContinuation cont;
672 671
673 if (NULL == pr_map) 672 if (NULL == pr_map)
674 return; /* already cleaned up! */ 673 return; /* already cleaned up! */
675 if (GNUNET_NO == full_cleanup) 674 if (GNUNET_NO == full_cleanup)
676 {
677 /* make request inactive (we're no longer interested in more results),
678 * but do NOT remove from our data-structures, we still need it there
679 * to prevent the request from looping */
680 pr->rh = NULL;
681 if (NULL != pr->cadet_request)
682 { 675 {
683 pr->cadet_retry_count = CADET_RETRY_MAX; 676 /* make request inactive (we're no longer interested in more results),
684 GSF_cadet_query_cancel (pr->cadet_request); 677 * but do NOT remove from our data-structures, we still need it there
685 pr->cadet_request = NULL; 678 * to prevent the request from looping */
679 pr->rh = NULL;
680 if (NULL != pr->cadet_request)
681 {
682 pr->cadet_retry_count = CADET_RETRY_MAX;
683 GSF_cadet_query_cancel(pr->cadet_request);
684 pr->cadet_request = NULL;
685 }
686 if (NULL != (cont = pr->llc_cont))
687 {
688 pr->llc_cont = NULL;
689 cont(pr->llc_cont_cls, pr, pr->local_result);
690 }
691 GSF_plan_notify_request_done_(pr);
692 if (NULL != pr->qe)
693 {
694 GNUNET_DATASTORE_cancel(pr->qe);
695 pr->qe = NULL;
696 }
697 if (NULL != pr->gh)
698 {
699 GNUNET_DHT_get_stop(pr->gh);
700 pr->gh = NULL;
701 }
702 if (NULL != pr->warn_task)
703 {
704 GNUNET_SCHEDULER_cancel(pr->warn_task);
705 pr->warn_task = NULL;
706 }
707 return;
686 } 708 }
687 if (NULL != (cont = pr->llc_cont)) 709 GNUNET_assert(GNUNET_YES ==
688 { 710 clean_request(NULL, &pr->public_data.query, pr));
689 pr->llc_cont = NULL;
690 cont (pr->llc_cont_cls, pr, pr->local_result);
691 }
692 GSF_plan_notify_request_done_ (pr);
693 if (NULL != pr->qe)
694 {
695 GNUNET_DATASTORE_cancel (pr->qe);
696 pr->qe = NULL;
697 }
698 if (NULL != pr->gh)
699 {
700 GNUNET_DHT_get_stop (pr->gh);
701 pr->gh = NULL;
702 }
703 if (NULL != pr->warn_task)
704 {
705 GNUNET_SCHEDULER_cancel (pr->warn_task);
706 pr->warn_task = NULL;
707 }
708 return;
709 }
710 GNUNET_assert (GNUNET_YES ==
711 clean_request (NULL, &pr->public_data.query, pr));
712} 711}
713 712
714 713
@@ -719,11 +718,11 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
719 * @param cls closure for @a it 718 * @param cls closure for @a it
720 */ 719 */
721void 720void
722GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, void *cls) 721GSF_iterate_pending_requests_(GSF_PendingRequestIterator it, void *cls)
723{ 722{
724 GNUNET_CONTAINER_multihashmap_iterate ( 723 GNUNET_CONTAINER_multihashmap_iterate(
725 pr_map, 724 pr_map,
726 (GNUNET_CONTAINER_MulitHashMapIteratorCallback) it, 725 (GNUNET_CONTAINER_MulitHashMapIteratorCallback)it,
727 cls); 726 cls);
728} 727}
729 728
@@ -731,8 +730,7 @@ GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, void *cls)
731/** 730/**
732 * Closure for process_reply() function. 731 * Closure for process_reply() function.
733 */ 732 */
734struct ProcessReplyClosure 733struct ProcessReplyClosure {
735{
736 /** 734 /**
737 * The data for the reply. 735 * The data for the reply.
738 */ 736 */
@@ -793,14 +791,14 @@ struct ProcessReplyClosure
793 * @param pr request that was satisfied 791 * @param pr request that was satisfied
794 */ 792 */
795static void 793static void
796update_request_performance_data (struct ProcessReplyClosure *prq, 794update_request_performance_data(struct ProcessReplyClosure *prq,
797 struct GSF_PendingRequest *pr) 795 struct GSF_PendingRequest *pr)
798{ 796{
799 if (prq->sender == NULL) 797 if (prq->sender == NULL)
800 return; 798 return;
801 GSF_peer_update_performance_ (prq->sender, 799 GSF_peer_update_performance_(prq->sender,
802 pr->public_data.start_time, 800 pr->public_data.start_time,
803 prq->priority); 801 prq->priority);
804} 802}
805 803
806 804
@@ -813,7 +811,7 @@ update_request_performance_data (struct ProcessReplyClosure *prq,
813 * @return #GNUNET_YES (we should continue to iterate) 811 * @return #GNUNET_YES (we should continue to iterate)
814 */ 812 */
815static int 813static int
816process_reply (void *cls, const struct GNUNET_HashCode *key, void *value) 814process_reply(void *cls, const struct GNUNET_HashCode *key, void *value)
817{ 815{
818 struct ProcessReplyClosure *prq = cls; 816 struct ProcessReplyClosure *prq = cls;
819 struct GSF_PendingRequest *pr = value; 817 struct GSF_PendingRequest *pr = value;
@@ -822,121 +820,128 @@ process_reply (void *cls, const struct GNUNET_HashCode *key, void *value)
822 820
823 if (NULL == pr->rh) 821 if (NULL == pr->rh)
824 return GNUNET_YES; 822 return GNUNET_YES;
825 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 823 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
826 "Matched result (type %u) for query `%s' with pending request\n", 824 "Matched result (type %u) for query `%s' with pending request\n",
827 (unsigned int) prq->type, 825 (unsigned int)prq->type,
828 GNUNET_h2s (key)); 826 GNUNET_h2s(key));
829 GNUNET_STATISTICS_update (GSF_stats, 827 GNUNET_STATISTICS_update(GSF_stats,
830 gettext_noop ("# replies received and matched"), 828 gettext_noop("# replies received and matched"),
831 1, 829 1,
832 GNUNET_NO); 830 GNUNET_NO);
833 prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx, 831 prq->eval = GNUNET_BLOCK_evaluate(GSF_block_ctx,
834 prq->type, 832 prq->type,
835 pr->bg, 833 pr->bg,
836 prq->eo, 834 prq->eo,
837 key, 835 key,
838 NULL, 836 NULL,
839 0, 837 0,
840 prq->data, 838 prq->data,
841 prq->size); 839 prq->size);
842 switch (prq->eval) 840 switch (prq->eval)
843 { 841 {
844 case GNUNET_BLOCK_EVALUATION_OK_MORE: 842 case GNUNET_BLOCK_EVALUATION_OK_MORE:
845 update_request_performance_data (prq, pr); 843 update_request_performance_data(prq, pr);
846 break; 844 break;
847 case GNUNET_BLOCK_EVALUATION_OK_LAST: 845
848 /* short cut: stop processing early, no BF-update, etc. */ 846 case GNUNET_BLOCK_EVALUATION_OK_LAST:
849 update_request_performance_data (prq, pr); 847 /* short cut: stop processing early, no BF-update, etc. */
850 GNUNET_LOAD_update (GSF_rt_entry_lifetime, 848 update_request_performance_data(prq, pr);
851 GNUNET_TIME_absolute_get_duration ( 849 GNUNET_LOAD_update(GSF_rt_entry_lifetime,
852 pr->public_data.start_time) 850 GNUNET_TIME_absolute_get_duration(
853 .rel_value_us); 851 pr->public_data.start_time)
854 if (GNUNET_YES != 852 .rel_value_us);
855 GSF_request_plan_reference_get_last_transmission_ (pr->public_data 853 if (GNUNET_YES !=
856 .pr_head, 854 GSF_request_plan_reference_get_last_transmission_(pr->public_data
857 prq->sender, 855 .pr_head,
858 &last_transmission)) 856 prq->sender,
859 last_transmission.abs_value_us = 857 &last_transmission))
860 GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us; 858 last_transmission.abs_value_us =
861 /* pass on to other peers / local clients */ 859 GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
862 pr->rh (pr->rh_cls, 860 /* pass on to other peers / local clients */
863 prq->eval, 861 pr->rh(pr->rh_cls,
864 pr, 862 prq->eval,
865 prq->anonymity_level, 863 pr,
866 prq->expiration, 864 prq->anonymity_level,
867 last_transmission, 865 prq->expiration,
868 prq->type, 866 last_transmission,
869 prq->data, 867 prq->type,
870 prq->size); 868 prq->data,
871 return GNUNET_YES; 869 prq->size);
872 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: 870 return GNUNET_YES;
871
872 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
873#if INSANE_STATISTICS 873#if INSANE_STATISTICS
874 GNUNET_STATISTICS_update (GSF_stats, 874 GNUNET_STATISTICS_update(GSF_stats,
875 gettext_noop ( 875 gettext_noop(
876 "# duplicate replies discarded (bloomfilter)"), 876 "# duplicate replies discarded (bloomfilter)"),
877 1, 877 1,
878 GNUNET_NO); 878 GNUNET_NO);
879#endif 879#endif
880 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Duplicate response, discarding.\n"); 880 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Duplicate response, discarding.\n");
881 return GNUNET_YES; /* duplicate */ 881 return GNUNET_YES; /* duplicate */
882 case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT: 882
883 GNUNET_STATISTICS_update (GSF_stats, 883 case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
884 gettext_noop ("# irrelevant replies discarded"), 884 GNUNET_STATISTICS_update(GSF_stats,
885 1, 885 gettext_noop("# irrelevant replies discarded"),
886 GNUNET_NO); 886 1,
887 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Irrelevant response, ignoring.\n"); 887 GNUNET_NO);
888 return GNUNET_YES; 888 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Irrelevant response, ignoring.\n");
889 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: 889 return GNUNET_YES;
890 return GNUNET_YES; /* wrong namespace */ 890
891 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: 891 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
892 GNUNET_break (0); 892 return GNUNET_YES; /* wrong namespace */
893 return GNUNET_YES; 893
894 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: 894 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
895 GNUNET_break (0); 895 GNUNET_break(0);
896 return GNUNET_YES; 896 return GNUNET_YES;
897 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: 897
898 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 898 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
899 _ ("Unsupported block type %u\n"), 899 GNUNET_break(0);
900 prq->type); 900 return GNUNET_YES;
901 return GNUNET_NO; 901
902 } 902 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
903 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
904 _("Unsupported block type %u\n"),
905 prq->type);
906 return GNUNET_NO;
907 }
903 /* update bloomfilter */ 908 /* update bloomfilter */
904 GNUNET_CRYPTO_hash (prq->data, prq->size, &chash); 909 GNUNET_CRYPTO_hash(prq->data, prq->size, &chash);
905 GSF_pending_request_update_ (pr, &chash, 1); 910 GSF_pending_request_update_(pr, &chash, 1);
906 if (NULL == prq->sender) 911 if (NULL == prq->sender)
907 { 912 {
908 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 913 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
909 "Found result for query `%s' in local datastore\n", 914 "Found result for query `%s' in local datastore\n",
910 GNUNET_h2s (key)); 915 GNUNET_h2s(key));
911 GNUNET_STATISTICS_update (GSF_stats, 916 GNUNET_STATISTICS_update(GSF_stats,
912 gettext_noop ("# results found locally"), 917 gettext_noop("# results found locally"),
913 1, 918 1,
914 GNUNET_NO); 919 GNUNET_NO);
915 } 920 }
916 else 921 else
917 { 922 {
918 GSF_dht_lookup_ (pr); 923 GSF_dht_lookup_(pr);
919 } 924 }
920 prq->priority += pr->public_data.original_priority; 925 prq->priority += pr->public_data.original_priority;
921 pr->public_data.priority = 0; 926 pr->public_data.priority = 0;
922 pr->public_data.original_priority = 0; 927 pr->public_data.original_priority = 0;
923 pr->public_data.results_found++; 928 pr->public_data.results_found++;
924 prq->request_found = GNUNET_YES; 929 prq->request_found = GNUNET_YES;
925 /* finally, pass on to other peer / local client */ 930 /* finally, pass on to other peer / local client */
926 if (! GSF_request_plan_reference_get_last_transmission_ (pr->public_data 931 if (!GSF_request_plan_reference_get_last_transmission_(pr->public_data
927 .pr_head, 932 .pr_head,
928 prq->sender, 933 prq->sender,
929 &last_transmission)) 934 &last_transmission))
930 last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us; 935 last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
931 pr->rh (pr->rh_cls, 936 pr->rh(pr->rh_cls,
932 prq->eval, 937 prq->eval,
933 pr, 938 pr,
934 prq->anonymity_level, 939 prq->anonymity_level,
935 prq->expiration, 940 prq->expiration,
936 last_transmission, 941 last_transmission,
937 prq->type, 942 prq->type,
938 prq->data, 943 prq->data,
939 prq->size); 944 prq->size);
940 return GNUNET_YES; 945 return GNUNET_YES;
941} 946}
942 947
@@ -944,9 +949,7 @@ process_reply (void *cls, const struct GNUNET_HashCode *key, void *value)
944/** 949/**
945 * Context for put_migration_continuation(). 950 * Context for put_migration_continuation().
946 */ 951 */
947struct PutMigrationContext 952struct PutMigrationContext {
948{
949
950 /** 953 /**
951 * Start time for the operation. 954 * Start time for the operation.
952 */ 955 */
@@ -975,10 +978,10 @@ struct PutMigrationContext
975 * @param msg NULL on success, otherwise an error message 978 * @param msg NULL on success, otherwise an error message
976 */ 979 */
977static void 980static void
978put_migration_continuation (void *cls, 981put_migration_continuation(void *cls,
979 int success, 982 int success,
980 struct GNUNET_TIME_Absolute min_expiration, 983 struct GNUNET_TIME_Absolute min_expiration,
981 const char *msg) 984 const char *msg)
982{ 985{
983 struct PutMigrationContext *pmc = cls; 986 struct PutMigrationContext *pmc = cls;
984 struct GSF_ConnectedPeer *cp; 987 struct GSF_ConnectedPeer *cp;
@@ -986,67 +989,67 @@ put_migration_continuation (void *cls,
986 struct GSF_PeerPerformanceData *ppd; 989 struct GSF_PeerPerformanceData *ppd;
987 990
988 if (NULL != datastore_put_load) 991 if (NULL != datastore_put_load)
989 {
990 if (GNUNET_SYSERR != success)
991 { 992 {
992 GNUNET_LOAD_update (datastore_put_load, 993 if (GNUNET_SYSERR != success)
993 GNUNET_TIME_absolute_get_duration (pmc->start) 994 {
994 .rel_value_us); 995 GNUNET_LOAD_update(datastore_put_load,
996 GNUNET_TIME_absolute_get_duration(pmc->start)
997 .rel_value_us);
998 }
999 else
1000 {
1001 /* on queue failure / timeout, increase the put load dramatically */
1002 GNUNET_LOAD_update(datastore_put_load,
1003 GNUNET_TIME_UNIT_MINUTES.rel_value_us);
1004 }
995 } 1005 }
996 else 1006 cp = GSF_peer_get_(&pmc->origin);
997 {
998 /* on queue failure / timeout, increase the put load dramatically */
999 GNUNET_LOAD_update (datastore_put_load,
1000 GNUNET_TIME_UNIT_MINUTES.rel_value_us);
1001 }
1002 }
1003 cp = GSF_peer_get_ (&pmc->origin);
1004 if (GNUNET_OK == success) 1007 if (GNUNET_OK == success)
1005 {
1006 if (NULL != cp)
1007 { 1008 {
1008 ppd = GSF_get_peer_performance_data_ (cp); 1009 if (NULL != cp)
1009 ppd->migration_delay.rel_value_us /= 2; 1010 {
1011 ppd = GSF_get_peer_performance_data_(cp);
1012 ppd->migration_delay.rel_value_us /= 2;
1013 }
1014 GNUNET_free(pmc);
1015 return;
1010 } 1016 }
1011 GNUNET_free (pmc);
1012 return;
1013 }
1014 if ((GNUNET_NO == success) && (GNUNET_NO == pmc->requested) && (NULL != cp)) 1017 if ((GNUNET_NO == success) && (GNUNET_NO == pmc->requested) && (NULL != cp))
1015 {
1016 ppd = GSF_get_peer_performance_data_ (cp);
1017 if (min_expiration.abs_value_us > 0)
1018 {
1019 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020 "Asking to stop migration for %s because datastore is full\n",
1021 GNUNET_STRINGS_relative_time_to_string (
1022 GNUNET_TIME_absolute_get_remaining (min_expiration),
1023 GNUNET_YES));
1024 GSF_block_peer_migration_ (cp, min_expiration);
1025 }
1026 else
1027 { 1018 {
1028 ppd->migration_delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_SECONDS, 1019 ppd = GSF_get_peer_performance_data_(cp);
1029 ppd->migration_delay); 1020 if (min_expiration.abs_value_us > 0)
1030 ppd->migration_delay = 1021 {
1031 GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS, ppd->migration_delay); 1022 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1032 mig_pause.rel_value_us = 1023 "Asking to stop migration for %s because datastore is full\n",
1033 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, 1024 GNUNET_STRINGS_relative_time_to_string(
1034 ppd->migration_delay.rel_value_us); 1025 GNUNET_TIME_absolute_get_remaining(min_expiration),
1035 ppd->migration_delay = 1026 GNUNET_YES));
1036 GNUNET_TIME_relative_saturating_multiply (ppd->migration_delay, 2); 1027 GSF_block_peer_migration_(cp, min_expiration);
1037 GNUNET_log ( 1028 }
1038 GNUNET_ERROR_TYPE_DEBUG, 1029 else
1039 "Replicated content already exists locally, asking to stop migration for %s\n", 1030 {
1040 GNUNET_STRINGS_relative_time_to_string (mig_pause, GNUNET_YES)); 1031 ppd->migration_delay = GNUNET_TIME_relative_max(GNUNET_TIME_UNIT_SECONDS,
1041 GSF_block_peer_migration_ (cp, 1032 ppd->migration_delay);
1042 GNUNET_TIME_relative_to_absolute (mig_pause)); 1033 ppd->migration_delay =
1034 GNUNET_TIME_relative_min(GNUNET_TIME_UNIT_HOURS, ppd->migration_delay);
1035 mig_pause.rel_value_us =
1036 GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK,
1037 ppd->migration_delay.rel_value_us);
1038 ppd->migration_delay =
1039 GNUNET_TIME_relative_saturating_multiply(ppd->migration_delay, 2);
1040 GNUNET_log(
1041 GNUNET_ERROR_TYPE_DEBUG,
1042 "Replicated content already exists locally, asking to stop migration for %s\n",
1043 GNUNET_STRINGS_relative_time_to_string(mig_pause, GNUNET_YES));
1044 GSF_block_peer_migration_(cp,
1045 GNUNET_TIME_relative_to_absolute(mig_pause));
1046 }
1043 } 1047 }
1044 } 1048 GNUNET_free(pmc);
1045 GNUNET_free (pmc); 1049 GNUNET_STATISTICS_update(GSF_stats,
1046 GNUNET_STATISTICS_update (GSF_stats, 1050 gettext_noop("# Datastore `PUT' failures"),
1047 gettext_noop ("# Datastore `PUT' failures"), 1051 1,
1048 1, 1052 GNUNET_NO);
1049 GNUNET_NO);
1050} 1053}
1051 1054
1052 1055
@@ -1060,22 +1063,22 @@ put_migration_continuation (void *cls,
1060 * #GNUNET_NO to process normally (load normal or low) 1063 * #GNUNET_NO to process normally (load normal or low)
1061 */ 1064 */
1062static int 1065static int
1063test_put_load_too_high (uint32_t priority) 1066test_put_load_too_high(uint32_t priority)
1064{ 1067{
1065 double ld; 1068 double ld;
1066 1069
1067 if (NULL == datastore_put_load) 1070 if (NULL == datastore_put_load)
1068 return GNUNET_NO; 1071 return GNUNET_NO;
1069 if (GNUNET_LOAD_get_average (datastore_put_load) < 50) 1072 if (GNUNET_LOAD_get_average(datastore_put_load) < 50)
1070 return GNUNET_NO; /* very fast */ 1073 return GNUNET_NO; /* very fast */
1071 ld = GNUNET_LOAD_get_load (datastore_put_load); 1074 ld = GNUNET_LOAD_get_load(datastore_put_load);
1072 if (ld < 2.0 * (1 + priority)) 1075 if (ld < 2.0 * (1 + priority))
1073 return GNUNET_NO; 1076 return GNUNET_NO;
1074 GNUNET_STATISTICS_update (GSF_stats, 1077 GNUNET_STATISTICS_update(GSF_stats,
1075 gettext_noop ( 1078 gettext_noop(
1076 "# storage requests dropped due to high load"), 1079 "# storage requests dropped due to high load"),
1077 1, 1080 1,
1078 GNUNET_NO); 1081 GNUNET_NO);
1079 return GNUNET_YES; 1082 return GNUNET_YES;
1080} 1083}
1081 1084
@@ -1096,67 +1099,67 @@ test_put_load_too_high (uint32_t priority)
1096 * @param data pointer to the result data 1099 * @param data pointer to the result data
1097 */ 1100 */
1098static void 1101static void
1099handle_dht_reply (void *cls, 1102handle_dht_reply(void *cls,
1100 struct GNUNET_TIME_Absolute exp, 1103 struct GNUNET_TIME_Absolute exp,
1101 const struct GNUNET_HashCode *key, 1104 const struct GNUNET_HashCode *key,
1102 const struct GNUNET_PeerIdentity *get_path, 1105 const struct GNUNET_PeerIdentity *get_path,
1103 unsigned int get_path_length, 1106 unsigned int get_path_length,
1104 const struct GNUNET_PeerIdentity *put_path, 1107 const struct GNUNET_PeerIdentity *put_path,
1105 unsigned int put_path_length, 1108 unsigned int put_path_length,
1106 enum GNUNET_BLOCK_Type type, 1109 enum GNUNET_BLOCK_Type type,
1107 size_t size, 1110 size_t size,
1108 const void *data) 1111 const void *data)
1109{ 1112{
1110 struct GSF_PendingRequest *pr = cls; 1113 struct GSF_PendingRequest *pr = cls;
1111 struct ProcessReplyClosure prq; 1114 struct ProcessReplyClosure prq;
1112 struct PutMigrationContext *pmc; 1115 struct PutMigrationContext *pmc;
1113 1116
1114 GNUNET_STATISTICS_update (GSF_stats, 1117 GNUNET_STATISTICS_update(GSF_stats,
1115 gettext_noop ("# Replies received from DHT"), 1118 gettext_noop("# Replies received from DHT"),
1116 1, 1119 1,
1117 GNUNET_NO); 1120 GNUNET_NO);
1118 memset (&prq, 0, sizeof (prq)); 1121 memset(&prq, 0, sizeof(prq));
1119 prq.data = data; 1122 prq.data = data;
1120 prq.expiration = exp; 1123 prq.expiration = exp;
1121 /* do not allow migrated content to live longer than 1 year */ 1124 /* do not allow migrated content to live longer than 1 year */
1122 prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute ( 1125 prq.expiration = GNUNET_TIME_absolute_min(GNUNET_TIME_relative_to_absolute(
1123 GNUNET_TIME_UNIT_YEARS), 1126 GNUNET_TIME_UNIT_YEARS),
1124 prq.expiration); 1127 prq.expiration);
1125 prq.size = size; 1128 prq.size = size;
1126 prq.type = type; 1129 prq.type = type;
1127 prq.eo = GNUNET_BLOCK_EO_NONE; 1130 prq.eo = GNUNET_BLOCK_EO_NONE;
1128 process_reply (&prq, key, pr); 1131 process_reply(&prq, key, pr);
1129 if ((GNUNET_YES == active_to_migration) && 1132 if ((GNUNET_YES == active_to_migration) &&
1130 (GNUNET_NO == test_put_load_too_high (prq.priority))) 1133 (GNUNET_NO == test_put_load_too_high(prq.priority)))
1131 {
1132 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1133 "Replicating result for query `%s' with priority %u\n",
1134 GNUNET_h2s (key),
1135 prq.priority);
1136 pmc = GNUNET_new (struct PutMigrationContext);
1137 pmc->start = GNUNET_TIME_absolute_get ();
1138 pmc->requested = GNUNET_YES;
1139 if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1140 0,
1141 key,
1142 size,
1143 data,
1144 type,
1145 prq.priority,
1146 1 /* anonymity */,
1147 0 /* replication */,
1148 exp,
1149 1 + prq.priority,
1150 MAX_DATASTORE_QUEUE,
1151 &put_migration_continuation,
1152 pmc))
1153 { 1134 {
1154 put_migration_continuation (pmc, 1135 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1155 GNUNET_SYSERR, 1136 "Replicating result for query `%s' with priority %u\n",
1156 GNUNET_TIME_UNIT_ZERO_ABS, 1137 GNUNET_h2s(key),
1157 NULL); 1138 prq.priority);
1139 pmc = GNUNET_new(struct PutMigrationContext);
1140 pmc->start = GNUNET_TIME_absolute_get();
1141 pmc->requested = GNUNET_YES;
1142 if (NULL == GNUNET_DATASTORE_put(GSF_dsh,
1143 0,
1144 key,
1145 size,
1146 data,
1147 type,
1148 prq.priority,
1149 1 /* anonymity */,
1150 0 /* replication */,
1151 exp,
1152 1 + prq.priority,
1153 MAX_DATASTORE_QUEUE,
1154 &put_migration_continuation,
1155 pmc))
1156 {
1157 put_migration_continuation(pmc,
1158 GNUNET_SYSERR,
1159 GNUNET_TIME_UNIT_ZERO_ABS,
1160 NULL);
1161 }
1158 } 1162 }
1159 }
1160} 1163}
1161 1164
1162 1165
@@ -1166,42 +1169,42 @@ handle_dht_reply (void *cls,
1166 * @param pr the pending request to process 1169 * @param pr the pending request to process
1167 */ 1170 */
1168void 1171void
1169GSF_dht_lookup_ (struct GSF_PendingRequest *pr) 1172GSF_dht_lookup_(struct GSF_PendingRequest *pr)
1170{ 1173{
1171 const void *xquery; 1174 const void *xquery;
1172 size_t xquery_size; 1175 size_t xquery_size;
1173 struct GNUNET_PeerIdentity pi; 1176 struct GNUNET_PeerIdentity pi;
1174 char buf[sizeof (struct GNUNET_HashCode) * 2] GNUNET_ALIGN; 1177 char buf[sizeof(struct GNUNET_HashCode) * 2] GNUNET_ALIGN;
1175 1178
1176 if (0 != pr->public_data.anonymity_level) 1179 if (0 != pr->public_data.anonymity_level)
1177 return; 1180 return;
1178 if (NULL != pr->gh) 1181 if (NULL != pr->gh)
1179 { 1182 {
1180 GNUNET_DHT_get_stop (pr->gh); 1183 GNUNET_DHT_get_stop(pr->gh);
1181 pr->gh = NULL; 1184 pr->gh = NULL;
1182 } 1185 }
1183 xquery = NULL; 1186 xquery = NULL;
1184 xquery_size = 0; 1187 xquery_size = 0;
1185 if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY)) 1188 if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
1186 { 1189 {
1187 GNUNET_assert (0 != pr->sender_pid); 1190 GNUNET_assert(0 != pr->sender_pid);
1188 GNUNET_PEER_resolve (pr->sender_pid, &pi); 1191 GNUNET_PEER_resolve(pr->sender_pid, &pi);
1189 GNUNET_memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity)); 1192 GNUNET_memcpy(&buf[xquery_size], &pi, sizeof(struct GNUNET_PeerIdentity));
1190 xquery_size += sizeof (struct GNUNET_PeerIdentity); 1193 xquery_size += sizeof(struct GNUNET_PeerIdentity);
1191 } 1194 }
1192 pr->gh = GNUNET_DHT_get_start (GSF_dht, 1195 pr->gh = GNUNET_DHT_get_start(GSF_dht,
1193 pr->public_data.type, 1196 pr->public_data.type,
1194 &pr->public_data.query, 1197 &pr->public_data.query,
1195 DHT_GET_REPLICATION, 1198 DHT_GET_REPLICATION,
1196 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE, 1199 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1197 xquery, 1200 xquery,
1198 xquery_size, 1201 xquery_size,
1199 &handle_dht_reply, 1202 &handle_dht_reply,
1200 pr); 1203 pr);
1201 if ((NULL != pr->gh) && (0 != pr->replies_seen_count)) 1204 if ((NULL != pr->gh) && (0 != pr->replies_seen_count))
1202 GNUNET_DHT_get_filter_known_results (pr->gh, 1205 GNUNET_DHT_get_filter_known_results(pr->gh,
1203 pr->replies_seen_count, 1206 pr->replies_seen_count,
1204 pr->replies_seen); 1207 pr->replies_seen);
1205} 1208}
1206 1209
1207 1210
@@ -1215,11 +1218,11 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1215 * @param data reply block data, NULL on error 1218 * @param data reply block data, NULL on error
1216 */ 1219 */
1217static void 1220static void
1218cadet_reply_proc (void *cls, 1221cadet_reply_proc(void *cls,
1219 enum GNUNET_BLOCK_Type type, 1222 enum GNUNET_BLOCK_Type type,
1220 struct GNUNET_TIME_Absolute expiration, 1223 struct GNUNET_TIME_Absolute expiration,
1221 size_t data_size, 1224 size_t data_size,
1222 const void *data) 1225 const void *data)
1223{ 1226{
1224 struct GSF_PendingRequest *pr = cls; 1227 struct GSF_PendingRequest *pr = cls;
1225 struct ProcessReplyClosure prq; 1228 struct ProcessReplyClosure prq;
@@ -1227,46 +1230,46 @@ cadet_reply_proc (void *cls,
1227 1230
1228 pr->cadet_request = NULL; 1231 pr->cadet_request = NULL;
1229 if (GNUNET_BLOCK_TYPE_ANY == type) 1232 if (GNUNET_BLOCK_TYPE_ANY == type)
1230 { 1233 {
1231 GNUNET_break (NULL == data); 1234 GNUNET_break(NULL == data);
1232 GNUNET_break (0 == data_size); 1235 GNUNET_break(0 == data_size);
1233 pr->cadet_retry_count++; 1236 pr->cadet_retry_count++;
1234 if (pr->cadet_retry_count >= CADET_RETRY_MAX) 1237 if (pr->cadet_retry_count >= CADET_RETRY_MAX)
1235 return; /* give up on cadet */ 1238 return; /* give up on cadet */
1236 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Error retrieiving block via cadet\n"); 1239 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Error retrieiving block via cadet\n");
1237 /* retry -- without delay, as this is non-anonymous 1240 /* retry -- without delay, as this is non-anonymous
1238 and cadet/cadet connect will take some time anyway */ 1241 and cadet/cadet connect will take some time anyway */
1239 pr->cadet_request = GSF_cadet_query (pr->public_data.target, 1242 pr->cadet_request = GSF_cadet_query(pr->public_data.target,
1240 &pr->public_data.query, 1243 &pr->public_data.query,
1241 pr->public_data.type, 1244 pr->public_data.type,
1242 &cadet_reply_proc, 1245 &cadet_reply_proc,
1243 pr); 1246 pr);
1244 return; 1247 return;
1245 } 1248 }
1246 if (GNUNET_YES != 1249 if (GNUNET_YES !=
1247 GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, data_size, &query)) 1250 GNUNET_BLOCK_get_key(GSF_block_ctx, type, data, data_size, &query))
1248 { 1251 {
1249 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1252 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1250 "Failed to derive key for block of type %d\n", 1253 "Failed to derive key for block of type %d\n",
1251 (int) type); 1254 (int)type);
1252 GNUNET_break_op (0); 1255 GNUNET_break_op(0);
1253 return; 1256 return;
1254 } 1257 }
1255 GNUNET_STATISTICS_update (GSF_stats, 1258 GNUNET_STATISTICS_update(GSF_stats,
1256 gettext_noop ("# Replies received from CADET"), 1259 gettext_noop("# Replies received from CADET"),
1257 1, 1260 1,
1258 GNUNET_NO); 1261 GNUNET_NO);
1259 memset (&prq, 0, sizeof (prq)); 1262 memset(&prq, 0, sizeof(prq));
1260 prq.data = data; 1263 prq.data = data;
1261 prq.expiration = expiration; 1264 prq.expiration = expiration;
1262 /* do not allow migrated content to live longer than 1 year */ 1265 /* do not allow migrated content to live longer than 1 year */
1263 prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute ( 1266 prq.expiration = GNUNET_TIME_absolute_min(GNUNET_TIME_relative_to_absolute(
1264 GNUNET_TIME_UNIT_YEARS), 1267 GNUNET_TIME_UNIT_YEARS),
1265 prq.expiration); 1268 prq.expiration);
1266 prq.size = data_size; 1269 prq.size = data_size;
1267 prq.type = type; 1270 prq.type = type;
1268 prq.eo = GNUNET_BLOCK_EO_NONE; 1271 prq.eo = GNUNET_BLOCK_EO_NONE;
1269 process_reply (&prq, &query, pr); 1272 process_reply(&prq, &query, pr);
1270} 1273}
1271 1274
1272 1275
@@ -1276,23 +1279,23 @@ cadet_reply_proc (void *cls,
1276 * @param pr the pending request to process 1279 * @param pr the pending request to process
1277 */ 1280 */
1278void 1281void
1279GSF_cadet_lookup_ (struct GSF_PendingRequest *pr) 1282GSF_cadet_lookup_(struct GSF_PendingRequest *pr)
1280{ 1283{
1281 if (0 != pr->public_data.anonymity_level) 1284 if (0 != pr->public_data.anonymity_level)
1282 return; 1285 return;
1283 if (0 == pr->public_data.target) 1286 if (0 == pr->public_data.target)
1284 { 1287 {
1285 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1288 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1286 "Cannot do cadet-based download, target peer not known\n"); 1289 "Cannot do cadet-based download, target peer not known\n");
1287 return; 1290 return;
1288 } 1291 }
1289 if (NULL != pr->cadet_request) 1292 if (NULL != pr->cadet_request)
1290 return; 1293 return;
1291 pr->cadet_request = GSF_cadet_query (pr->public_data.target, 1294 pr->cadet_request = GSF_cadet_query(pr->public_data.target,
1292 &pr->public_data.query, 1295 &pr->public_data.query,
1293 pr->public_data.type, 1296 pr->public_data.type,
1294 &cadet_reply_proc, 1297 &cadet_reply_proc,
1295 pr); 1298 pr);
1296} 1299}
1297 1300
1298 1301
@@ -1302,18 +1305,18 @@ GSF_cadet_lookup_ (struct GSF_PendingRequest *pr)
1302 * @param cls the `struct GSF_PendingRequest` 1305 * @param cls the `struct GSF_PendingRequest`
1303 */ 1306 */
1304static void 1307static void
1305warn_delay_task (void *cls) 1308warn_delay_task(void *cls)
1306{ 1309{
1307 struct GSF_PendingRequest *pr = cls; 1310 struct GSF_PendingRequest *pr = cls;
1308 1311
1309 GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, 1312 GNUNET_log(GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1310 _ ("Datastore lookup already took %s!\n"), 1313 _("Datastore lookup already took %s!\n"),
1311 GNUNET_STRINGS_relative_time_to_string ( 1314 GNUNET_STRINGS_relative_time_to_string(
1312 GNUNET_TIME_absolute_get_duration (pr->qe_start), 1315 GNUNET_TIME_absolute_get_duration(pr->qe_start),
1313 GNUNET_YES)); 1316 GNUNET_YES));
1314 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1317 pr->warn_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_MINUTES,
1315 &warn_delay_task, 1318 &warn_delay_task,
1316 pr); 1319 pr);
1317} 1320}
1318 1321
1319 1322
@@ -1323,140 +1326,140 @@ warn_delay_task (void *cls)
1323 * @param cls the `struct GSF_PendingRequest` 1326 * @param cls the `struct GSF_PendingRequest`
1324 */ 1327 */
1325static void 1328static void
1326odc_warn_delay_task (void *cls) 1329odc_warn_delay_task(void *cls)
1327{ 1330{
1328 struct GSF_PendingRequest *pr = cls; 1331 struct GSF_PendingRequest *pr = cls;
1329 1332
1330 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1333 GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
1331 _ ("On-demand lookup already took %s!\n"), 1334 _("On-demand lookup already took %s!\n"),
1332 GNUNET_STRINGS_relative_time_to_string ( 1335 GNUNET_STRINGS_relative_time_to_string(
1333 GNUNET_TIME_absolute_get_duration (pr->qe_start), 1336 GNUNET_TIME_absolute_get_duration(pr->qe_start),
1334 GNUNET_YES)); 1337 GNUNET_YES));
1335 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1338 pr->warn_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_MINUTES,
1336 &odc_warn_delay_task, 1339 &odc_warn_delay_task,
1337 pr); 1340 pr);
1338} 1341}
1339 1342
1340 1343
1341/* Call our continuation (if we have any) */ 1344/* Call our continuation (if we have any) */
1342static void 1345static void
1343call_continuation (struct GSF_PendingRequest *pr) 1346call_continuation(struct GSF_PendingRequest *pr)
1344{ 1347{
1345 GSF_LocalLookupContinuation cont = pr->llc_cont; 1348 GSF_LocalLookupContinuation cont = pr->llc_cont;
1346 1349
1347 GNUNET_assert (NULL == pr->qe); 1350 GNUNET_assert(NULL == pr->qe);
1348 if (NULL != pr->warn_task) 1351 if (NULL != pr->warn_task)
1349 { 1352 {
1350 GNUNET_SCHEDULER_cancel (pr->warn_task); 1353 GNUNET_SCHEDULER_cancel(pr->warn_task);
1351 pr->warn_task = NULL; 1354 pr->warn_task = NULL;
1352 } 1355 }
1353 if (NULL == cont) 1356 if (NULL == cont)
1354 return; /* no continuation */ 1357 return; /* no continuation */
1355 pr->llc_cont = NULL; 1358 pr->llc_cont = NULL;
1356 if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options)) 1359 if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1357 {
1358 if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result)
1359 { 1360 {
1360 /* Signal that we are done and that there won't be any 1361 if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result)
1361 additional results to allow client to clean up state. */ 1362 {
1362 pr->rh (pr->rh_cls, 1363 /* Signal that we are done and that there won't be any
1363 GNUNET_BLOCK_EVALUATION_OK_LAST, 1364 additional results to allow client to clean up state. */
1364 pr, 1365 pr->rh(pr->rh_cls,
1365 UINT32_MAX, 1366 GNUNET_BLOCK_EVALUATION_OK_LAST,
1366 GNUNET_TIME_UNIT_ZERO_ABS, 1367 pr,
1367 GNUNET_TIME_UNIT_FOREVER_ABS, 1368 UINT32_MAX,
1368 GNUNET_BLOCK_TYPE_ANY, 1369 GNUNET_TIME_UNIT_ZERO_ABS,
1369 NULL, 1370 GNUNET_TIME_UNIT_FOREVER_ABS,
1370 0); 1371 GNUNET_BLOCK_TYPE_ANY,
1372 NULL,
1373 0);
1374 }
1375 /* Finally, call our continuation to signal that we are
1376 done with local processing of this request; i.e. to
1377 start reading again from the client. */
1378 cont(pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
1379 return;
1371 } 1380 }
1372 /* Finally, call our continuation to signal that we are
1373 done with local processing of this request; i.e. to
1374 start reading again from the client. */
1375 cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
1376 return;
1377 }
1378 1381
1379 cont (pr->llc_cont_cls, pr, pr->local_result); 1382 cont(pr->llc_cont_cls, pr, pr->local_result);
1380} 1383}
1381 1384
1382 1385
1383/* Update stats and call continuation */ 1386/* Update stats and call continuation */
1384static void 1387static void
1385no_more_local_results (struct GSF_PendingRequest *pr) 1388no_more_local_results(struct GSF_PendingRequest *pr)
1386{ 1389{
1387 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, 1390 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1388 "No further local responses available.\n"); 1391 "No further local responses available.\n");
1389#if INSANE_STATISTICS 1392#if INSANE_STATISTICS
1390 if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == pr->public_data.type) || 1393 if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == pr->public_data.type) ||
1391 (GNUNET_BLOCK_TYPE_FS_IBLOCK == pr->public_data.type)) 1394 (GNUNET_BLOCK_TYPE_FS_IBLOCK == pr->public_data.type))
1392 GNUNET_STATISTICS_update (GSF_stats, 1395 GNUNET_STATISTICS_update(GSF_stats,
1393 gettext_noop ( 1396 gettext_noop(
1394 "# requested DBLOCK or IBLOCK not found"), 1397 "# requested DBLOCK or IBLOCK not found"),
1395 1, 1398 1,
1396 GNUNET_NO); 1399 GNUNET_NO);
1397#endif 1400#endif
1398 call_continuation (pr); 1401 call_continuation(pr);
1399} 1402}
1400 1403
1401 1404
1402/* forward declaration */ 1405/* forward declaration */
1403static void 1406static void
1404process_local_reply (void *cls, 1407process_local_reply(void *cls,
1405 const struct GNUNET_HashCode *key, 1408 const struct GNUNET_HashCode *key,
1406 size_t size, 1409 size_t size,
1407 const void *data, 1410 const void *data,
1408 enum GNUNET_BLOCK_Type type, 1411 enum GNUNET_BLOCK_Type type,
1409 uint32_t priority, 1412 uint32_t priority,
1410 uint32_t anonymity, 1413 uint32_t anonymity,
1411 uint32_t replication, 1414 uint32_t replication,
1412 struct GNUNET_TIME_Absolute expiration, 1415 struct GNUNET_TIME_Absolute expiration,
1413 uint64_t uid); 1416 uint64_t uid);
1414 1417
1415 1418
1416/* Start a local query */ 1419/* Start a local query */
1417static void 1420static void
1418start_local_query (struct GSF_PendingRequest *pr, 1421start_local_query(struct GSF_PendingRequest *pr,
1419 uint64_t next_uid, 1422 uint64_t next_uid,
1420 bool random) 1423 bool random)
1421{ 1424{
1422 pr->qe_start = GNUNET_TIME_absolute_get (); 1425 pr->qe_start = GNUNET_TIME_absolute_get();
1423 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 1426 pr->warn_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_MINUTES,
1424 &warn_delay_task, 1427 &warn_delay_task,
1425 pr); 1428 pr);
1426 pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 1429 pr->qe = GNUNET_DATASTORE_get_key(GSF_dsh,
1427 next_uid, 1430 next_uid,
1428 random, 1431 random,
1429 &pr->public_data.query, 1432 &pr->public_data.query,
1430 pr->public_data.type == 1433 pr->public_data.type ==
1431 GNUNET_BLOCK_TYPE_FS_DBLOCK 1434 GNUNET_BLOCK_TYPE_FS_DBLOCK
1432 ? GNUNET_BLOCK_TYPE_ANY 1435 ? GNUNET_BLOCK_TYPE_ANY
1433 : pr->public_data.type, 1436 : pr->public_data.type,
1434 (0 != (GSF_PRO_PRIORITY_UNLIMITED & 1437 (0 != (GSF_PRO_PRIORITY_UNLIMITED &
1435 pr->public_data.options)) 1438 pr->public_data.options))
1436 ? UINT_MAX 1439 ? UINT_MAX
1437 : 1 1440 : 1
1438 /* queue priority */, 1441 /* queue priority */,
1439 (0 != (GSF_PRO_PRIORITY_UNLIMITED & 1442 (0 != (GSF_PRO_PRIORITY_UNLIMITED &
1440 pr->public_data.options)) 1443 pr->public_data.options))
1441 ? UINT_MAX 1444 ? UINT_MAX
1442 : GSF_datastore_queue_size 1445 : GSF_datastore_queue_size
1443 /* max queue size */, 1446 /* max queue size */,
1444 &process_local_reply, 1447 &process_local_reply,
1445 pr); 1448 pr);
1446 if (NULL != pr->qe) 1449 if (NULL != pr->qe)
1447 return; 1450 return;
1448 GNUNET_log ( 1451 GNUNET_log(
1449 GNUNET_ERROR_TYPE_DEBUG, 1452 GNUNET_ERROR_TYPE_DEBUG,
1450 "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n", 1453 "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n",
1451 GNUNET_h2s (&pr->public_data.query), 1454 GNUNET_h2s(&pr->public_data.query),
1452 pr->public_data.type, 1455 pr->public_data.type,
1453 (unsigned long long) next_uid); 1456 (unsigned long long)next_uid);
1454 GNUNET_STATISTICS_update (GSF_stats, 1457 GNUNET_STATISTICS_update(GSF_stats,
1455 gettext_noop ( 1458 gettext_noop(
1456 "# Datastore lookups concluded (error queueing)"), 1459 "# Datastore lookups concluded (error queueing)"),
1457 1, 1460 1,
1458 GNUNET_NO); 1461 GNUNET_NO);
1459 call_continuation (pr); 1462 call_continuation(pr);
1460} 1463}
1461 1464
1462 1465
@@ -1479,178 +1482,178 @@ start_local_query (struct GSF_PendingRequest *pr,
1479 * maybe 0 if no unique identifier is available 1482 * maybe 0 if no unique identifier is available
1480 */ 1483 */
1481static void 1484static void
1482process_local_reply (void *cls, 1485process_local_reply(void *cls,
1483 const struct GNUNET_HashCode *key, 1486 const struct GNUNET_HashCode *key,
1484 size_t size, 1487 size_t size,
1485 const void *data, 1488 const void *data,
1486 enum GNUNET_BLOCK_Type type, 1489 enum GNUNET_BLOCK_Type type,
1487 uint32_t priority, 1490 uint32_t priority,
1488 uint32_t anonymity, 1491 uint32_t anonymity,
1489 uint32_t replication, 1492 uint32_t replication,
1490 struct GNUNET_TIME_Absolute expiration, 1493 struct GNUNET_TIME_Absolute expiration,
1491 uint64_t uid) 1494 uint64_t uid)
1492{ 1495{
1493 struct GSF_PendingRequest *pr = cls; 1496 struct GSF_PendingRequest *pr = cls;
1494 struct ProcessReplyClosure prq; 1497 struct ProcessReplyClosure prq;
1495 struct GNUNET_HashCode query; 1498 struct GNUNET_HashCode query;
1496 unsigned int old_rf; 1499 unsigned int old_rf;
1497 1500
1498 GNUNET_SCHEDULER_cancel (pr->warn_task); 1501 GNUNET_SCHEDULER_cancel(pr->warn_task);
1499 pr->warn_task = NULL; 1502 pr->warn_task = NULL;
1500 if (NULL == pr->qe) 1503 if (NULL == pr->qe)
1501 goto called_from_on_demand; 1504 goto called_from_on_demand;
1502 pr->qe = NULL; 1505 pr->qe = NULL;
1503 if ( 1506 if (
1504 (NULL == key) && pr->seen_null && 1507 (NULL == key) && pr->seen_null &&
1505 ! pr->have_first_uid) /* We have hit the end for the 2nd time with no results */ 1508 !pr->have_first_uid) /* We have hit the end for the 2nd time with no results */
1506 { 1509 {
1507 /* No results */ 1510 /* No results */
1508#if INSANE_STATISTICS 1511#if INSANE_STATISTICS
1509 GNUNET_STATISTICS_update (GSF_stats, 1512 GNUNET_STATISTICS_update(GSF_stats,
1510 gettext_noop ( 1513 gettext_noop(
1511 "# Datastore lookups concluded (no results)"), 1514 "# Datastore lookups concluded (no results)"),
1512 1, 1515 1,
1513 GNUNET_NO); 1516 GNUNET_NO);
1514#endif 1517#endif
1515 no_more_local_results (pr); 1518 no_more_local_results(pr);
1516 return; 1519 return;
1517 } 1520 }
1518 if (((NULL == key) && 1521 if (((NULL == key) &&
1519 pr->seen_null) || /* We have hit the end for the 2nd time OR */ 1522 pr->seen_null) || /* We have hit the end for the 2nd time OR */
1520 (pr->seen_null && pr->have_first_uid && 1523 (pr->seen_null && pr->have_first_uid &&
1521 (uid >= pr->first_uid))) /* We have hit the end and past first UID */ 1524 (uid >= pr->first_uid))) /* We have hit the end and past first UID */
1522 { 1525 {
1523 /* Seen all results */ 1526 /* Seen all results */
1524 GNUNET_STATISTICS_update (GSF_stats, 1527 GNUNET_STATISTICS_update(GSF_stats,
1525 gettext_noop ( 1528 gettext_noop(
1526 "# Datastore lookups concluded (seen all)"), 1529 "# Datastore lookups concluded (seen all)"),
1527 1, 1530 1,
1528 GNUNET_NO); 1531 GNUNET_NO);
1529 no_more_local_results (pr); 1532 no_more_local_results(pr);
1530 return; 1533 return;
1531 } 1534 }
1532 if (NULL == key) 1535 if (NULL == key)
1533 { 1536 {
1534 GNUNET_assert (! pr->seen_null); 1537 GNUNET_assert(!pr->seen_null);
1535 pr->seen_null = true; 1538 pr->seen_null = true;
1536 start_local_query (pr, 0 /* next_uid */, false /* random */); 1539 start_local_query(pr, 0 /* next_uid */, false /* random */);
1537 return; 1540 return;
1538 } 1541 }
1539 if (! pr->have_first_uid) 1542 if (!pr->have_first_uid)
1540 { 1543 {
1541 pr->first_uid = uid; 1544 pr->first_uid = uid;
1542 pr->have_first_uid = true; 1545 pr->have_first_uid = true;
1543 } 1546 }
1544 pr->result_count++; 1547 pr->result_count++;
1545 if (pr->result_count > MAX_RESULTS) 1548 if (pr->result_count > MAX_RESULTS)
1546 { 1549 {
1547 GNUNET_STATISTICS_update ( 1550 GNUNET_STATISTICS_update(
1548 GSF_stats, 1551 GSF_stats,
1549 gettext_noop ("# Datastore lookups aborted (more than MAX_RESULTS)"), 1552 gettext_noop("# Datastore lookups aborted (more than MAX_RESULTS)"),
1550 1, 1553 1,
1551 GNUNET_NO); 1554 GNUNET_NO);
1552 no_more_local_results (pr); 1555 no_more_local_results(pr);
1553 return; 1556 return;
1554 } 1557 }
1555 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1558 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1556 "Received reply for `%s' of type %d with UID %llu from datastore.\n", 1559 "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1557 GNUNET_h2s (key), 1560 GNUNET_h2s(key),
1558 type, 1561 type,
1559 (unsigned long long) uid); 1562 (unsigned long long)uid);
1560 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) 1563 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1561 {
1562 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1563 "Found ONDEMAND block, performing on-demand encoding\n");
1564 GNUNET_STATISTICS_update (GSF_stats,
1565 gettext_noop (
1566 "# on-demand blocks matched requests"),
1567 1,
1568 GNUNET_NO);
1569 pr->qe_start = GNUNET_TIME_absolute_get ();
1570 pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1571 &odc_warn_delay_task,
1572 pr);
1573 if (GNUNET_OK == GNUNET_FS_handle_on_demand_block (key,
1574 size,
1575 data,
1576 type,
1577 priority,
1578 anonymity,
1579 replication,
1580 expiration,
1581 uid,
1582 &process_local_reply,
1583 pr))
1584 { 1564 {
1585 GNUNET_STATISTICS_update (GSF_stats, 1565 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1586 gettext_noop ( 1566 "Found ONDEMAND block, performing on-demand encoding\n");
1587 "# on-demand lookups performed successfully"), 1567 GNUNET_STATISTICS_update(GSF_stats,
1588 1, 1568 gettext_noop(
1589 GNUNET_NO); 1569 "# on-demand blocks matched requests"),
1590 return; /* we're done */ 1570 1,
1571 GNUNET_NO);
1572 pr->qe_start = GNUNET_TIME_absolute_get();
1573 pr->warn_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_MINUTES,
1574 &odc_warn_delay_task,
1575 pr);
1576 if (GNUNET_OK == GNUNET_FS_handle_on_demand_block(key,
1577 size,
1578 data,
1579 type,
1580 priority,
1581 anonymity,
1582 replication,
1583 expiration,
1584 uid,
1585 &process_local_reply,
1586 pr))
1587 {
1588 GNUNET_STATISTICS_update(GSF_stats,
1589 gettext_noop(
1590 "# on-demand lookups performed successfully"),
1591 1,
1592 GNUNET_NO);
1593 return; /* we're done */
1594 }
1595 GNUNET_STATISTICS_update(GSF_stats,
1596 gettext_noop("# on-demand lookups failed"),
1597 1,
1598 GNUNET_NO);
1599 GNUNET_SCHEDULER_cancel(pr->warn_task);
1600 start_local_query(pr, uid + 1 /* next_uid */, false /* random */);
1601 return;
1591 } 1602 }
1592 GNUNET_STATISTICS_update (GSF_stats,
1593 gettext_noop ("# on-demand lookups failed"),
1594 1,
1595 GNUNET_NO);
1596 GNUNET_SCHEDULER_cancel (pr->warn_task);
1597 start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1598 return;
1599 }
1600called_from_on_demand: 1603called_from_on_demand:
1601 old_rf = pr->public_data.results_found; 1604 old_rf = pr->public_data.results_found;
1602 memset (&prq, 0, sizeof (prq)); 1605 memset(&prq, 0, sizeof(prq));
1603 prq.data = data; 1606 prq.data = data;
1604 prq.expiration = expiration; 1607 prq.expiration = expiration;
1605 prq.size = size; 1608 prq.size = size;
1606 if (GNUNET_OK != 1609 if (GNUNET_OK !=
1607 GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, size, &query)) 1610 GNUNET_BLOCK_get_key(GSF_block_ctx, type, data, size, &query))
1608 { 1611 {
1609 GNUNET_break (0); 1612 GNUNET_break(0);
1610 GNUNET_DATASTORE_remove (GSF_dsh, 1613 GNUNET_DATASTORE_remove(GSF_dsh,
1611 key, 1614 key,
1612 size, 1615 size,
1613 data, 1616 data,
1614 UINT_MAX, 1617 UINT_MAX,
1615 UINT_MAX, 1618 UINT_MAX,
1616 NULL, 1619 NULL,
1617 NULL); 1620 NULL);
1618 start_local_query (pr, uid + 1 /* next_uid */, false /* random */); 1621 start_local_query(pr, uid + 1 /* next_uid */, false /* random */);
1619 return; 1622 return;
1620 } 1623 }
1621 prq.type = type; 1624 prq.type = type;
1622 prq.priority = priority; 1625 prq.priority = priority;
1623 prq.request_found = GNUNET_NO; 1626 prq.request_found = GNUNET_NO;
1624 prq.anonymity_level = anonymity; 1627 prq.anonymity_level = anonymity;
1625 if ((0 == old_rf) && (0 == pr->public_data.results_found)) 1628 if ((0 == old_rf) && (0 == pr->public_data.results_found))
1626 GSF_update_datastore_delay_ (pr->public_data.start_time); 1629 GSF_update_datastore_delay_(pr->public_data.start_time);
1627 prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO; 1630 prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO;
1628 process_reply (&prq, key, pr); 1631 process_reply(&prq, key, pr);
1629 pr->local_result = prq.eval; 1632 pr->local_result = prq.eval;
1630 if (GNUNET_BLOCK_EVALUATION_OK_LAST == prq.eval) 1633 if (GNUNET_BLOCK_EVALUATION_OK_LAST == prq.eval)
1631 { 1634 {
1632 GNUNET_STATISTICS_update ( 1635 GNUNET_STATISTICS_update(
1633 GSF_stats, 1636 GSF_stats,
1634 gettext_noop ("# Datastore lookups concluded (found last result)"), 1637 gettext_noop("# Datastore lookups concluded (found last result)"),
1635 1, 1638 1,
1636 GNUNET_NO); 1639 GNUNET_NO);
1637 call_continuation (pr); 1640 call_continuation(pr);
1638 return; 1641 return;
1639 } 1642 }
1640 if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) && 1643 if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1641 ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) || 1644 ((GNUNET_YES == GSF_test_get_load_too_high_(0)) ||
1642 (pr->public_data.results_found > 5 + 2 * pr->public_data.priority))) 1645 (pr->public_data.results_found > 5 + 2 * pr->public_data.priority)))
1643 { 1646 {
1644 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n"); 1647 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n");
1645 GNUNET_STATISTICS_update (GSF_stats, 1648 GNUNET_STATISTICS_update(GSF_stats,
1646 gettext_noop ( 1649 gettext_noop(
1647 "# Datastore lookups concluded (load too high)"), 1650 "# Datastore lookups concluded (load too high)"),
1648 1, 1651 1,
1649 GNUNET_NO); 1652 GNUNET_NO);
1650 call_continuation (pr); 1653 call_continuation(pr);
1651 return; 1654 return;
1652 } 1655 }
1653 start_local_query (pr, uid + 1 /* next_uid */, false /* random */); 1656 start_local_query(pr, uid + 1 /* next_uid */, false /* random */);
1654} 1657}
1655 1658
1656 1659
@@ -1662,17 +1665,17 @@ called_from_on_demand:
1662 * @return #GNUNET_YES if this request could be forwarded to the given peer 1665 * @return #GNUNET_YES if this request could be forwarded to the given peer
1663 */ 1666 */
1664int 1667int
1665GSF_pending_request_test_target_ (struct GSF_PendingRequest *pr, 1668GSF_pending_request_test_target_(struct GSF_PendingRequest *pr,
1666 const struct GNUNET_PeerIdentity *target) 1669 const struct GNUNET_PeerIdentity *target)
1667{ 1670{
1668 struct GNUNET_PeerIdentity pi; 1671 struct GNUNET_PeerIdentity pi;
1669 1672
1670 if (0 == pr->origin_pid) 1673 if (0 == pr->origin_pid)
1671 return GNUNET_YES; 1674 return GNUNET_YES;
1672 GNUNET_PEER_resolve (pr->origin_pid, &pi); 1675 GNUNET_PEER_resolve(pr->origin_pid, &pi);
1673 return (0 == memcmp (&pi, target, sizeof (struct GNUNET_PeerIdentity))) 1676 return (0 == memcmp(&pi, target, sizeof(struct GNUNET_PeerIdentity)))
1674 ? GNUNET_NO 1677 ? GNUNET_NO
1675 : GNUNET_YES; 1678 : GNUNET_YES;
1676} 1679}
1677 1680
1678 1681
@@ -1684,22 +1687,22 @@ GSF_pending_request_test_target_ (struct GSF_PendingRequest *pr,
1684 * @param cont_cls closure for @a cont 1687 * @param cont_cls closure for @a cont
1685 */ 1688 */
1686void 1689void
1687GSF_local_lookup_ (struct GSF_PendingRequest *pr, 1690GSF_local_lookup_(struct GSF_PendingRequest *pr,
1688 GSF_LocalLookupContinuation cont, 1691 GSF_LocalLookupContinuation cont,
1689 void *cont_cls) 1692 void *cont_cls)
1690{ 1693{
1691 GNUNET_assert (NULL == pr->gh); 1694 GNUNET_assert(NULL == pr->gh);
1692 GNUNET_assert (NULL == pr->cadet_request); 1695 GNUNET_assert(NULL == pr->cadet_request);
1693 GNUNET_assert (NULL == pr->llc_cont); 1696 GNUNET_assert(NULL == pr->llc_cont);
1694 pr->llc_cont = cont; 1697 pr->llc_cont = cont;
1695 pr->llc_cont_cls = cont_cls; 1698 pr->llc_cont_cls = cont_cls;
1696#if INSANE_STATISTICS 1699#if INSANE_STATISTICS
1697 GNUNET_STATISTICS_update (GSF_stats, 1700 GNUNET_STATISTICS_update(GSF_stats,
1698 gettext_noop ("# Datastore lookups initiated"), 1701 gettext_noop("# Datastore lookups initiated"),
1699 1, 1702 1,
1700 GNUNET_NO); 1703 GNUNET_NO);
1701#endif 1704#endif
1702 start_local_query (pr, 0 /* next_uid */, true /* random */); 1705 start_local_query(pr, 0 /* next_uid */, true /* random */);
1703} 1706}
1704 1707
1705 1708
@@ -1713,7 +1716,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1713 * @param put the actual message 1716 * @param put the actual message
1714 */ 1717 */
1715void 1718void
1716handle_p2p_put (void *cls, const struct PutMessage *put) 1719handle_p2p_put(void *cls, const struct PutMessage *put)
1717{ 1720{
1718 struct GSF_ConnectedPeer *cp = cls; 1721 struct GSF_ConnectedPeer *cp = cls;
1719 uint16_t msize; 1722 uint16_t msize;
@@ -1726,30 +1729,30 @@ handle_p2p_put (void *cls, const struct PutMessage *put)
1726 double putl; 1729 double putl;
1727 struct PutMigrationContext *pmc; 1730 struct PutMigrationContext *pmc;
1728 1731
1729 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1732 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1730 "Received P2P PUT from %s\n", 1733 "Received P2P PUT from %s\n",
1731 GNUNET_i2s (GSF_get_peer_performance_data_ (cp)->peer)); 1734 GNUNET_i2s(GSF_get_peer_performance_data_(cp)->peer));
1732 GSF_cover_content_count++; 1735 GSF_cover_content_count++;
1733 msize = ntohs (put->header.size); 1736 msize = ntohs(put->header.size);
1734 dsize = msize - sizeof (struct PutMessage); 1737 dsize = msize - sizeof(struct PutMessage);
1735 type = ntohl (put->type); 1738 type = ntohl(put->type);
1736 expiration = GNUNET_TIME_absolute_ntoh (put->expiration); 1739 expiration = GNUNET_TIME_absolute_ntoh(put->expiration);
1737 /* do not allow migrated content to live longer than 1 year */ 1740 /* do not allow migrated content to live longer than 1 year */
1738 expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute ( 1741 expiration = GNUNET_TIME_absolute_min(GNUNET_TIME_relative_to_absolute(
1739 GNUNET_TIME_UNIT_YEARS), 1742 GNUNET_TIME_UNIT_YEARS),
1740 expiration); 1743 expiration);
1741 if (GNUNET_OK != 1744 if (GNUNET_OK !=
1742 GNUNET_BLOCK_get_key (GSF_block_ctx, type, &put[1], dsize, &query)) 1745 GNUNET_BLOCK_get_key(GSF_block_ctx, type, &put[1], dsize, &query))
1743 { 1746 {
1744 GNUNET_break_op (0); 1747 GNUNET_break_op(0);
1745 return; 1748 return;
1746 } 1749 }
1747 GNUNET_STATISTICS_update (GSF_stats, 1750 GNUNET_STATISTICS_update(GSF_stats,
1748 gettext_noop ("# GAP PUT messages received"), 1751 gettext_noop("# GAP PUT messages received"),
1749 1, 1752 1,
1750 GNUNET_NO); 1753 GNUNET_NO);
1751 /* now, lookup 'query' */ 1754 /* now, lookup 'query' */
1752 prq.data = (const void *) &put[1]; 1755 prq.data = (const void *)&put[1];
1753 prq.sender = cp; 1756 prq.sender = cp;
1754 prq.size = dsize; 1757 prq.size = dsize;
1755 prq.type = type; 1758 prq.type = type;
@@ -1758,80 +1761,80 @@ handle_p2p_put (void *cls, const struct PutMessage *put)
1758 prq.anonymity_level = UINT32_MAX; 1761 prq.anonymity_level = UINT32_MAX;
1759 prq.request_found = GNUNET_NO; 1762 prq.request_found = GNUNET_NO;
1760 prq.eo = GNUNET_BLOCK_EO_NONE; 1763 prq.eo = GNUNET_BLOCK_EO_NONE;
1761 GNUNET_CONTAINER_multihashmap_get_multiple (pr_map, 1764 GNUNET_CONTAINER_multihashmap_get_multiple(pr_map,
1762 &query, 1765 &query,
1763 &process_reply, 1766 &process_reply,
1764 &prq); 1767 &prq);
1765 if (NULL != cp) 1768 if (NULL != cp)
1766 { 1769 {
1767 GSF_connected_peer_change_preference_ (cp, 1770 GSF_connected_peer_change_preference_(cp,
1768 CONTENT_BANDWIDTH_VALUE + 1771 CONTENT_BANDWIDTH_VALUE +
1769 1000 * prq.priority); 1772 1000 * prq.priority);
1770 GSF_get_peer_performance_data_ (cp)->respect += prq.priority; 1773 GSF_get_peer_performance_data_(cp)->respect += prq.priority;
1771 } 1774 }
1772 if ((GNUNET_YES == active_to_migration) && (NULL != cp) && 1775 if ((GNUNET_YES == active_to_migration) && (NULL != cp) &&
1773 (GNUNET_NO == test_put_load_too_high (prq.priority))) 1776 (GNUNET_NO == test_put_load_too_high(prq.priority)))
1774 {
1775 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1776 "Replicating result for query `%s' with priority %u\n",
1777 GNUNET_h2s (&query),
1778 prq.priority);
1779 pmc = GNUNET_new (struct PutMigrationContext);
1780 pmc->start = GNUNET_TIME_absolute_get ();
1781 pmc->requested = prq.request_found;
1782 GNUNET_assert (0 != GSF_get_peer_performance_data_ (cp)->pid);
1783 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
1784 &pmc->origin);
1785 if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1786 0,
1787 &query,
1788 dsize,
1789 &put[1],
1790 type,
1791 prq.priority,
1792 1 /* anonymity */,
1793 0 /* replication */,
1794 expiration,
1795 1 + prq.priority,
1796 MAX_DATASTORE_QUEUE,
1797 &put_migration_continuation,
1798 pmc))
1799 { 1777 {
1800 put_migration_continuation (pmc, 1778 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1801 GNUNET_SYSERR, 1779 "Replicating result for query `%s' with priority %u\n",
1802 GNUNET_TIME_UNIT_ZERO_ABS, 1780 GNUNET_h2s(&query),
1803 NULL); 1781 prq.priority);
1782 pmc = GNUNET_new(struct PutMigrationContext);
1783 pmc->start = GNUNET_TIME_absolute_get();
1784 pmc->requested = prq.request_found;
1785 GNUNET_assert(0 != GSF_get_peer_performance_data_(cp)->pid);
1786 GNUNET_PEER_resolve(GSF_get_peer_performance_data_(cp)->pid,
1787 &pmc->origin);
1788 if (NULL == GNUNET_DATASTORE_put(GSF_dsh,
1789 0,
1790 &query,
1791 dsize,
1792 &put[1],
1793 type,
1794 prq.priority,
1795 1 /* anonymity */,
1796 0 /* replication */,
1797 expiration,
1798 1 + prq.priority,
1799 MAX_DATASTORE_QUEUE,
1800 &put_migration_continuation,
1801 pmc))
1802 {
1803 put_migration_continuation(pmc,
1804 GNUNET_SYSERR,
1805 GNUNET_TIME_UNIT_ZERO_ABS,
1806 NULL);
1807 }
1804 } 1808 }
1805 }
1806 else if (NULL != cp) 1809 else if (NULL != cp)
1807 { 1810 {
1808 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1811 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1809 "Choosing not to keep content `%s' (%d/%d)\n", 1812 "Choosing not to keep content `%s' (%d/%d)\n",
1810 GNUNET_h2s (&query), 1813 GNUNET_h2s(&query),
1811 active_to_migration, 1814 active_to_migration,
1812 test_put_load_too_high (prq.priority)); 1815 test_put_load_too_high(prq.priority));
1813 } 1816 }
1814 putl = GNUNET_LOAD_get_load (datastore_put_load); 1817 putl = GNUNET_LOAD_get_load(datastore_put_load);
1815 if ((NULL != cp) && (GNUNET_NO == prq.request_found) && 1818 if ((NULL != cp) && (GNUNET_NO == prq.request_found) &&
1816 ((GNUNET_YES != active_to_migration) || 1819 ((GNUNET_YES != active_to_migration) ||
1817 (putl > 2.5 * (1 + prq.priority)))) 1820 (putl > 2.5 * (1 + prq.priority))))
1818 { 1821 {
1819 if (GNUNET_YES != active_to_migration) 1822 if (GNUNET_YES != active_to_migration)
1820 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5); 1823 putl = 1.0 + GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 5);
1821 block_time = GNUNET_TIME_relative_multiply ( 1824 block_time = GNUNET_TIME_relative_multiply(
1822 GNUNET_TIME_UNIT_MILLISECONDS, 1825 GNUNET_TIME_UNIT_MILLISECONDS,
1823 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1826 5000 + GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
1824 (unsigned int) (60000 * putl * putl))); 1827 (unsigned int)(60000 * putl * putl)));
1825 GNUNET_log ( 1828 GNUNET_log(
1826 GNUNET_ERROR_TYPE_DEBUG, 1829 GNUNET_ERROR_TYPE_DEBUG,
1827 "Asking to stop migration for %s because of load %f and events %d/%d\n", 1830 "Asking to stop migration for %s because of load %f and events %d/%d\n",
1828 GNUNET_STRINGS_relative_time_to_string (block_time, GNUNET_YES), 1831 GNUNET_STRINGS_relative_time_to_string(block_time, GNUNET_YES),
1829 putl, 1832 putl,
1830 active_to_migration, 1833 active_to_migration,
1831 (GNUNET_NO == prq.request_found)); 1834 (GNUNET_NO == prq.request_found));
1832 GSF_block_peer_migration_ (cp, 1835 GSF_block_peer_migration_(cp,
1833 GNUNET_TIME_relative_to_absolute (block_time)); 1836 GNUNET_TIME_relative_to_absolute(block_time));
1834 } 1837 }
1835} 1838}
1836 1839
1837 1840
@@ -1842,7 +1845,7 @@ handle_p2p_put (void *cls, const struct PutMessage *put)
1842 * @return #GNUNET_YES if the request is still active 1845 * @return #GNUNET_YES if the request is still active
1843 */ 1846 */
1844int 1847int
1845GSF_pending_request_test_active_ (struct GSF_PendingRequest *pr) 1848GSF_pending_request_test_active_(struct GSF_PendingRequest *pr)
1846{ 1849{
1847 return (NULL != pr->rh) ? GNUNET_YES : GNUNET_NO; 1850 return (NULL != pr->rh) ? GNUNET_YES : GNUNET_NO;
1848} 1851}
@@ -1852,24 +1855,24 @@ GSF_pending_request_test_active_ (struct GSF_PendingRequest *pr)
1852 * Setup the subsystem. 1855 * Setup the subsystem.
1853 */ 1856 */
1854void 1857void
1855GSF_pending_request_init_ () 1858GSF_pending_request_init_()
1856{ 1859{
1857 if (GNUNET_OK != 1860 if (GNUNET_OK !=
1858 GNUNET_CONFIGURATION_get_value_number (GSF_cfg, 1861 GNUNET_CONFIGURATION_get_value_number(GSF_cfg,
1859 "fs", 1862 "fs",
1860 "MAX_PENDING_REQUESTS", 1863 "MAX_PENDING_REQUESTS",
1861 &max_pending_requests)) 1864 &max_pending_requests))
1862 { 1865 {
1863 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO, 1866 GNUNET_log_config_missing(GNUNET_ERROR_TYPE_INFO,
1864 "fs", 1867 "fs",
1865 "MAX_PENDING_REQUESTS"); 1868 "MAX_PENDING_REQUESTS");
1866 } 1869 }
1867 active_to_migration = 1870 active_to_migration =
1868 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING"); 1871 GNUNET_CONFIGURATION_get_value_yesno(GSF_cfg, "FS", "CONTENT_CACHING");
1869 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); 1872 datastore_put_load = GNUNET_LOAD_value_init(DATASTORE_LOAD_AUTODECLINE);
1870 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024, GNUNET_YES); 1873 pr_map = GNUNET_CONTAINER_multihashmap_create(32 * 1024, GNUNET_YES);
1871 requests_by_expiration_heap = 1874 requests_by_expiration_heap =
1872 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 1875 GNUNET_CONTAINER_heap_create(GNUNET_CONTAINER_HEAP_ORDER_MIN);
1873} 1876}
1874 1877
1875 1878
@@ -1877,14 +1880,14 @@ GSF_pending_request_init_ ()
1877 * Shutdown the subsystem. 1880 * Shutdown the subsystem.
1878 */ 1881 */
1879void 1882void
1880GSF_pending_request_done_ () 1883GSF_pending_request_done_()
1881{ 1884{
1882 GNUNET_CONTAINER_multihashmap_iterate (pr_map, &clean_request, NULL); 1885 GNUNET_CONTAINER_multihashmap_iterate(pr_map, &clean_request, NULL);
1883 GNUNET_CONTAINER_multihashmap_destroy (pr_map); 1886 GNUNET_CONTAINER_multihashmap_destroy(pr_map);
1884 pr_map = NULL; 1887 pr_map = NULL;
1885 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap); 1888 GNUNET_CONTAINER_heap_destroy(requests_by_expiration_heap);
1886 requests_by_expiration_heap = NULL; 1889 requests_by_expiration_heap = NULL;
1887 GNUNET_LOAD_value_free (datastore_put_load); 1890 GNUNET_LOAD_value_free(datastore_put_load);
1888 datastore_put_load = NULL; 1891 datastore_put_load = NULL;
1889} 1892}
1890 1893