aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c1421
1 files changed, 721 insertions, 700 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index 6dab37869..dca8ae2a8 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -43,12 +43,14 @@
43/** 43/**
44 * How often do we flush respect values to disk? 44 * How often do we flush respect values to disk?
45 */ 45 */
46#define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5) 46#define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply ( \
47 GNUNET_TIME_UNIT_MINUTES, 5)
47 48
48/** 49/**
49 * After how long do we discard a reply? 50 * After how long do we discard a reply?
50 */ 51 */
51#define REPLY_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2) 52#define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \
53 2)
52 54
53/** 55/**
54 * Collect an instane number of statistics? May cause excessive IPC. 56 * Collect an instane number of statistics? May cause excessive IPC.
@@ -59,7 +61,8 @@
59/** 61/**
60 * Handle to cancel a transmission request. 62 * Handle to cancel a transmission request.
61 */ 63 */
62struct GSF_PeerTransmitHandle { 64struct GSF_PeerTransmitHandle
65{
63 /** 66 /**
64 * Kept in a doubly-linked list. 67 * Kept in a doubly-linked list.
65 */ 68 */
@@ -105,7 +108,8 @@ struct GSF_PeerTransmitHandle {
105/** 108/**
106 * Handle for an entry in our delay list. 109 * Handle for an entry in our delay list.
107 */ 110 */
108struct GSF_DelayedHandle { 111struct GSF_DelayedHandle
112{
109 /** 113 /**
110 * Kept in a doubly-linked list. 114 * Kept in a doubly-linked list.
111 */ 115 */
@@ -141,7 +145,8 @@ struct GSF_DelayedHandle {
141/** 145/**
142 * Information per peer and request. 146 * Information per peer and request.
143 */ 147 */
144struct PeerRequest { 148struct PeerRequest
149{
145 /** 150 /**
146 * Handle to generic request (generic: from peer or local client). 151 * Handle to generic request (generic: from peer or local client).
147 */ 152 */
@@ -162,7 +167,8 @@ struct PeerRequest {
162/** 167/**
163 * A connected peer. 168 * A connected peer.
164 */ 169 */
165struct GSF_ConnectedPeer { 170struct GSF_ConnectedPeer
171{
166 /** 172 /**
167 * Performance data for this peer. 173 * Performance data for this peer.
168 */ 174 */
@@ -293,16 +299,16 @@ static struct GNUNET_SCHEDULER_Task *fr_task;
293 * @param latency current latency value 299 * @param latency current latency value
294 */ 300 */
295void 301void
296GSF_update_peer_latency_(const struct GNUNET_PeerIdentity *id, 302GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
297 struct GNUNET_TIME_Relative latency) 303 struct GNUNET_TIME_Relative latency)
298{ 304{
299 struct GSF_ConnectedPeer *cp; 305 struct GSF_ConnectedPeer *cp;
300 306
301 cp = GSF_peer_get_(id); 307 cp = GSF_peer_get_ (id);
302 if (NULL == cp) 308 if (NULL == cp)
303 return; /* we're not yet connected at the core level, ignore */ 309 return; /* we're not yet connected at the core level, ignore */
304 GNUNET_LOAD_value_set_decline(cp->ppd.transmission_delay, 310 GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay,
305 latency); 311 latency);
306} 312}
307 313
308 314
@@ -313,7 +319,7 @@ GSF_update_peer_latency_(const struct GNUNET_PeerIdentity *id,
313 * @return performance data record for the peer 319 * @return performance data record for the peer
314 */ 320 */
315struct GSF_PeerPerformanceData * 321struct GSF_PeerPerformanceData *
316GSF_get_peer_performance_data_(struct GSF_ConnectedPeer *cp) 322GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
317{ 323{
318 return &cp->ppd; 324 return &cp->ppd;
319} 325}
@@ -325,7 +331,7 @@ GSF_get_peer_performance_data_(struct GSF_ConnectedPeer *cp)
325 * @param cp which peer to send a message to 331 * @param cp which peer to send a message to
326 */ 332 */
327static void 333static void
328peer_transmit(struct GSF_ConnectedPeer *cp); 334peer_transmit (struct GSF_ConnectedPeer *cp);
329 335
330 336
331/** 337/**
@@ -339,10 +345,10 @@ peer_transmit(struct GSF_ConnectedPeer *cp);
339 * long should the client wait until re-trying? 345 * long should the client wait until re-trying?
340 */ 346 */
341static void 347static void
342ats_reserve_callback(void *cls, 348ats_reserve_callback (void *cls,
343 const struct GNUNET_PeerIdentity *peer, 349 const struct GNUNET_PeerIdentity *peer,
344 int32_t amount, 350 int32_t amount,
345 struct GNUNET_TIME_Relative res_delay); 351 struct GNUNET_TIME_Relative res_delay);
346 352
347 353
348/** 354/**
@@ -352,42 +358,42 @@ ats_reserve_callback(void *cls,
352 * @param pth transmission handle to schedule 358 * @param pth transmission handle to schedule
353 */ 359 */
354static void 360static void
355schedule_transmission(struct GSF_PeerTransmitHandle *pth) 361schedule_transmission (struct GSF_PeerTransmitHandle *pth)
356{ 362{
357 struct GSF_ConnectedPeer *cp; 363 struct GSF_ConnectedPeer *cp;
358 struct GNUNET_PeerIdentity target; 364 struct GNUNET_PeerIdentity target;
359 365
360 cp = pth->cp; 366 cp = pth->cp;
361 GNUNET_assert(0 != cp->ppd.pid); 367 GNUNET_assert (0 != cp->ppd.pid);
362 GNUNET_PEER_resolve(cp->ppd.pid, &target); 368 GNUNET_PEER_resolve (cp->ppd.pid, &target);
363 369
364 if (0 != cp->inc_preference) 370 if (0 != cp->inc_preference)
365 { 371 {
366 GNUNET_ATS_performance_change_preference(GSF_ats, 372 GNUNET_ATS_performance_change_preference (GSF_ats,
367 &target, 373 &target,
368 GNUNET_ATS_PREFERENCE_BANDWIDTH, 374 GNUNET_ATS_PREFERENCE_BANDWIDTH,
369 (double)cp->inc_preference, 375 (double) cp->inc_preference,
370 GNUNET_ATS_PREFERENCE_END); 376 GNUNET_ATS_PREFERENCE_END);
371 cp->inc_preference = 0; 377 cp->inc_preference = 0;
372 } 378 }
373 379
374 if ((GNUNET_YES == pth->is_query) && 380 if ((GNUNET_YES == pth->is_query) &&
375 (GNUNET_YES != pth->was_reserved)) 381 (GNUNET_YES != pth->was_reserved))
376 { 382 {
377 /* query, need reservation */ 383 /* query, need reservation */
378 if (GNUNET_YES != cp->did_reserve) 384 if (GNUNET_YES != cp->did_reserve)
379 return; /* not ready */ 385 return; /* not ready */
380 cp->did_reserve = GNUNET_NO; 386 cp->did_reserve = GNUNET_NO;
381 /* reservation already done! */ 387 /* reservation already done! */
382 pth->was_reserved = GNUNET_YES; 388 pth->was_reserved = GNUNET_YES;
383 cp->rc = GNUNET_ATS_reserve_bandwidth(GSF_ats, 389 cp->rc = GNUNET_ATS_reserve_bandwidth (GSF_ats,
384 &target, 390 &target,
385 DBLOCK_SIZE, 391 DBLOCK_SIZE,
386 &ats_reserve_callback, 392 &ats_reserve_callback,
387 cp); 393 cp);
388 return; 394 return;
389 } 395 }
390 peer_transmit(cp); 396 peer_transmit (cp);
391} 397}
392 398
393 399
@@ -397,38 +403,38 @@ schedule_transmission(struct GSF_PeerTransmitHandle *pth)
397 * @param cp which peer to send a message to 403 * @param cp which peer to send a message to
398 */ 404 */
399static void 405static void
400peer_transmit(struct GSF_ConnectedPeer *cp) 406peer_transmit (struct GSF_ConnectedPeer *cp)
401{ 407{
402 struct GSF_PeerTransmitHandle *pth = cp->pth_head; 408 struct GSF_PeerTransmitHandle *pth = cp->pth_head;
403 struct GSF_PeerTransmitHandle *pos; 409 struct GSF_PeerTransmitHandle *pos;
404 410
405 if (NULL == pth) 411 if (NULL == pth)
406 return; 412 return;
407 GNUNET_CONTAINER_DLL_remove(cp->pth_head, 413 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
408 cp->pth_tail, 414 cp->pth_tail,
409 pth); 415 pth);
410 if (GNUNET_YES == pth->is_query) 416 if (GNUNET_YES == pth->is_query)
411 { 417 {
412 cp->ppd.last_request_times[(cp->last_request_times_off++) % 418 cp->ppd.last_request_times[(cp->last_request_times_off++)
413 MAX_QUEUE_PER_PEER] = 419 % MAX_QUEUE_PER_PEER] =
414 GNUNET_TIME_absolute_get(); 420 GNUNET_TIME_absolute_get ();
415 GNUNET_assert(0 < cp->ppd.pending_queries--); 421 GNUNET_assert (0 < cp->ppd.pending_queries--);
416 } 422 }
417 else if (GNUNET_NO == pth->is_query) 423 else if (GNUNET_NO == pth->is_query)
418 { 424 {
419 GNUNET_assert(0 < cp->ppd.pending_replies--); 425 GNUNET_assert (0 < cp->ppd.pending_replies--);
420 } 426 }
421 GNUNET_LOAD_update(cp->ppd.transmission_delay, 427 GNUNET_LOAD_update (cp->ppd.transmission_delay,
422 GNUNET_TIME_absolute_get_duration 428 GNUNET_TIME_absolute_get_duration
423 (pth->transmission_request_start_time).rel_value_us); 429 (pth->transmission_request_start_time).rel_value_us);
424 GNUNET_MQ_send(cp->mq, 430 GNUNET_MQ_send (cp->mq,
425 pth->env); 431 pth->env);
426 GNUNET_free(pth); 432 GNUNET_free (pth);
427 if (NULL != (pos = cp->pth_head)) 433 if (NULL != (pos = cp->pth_head))
428 { 434 {
429 GNUNET_assert(pos != pth); 435 GNUNET_assert (pos != pth);
430 schedule_transmission(pos); 436 schedule_transmission (pos);
431 } 437 }
432} 438}
433 439
434 440
@@ -438,18 +444,18 @@ peer_transmit(struct GSF_ConnectedPeer *cp)
438 * @param cls the `struct GSF_ConnectedPeer` to reserve from 444 * @param cls the `struct GSF_ConnectedPeer` to reserve from
439 */ 445 */
440static void 446static void
441retry_reservation(void *cls) 447retry_reservation (void *cls)
442{ 448{
443 struct GSF_ConnectedPeer *cp = cls; 449 struct GSF_ConnectedPeer *cp = cls;
444 struct GNUNET_PeerIdentity target; 450 struct GNUNET_PeerIdentity target;
445 451
446 GNUNET_PEER_resolve(cp->ppd.pid, &target); 452 GNUNET_PEER_resolve (cp->ppd.pid, &target);
447 cp->rc_delay_task = NULL; 453 cp->rc_delay_task = NULL;
448 cp->rc = 454 cp->rc =
449 GNUNET_ATS_reserve_bandwidth(GSF_ats, 455 GNUNET_ATS_reserve_bandwidth (GSF_ats,
450 &target, 456 &target,
451 DBLOCK_SIZE, 457 DBLOCK_SIZE,
452 &ats_reserve_callback, cp); 458 &ats_reserve_callback, cp);
453} 459}
454 460
455 461
@@ -464,34 +470,34 @@ retry_reservation(void *cls)
464 * long should the client wait until re-trying? 470 * long should the client wait until re-trying?
465 */ 471 */
466static void 472static void
467ats_reserve_callback(void *cls, 473ats_reserve_callback (void *cls,
468 const struct GNUNET_PeerIdentity *peer, 474 const struct GNUNET_PeerIdentity *peer,
469 int32_t amount, 475 int32_t amount,
470 struct GNUNET_TIME_Relative res_delay) 476 struct GNUNET_TIME_Relative res_delay)
471{ 477{
472 struct GSF_ConnectedPeer *cp = cls; 478 struct GSF_ConnectedPeer *cp = cls;
473 struct GSF_PeerTransmitHandle *pth; 479 struct GSF_PeerTransmitHandle *pth;
474 480
475 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
476 "Reserved %d bytes / need to wait %s for reservation\n", 482 "Reserved %d bytes / need to wait %s for reservation\n",
477 (int)amount, 483 (int) amount,
478 GNUNET_STRINGS_relative_time_to_string(res_delay, GNUNET_YES)); 484 GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES));
479 cp->rc = NULL; 485 cp->rc = NULL;
480 if (0 == amount) 486 if (0 == amount)
481 { 487 {
482 cp->rc_delay_task = 488 cp->rc_delay_task =
483 GNUNET_SCHEDULER_add_delayed(res_delay, 489 GNUNET_SCHEDULER_add_delayed (res_delay,
484 &retry_reservation, 490 &retry_reservation,
485 cp); 491 cp);
486 return; 492 return;
487 } 493 }
488 cp->did_reserve = GNUNET_YES; 494 cp->did_reserve = GNUNET_YES;
489 pth = cp->pth_head; 495 pth = cp->pth_head;
490 if (NULL != pth) 496 if (NULL != pth)
491 { 497 {
492 /* reservation success, try transmission now! */ 498 /* reservation success, try transmission now! */
493 peer_transmit(cp); 499 peer_transmit (cp);
494 } 500 }
495} 501}
496 502
497 503
@@ -503,22 +509,22 @@ ats_reserve_callback(void *cls,
503 * @param emsg error message, or NULL if no errors 509 * @param emsg error message, or NULL if no errors
504 */ 510 */
505static void 511static void
506peer_respect_cb(void *cls, 512peer_respect_cb (void *cls,
507 const struct GNUNET_PEERSTORE_Record *record, 513 const struct GNUNET_PEERSTORE_Record *record,
508 const char *emsg) 514 const char *emsg)
509{ 515{
510 struct GSF_ConnectedPeer *cp = cls; 516 struct GSF_ConnectedPeer *cp = cls;
511 517
512 GNUNET_assert(NULL != cp->respect_iterate_req); 518 GNUNET_assert (NULL != cp->respect_iterate_req);
513 if ((NULL != record) && 519 if ((NULL != record) &&
514 (sizeof(cp->disk_respect) == record->value_size)) 520 (sizeof(cp->disk_respect) == record->value_size))
515 { 521 {
516 cp->disk_respect = *((uint32_t *)record->value); 522 cp->disk_respect = *((uint32_t *) record->value);
517 cp->ppd.respect += *((uint32_t *)record->value); 523 cp->ppd.respect += *((uint32_t *) record->value);
518 } 524 }
519 GSF_push_start_(cp); 525 GSF_push_start_ (cp);
520 if (NULL != record) 526 if (NULL != record)
521 GNUNET_PEERSTORE_iterate_cancel(cp->respect_iterate_req); 527 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
522 cp->respect_iterate_req = NULL; 528 cp->respect_iterate_req = NULL;
523} 529}
524 530
@@ -534,27 +540,27 @@ peer_respect_cb(void *cls,
534 * @return #GNUNET_YES to continue to iterate 540 * @return #GNUNET_YES to continue to iterate
535 */ 541 */
536static int 542static int
537consider_peer_for_forwarding(void *cls, 543consider_peer_for_forwarding (void *cls,
538 const struct GNUNET_HashCode *key, 544 const struct GNUNET_HashCode *key,
539 struct GSF_PendingRequest *pr) 545 struct GSF_PendingRequest *pr)
540{ 546{
541 struct GSF_ConnectedPeer *cp = cls; 547 struct GSF_ConnectedPeer *cp = cls;
542 struct GNUNET_PeerIdentity pid; 548 struct GNUNET_PeerIdentity pid;
543 549
544 if (GNUNET_YES != 550 if (GNUNET_YES !=
545 GSF_pending_request_test_active_(pr)) 551 GSF_pending_request_test_active_ (pr))
546 return GNUNET_YES; /* request is not actually active, skip! */ 552 return GNUNET_YES; /* request is not actually active, skip! */
547 GSF_connected_peer_get_identity_(cp, &pid); 553 GSF_connected_peer_get_identity_ (cp, &pid);
548 if (GNUNET_YES != 554 if (GNUNET_YES !=
549 GSF_pending_request_test_target_(pr, &pid)) 555 GSF_pending_request_test_target_ (pr, &pid))
550 { 556 {
551 GNUNET_STATISTICS_update(GSF_stats, 557 GNUNET_STATISTICS_update (GSF_stats,
552 gettext_noop("# Loopback routes suppressed"), 558 gettext_noop ("# Loopback routes suppressed"),
553 1, 559 1,
554 GNUNET_NO); 560 GNUNET_NO);
555 return GNUNET_YES; 561 return GNUNET_YES;
556 } 562 }
557 GSF_plan_add_(cp, pr); 563 GSF_plan_add_ (cp, pr);
558 return GNUNET_YES; 564 return GNUNET_YES;
559} 565}
560 566
@@ -569,49 +575,50 @@ consider_peer_for_forwarding(void *cls,
569 * @return our internal handle for the peer 575 * @return our internal handle for the peer
570 */ 576 */
571void * 577void *
572GSF_peer_connect_handler(void *cls, 578GSF_peer_connect_handler (void *cls,
573 const struct GNUNET_PeerIdentity *peer, 579 const struct GNUNET_PeerIdentity *peer,
574 struct GNUNET_MQ_Handle *mq) 580 struct GNUNET_MQ_Handle *mq)
575{ 581{
576 struct GSF_ConnectedPeer *cp; 582 struct GSF_ConnectedPeer *cp;
577 583
578 if (0 == 584 if (0 ==
579 GNUNET_memcmp(&GSF_my_id, 585 GNUNET_memcmp (&GSF_my_id,
580 peer)) 586 peer))
581 return NULL; 587 return NULL;
582 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 588 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
583 "Connected to peer %s\n", 589 "Connected to peer %s\n",
584 GNUNET_i2s(peer)); 590 GNUNET_i2s (peer));
585 cp = GNUNET_new(struct GSF_ConnectedPeer); 591 cp = GNUNET_new (struct GSF_ConnectedPeer);
586 cp->ppd.pid = GNUNET_PEER_intern(peer); 592 cp->ppd.pid = GNUNET_PEER_intern (peer);
587 cp->ppd.peer = peer; 593 cp->ppd.peer = peer;
588 cp->mq = mq; 594 cp->mq = mq;
589 cp->ppd.transmission_delay = GNUNET_LOAD_value_init(GNUNET_TIME_UNIT_ZERO); 595 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
590 cp->rc = 596 cp->rc =
591 GNUNET_ATS_reserve_bandwidth(GSF_ats, 597 GNUNET_ATS_reserve_bandwidth (GSF_ats,
592 peer, 598 peer,
593 DBLOCK_SIZE, 599 DBLOCK_SIZE,
594 &ats_reserve_callback, cp); 600 &ats_reserve_callback, cp);
595 cp->request_map = GNUNET_CONTAINER_multihashmap_create(128, 601 cp->request_map = GNUNET_CONTAINER_multihashmap_create (128,
596 GNUNET_YES); 602 GNUNET_YES);
597 GNUNET_break(GNUNET_OK == 603 GNUNET_break (GNUNET_OK ==
598 GNUNET_CONTAINER_multipeermap_put(cp_map, 604 GNUNET_CONTAINER_multipeermap_put (cp_map,
599 GSF_connected_peer_get_identity2_(cp), 605 GSF_connected_peer_get_identity2_ (
600 cp, 606 cp),
601 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 607 cp,
602 GNUNET_STATISTICS_set(GSF_stats, 608 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
603 gettext_noop("# peers connected"), 609 GNUNET_STATISTICS_set (GSF_stats,
604 GNUNET_CONTAINER_multipeermap_size(cp_map), 610 gettext_noop ("# peers connected"),
605 GNUNET_NO); 611 GNUNET_CONTAINER_multipeermap_size (cp_map),
612 GNUNET_NO);
606 cp->respect_iterate_req 613 cp->respect_iterate_req
607 = GNUNET_PEERSTORE_iterate(peerstore, 614 = GNUNET_PEERSTORE_iterate (peerstore,
608 "fs", 615 "fs",
609 peer, 616 peer,
610 "respect", 617 "respect",
611 &peer_respect_cb, 618 &peer_respect_cb,
612 cp);
613 GSF_iterate_pending_requests_(&consider_peer_for_forwarding,
614 cp); 619 cp);
620 GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
621 cp);
615 return cp; 622 return cp;
616} 623}
617 624
@@ -623,21 +630,21 @@ GSF_peer_connect_handler(void *cls,
623 * @param cls the `struct GSF_ConnectedPeer` 630 * @param cls the `struct GSF_ConnectedPeer`
624 */ 631 */
625static void 632static void
626revive_migration(void *cls) 633revive_migration (void *cls)
627{ 634{
628 struct GSF_ConnectedPeer *cp = cls; 635 struct GSF_ConnectedPeer *cp = cls;
629 struct GNUNET_TIME_Relative bt; 636 struct GNUNET_TIME_Relative bt;
630 637
631 cp->mig_revive_task = NULL; 638 cp->mig_revive_task = NULL;
632 bt = GNUNET_TIME_absolute_get_remaining(cp->ppd.migration_blocked_until); 639 bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until);
633 if (0 != bt.rel_value_us) 640 if (0 != bt.rel_value_us)
634 { 641 {
635 /* still time left... */ 642 /* still time left... */
636 cp->mig_revive_task = 643 cp->mig_revive_task =
637 GNUNET_SCHEDULER_add_delayed(bt, &revive_migration, cp); 644 GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp);
638 return; 645 return;
639 } 646 }
640 GSF_push_start_(cp); 647 GSF_push_start_ (cp);
641} 648}
642 649
643 650
@@ -648,11 +655,11 @@ revive_migration(void *cls)
648 * @return NULL if the peer is not currently connected 655 * @return NULL if the peer is not currently connected
649 */ 656 */
650struct GSF_ConnectedPeer * 657struct GSF_ConnectedPeer *
651GSF_peer_get_(const struct GNUNET_PeerIdentity *peer) 658GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
652{ 659{
653 if (NULL == cp_map) 660 if (NULL == cp_map)
654 return NULL; 661 return NULL;
655 return GNUNET_CONTAINER_multipeermap_get(cp_map, peer); 662 return GNUNET_CONTAINER_multipeermap_get (cp_map, peer);
656} 663}
657 664
658 665
@@ -663,29 +670,29 @@ GSF_peer_get_(const struct GNUNET_PeerIdentity *peer)
663 * @param msm the actual message 670 * @param msm the actual message
664 */ 671 */
665void 672void
666handle_p2p_migration_stop(void *cls, 673handle_p2p_migration_stop (void *cls,
667 const struct MigrationStopMessage *msm) 674 const struct MigrationStopMessage *msm)
668{ 675{
669 struct GSF_ConnectedPeer *cp = cls; 676 struct GSF_ConnectedPeer *cp = cls;
670 struct GNUNET_TIME_Relative bt; 677 struct GNUNET_TIME_Relative bt;
671 678
672 GNUNET_STATISTICS_update(GSF_stats, 679 GNUNET_STATISTICS_update (GSF_stats,
673 gettext_noop("# migration stop messages received"), 680 gettext_noop ("# migration stop messages received"),
674 1, GNUNET_NO); 681 1, GNUNET_NO);
675 bt = GNUNET_TIME_relative_ntoh(msm->duration); 682 bt = GNUNET_TIME_relative_ntoh (msm->duration);
676 GNUNET_log(GNUNET_ERROR_TYPE_INFO, 683 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
677 _("Migration of content to peer `%s' blocked for %s\n"), 684 _ ("Migration of content to peer `%s' blocked for %s\n"),
678 GNUNET_i2s(cp->ppd.peer), 685 GNUNET_i2s (cp->ppd.peer),
679 GNUNET_STRINGS_relative_time_to_string(bt, GNUNET_YES)); 686 GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
680 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute(bt); 687 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
681 if ((NULL == cp->mig_revive_task) && 688 if ((NULL == cp->mig_revive_task) &&
682 (NULL == cp->respect_iterate_req)) 689 (NULL == cp->respect_iterate_req))
683 { 690 {
684 GSF_push_stop_(cp); 691 GSF_push_stop_ (cp);
685 cp->mig_revive_task = 692 cp->mig_revive_task =
686 GNUNET_SCHEDULER_add_delayed(bt, 693 GNUNET_SCHEDULER_add_delayed (bt,
687 &revive_migration, cp); 694 &revive_migration, cp);
688 } 695 }
689} 696}
690 697
691 698
@@ -695,26 +702,26 @@ handle_p2p_migration_stop(void *cls,
695 * @param peerreq request to free 702 * @param peerreq request to free
696 */ 703 */
697static void 704static void
698free_pending_request(struct PeerRequest *peerreq) 705free_pending_request (struct PeerRequest *peerreq)
699{ 706{
700 struct GSF_ConnectedPeer *cp = peerreq->cp; 707 struct GSF_ConnectedPeer *cp = peerreq->cp;
701 struct GSF_PendingRequestData *prd; 708 struct GSF_PendingRequestData *prd;
702 709
703 prd = GSF_pending_request_get_data_(peerreq->pr); 710 prd = GSF_pending_request_get_data_ (peerreq->pr);
704 if (NULL != peerreq->kill_task) 711 if (NULL != peerreq->kill_task)
705 { 712 {
706 GNUNET_SCHEDULER_cancel(peerreq->kill_task); 713 GNUNET_SCHEDULER_cancel (peerreq->kill_task);
707 peerreq->kill_task = NULL; 714 peerreq->kill_task = NULL;
708 } 715 }
709 GNUNET_STATISTICS_update(GSF_stats, 716 GNUNET_STATISTICS_update (GSF_stats,
710 gettext_noop("# P2P searches active"), 717 gettext_noop ("# P2P searches active"),
711 -1, 718 -1,
712 GNUNET_NO); 719 GNUNET_NO);
713 GNUNET_break(GNUNET_YES == 720 GNUNET_break (GNUNET_YES ==
714 GNUNET_CONTAINER_multihashmap_remove(cp->request_map, 721 GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
715 &prd->query, 722 &prd->query,
716 peerreq)); 723 peerreq));
717 GNUNET_free(peerreq); 724 GNUNET_free (peerreq);
718} 725}
719 726
720 727
@@ -727,16 +734,16 @@ free_pending_request(struct PeerRequest *peerreq)
727 * @return #GNUNET_YES (continue to iterate) 734 * @return #GNUNET_YES (continue to iterate)
728 */ 735 */
729static int 736static int
730cancel_pending_request(void *cls, 737cancel_pending_request (void *cls,
731 const struct GNUNET_HashCode *query, 738 const struct GNUNET_HashCode *query,
732 void *value) 739 void *value)
733{ 740{
734 struct PeerRequest *peerreq = value; 741 struct PeerRequest *peerreq = value;
735 struct GSF_PendingRequest *pr = peerreq->pr; 742 struct GSF_PendingRequest *pr = peerreq->pr;
736 743
737 free_pending_request(peerreq); 744 free_pending_request (peerreq);
738 GSF_pending_request_cancel_(pr, 745 GSF_pending_request_cancel_ (pr,
739 GNUNET_NO); 746 GNUNET_NO);
740 return GNUNET_OK; 747 return GNUNET_OK;
741} 748}
742 749
@@ -747,17 +754,17 @@ cancel_pending_request(void *cls,
747 * @param cls the request to free 754 * @param cls the request to free
748 */ 755 */
749static void 756static void
750peer_request_destroy(void *cls) 757peer_request_destroy (void *cls)
751{ 758{
752 struct PeerRequest *peerreq = cls; 759 struct PeerRequest *peerreq = cls;
753 struct GSF_PendingRequest *pr = peerreq->pr; 760 struct GSF_PendingRequest *pr = peerreq->pr;
754 struct GSF_PendingRequestData *prd; 761 struct GSF_PendingRequestData *prd;
755 762
756 peerreq->kill_task = NULL; 763 peerreq->kill_task = NULL;
757 prd = GSF_pending_request_get_data_(pr); 764 prd = GSF_pending_request_get_data_ (pr);
758 cancel_pending_request(NULL, 765 cancel_pending_request (NULL,
759 &prd->query, 766 &prd->query,
760 peerreq); 767 peerreq);
761} 768}
762 769
763 770
@@ -767,20 +774,20 @@ peer_request_destroy(void *cls)
767 * @param cls the `struct GSF_DelayedHandle` with the message 774 * @param cls the `struct GSF_DelayedHandle` with the message
768 */ 775 */
769static void 776static void
770transmit_delayed_now(void *cls) 777transmit_delayed_now (void *cls)
771{ 778{
772 struct GSF_DelayedHandle *dh = cls; 779 struct GSF_DelayedHandle *dh = cls;
773 struct GSF_ConnectedPeer *cp = dh->cp; 780 struct GSF_ConnectedPeer *cp = dh->cp;
774 781
775 GNUNET_CONTAINER_DLL_remove(cp->delayed_head, 782 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
776 cp->delayed_tail, 783 cp->delayed_tail,
777 dh); 784 dh);
778 cp->delay_queue_size--; 785 cp->delay_queue_size--;
779 GSF_peer_transmit_(cp, 786 GSF_peer_transmit_ (cp,
780 GNUNET_NO, 787 GNUNET_NO,
781 UINT32_MAX, 788 UINT32_MAX,
782 dh->env); 789 dh->env);
783 GNUNET_free(dh); 790 GNUNET_free (dh);
784} 791}
785 792
786 793
@@ -790,20 +797,20 @@ transmit_delayed_now(void *cls)
790 * @return desired delay 797 * @return desired delay
791 */ 798 */
792static struct GNUNET_TIME_Relative 799static struct GNUNET_TIME_Relative
793get_randomized_delay() 800get_randomized_delay ()
794{ 801{
795 struct GNUNET_TIME_Relative ret; 802 struct GNUNET_TIME_Relative ret;
796 803
797 ret = 804 ret =
798 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 805 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
799 GNUNET_CRYPTO_random_u32 806 GNUNET_CRYPTO_random_u32
800 (GNUNET_CRYPTO_QUALITY_WEAK, 807 (GNUNET_CRYPTO_QUALITY_WEAK,
801 2 * GSF_avg_latency.rel_value_us + 1)); 808 2 * GSF_avg_latency.rel_value_us + 1));
802#if INSANE_STATISTICS 809#if INSANE_STATISTICS
803 GNUNET_STATISTICS_update(GSF_stats, 810 GNUNET_STATISTICS_update (GSF_stats,
804 gettext_noop 811 gettext_noop
805 ("# artificial delays introduced (ms)"), 812 ("# artificial delays introduced (ms)"),
806 ret.rel_value_us / 1000LL, GNUNET_NO); 813 ret.rel_value_us / 1000LL, GNUNET_NO);
807#endif 814#endif
808 return ret; 815 return ret;
809} 816}
@@ -828,15 +835,15 @@ get_randomized_delay()
828 * @param data_len number of bytes in @a data 835 * @param data_len number of bytes in @a data
829 */ 836 */
830static void 837static void
831handle_p2p_reply(void *cls, 838handle_p2p_reply (void *cls,
832 enum GNUNET_BLOCK_EvaluationResult eval, 839 enum GNUNET_BLOCK_EvaluationResult eval,
833 struct GSF_PendingRequest *pr, 840 struct GSF_PendingRequest *pr,
834 uint32_t reply_anonymity_level, 841 uint32_t reply_anonymity_level,
835 struct GNUNET_TIME_Absolute expiration, 842 struct GNUNET_TIME_Absolute expiration,
836 struct GNUNET_TIME_Absolute last_transmission, 843 struct GNUNET_TIME_Absolute last_transmission,
837 enum GNUNET_BLOCK_Type type, 844 enum GNUNET_BLOCK_Type type,
838 const void *data, 845 const void *data,
839 size_t data_len) 846 size_t data_len)
840{ 847{
841 struct PeerRequest *peerreq = cls; 848 struct PeerRequest *peerreq = cls;
842 struct GSF_ConnectedPeer *cp = peerreq->cp; 849 struct GSF_ConnectedPeer *cp = peerreq->cp;
@@ -845,96 +852,98 @@ handle_p2p_reply(void *cls,
845 struct PutMessage *pm; 852 struct PutMessage *pm;
846 size_t msize; 853 size_t msize;
847 854
848 GNUNET_assert(data_len + sizeof(struct PutMessage) < 855 GNUNET_assert (data_len + sizeof(struct PutMessage) <
849 GNUNET_MAX_MESSAGE_SIZE); 856 GNUNET_MAX_MESSAGE_SIZE);
850 GNUNET_assert(peerreq->pr == pr); 857 GNUNET_assert (peerreq->pr == pr);
851 prd = GSF_pending_request_get_data_(pr); 858 prd = GSF_pending_request_get_data_ (pr);
852 if (NULL == data) 859 if (NULL == data)
853 { 860 {
854 free_pending_request(peerreq); 861 free_pending_request (peerreq);
855 return; 862 return;
856 } 863 }
857 GNUNET_break(GNUNET_BLOCK_TYPE_ANY != type); 864 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
858 if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type)) 865 if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
859 { 866 {
860 GNUNET_STATISTICS_update(GSF_stats, 867 GNUNET_STATISTICS_update (GSF_stats,
861 gettext_noop 868 gettext_noop
862 ("# replies dropped due to type mismatch"), 869 ("# replies dropped due to type mismatch"),
863 1, GNUNET_NO); 870 1, GNUNET_NO);
864 return; 871 return;
865 } 872 }
866 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 873 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867 "Transmitting result for query `%s' to peer\n", 874 "Transmitting result for query `%s' to peer\n",
868 GNUNET_h2s(&prd->query)); 875 GNUNET_h2s (&prd->query));
869 GNUNET_STATISTICS_update(GSF_stats, 876 GNUNET_STATISTICS_update (GSF_stats,
870 gettext_noop("# replies received for other peers"), 877 gettext_noop ("# replies received for other peers"),
871 1, GNUNET_NO); 878 1, GNUNET_NO);
872 msize = sizeof(struct PutMessage) + data_len; 879 msize = sizeof(struct PutMessage) + data_len;
873 if (msize >= GNUNET_MAX_MESSAGE_SIZE) 880 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
874 { 881 {
875 GNUNET_break(0); 882 GNUNET_break (0);
876 return; 883 return;
877 } 884 }
878 if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1)) 885 if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
886 {
887 if (reply_anonymity_level - 1 > GSF_cover_content_count)
879 { 888 {
880 if (reply_anonymity_level - 1 > GSF_cover_content_count) 889 GNUNET_STATISTICS_update (GSF_stats,
881 { 890 gettext_noop
882 GNUNET_STATISTICS_update(GSF_stats, 891 (
883 gettext_noop 892 "# replies dropped due to insufficient cover traffic"),
884 ("# replies dropped due to insufficient cover traffic"), 893 1, GNUNET_NO);
885 1, GNUNET_NO); 894 return;
886 return;
887 }
888 GSF_cover_content_count -= (reply_anonymity_level - 1);
889 } 895 }
890 896 GSF_cover_content_count -= (reply_anonymity_level - 1);
891 env = GNUNET_MQ_msg_extra(pm, 897 }
892 data_len, 898
893 GNUNET_MESSAGE_TYPE_FS_PUT); 899 env = GNUNET_MQ_msg_extra (pm,
894 pm->type = htonl(type); 900 data_len,
895 pm->expiration = GNUNET_TIME_absolute_hton(expiration); 901 GNUNET_MESSAGE_TYPE_FS_PUT);
896 GNUNET_memcpy(&pm[1], 902 pm->type = htonl (type);
897 data, 903 pm->expiration = GNUNET_TIME_absolute_hton (expiration);
898 data_len); 904 GNUNET_memcpy (&pm[1],
905 data,
906 data_len);
899 if ((UINT32_MAX != reply_anonymity_level) && 907 if ((UINT32_MAX != reply_anonymity_level) &&
900 (0 != reply_anonymity_level) && 908 (0 != reply_anonymity_level) &&
901 (GNUNET_YES == GSF_enable_randomized_delays)) 909 (GNUNET_YES == GSF_enable_randomized_delays))
902 { 910 {
903 struct GSF_DelayedHandle *dh; 911 struct GSF_DelayedHandle *dh;
904 912
905 dh = GNUNET_new(struct GSF_DelayedHandle); 913 dh = GNUNET_new (struct GSF_DelayedHandle);
906 dh->cp = cp; 914 dh->cp = cp;
907 dh->env = env; 915 dh->env = env;
908 dh->msize = msize; 916 dh->msize = msize;
909 GNUNET_CONTAINER_DLL_insert(cp->delayed_head, 917 GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
910 cp->delayed_tail, 918 cp->delayed_tail,
911 dh); 919 dh);
912 cp->delay_queue_size++; 920 cp->delay_queue_size++;
913 dh->delay_task = 921 dh->delay_task =
914 GNUNET_SCHEDULER_add_delayed(get_randomized_delay(), 922 GNUNET_SCHEDULER_add_delayed (get_randomized_delay (),
915 &transmit_delayed_now, 923 &transmit_delayed_now,
916 dh); 924 dh);
917 } 925 }
918 else 926 else
919 { 927 {
920 GSF_peer_transmit_(cp, 928 GSF_peer_transmit_ (cp,
921 GNUNET_NO, 929 GNUNET_NO,
922 UINT32_MAX, 930 UINT32_MAX,
923 env); 931 env);
924 } 932 }
925 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval) 933 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
926 return; 934 return;
927 if (NULL == peerreq->kill_task) 935 if (NULL == peerreq->kill_task)
928 { 936 {
929 GNUNET_STATISTICS_update(GSF_stats, 937 GNUNET_STATISTICS_update (GSF_stats,
930 gettext_noop 938 gettext_noop
931 ("# P2P searches destroyed due to ultimate reply"), 939 (
932 1, 940 "# P2P searches destroyed due to ultimate reply"),
933 GNUNET_NO); 941 1,
934 peerreq->kill_task = 942 GNUNET_NO);
935 GNUNET_SCHEDULER_add_now(&peer_request_destroy, 943 peerreq->kill_task =
936 peerreq); 944 GNUNET_SCHEDULER_add_now (&peer_request_destroy,
937 } 945 peerreq);
946 }
938} 947}
939 948
940 949
@@ -947,31 +956,31 @@ handle_p2p_reply(void *cls,
947 * @returns the actual change in respect (positive or negative) 956 * @returns the actual change in respect (positive or negative)
948 */ 957 */
949static int 958static int
950change_peer_respect(struct GSF_ConnectedPeer *cp, int value) 959change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
951{ 960{
952 if (0 == value) 961 if (0 == value)
953 return 0; 962 return 0;
954 GNUNET_assert(NULL != cp); 963 GNUNET_assert (NULL != cp);
955 if (value > 0) 964 if (value > 0)
965 {
966 if (cp->ppd.respect + value < cp->ppd.respect)
956 { 967 {
957 if (cp->ppd.respect + value < cp->ppd.respect) 968 value = UINT32_MAX - cp->ppd.respect;
958 { 969 cp->ppd.respect = UINT32_MAX;
959 value = UINT32_MAX - cp->ppd.respect;
960 cp->ppd.respect = UINT32_MAX;
961 }
962 else
963 cp->ppd.respect += value;
964 } 970 }
971 else
972 cp->ppd.respect += value;
973 }
965 else 974 else
975 {
976 if (cp->ppd.respect < -value)
966 { 977 {
967 if (cp->ppd.respect < -value) 978 value = -cp->ppd.respect;
968 { 979 cp->ppd.respect = 0;
969 value = -cp->ppd.respect;
970 cp->ppd.respect = 0;
971 }
972 else
973 cp->ppd.respect += value;
974 } 980 }
981 else
982 cp->ppd.respect += value;
983 }
975 return value; 984 return value;
976} 985}
977 986
@@ -985,58 +994,59 @@ change_peer_respect(struct GSF_ConnectedPeer *cp, int value)
985 * @return effective priority 994 * @return effective priority
986 */ 995 */
987static int32_t 996static int32_t
988bound_priority(uint32_t prio_in, 997bound_priority (uint32_t prio_in,
989 struct GSF_ConnectedPeer *cp) 998 struct GSF_ConnectedPeer *cp)
990{ 999{
991#define N ((double)128.0) 1000#define N ((double) 128.0)
992 uint32_t ret; 1001 uint32_t ret;
993 double rret; 1002 double rret;
994 int ld; 1003 int ld;
995 1004
996 ld = GSF_test_get_load_too_high_(0); 1005 ld = GSF_test_get_load_too_high_ (0);
997 if (GNUNET_SYSERR == ld) 1006 if (GNUNET_SYSERR == ld)
998 { 1007 {
999#if INSANE_STATISTICS 1008#if INSANE_STATISTICS
1000 GNUNET_STATISTICS_update(GSF_stats, 1009 GNUNET_STATISTICS_update (GSF_stats,
1001 gettext_noop 1010 gettext_noop
1002 ("# requests done for free (low load)"), 1, 1011 ("# requests done for free (low load)"), 1,
1003 GNUNET_NO); 1012 GNUNET_NO);
1004#endif 1013#endif
1005 return 0; /* excess resources */ 1014 return 0; /* excess resources */
1006 } 1015 }
1007 if (prio_in > INT32_MAX) 1016 if (prio_in > INT32_MAX)
1008 prio_in = INT32_MAX; 1017 prio_in = INT32_MAX;
1009 ret = -change_peer_respect(cp, -(int)prio_in); 1018 ret = -change_peer_respect (cp, -(int) prio_in);
1010 if (ret > 0) 1019 if (ret > 0)
1011 { 1020 {
1012 if (ret > GSF_current_priorities + N) 1021 if (ret > GSF_current_priorities + N)
1013 rret = GSF_current_priorities + N; 1022 rret = GSF_current_priorities + N;
1014 else 1023 else
1015 rret = ret; 1024 rret = ret;
1016 GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N; 1025 GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1017 } 1026 }
1018 if ((GNUNET_YES == ld) && (ret > 0)) 1027 if ((GNUNET_YES == ld) && (ret > 0))
1019 { 1028 {
1020 /* try with charging */ 1029 /* try with charging */
1021 ld = GSF_test_get_load_too_high_(ret); 1030 ld = GSF_test_get_load_too_high_ (ret);
1022 } 1031 }
1023 if (GNUNET_YES == ld) 1032 if (GNUNET_YES == ld)
1024 { 1033 {
1025 GNUNET_STATISTICS_update(GSF_stats, 1034 GNUNET_STATISTICS_update (GSF_stats,
1026 gettext_noop 1035 gettext_noop
1027 ("# request dropped, priority insufficient"), 1, 1036 ("# request dropped, priority insufficient"), 1,
1028 GNUNET_NO); 1037 GNUNET_NO);
1029 /* undo charge */ 1038 /* undo charge */
1030 change_peer_respect(cp, (int)ret); 1039 change_peer_respect (cp, (int) ret);
1031 return -1; /* not enough resources */ 1040 return -1; /* not enough resources */
1032 } 1041 }
1033 else 1042 else
1034 { 1043 {
1035 GNUNET_STATISTICS_update(GSF_stats, 1044 GNUNET_STATISTICS_update (GSF_stats,
1036 gettext_noop 1045 gettext_noop
1037 ("# requests done for a price (normal load)"), 1, 1046 ("# requests done for a price (normal load)"),
1038 GNUNET_NO); 1047 1,
1039 } 1048 GNUNET_NO);
1049 }
1040#undef N 1050#undef N
1041 return ret; 1051 return ret;
1042} 1052}
@@ -1052,20 +1062,20 @@ bound_priority(uint32_t prio_in,
1052 * otherwise the ttl-limit for the given @a prio 1062 * otherwise the ttl-limit for the given @a prio
1053 */ 1063 */
1054static int32_t 1064static int32_t
1055bound_ttl(int32_t ttl_in, 1065bound_ttl (int32_t ttl_in,
1056 uint32_t prio) 1066 uint32_t prio)
1057{ 1067{
1058 unsigned long long allowed; 1068 unsigned long long allowed;
1059 1069
1060 if (ttl_in <= 0) 1070 if (ttl_in <= 0)
1061 return ttl_in; 1071 return ttl_in;
1062 allowed = ((unsigned long long)prio) * TTL_DECREMENT / 1000; 1072 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000;
1063 if (ttl_in > allowed) 1073 if (ttl_in > allowed)
1064 { 1074 {
1065 if (allowed >= (1 << 30)) 1075 if (allowed >= (1 << 30))
1066 return 1 << 30; 1076 return 1 << 30;
1067 return allowed; 1077 return allowed;
1068 } 1078 }
1069 return ttl_in; 1079 return ttl_in;
1070} 1080}
1071 1081
@@ -1073,7 +1083,8 @@ bound_ttl(int32_t ttl_in,
1073/** 1083/**
1074 * Closure for #test_exist_cb(). 1084 * Closure for #test_exist_cb().
1075 */ 1085 */
1076struct TestExistClosure { 1086struct TestExistClosure
1087{
1077 /** 1088 /**
1078 * Priority of the incoming request. 1089 * Priority of the incoming request.
1079 */ 1090 */
@@ -1107,9 +1118,9 @@ struct TestExistClosure {
1107 * #GNUNET_NO if we successfully merged 1118 * #GNUNET_NO if we successfully merged
1108 */ 1119 */
1109static int 1120static int
1110test_exist_cb(void *cls, 1121test_exist_cb (void *cls,
1111 const struct GNUNET_HashCode *hc, 1122 const struct GNUNET_HashCode *hc,
1112 void *value) 1123 void *value)
1113{ 1124{
1114 struct TestExistClosure *tec = cls; 1125 struct TestExistClosure *tec = cls;
1115 struct PeerRequest *peerreq = value; 1126 struct PeerRequest *peerreq = value;
@@ -1117,28 +1128,28 @@ test_exist_cb(void *cls,
1117 struct GSF_PendingRequestData *prd; 1128 struct GSF_PendingRequestData *prd;
1118 1129
1119 pr = peerreq->pr; 1130 pr = peerreq->pr;
1120 prd = GSF_pending_request_get_data_(pr); 1131 prd = GSF_pending_request_get_data_ (pr);
1121 if (prd->type != tec->type) 1132 if (prd->type != tec->type)
1122 return GNUNET_YES; 1133 return GNUNET_YES;
1123 if (prd->ttl.abs_value_us >= 1134 if (prd->ttl.abs_value_us >=
1124 GNUNET_TIME_absolute_get().abs_value_us + tec->ttl * 1000LL) 1135 GNUNET_TIME_absolute_get ().abs_value_us + tec->ttl * 1000LL)
1125 { 1136 {
1126 /* existing request has higher TTL, drop new one! */ 1137 /* existing request has higher TTL, drop new one! */
1127 prd->priority += tec->priority; 1138 prd->priority += tec->priority;
1128 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1139 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1129 "Have existing request with higher TTL, dropping new request.\n"); 1140 "Have existing request with higher TTL, dropping new request.\n");
1130 GNUNET_STATISTICS_update(GSF_stats, 1141 GNUNET_STATISTICS_update (GSF_stats,
1131 gettext_noop 1142 gettext_noop
1132 ("# requests dropped due to higher-TTL request"), 1143 ("# requests dropped due to higher-TTL request"),
1133 1, GNUNET_NO); 1144 1, GNUNET_NO);
1134 tec->finished = GNUNET_YES; 1145 tec->finished = GNUNET_YES;
1135 return GNUNET_NO; 1146 return GNUNET_NO;
1136 } 1147 }
1137 /* existing request has lower TTL, drop old one! */ 1148 /* existing request has lower TTL, drop old one! */
1138 tec->priority += prd->priority; 1149 tec->priority += prd->priority;
1139 free_pending_request(peerreq); 1150 free_pending_request (peerreq);
1140 GSF_pending_request_cancel_(pr, 1151 GSF_pending_request_cancel_ (pr,
1141 GNUNET_YES); 1152 GNUNET_YES);
1142 return GNUNET_NO; 1153 return GNUNET_NO;
1143} 1154}
1144 1155
@@ -1153,8 +1164,8 @@ test_exist_cb(void *cls,
1153 * @param gm the GET message 1164 * @param gm the GET message
1154 */ 1165 */
1155void 1166void
1156handle_p2p_get(void *cls, 1167handle_p2p_get (void *cls,
1157 const struct GetMessage *gm) 1168 const struct GetMessage *gm)
1158{ 1169{
1159 struct GSF_ConnectedPeer *cps = cls; 1170 struct GSF_ConnectedPeer *cps = cls;
1160 struct PeerRequest *peerreq; 1171 struct PeerRequest *peerreq;
@@ -1172,167 +1183,172 @@ handle_p2p_get(void *cls,
1172 GNUNET_PEER_Id spid; 1183 GNUNET_PEER_Id spid;
1173 const struct GSF_PendingRequestData *prd; 1184 const struct GSF_PendingRequestData *prd;
1174 1185
1175 msize = ntohs(gm->header.size); 1186 msize = ntohs (gm->header.size);
1176 tec.type = ntohl(gm->type); 1187 tec.type = ntohl (gm->type);
1177 bm = ntohl(gm->hash_bitmap); 1188 bm = ntohl (gm->hash_bitmap);
1178 bits = 0; 1189 bits = 0;
1179 while (bm > 0) 1190 while (bm > 0)
1180 { 1191 {
1181 if (1 == (bm & 1)) 1192 if (1 == (bm & 1))
1182 bits++; 1193 bits++;
1183 bm >>= 1; 1194 bm >>= 1;
1184 } 1195 }
1185 opt = (const struct GNUNET_PeerIdentity *)&gm[1]; 1196 opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1186 bfsize = msize - sizeof(struct GetMessage) - bits * sizeof(struct GNUNET_PeerIdentity); 1197 bfsize = msize - sizeof(struct GetMessage) - bits * sizeof(struct
1187 GNUNET_STATISTICS_update(GSF_stats, 1198 GNUNET_PeerIdentity);
1188 gettext_noop 1199 GNUNET_STATISTICS_update (GSF_stats,
1189 ("# GET requests received (from other peers)"), 1200 gettext_noop
1190 1, 1201 ("# GET requests received (from other peers)"),
1191 GNUNET_NO); 1202 1,
1203 GNUNET_NO);
1192 GSF_cover_query_count++; 1204 GSF_cover_query_count++;
1193 bm = ntohl(gm->hash_bitmap); 1205 bm = ntohl (gm->hash_bitmap);
1194 bits = 0; 1206 bits = 0;
1195 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) 1207 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1196 cp = GSF_peer_get_(&opt[bits++]); 1208 cp = GSF_peer_get_ (&opt[bits++]);
1197 else 1209 else
1198 cp = cps; 1210 cp = cps;
1199 if (NULL == cp) 1211 if (NULL == cp)
1200 { 1212 {
1201 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) 1213 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1202 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1214 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1203 "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n", 1215 "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1204 GNUNET_i2s(&opt[bits - 1])); 1216 GNUNET_i2s (&opt[bits - 1]));
1205 1217
1206 else 1218 else
1207 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1219 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1208 "Failed to find peer `%s' in connection set. Dropping query.\n", 1220 "Failed to find peer `%s' in connection set. Dropping query.\n",
1209 GNUNET_i2s(cps->ppd.peer)); 1221 GNUNET_i2s (cps->ppd.peer));
1210 GNUNET_STATISTICS_update(GSF_stats, 1222 GNUNET_STATISTICS_update (GSF_stats,
1211 gettext_noop 1223 gettext_noop
1212 ("# requests dropped due to missing reverse route"), 1224 (
1213 1, 1225 "# requests dropped due to missing reverse route"),
1214 GNUNET_NO); 1226 1,
1215 return; 1227 GNUNET_NO);
1216 } 1228 return;
1217 unsigned int queue_size = GNUNET_MQ_get_length(cp->mq); 1229 }
1230 unsigned int queue_size = GNUNET_MQ_get_length (cp->mq);
1218 queue_size += cp->ppd.pending_replies + cp->delay_queue_size; 1231 queue_size += cp->ppd.pending_replies + cp->delay_queue_size;
1219 if (queue_size > MAX_QUEUE_PER_PEER) 1232 if (queue_size > MAX_QUEUE_PER_PEER)
1220 { 1233 {
1221 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1234 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222 "Peer `%s' has too many replies queued already. Dropping query.\n", 1235 "Peer `%s' has too many replies queued already. Dropping query.\n",
1223 GNUNET_i2s(cps->ppd.peer)); 1236 GNUNET_i2s (cps->ppd.peer));
1224 GNUNET_STATISTICS_update(GSF_stats, 1237 GNUNET_STATISTICS_update (GSF_stats,
1225 gettext_noop("# requests dropped due to full reply queue"), 1238 gettext_noop (
1226 1, 1239 "# requests dropped due to full reply queue"),
1227 GNUNET_NO); 1240 1,
1228 return; 1241 GNUNET_NO);
1229 } 1242 return;
1243 }
1230 /* note that we can really only check load here since otherwise 1244 /* note that we can really only check load here since otherwise
1231 * peers could find out that we are overloaded by not being 1245 * peers could find out that we are overloaded by not being
1232 * disconnected after sending us a malformed query... */ 1246 * disconnected after sending us a malformed query... */
1233 tec.priority = bound_priority(ntohl(gm->priority), 1247 tec.priority = bound_priority (ntohl (gm->priority),
1234 cps); 1248 cps);
1235 if (tec.priority < 0) 1249 if (tec.priority < 0)
1236 { 1250 {
1237 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1251 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1238 "Dropping query from `%s', this peer is too busy.\n", 1252 "Dropping query from `%s', this peer is too busy.\n",
1239 GNUNET_i2s(cps->ppd.peer)); 1253 GNUNET_i2s (cps->ppd.peer));
1240 return; 1254 return;
1241 } 1255 }
1242 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1256 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1243 "Received request for `%s' of type %u from peer `%s' with flags %u\n", 1257 "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1244 GNUNET_h2s(&gm->query), 1258 GNUNET_h2s (&gm->query),
1245 (unsigned int)tec.type, 1259 (unsigned int) tec.type,
1246 GNUNET_i2s(cps->ppd.peer), 1260 GNUNET_i2s (cps->ppd.peer),
1247 (unsigned int)bm); 1261 (unsigned int) bm);
1248 target = 1262 target =
1249 (0 != 1263 (0 !=
1250 (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL; 1264 (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1251 options = GSF_PRO_DEFAULTS; 1265 options = GSF_PRO_DEFAULTS;
1252 spid = 0; 1266 spid = 0;
1253 if ((GNUNET_LOAD_get_load(cp->ppd.transmission_delay) > 3 * (1 + tec.priority)) 1267 if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1
1254 || (GNUNET_LOAD_get_average(cp->ppd.transmission_delay) > 1268 + tec.priority))
1255 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 + 1269 || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) >
1256 GNUNET_LOAD_get_average(GSF_rt_entry_lifetime))) 1270 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2
1257 { 1271 + GNUNET_LOAD_get_average (GSF_rt_entry_lifetime)))
1258 /* don't have BW to send to peer, or would likely take longer than we have for it, 1272 {
1259 * so at best indirect the query */ 1273 /* don't have BW to send to peer, or would likely take longer than we have for it,
1260 tec.priority = 0; 1274 * so at best indirect the query */
1261 options |= GSF_PRO_FORWARD_ONLY; 1275 tec.priority = 0;
1262 spid = GNUNET_PEER_intern(cps->ppd.peer); 1276 options |= GSF_PRO_FORWARD_ONLY;
1263 GNUNET_assert(0 != spid); 1277 spid = GNUNET_PEER_intern (cps->ppd.peer);
1264 } 1278 GNUNET_assert (0 != spid);
1265 tec.ttl = bound_ttl(ntohl(gm->ttl), 1279 }
1266 tec.priority); 1280 tec.ttl = bound_ttl (ntohl (gm->ttl),
1281 tec.priority);
1267 /* decrement ttl (always) */ 1282 /* decrement ttl (always) */
1268 ttl_decrement = 1283 ttl_decrement =
1269 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 1284 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1270 TTL_DECREMENT); 1285 TTL_DECREMENT);
1271 if ((tec.ttl < 0) && 1286 if ((tec.ttl < 0) &&
1272 (((int32_t)(tec.ttl - ttl_decrement)) > 0)) 1287 (((int32_t) (tec.ttl - ttl_decrement)) > 0))
1273 { 1288 {
1274 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1275 "Dropping query from `%s' due to TTL underflow (%d - %u).\n", 1290 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1276 GNUNET_i2s(cps->ppd.peer), 1291 GNUNET_i2s (cps->ppd.peer),
1277 tec.ttl, 1292 tec.ttl,
1278 ttl_decrement); 1293 ttl_decrement);
1279 GNUNET_STATISTICS_update(GSF_stats, 1294 GNUNET_STATISTICS_update (GSF_stats,
1280 gettext_noop 1295 gettext_noop
1281 ("# requests dropped due TTL underflow"), 1, 1296 ("# requests dropped due TTL underflow"), 1,
1282 GNUNET_NO); 1297 GNUNET_NO);
1283 /* integer underflow => drop (should be very rare)! */ 1298 /* integer underflow => drop (should be very rare)! */
1284 return; 1299 return;
1285 } 1300 }
1286 tec.ttl -= ttl_decrement; 1301 tec.ttl -= ttl_decrement;
1287 1302
1288 /* test if the request already exists */ 1303 /* test if the request already exists */
1289 tec.finished = GNUNET_NO; 1304 tec.finished = GNUNET_NO;
1290 GNUNET_CONTAINER_multihashmap_get_multiple(cp->request_map, 1305 GNUNET_CONTAINER_multihashmap_get_multiple (cp->request_map,
1291 &gm->query, 1306 &gm->query,
1292 &test_exist_cb, 1307 &test_exist_cb,
1293 &tec); 1308 &tec);
1294 if (GNUNET_YES == tec.finished) 1309 if (GNUNET_YES == tec.finished)
1295 return; /* merged into existing request, we're done */ 1310 return; /* merged into existing request, we're done */
1296 1311
1297 peerreq = GNUNET_new(struct PeerRequest); 1312 peerreq = GNUNET_new (struct PeerRequest);
1298 peerreq->cp = cp; 1313 peerreq->cp = cp;
1299 pr = GSF_pending_request_create_(options, 1314 pr = GSF_pending_request_create_ (options,
1300 tec.type, 1315 tec.type,
1301 &gm->query, 1316 &gm->query,
1302 target, 1317 target,
1303 (bfsize > 0) 1318 (bfsize > 0)
1304 ? (const char *)&opt[bits] 1319 ? (const char *) &opt[bits]
1305 : NULL, 1320 : NULL,
1306 bfsize, 1321 bfsize,
1307 ntohl(gm->filter_mutator), 1322 ntohl (gm->filter_mutator),
1308 1 /* anonymity */, 1323 1 /* anonymity */,
1309 (uint32_t)tec.priority, 1324 (uint32_t) tec.priority,
1310 tec.ttl, 1325 tec.ttl,
1311 spid, 1326 spid,
1312 GNUNET_PEER_intern(cps->ppd.peer), 1327 GNUNET_PEER_intern (cps->ppd.peer),
1313 NULL, 0, /* replies_seen */ 1328 NULL, 0, /* replies_seen */
1314 &handle_p2p_reply, 1329 &handle_p2p_reply,
1315 peerreq); 1330 peerreq);
1316 GNUNET_assert(NULL != pr); 1331 GNUNET_assert (NULL != pr);
1317 prd = GSF_pending_request_get_data_(pr); 1332 prd = GSF_pending_request_get_data_ (pr);
1318 peerreq->pr = pr; 1333 peerreq->pr = pr;
1319 GNUNET_break(GNUNET_OK == 1334 GNUNET_break (GNUNET_OK ==
1320 GNUNET_CONTAINER_multihashmap_put(cp->request_map, 1335 GNUNET_CONTAINER_multihashmap_put (cp->request_map,
1321 &prd->query, 1336 &prd->query,
1322 peerreq, 1337 peerreq,
1323 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 1338 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1324 GNUNET_STATISTICS_update(GSF_stats, 1339 GNUNET_STATISTICS_update (GSF_stats,
1325 gettext_noop("# P2P query messages received and processed"), 1340 gettext_noop (
1326 1, 1341 "# P2P query messages received and processed"),
1327 GNUNET_NO); 1342 1,
1328 GNUNET_STATISTICS_update(GSF_stats, 1343 GNUNET_NO);
1329 gettext_noop("# P2P searches active"), 1344 GNUNET_STATISTICS_update (GSF_stats,
1330 1, 1345 gettext_noop ("# P2P searches active"),
1331 GNUNET_NO); 1346 1,
1332 GSF_pending_request_get_data_(pr)->has_started = GNUNET_YES; 1347 GNUNET_NO);
1333 GSF_local_lookup_(pr, 1348 GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
1334 &GSF_consider_forwarding, 1349 GSF_local_lookup_ (pr,
1335 NULL); 1350 &GSF_consider_forwarding,
1351 NULL);
1336} 1352}
1337 1353
1338 1354
@@ -1349,17 +1365,17 @@ handle_p2p_get(void *cls,
1349 * @param env message to send 1365 * @param env message to send
1350 */ 1366 */
1351void 1367void
1352GSF_peer_transmit_(struct GSF_ConnectedPeer *cp, 1368GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1353 int is_query, 1369 int is_query,
1354 uint32_t priority, 1370 uint32_t priority,
1355 struct GNUNET_MQ_Envelope *env) 1371 struct GNUNET_MQ_Envelope *env)
1356{ 1372{
1357 struct GSF_PeerTransmitHandle *pth; 1373 struct GSF_PeerTransmitHandle *pth;
1358 struct GSF_PeerTransmitHandle *pos; 1374 struct GSF_PeerTransmitHandle *pos;
1359 struct GSF_PeerTransmitHandle *prev; 1375 struct GSF_PeerTransmitHandle *prev;
1360 1376
1361 pth = GNUNET_new(struct GSF_PeerTransmitHandle); 1377 pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1362 pth->transmission_request_start_time = GNUNET_TIME_absolute_get(); 1378 pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1363 pth->env = env; 1379 pth->env = env;
1364 pth->is_query = is_query; 1380 pth->is_query = is_query;
1365 pth->priority = priority; 1381 pth->priority = priority;
@@ -1368,19 +1384,19 @@ GSF_peer_transmit_(struct GSF_ConnectedPeer *cp,
1368 prev = NULL; 1384 prev = NULL;
1369 pos = cp->pth_head; 1385 pos = cp->pth_head;
1370 while ((NULL != pos) && (pos->priority > priority)) 1386 while ((NULL != pos) && (pos->priority > priority))
1371 { 1387 {
1372 prev = pos; 1388 prev = pos;
1373 pos = pos->next; 1389 pos = pos->next;
1374 } 1390 }
1375 GNUNET_CONTAINER_DLL_insert_after(cp->pth_head, 1391 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head,
1376 cp->pth_tail, 1392 cp->pth_tail,
1377 prev, 1393 prev,
1378 pth); 1394 pth);
1379 if (GNUNET_YES == is_query) 1395 if (GNUNET_YES == is_query)
1380 cp->ppd.pending_queries++; 1396 cp->ppd.pending_queries++;
1381 else if (GNUNET_NO == is_query) 1397 else if (GNUNET_NO == is_query)
1382 cp->ppd.pending_replies++; 1398 cp->ppd.pending_replies++;
1383 schedule_transmission(pth); 1399 schedule_transmission (pth);
1384} 1400}
1385 1401
1386 1402
@@ -1392,19 +1408,19 @@ GSF_peer_transmit_(struct GSF_ConnectedPeer *cp,
1392 * @param request_priority priority of the original request 1408 * @param request_priority priority of the original request
1393 */ 1409 */
1394void 1410void
1395GSF_peer_update_performance_(struct GSF_ConnectedPeer *cp, 1411GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1396 struct GNUNET_TIME_Absolute request_time, 1412 struct GNUNET_TIME_Absolute request_time,
1397 uint32_t request_priority) 1413 uint32_t request_priority)
1398{ 1414{
1399 struct GNUNET_TIME_Relative delay; 1415 struct GNUNET_TIME_Relative delay;
1400 1416
1401 delay = GNUNET_TIME_absolute_get_duration(request_time); 1417 delay = GNUNET_TIME_absolute_get_duration (request_time);
1402 cp->ppd.avg_reply_delay.rel_value_us = 1418 cp->ppd.avg_reply_delay.rel_value_us =
1403 (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) + 1419 (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1)
1404 delay.rel_value_us) / RUNAVG_DELAY_N; 1420 + delay.rel_value_us) / RUNAVG_DELAY_N;
1405 cp->ppd.avg_priority = 1421 cp->ppd.avg_priority =
1406 (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) + 1422 (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1)
1407 request_priority) / RUNAVG_DELAY_N; 1423 + request_priority) / RUNAVG_DELAY_N;
1408} 1424}
1409 1425
1410 1426
@@ -1416,11 +1432,11 @@ GSF_peer_update_performance_(struct GSF_ConnectedPeer *cp,
1416 * @param initiator_client local client on responsible for query 1432 * @param initiator_client local client on responsible for query
1417 */ 1433 */
1418void 1434void
1419GSF_peer_update_responder_client_(struct GSF_ConnectedPeer *cp, 1435GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1420 struct GSF_LocalClient *initiator_client) 1436 struct GSF_LocalClient *initiator_client)
1421{ 1437{
1422 cp->ppd.last_client_replies[cp->last_client_replies_woff++ % 1438 cp->ppd.last_client_replies[cp->last_client_replies_woff++
1423 CS2P_SUCCESS_LIST_SIZE] = initiator_client; 1439 % CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1424} 1440}
1425 1441
1426 1442
@@ -1432,15 +1448,15 @@ GSF_peer_update_responder_client_(struct GSF_ConnectedPeer *cp,
1432 * @param initiator_peer other peer responsible for query 1448 * @param initiator_peer other peer responsible for query
1433 */ 1449 */
1434void 1450void
1435GSF_peer_update_responder_peer_(struct GSF_ConnectedPeer *cp, 1451GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1436 const struct GSF_ConnectedPeer *initiator_peer) 1452 const struct GSF_ConnectedPeer *initiator_peer)
1437{ 1453{
1438 unsigned int woff; 1454 unsigned int woff;
1439 1455
1440 woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE; 1456 woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1441 GNUNET_PEER_change_rc(cp->ppd.last_p2p_replies[woff], -1); 1457 GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1);
1442 cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid; 1458 cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1443 GNUNET_PEER_change_rc(initiator_peer->ppd.pid, 1); 1459 GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1);
1444 cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE; 1460 cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1445} 1461}
1446 1462
@@ -1454,23 +1470,23 @@ GSF_peer_update_responder_peer_(struct GSF_ConnectedPeer *cp,
1454 * @return #GNUNET_OK to continue iteration 1470 * @return #GNUNET_OK to continue iteration
1455 */ 1471 */
1456static int 1472static int
1457flush_respect(void *cls, 1473flush_respect (void *cls,
1458 const struct GNUNET_PeerIdentity *key, 1474 const struct GNUNET_PeerIdentity *key,
1459 void *value) 1475 void *value)
1460{ 1476{
1461 struct GSF_ConnectedPeer *cp = value; 1477 struct GSF_ConnectedPeer *cp = value;
1462 struct GNUNET_PeerIdentity pid; 1478 struct GNUNET_PeerIdentity pid;
1463 1479
1464 if (cp->ppd.respect == cp->disk_respect) 1480 if (cp->ppd.respect == cp->disk_respect)
1465 return GNUNET_OK; /* unchanged */ 1481 return GNUNET_OK; /* unchanged */
1466 GNUNET_assert(0 != cp->ppd.pid); 1482 GNUNET_assert (0 != cp->ppd.pid);
1467 GNUNET_PEER_resolve(cp->ppd.pid, &pid); 1483 GNUNET_PEER_resolve (cp->ppd.pid, &pid);
1468 GNUNET_PEERSTORE_store(peerstore, "fs", &pid, "respect", &cp->ppd.respect, 1484 GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1469 sizeof(cp->ppd.respect), 1485 sizeof(cp->ppd.respect),
1470 GNUNET_TIME_UNIT_FOREVER_ABS, 1486 GNUNET_TIME_UNIT_FOREVER_ABS,
1471 GNUNET_PEERSTORE_STOREOPTION_REPLACE, 1487 GNUNET_PEERSTORE_STOREOPTION_REPLACE,
1472 NULL, 1488 NULL,
1473 NULL); 1489 NULL);
1474 return GNUNET_OK; 1490 return GNUNET_OK;
1475} 1491}
1476 1492
@@ -1484,9 +1500,9 @@ flush_respect(void *cls,
1484 * @param internal_cls the corresponding `struct GSF_ConnectedPeer` 1500 * @param internal_cls the corresponding `struct GSF_ConnectedPeer`
1485 */ 1501 */
1486void 1502void
1487GSF_peer_disconnect_handler(void *cls, 1503GSF_peer_disconnect_handler (void *cls,
1488 const struct GNUNET_PeerIdentity *peer, 1504 const struct GNUNET_PeerIdentity *peer,
1489 void *internal_cls) 1505 void *internal_cls)
1490{ 1506{
1491 struct GSF_ConnectedPeer *cp = internal_cls; 1507 struct GSF_ConnectedPeer *cp = internal_cls;
1492 struct GSF_PeerTransmitHandle *pth; 1508 struct GSF_PeerTransmitHandle *pth;
@@ -1495,82 +1511,83 @@ GSF_peer_disconnect_handler(void *cls,
1495 if (NULL == cp) 1511 if (NULL == cp)
1496 return; /* must have been disconnect from core with 1512 return; /* must have been disconnect from core with
1497 * 'peer' == my_id, ignore */ 1513 * 'peer' == my_id, ignore */
1498 flush_respect(NULL, 1514 flush_respect (NULL,
1499 peer, 1515 peer,
1500 cp); 1516 cp);
1501 GNUNET_assert(GNUNET_YES == 1517 GNUNET_assert (GNUNET_YES ==
1502 GNUNET_CONTAINER_multipeermap_remove(cp_map, 1518 GNUNET_CONTAINER_multipeermap_remove (cp_map,
1503 peer, 1519 peer,
1504 cp)); 1520 cp));
1505 GNUNET_STATISTICS_set(GSF_stats, 1521 GNUNET_STATISTICS_set (GSF_stats,
1506 gettext_noop("# peers connected"), 1522 gettext_noop ("# peers connected"),
1507 GNUNET_CONTAINER_multipeermap_size(cp_map), 1523 GNUNET_CONTAINER_multipeermap_size (cp_map),
1508 GNUNET_NO); 1524 GNUNET_NO);
1509 if (NULL != cp->respect_iterate_req) 1525 if (NULL != cp->respect_iterate_req)
1510 { 1526 {
1511 GNUNET_PEERSTORE_iterate_cancel(cp->respect_iterate_req); 1527 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1512 cp->respect_iterate_req = NULL; 1528 cp->respect_iterate_req = NULL;
1513 } 1529 }
1514 if (NULL != cp->rc) 1530 if (NULL != cp->rc)
1515 { 1531 {
1516 GNUNET_ATS_reserve_bandwidth_cancel(cp->rc); 1532 GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
1517 cp->rc = NULL; 1533 cp->rc = NULL;
1518 } 1534 }
1519 if (NULL != cp->rc_delay_task) 1535 if (NULL != cp->rc_delay_task)
1520 { 1536 {
1521 GNUNET_SCHEDULER_cancel(cp->rc_delay_task); 1537 GNUNET_SCHEDULER_cancel (cp->rc_delay_task);
1522 cp->rc_delay_task = NULL; 1538 cp->rc_delay_task = NULL;
1523 } 1539 }
1524 GNUNET_CONTAINER_multihashmap_iterate(cp->request_map, 1540 GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
1525 &cancel_pending_request, 1541 &cancel_pending_request,
1526 cp); 1542 cp);
1527 GNUNET_CONTAINER_multihashmap_destroy(cp->request_map); 1543 GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
1528 cp->request_map = NULL; 1544 cp->request_map = NULL;
1529 GSF_plan_notify_peer_disconnect_(cp); 1545 GSF_plan_notify_peer_disconnect_ (cp);
1530 GNUNET_LOAD_value_free(cp->ppd.transmission_delay); 1546 GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
1531 GNUNET_PEER_decrement_rcs(cp->ppd.last_p2p_replies, 1547 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies,
1532 P2P_SUCCESS_LIST_SIZE); 1548 P2P_SUCCESS_LIST_SIZE);
1533 memset(cp->ppd.last_p2p_replies, 1549 memset (cp->ppd.last_p2p_replies,
1534 0, 1550 0,
1535 sizeof(cp->ppd.last_p2p_replies)); 1551 sizeof(cp->ppd.last_p2p_replies));
1536 GSF_push_stop_(cp); 1552 GSF_push_stop_ (cp);
1537 while (NULL != (pth = cp->pth_head)) 1553 while (NULL != (pth = cp->pth_head))
1538 { 1554 {
1539 GNUNET_CONTAINER_DLL_remove(cp->pth_head, 1555 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1540 cp->pth_tail, 1556 cp->pth_tail,
1541 pth); 1557 pth);
1542 if (GNUNET_YES == pth->is_query) 1558 if (GNUNET_YES == pth->is_query)
1543 GNUNET_assert(0 < cp->ppd.pending_queries--); 1559 GNUNET_assert (0 < cp->ppd.pending_queries--);
1544 else if (GNUNET_NO == pth->is_query) 1560 else if (GNUNET_NO == pth->is_query)
1545 GNUNET_assert(0 < cp->ppd.pending_replies--); 1561 GNUNET_assert (0 < cp->ppd.pending_replies--);
1546 GNUNET_free(pth); 1562 GNUNET_free (pth);
1547 } 1563 }
1548 while (NULL != (dh = cp->delayed_head)) 1564 while (NULL != (dh = cp->delayed_head))
1549 { 1565 {
1550 GNUNET_CONTAINER_DLL_remove(cp->delayed_head, 1566 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1551 cp->delayed_tail, 1567 cp->delayed_tail,
1552 dh); 1568 dh);
1553 GNUNET_MQ_discard(dh->env); 1569 GNUNET_MQ_discard (dh->env);
1554 cp->delay_queue_size--; 1570 cp->delay_queue_size--;
1555 GNUNET_SCHEDULER_cancel(dh->delay_task); 1571 GNUNET_SCHEDULER_cancel (dh->delay_task);
1556 GNUNET_free(dh); 1572 GNUNET_free (dh);
1557 } 1573 }
1558 GNUNET_PEER_change_rc(cp->ppd.pid, -1); 1574 GNUNET_PEER_change_rc (cp->ppd.pid, -1);
1559 if (NULL != cp->mig_revive_task) 1575 if (NULL != cp->mig_revive_task)
1560 { 1576 {
1561 GNUNET_SCHEDULER_cancel(cp->mig_revive_task); 1577 GNUNET_SCHEDULER_cancel (cp->mig_revive_task);
1562 cp->mig_revive_task = NULL; 1578 cp->mig_revive_task = NULL;
1563 } 1579 }
1564 GNUNET_break(0 == cp->ppd.pending_queries); 1580 GNUNET_break (0 == cp->ppd.pending_queries);
1565 GNUNET_break(0 == cp->ppd.pending_replies); 1581 GNUNET_break (0 == cp->ppd.pending_replies);
1566 GNUNET_free(cp); 1582 GNUNET_free (cp);
1567} 1583}
1568 1584
1569 1585
1570/** 1586/**
1571 * Closure for #call_iterator(). 1587 * Closure for #call_iterator().
1572 */ 1588 */
1573struct IterationContext { 1589struct IterationContext
1590{
1574 /** 1591 /**
1575 * Function to call on each entry. 1592 * Function to call on each entry.
1576 */ 1593 */
@@ -1592,16 +1609,16 @@ struct IterationContext {
1592 * @return #GNUNET_YES to continue iteration 1609 * @return #GNUNET_YES to continue iteration
1593 */ 1610 */
1594static int 1611static int
1595call_iterator(void *cls, 1612call_iterator (void *cls,
1596 const struct GNUNET_PeerIdentity *key, 1613 const struct GNUNET_PeerIdentity *key,
1597 void *value) 1614 void *value)
1598{ 1615{
1599 struct IterationContext *ic = cls; 1616 struct IterationContext *ic = cls;
1600 struct GSF_ConnectedPeer *cp = value; 1617 struct GSF_ConnectedPeer *cp = value;
1601 1618
1602 ic->it(ic->it_cls, 1619 ic->it (ic->it_cls,
1603 key, cp, 1620 key, cp,
1604 &cp->ppd); 1621 &cp->ppd);
1605 return GNUNET_YES; 1622 return GNUNET_YES;
1606} 1623}
1607 1624
@@ -1613,16 +1630,16 @@ call_iterator(void *cls,
1613 * @param it_cls closure for @a it 1630 * @param it_cls closure for @a it
1614 */ 1631 */
1615void 1632void
1616GSF_iterate_connected_peers_(GSF_ConnectedPeerIterator it, 1633GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1617 void *it_cls) 1634 void *it_cls)
1618{ 1635{
1619 struct IterationContext ic; 1636 struct IterationContext ic;
1620 1637
1621 ic.it = it; 1638 ic.it = it;
1622 ic.it_cls = it_cls; 1639 ic.it_cls = it_cls;
1623 GNUNET_CONTAINER_multipeermap_iterate(cp_map, 1640 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1624 &call_iterator, 1641 &call_iterator,
1625 &ic); 1642 &ic);
1626} 1643}
1627 1644
1628 1645
@@ -1633,11 +1650,11 @@ GSF_iterate_connected_peers_(GSF_ConnectedPeerIterator it,
1633 * @param id identity to set (written to) 1650 * @param id identity to set (written to)
1634 */ 1651 */
1635void 1652void
1636GSF_connected_peer_get_identity_(const struct GSF_ConnectedPeer *cp, 1653GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1637 struct GNUNET_PeerIdentity *id) 1654 struct GNUNET_PeerIdentity *id)
1638{ 1655{
1639 GNUNET_assert(0 != cp->ppd.pid); 1656 GNUNET_assert (0 != cp->ppd.pid);
1640 GNUNET_PEER_resolve(cp->ppd.pid, id); 1657 GNUNET_PEER_resolve (cp->ppd.pid, id);
1641} 1658}
1642 1659
1643 1660
@@ -1648,10 +1665,10 @@ GSF_connected_peer_get_identity_(const struct GSF_ConnectedPeer *cp,
1648 * @return reference to peer identity, valid until peer disconnects (!) 1665 * @return reference to peer identity, valid until peer disconnects (!)
1649 */ 1666 */
1650const struct GNUNET_PeerIdentity * 1667const struct GNUNET_PeerIdentity *
1651GSF_connected_peer_get_identity2_(const struct GSF_ConnectedPeer *cp) 1668GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1652{ 1669{
1653 GNUNET_assert(0 != cp->ppd.pid); 1670 GNUNET_assert (0 != cp->ppd.pid);
1654 return GNUNET_PEER_resolve2(cp->ppd.pid); 1671 return GNUNET_PEER_resolve2 (cp->ppd.pid);
1655} 1672}
1656 1673
1657 1674
@@ -1663,38 +1680,41 @@ GSF_connected_peer_get_identity2_(const struct GSF_ConnectedPeer *cp)
1663 * @param block_time until when to block 1680 * @param block_time until when to block
1664 */ 1681 */
1665void 1682void
1666GSF_block_peer_migration_(struct GSF_ConnectedPeer *cp, 1683GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1667 struct GNUNET_TIME_Absolute block_time) 1684 struct GNUNET_TIME_Absolute block_time)
1668{ 1685{
1669 struct GNUNET_MQ_Envelope *env; 1686 struct GNUNET_MQ_Envelope *env;
1670 struct MigrationStopMessage *msm; 1687 struct MigrationStopMessage *msm;
1671 1688
1672 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us) 1689 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1673 { 1690 {
1674 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 1691 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1675 "Migration already blocked for another %s\n", 1692 "Migration already blocked for another %s\n",
1676 GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining 1693 GNUNET_STRINGS_relative_time_to_string (
1677 (cp->last_migration_block), GNUNET_YES)); 1694 GNUNET_TIME_absolute_get_remaining
1678 return; /* already blocked */ 1695 (cp->
1679 } 1696 last_migration_block), GNUNET_YES));
1680 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n", 1697 return; /* already blocked */
1681 GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining(block_time), 1698 }
1682 GNUNET_YES)); 1699 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1700 GNUNET_STRINGS_relative_time_to_string (
1701 GNUNET_TIME_absolute_get_remaining (block_time),
1702 GNUNET_YES));
1683 cp->last_migration_block = block_time; 1703 cp->last_migration_block = block_time;
1684 env = GNUNET_MQ_msg(msm, 1704 env = GNUNET_MQ_msg (msm,
1685 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); 1705 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1686 msm->reserved = htonl(0); 1706 msm->reserved = htonl (0);
1687 msm->duration 1707 msm->duration
1688 = GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining 1708 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1689 (cp->last_migration_block)); 1709 (cp->last_migration_block));
1690 GNUNET_STATISTICS_update(GSF_stats, 1710 GNUNET_STATISTICS_update (GSF_stats,
1691 gettext_noop("# migration stop messages sent"), 1711 gettext_noop ("# migration stop messages sent"),
1692 1, 1712 1,
1693 GNUNET_NO); 1713 GNUNET_NO);
1694 GSF_peer_transmit_(cp, 1714 GSF_peer_transmit_ (cp,
1695 GNUNET_SYSERR, 1715 GNUNET_SYSERR,
1696 UINT32_MAX, 1716 UINT32_MAX,
1697 env); 1717 env);
1698} 1718}
1699 1719
1700 1720
@@ -1708,8 +1728,8 @@ GSF_block_peer_migration_(struct GSF_ConnectedPeer *cp,
1708 * @param pref preference change 1728 * @param pref preference change
1709 */ 1729 */
1710void 1730void
1711GSF_connected_peer_change_preference_(struct GSF_ConnectedPeer *cp, 1731GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1712 uint64_t pref) 1732 uint64_t pref)
1713{ 1733{
1714 cp->inc_preference += pref; 1734 cp->inc_preference += pref;
1715} 1735}
@@ -1721,15 +1741,16 @@ GSF_connected_peer_change_preference_(struct GSF_ConnectedPeer *cp,
1721 * @param cls closure, not used 1741 * @param cls closure, not used
1722 */ 1742 */
1723static void 1743static void
1724cron_flush_respect(void *cls) 1744cron_flush_respect (void *cls)
1725{ 1745{
1726 fr_task = NULL; 1746 fr_task = NULL;
1727 GNUNET_CONTAINER_multipeermap_iterate(cp_map, 1747 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1728 &flush_respect, 1748 &flush_respect,
1729 NULL); 1749 NULL);
1730 fr_task = GNUNET_SCHEDULER_add_delayed_with_priority(RESPECT_FLUSH_FREQ, 1750 fr_task = GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ,
1731 GNUNET_SCHEDULER_PRIORITY_HIGH, 1751 GNUNET_SCHEDULER_PRIORITY_HIGH,
1732 &cron_flush_respect, NULL); 1752 &cron_flush_respect,
1753 NULL);
1733} 1754}
1734 1755
1735 1756
@@ -1737,12 +1758,12 @@ cron_flush_respect(void *cls)
1737 * Initialize peer management subsystem. 1758 * Initialize peer management subsystem.
1738 */ 1759 */
1739void 1760void
1740GSF_connected_peer_init_() 1761GSF_connected_peer_init_ ()
1741{ 1762{
1742 cp_map = GNUNET_CONTAINER_multipeermap_create(128, GNUNET_YES); 1763 cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
1743 peerstore = GNUNET_PEERSTORE_connect(GSF_cfg); 1764 peerstore = GNUNET_PEERSTORE_connect (GSF_cfg);
1744 fr_task = GNUNET_SCHEDULER_add_with_priority(GNUNET_SCHEDULER_PRIORITY_HIGH, 1765 fr_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
1745 &cron_flush_respect, NULL); 1766 &cron_flush_respect, NULL);
1746} 1767}
1747 1768
1748 1769
@@ -1750,17 +1771,17 @@ GSF_connected_peer_init_()
1750 * Shutdown peer management subsystem. 1771 * Shutdown peer management subsystem.
1751 */ 1772 */
1752void 1773void
1753GSF_connected_peer_done_() 1774GSF_connected_peer_done_ ()
1754{ 1775{
1755 GNUNET_CONTAINER_multipeermap_iterate(cp_map, 1776 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1756 &flush_respect, 1777 &flush_respect,
1757 NULL); 1778 NULL);
1758 GNUNET_SCHEDULER_cancel(fr_task); 1779 GNUNET_SCHEDULER_cancel (fr_task);
1759 fr_task = NULL; 1780 fr_task = NULL;
1760 GNUNET_CONTAINER_multipeermap_destroy(cp_map); 1781 GNUNET_CONTAINER_multipeermap_destroy (cp_map);
1761 cp_map = NULL; 1782 cp_map = NULL;
1762 GNUNET_PEERSTORE_disconnect(peerstore, 1783 GNUNET_PEERSTORE_disconnect (peerstore,
1763 GNUNET_YES); 1784 GNUNET_YES);
1764} 1785}
1765 1786
1766 1787
@@ -1773,9 +1794,9 @@ GSF_connected_peer_done_()
1773 * @return #GNUNET_YES (we should continue to iterate) 1794 * @return #GNUNET_YES (we should continue to iterate)
1774 */ 1795 */
1775static int 1796static int
1776clean_local_client(void *cls, 1797clean_local_client (void *cls,
1777 const struct GNUNET_PeerIdentity *key, 1798 const struct GNUNET_PeerIdentity *key,
1778 void *value) 1799 void *value)
1779{ 1800{
1780 const struct GSF_LocalClient *lc = cls; 1801 const struct GSF_LocalClient *lc = cls;
1781 struct GSF_ConnectedPeer *cp = value; 1802 struct GSF_ConnectedPeer *cp = value;
@@ -1795,13 +1816,13 @@ clean_local_client(void *cls,
1795 * @param lc handle to the local client (henceforth invalid) 1816 * @param lc handle to the local client (henceforth invalid)
1796 */ 1817 */
1797void 1818void
1798GSF_handle_local_client_disconnect_(const struct GSF_LocalClient *lc) 1819GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
1799{ 1820{
1800 if (NULL == cp_map) 1821 if (NULL == cp_map)
1801 return; /* already cleaned up */ 1822 return; /* already cleaned up */
1802 GNUNET_CONTAINER_multipeermap_iterate(cp_map, 1823 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1803 &clean_local_client, 1824 &clean_local_client,
1804 (void *)lc); 1825 (void *) lc);
1805} 1826}
1806 1827
1807 1828