aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c1451
1 files changed, 717 insertions, 734 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index a3a521221..6dab37869 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -11,12 +11,12 @@
11 WITHOUT ANY WARRANTY; without even the implied warranty of 11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details. 13 Affero General Public License for more details.
14 14
15 You should have received a copy of the GNU Affero General Public License 15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
18 SPDX-License-Identifier: AGPL3.0-or-later 18 SPDX-License-Identifier: AGPL3.0-or-later
19*/ 19 */
20/** 20/**
21 * @file fs/gnunet-service-fs_cp.c 21 * @file fs/gnunet-service-fs_cp.c
22 * @brief API to handle 'connected peers' 22 * @brief API to handle 'connected peers'
@@ -43,12 +43,12 @@
43/** 43/**
44 * How often do we flush respect values to disk? 44 * How often do we flush respect values to disk?
45 */ 45 */
46#define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5) 46#define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
47 47
48/** 48/**
49 * After how long do we discard a reply? 49 * After how long do we discard a reply?
50 */ 50 */
51#define REPLY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2) 51#define REPLY_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
52 52
53/** 53/**
54 * Collect an instane number of statistics? May cause excessive IPC. 54 * Collect an instane number of statistics? May cause excessive IPC.
@@ -59,9 +59,7 @@
59/** 59/**
60 * Handle to cancel a transmission request. 60 * Handle to cancel a transmission request.
61 */ 61 */
62struct GSF_PeerTransmitHandle 62struct GSF_PeerTransmitHandle {
63{
64
65 /** 63 /**
66 * Kept in a doubly-linked list. 64 * Kept in a doubly-linked list.
67 */ 65 */
@@ -101,16 +99,13 @@ struct GSF_PeerTransmitHandle
101 * Priority of this request. 99 * Priority of this request.
102 */ 100 */
103 uint32_t priority; 101 uint32_t priority;
104
105}; 102};
106 103
107 104
108/** 105/**
109 * Handle for an entry in our delay list. 106 * Handle for an entry in our delay list.
110 */ 107 */
111struct GSF_DelayedHandle 108struct GSF_DelayedHandle {
112{
113
114 /** 109 /**
115 * Kept in a doubly-linked list. 110 * Kept in a doubly-linked list.
116 */ 111 */
@@ -140,16 +135,13 @@ struct GSF_DelayedHandle
140 * Size of the message. 135 * Size of the message.
141 */ 136 */
142 size_t msize; 137 size_t msize;
143
144}; 138};
145 139
146 140
147/** 141/**
148 * Information per peer and request. 142 * Information per peer and request.
149 */ 143 */
150struct PeerRequest 144struct PeerRequest {
151{
152
153 /** 145 /**
154 * Handle to generic request (generic: from peer or local client). 146 * Handle to generic request (generic: from peer or local client).
155 */ 147 */
@@ -164,16 +156,13 @@ struct PeerRequest
164 * Task for asynchronous stopping of this request. 156 * Task for asynchronous stopping of this request.
165 */ 157 */
166 struct GNUNET_SCHEDULER_Task *kill_task; 158 struct GNUNET_SCHEDULER_Task *kill_task;
167
168}; 159};
169 160
170 161
171/** 162/**
172 * A connected peer. 163 * A connected peer.
173 */ 164 */
174struct GSF_ConnectedPeer 165struct GSF_ConnectedPeer {
175{
176
177 /** 166 /**
178 * Performance data for this peer. 167 * Performance data for this peer.
179 */ 168 */
@@ -278,7 +267,6 @@ struct GSF_ConnectedPeer
278 * Handle to the PEERSTORE iterate request for peer respect value 267 * Handle to the PEERSTORE iterate request for peer respect value
279 */ 268 */
280 struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req; 269 struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
281
282}; 270};
283 271
284 272
@@ -305,16 +293,16 @@ static struct GNUNET_SCHEDULER_Task *fr_task;
305 * @param latency current latency value 293 * @param latency current latency value
306 */ 294 */
307void 295void
308GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id, 296GSF_update_peer_latency_(const struct GNUNET_PeerIdentity *id,
309 struct GNUNET_TIME_Relative latency) 297 struct GNUNET_TIME_Relative latency)
310{ 298{
311 struct GSF_ConnectedPeer *cp; 299 struct GSF_ConnectedPeer *cp;
312 300
313 cp = GSF_peer_get_ (id); 301 cp = GSF_peer_get_(id);
314 if (NULL == cp) 302 if (NULL == cp)
315 return; /* we're not yet connected at the core level, ignore */ 303 return; /* we're not yet connected at the core level, ignore */
316 GNUNET_LOAD_value_set_decline (cp->ppd.transmission_delay, 304 GNUNET_LOAD_value_set_decline(cp->ppd.transmission_delay,
317 latency); 305 latency);
318} 306}
319 307
320 308
@@ -325,7 +313,7 @@ GSF_update_peer_latency_ (const struct GNUNET_PeerIdentity *id,
325 * @return performance data record for the peer 313 * @return performance data record for the peer
326 */ 314 */
327struct GSF_PeerPerformanceData * 315struct GSF_PeerPerformanceData *
328GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp) 316GSF_get_peer_performance_data_(struct GSF_ConnectedPeer *cp)
329{ 317{
330 return &cp->ppd; 318 return &cp->ppd;
331} 319}
@@ -337,7 +325,7 @@ GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
337 * @param cp which peer to send a message to 325 * @param cp which peer to send a message to
338 */ 326 */
339static void 327static void
340peer_transmit (struct GSF_ConnectedPeer *cp); 328peer_transmit(struct GSF_ConnectedPeer *cp);
341 329
342 330
343/** 331/**
@@ -351,10 +339,10 @@ peer_transmit (struct GSF_ConnectedPeer *cp);
351 * long should the client wait until re-trying? 339 * long should the client wait until re-trying?
352 */ 340 */
353static void 341static void
354ats_reserve_callback (void *cls, 342ats_reserve_callback(void *cls,
355 const struct GNUNET_PeerIdentity *peer, 343 const struct GNUNET_PeerIdentity *peer,
356 int32_t amount, 344 int32_t amount,
357 struct GNUNET_TIME_Relative res_delay); 345 struct GNUNET_TIME_Relative res_delay);
358 346
359 347
360/** 348/**
@@ -364,42 +352,42 @@ ats_reserve_callback (void *cls,
364 * @param pth transmission handle to schedule 352 * @param pth transmission handle to schedule
365 */ 353 */
366static void 354static void
367schedule_transmission (struct GSF_PeerTransmitHandle *pth) 355schedule_transmission(struct GSF_PeerTransmitHandle *pth)
368{ 356{
369 struct GSF_ConnectedPeer *cp; 357 struct GSF_ConnectedPeer *cp;
370 struct GNUNET_PeerIdentity target; 358 struct GNUNET_PeerIdentity target;
371 359
372 cp = pth->cp; 360 cp = pth->cp;
373 GNUNET_assert (0 != cp->ppd.pid); 361 GNUNET_assert(0 != cp->ppd.pid);
374 GNUNET_PEER_resolve (cp->ppd.pid, &target); 362 GNUNET_PEER_resolve(cp->ppd.pid, &target);
375 363
376 if (0 != cp->inc_preference) 364 if (0 != cp->inc_preference)
377 { 365 {
378 GNUNET_ATS_performance_change_preference (GSF_ats, 366 GNUNET_ATS_performance_change_preference(GSF_ats,
379 &target, 367 &target,
380 GNUNET_ATS_PREFERENCE_BANDWIDTH, 368 GNUNET_ATS_PREFERENCE_BANDWIDTH,
381 (double) cp->inc_preference, 369 (double)cp->inc_preference,
382 GNUNET_ATS_PREFERENCE_END); 370 GNUNET_ATS_PREFERENCE_END);
383 cp->inc_preference = 0; 371 cp->inc_preference = 0;
384 } 372 }
385 373
386 if ( (GNUNET_YES == pth->is_query) && 374 if ((GNUNET_YES == pth->is_query) &&
387 (GNUNET_YES != pth->was_reserved) ) 375 (GNUNET_YES != pth->was_reserved))
388 { 376 {
389 /* query, need reservation */ 377 /* query, need reservation */
390 if (GNUNET_YES != cp->did_reserve) 378 if (GNUNET_YES != cp->did_reserve)
391 return; /* not ready */ 379 return; /* not ready */
392 cp->did_reserve = GNUNET_NO; 380 cp->did_reserve = GNUNET_NO;
393 /* reservation already done! */ 381 /* reservation already done! */
394 pth->was_reserved = GNUNET_YES; 382 pth->was_reserved = GNUNET_YES;
395 cp->rc = GNUNET_ATS_reserve_bandwidth (GSF_ats, 383 cp->rc = GNUNET_ATS_reserve_bandwidth(GSF_ats,
396 &target, 384 &target,
397 DBLOCK_SIZE, 385 DBLOCK_SIZE,
398 &ats_reserve_callback, 386 &ats_reserve_callback,
399 cp); 387 cp);
400 return; 388 return;
401 } 389 }
402 peer_transmit (cp); 390 peer_transmit(cp);
403} 391}
404 392
405 393
@@ -409,38 +397,38 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth)
409 * @param cp which peer to send a message to 397 * @param cp which peer to send a message to
410 */ 398 */
411static void 399static void
412peer_transmit (struct GSF_ConnectedPeer *cp) 400peer_transmit(struct GSF_ConnectedPeer *cp)
413{ 401{
414 struct GSF_PeerTransmitHandle *pth = cp->pth_head; 402 struct GSF_PeerTransmitHandle *pth = cp->pth_head;
415 struct GSF_PeerTransmitHandle *pos; 403 struct GSF_PeerTransmitHandle *pos;
416 404
417 if (NULL == pth) 405 if (NULL == pth)
418 return; 406 return;
419 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 407 GNUNET_CONTAINER_DLL_remove(cp->pth_head,
420 cp->pth_tail, 408 cp->pth_tail,
421 pth); 409 pth);
422 if (GNUNET_YES == pth->is_query) 410 if (GNUNET_YES == pth->is_query)
423 { 411 {
424 cp->ppd.last_request_times[(cp->last_request_times_off++) % 412 cp->ppd.last_request_times[(cp->last_request_times_off++) %
425 MAX_QUEUE_PER_PEER] = 413 MAX_QUEUE_PER_PEER] =
426 GNUNET_TIME_absolute_get (); 414 GNUNET_TIME_absolute_get();
427 GNUNET_assert (0 < cp->ppd.pending_queries--); 415 GNUNET_assert(0 < cp->ppd.pending_queries--);
428 } 416 }
429 else if (GNUNET_NO == pth->is_query) 417 else if (GNUNET_NO == pth->is_query)
430 { 418 {
431 GNUNET_assert (0 < cp->ppd.pending_replies--); 419 GNUNET_assert(0 < cp->ppd.pending_replies--);
432 } 420 }
433 GNUNET_LOAD_update (cp->ppd.transmission_delay, 421 GNUNET_LOAD_update(cp->ppd.transmission_delay,
434 GNUNET_TIME_absolute_get_duration 422 GNUNET_TIME_absolute_get_duration
435 (pth->transmission_request_start_time).rel_value_us); 423 (pth->transmission_request_start_time).rel_value_us);
436 GNUNET_MQ_send (cp->mq, 424 GNUNET_MQ_send(cp->mq,
437 pth->env); 425 pth->env);
438 GNUNET_free (pth); 426 GNUNET_free(pth);
439 if (NULL != (pos = cp->pth_head)) 427 if (NULL != (pos = cp->pth_head))
440 { 428 {
441 GNUNET_assert (pos != pth); 429 GNUNET_assert(pos != pth);
442 schedule_transmission (pos); 430 schedule_transmission(pos);
443 } 431 }
444} 432}
445 433
446 434
@@ -450,18 +438,18 @@ peer_transmit (struct GSF_ConnectedPeer *cp)
450 * @param cls the `struct GSF_ConnectedPeer` to reserve from 438 * @param cls the `struct GSF_ConnectedPeer` to reserve from
451 */ 439 */
452static void 440static void
453retry_reservation (void *cls) 441retry_reservation(void *cls)
454{ 442{
455 struct GSF_ConnectedPeer *cp = cls; 443 struct GSF_ConnectedPeer *cp = cls;
456 struct GNUNET_PeerIdentity target; 444 struct GNUNET_PeerIdentity target;
457 445
458 GNUNET_PEER_resolve (cp->ppd.pid, &target); 446 GNUNET_PEER_resolve(cp->ppd.pid, &target);
459 cp->rc_delay_task = NULL; 447 cp->rc_delay_task = NULL;
460 cp->rc = 448 cp->rc =
461 GNUNET_ATS_reserve_bandwidth (GSF_ats, 449 GNUNET_ATS_reserve_bandwidth(GSF_ats,
462 &target, 450 &target,
463 DBLOCK_SIZE, 451 DBLOCK_SIZE,
464 &ats_reserve_callback, cp); 452 &ats_reserve_callback, cp);
465} 453}
466 454
467 455
@@ -476,34 +464,34 @@ retry_reservation (void *cls)
476 * long should the client wait until re-trying? 464 * long should the client wait until re-trying?
477 */ 465 */
478static void 466static void
479ats_reserve_callback (void *cls, 467ats_reserve_callback(void *cls,
480 const struct GNUNET_PeerIdentity *peer, 468 const struct GNUNET_PeerIdentity *peer,
481 int32_t amount, 469 int32_t amount,
482 struct GNUNET_TIME_Relative res_delay) 470 struct GNUNET_TIME_Relative res_delay)
483{ 471{
484 struct GSF_ConnectedPeer *cp = cls; 472 struct GSF_ConnectedPeer *cp = cls;
485 struct GSF_PeerTransmitHandle *pth; 473 struct GSF_PeerTransmitHandle *pth;
486 474
487 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 475 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
488 "Reserved %d bytes / need to wait %s for reservation\n", 476 "Reserved %d bytes / need to wait %s for reservation\n",
489 (int) amount, 477 (int)amount,
490 GNUNET_STRINGS_relative_time_to_string (res_delay, GNUNET_YES)); 478 GNUNET_STRINGS_relative_time_to_string(res_delay, GNUNET_YES));
491 cp->rc = NULL; 479 cp->rc = NULL;
492 if (0 == amount) 480 if (0 == amount)
493 { 481 {
494 cp->rc_delay_task = 482 cp->rc_delay_task =
495 GNUNET_SCHEDULER_add_delayed (res_delay, 483 GNUNET_SCHEDULER_add_delayed(res_delay,
496 &retry_reservation, 484 &retry_reservation,
497 cp); 485 cp);
498 return; 486 return;
499 } 487 }
500 cp->did_reserve = GNUNET_YES; 488 cp->did_reserve = GNUNET_YES;
501 pth = cp->pth_head; 489 pth = cp->pth_head;
502 if (NULL != pth) 490 if (NULL != pth)
503 { 491 {
504 /* reservation success, try transmission now! */ 492 /* reservation success, try transmission now! */
505 peer_transmit (cp); 493 peer_transmit(cp);
506 } 494 }
507} 495}
508 496
509 497
@@ -515,22 +503,22 @@ ats_reserve_callback (void *cls,
515 * @param emsg error message, or NULL if no errors 503 * @param emsg error message, or NULL if no errors
516 */ 504 */
517static void 505static void
518peer_respect_cb (void *cls, 506peer_respect_cb(void *cls,
519 const struct GNUNET_PEERSTORE_Record *record, 507 const struct GNUNET_PEERSTORE_Record *record,
520 const char *emsg) 508 const char *emsg)
521{ 509{
522 struct GSF_ConnectedPeer *cp = cls; 510 struct GSF_ConnectedPeer *cp = cls;
523 511
524 GNUNET_assert (NULL != cp->respect_iterate_req); 512 GNUNET_assert(NULL != cp->respect_iterate_req);
525 if ( (NULL != record) && 513 if ((NULL != record) &&
526 (sizeof (cp->disk_respect) == record->value_size)) 514 (sizeof(cp->disk_respect) == record->value_size))
527 { 515 {
528 cp->disk_respect = *((uint32_t *)record->value); 516 cp->disk_respect = *((uint32_t *)record->value);
529 cp->ppd.respect += *((uint32_t *)record->value); 517 cp->ppd.respect += *((uint32_t *)record->value);
530 } 518 }
531 GSF_push_start_ (cp); 519 GSF_push_start_(cp);
532 if (NULL != record) 520 if (NULL != record)
533 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); 521 GNUNET_PEERSTORE_iterate_cancel(cp->respect_iterate_req);
534 cp->respect_iterate_req = NULL; 522 cp->respect_iterate_req = NULL;
535} 523}
536 524
@@ -546,27 +534,27 @@ peer_respect_cb (void *cls,
546 * @return #GNUNET_YES to continue to iterate 534 * @return #GNUNET_YES to continue to iterate
547 */ 535 */
548static int 536static int
549consider_peer_for_forwarding (void *cls, 537consider_peer_for_forwarding(void *cls,
550 const struct GNUNET_HashCode *key, 538 const struct GNUNET_HashCode *key,
551 struct GSF_PendingRequest *pr) 539 struct GSF_PendingRequest *pr)
552{ 540{
553 struct GSF_ConnectedPeer *cp = cls; 541 struct GSF_ConnectedPeer *cp = cls;
554 struct GNUNET_PeerIdentity pid; 542 struct GNUNET_PeerIdentity pid;
555 543
556 if (GNUNET_YES != 544 if (GNUNET_YES !=
557 GSF_pending_request_test_active_ (pr)) 545 GSF_pending_request_test_active_(pr))
558 return GNUNET_YES; /* request is not actually active, skip! */ 546 return GNUNET_YES; /* request is not actually active, skip! */
559 GSF_connected_peer_get_identity_ (cp, &pid); 547 GSF_connected_peer_get_identity_(cp, &pid);
560 if (GNUNET_YES != 548 if (GNUNET_YES !=
561 GSF_pending_request_test_target_ (pr, &pid)) 549 GSF_pending_request_test_target_(pr, &pid))
562 { 550 {
563 GNUNET_STATISTICS_update (GSF_stats, 551 GNUNET_STATISTICS_update(GSF_stats,
564 gettext_noop ("# Loopback routes suppressed"), 552 gettext_noop("# Loopback routes suppressed"),
565 1, 553 1,
566 GNUNET_NO); 554 GNUNET_NO);
567 return GNUNET_YES; 555 return GNUNET_YES;
568 } 556 }
569 GSF_plan_add_ (cp, pr); 557 GSF_plan_add_(cp, pr);
570 return GNUNET_YES; 558 return GNUNET_YES;
571} 559}
572 560
@@ -581,49 +569,49 @@ consider_peer_for_forwarding (void *cls,
581 * @return our internal handle for the peer 569 * @return our internal handle for the peer
582 */ 570 */
583void * 571void *
584GSF_peer_connect_handler (void *cls, 572GSF_peer_connect_handler(void *cls,
585 const struct GNUNET_PeerIdentity *peer, 573 const struct GNUNET_PeerIdentity *peer,
586 struct GNUNET_MQ_Handle *mq) 574 struct GNUNET_MQ_Handle *mq)
587{ 575{
588 struct GSF_ConnectedPeer *cp; 576 struct GSF_ConnectedPeer *cp;
589 577
590 if (0 == 578 if (0 ==
591 GNUNET_memcmp (&GSF_my_id, 579 GNUNET_memcmp(&GSF_my_id,
592 peer)) 580 peer))
593 return NULL; 581 return NULL;
594 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 582 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
595 "Connected to peer %s\n", 583 "Connected to peer %s\n",
596 GNUNET_i2s (peer)); 584 GNUNET_i2s(peer));
597 cp = GNUNET_new (struct GSF_ConnectedPeer); 585 cp = GNUNET_new(struct GSF_ConnectedPeer);
598 cp->ppd.pid = GNUNET_PEER_intern (peer); 586 cp->ppd.pid = GNUNET_PEER_intern(peer);
599 cp->ppd.peer = peer; 587 cp->ppd.peer = peer;
600 cp->mq = mq; 588 cp->mq = mq;
601 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO); 589 cp->ppd.transmission_delay = GNUNET_LOAD_value_init(GNUNET_TIME_UNIT_ZERO);
602 cp->rc = 590 cp->rc =
603 GNUNET_ATS_reserve_bandwidth (GSF_ats, 591 GNUNET_ATS_reserve_bandwidth(GSF_ats,
604 peer, 592 peer,
605 DBLOCK_SIZE, 593 DBLOCK_SIZE,
606 &ats_reserve_callback, cp); 594 &ats_reserve_callback, cp);
607 cp->request_map = GNUNET_CONTAINER_multihashmap_create (128, 595 cp->request_map = GNUNET_CONTAINER_multihashmap_create(128,
608 GNUNET_YES); 596 GNUNET_YES);
609 GNUNET_break (GNUNET_OK == 597 GNUNET_break(GNUNET_OK ==
610 GNUNET_CONTAINER_multipeermap_put (cp_map, 598 GNUNET_CONTAINER_multipeermap_put(cp_map,
611 GSF_connected_peer_get_identity2_ (cp), 599 GSF_connected_peer_get_identity2_(cp),
612 cp, 600 cp,
613 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 601 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
614 GNUNET_STATISTICS_set (GSF_stats, 602 GNUNET_STATISTICS_set(GSF_stats,
615 gettext_noop ("# peers connected"), 603 gettext_noop("# peers connected"),
616 GNUNET_CONTAINER_multipeermap_size (cp_map), 604 GNUNET_CONTAINER_multipeermap_size(cp_map),
617 GNUNET_NO); 605 GNUNET_NO);
618 cp->respect_iterate_req 606 cp->respect_iterate_req
619 = GNUNET_PEERSTORE_iterate (peerstore, 607 = GNUNET_PEERSTORE_iterate(peerstore,
620 "fs", 608 "fs",
621 peer, 609 peer,
622 "respect", 610 "respect",
623 &peer_respect_cb, 611 &peer_respect_cb,
612 cp);
613 GSF_iterate_pending_requests_(&consider_peer_for_forwarding,
624 cp); 614 cp);
625 GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
626 cp);
627 return cp; 615 return cp;
628} 616}
629 617
@@ -635,21 +623,21 @@ GSF_peer_connect_handler (void *cls,
635 * @param cls the `struct GSF_ConnectedPeer` 623 * @param cls the `struct GSF_ConnectedPeer`
636 */ 624 */
637static void 625static void
638revive_migration (void *cls) 626revive_migration(void *cls)
639{ 627{
640 struct GSF_ConnectedPeer *cp = cls; 628 struct GSF_ConnectedPeer *cp = cls;
641 struct GNUNET_TIME_Relative bt; 629 struct GNUNET_TIME_Relative bt;
642 630
643 cp->mig_revive_task = NULL; 631 cp->mig_revive_task = NULL;
644 bt = GNUNET_TIME_absolute_get_remaining (cp->ppd.migration_blocked_until); 632 bt = GNUNET_TIME_absolute_get_remaining(cp->ppd.migration_blocked_until);
645 if (0 != bt.rel_value_us) 633 if (0 != bt.rel_value_us)
646 { 634 {
647 /* still time left... */ 635 /* still time left... */
648 cp->mig_revive_task = 636 cp->mig_revive_task =
649 GNUNET_SCHEDULER_add_delayed (bt, &revive_migration, cp); 637 GNUNET_SCHEDULER_add_delayed(bt, &revive_migration, cp);
650 return; 638 return;
651 } 639 }
652 GSF_push_start_ (cp); 640 GSF_push_start_(cp);
653} 641}
654 642
655 643
@@ -660,44 +648,44 @@ revive_migration (void *cls)
660 * @return NULL if the peer is not currently connected 648 * @return NULL if the peer is not currently connected
661 */ 649 */
662struct GSF_ConnectedPeer * 650struct GSF_ConnectedPeer *
663GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer) 651GSF_peer_get_(const struct GNUNET_PeerIdentity *peer)
664{ 652{
665 if (NULL == cp_map) 653 if (NULL == cp_map)
666 return NULL; 654 return NULL;
667 return GNUNET_CONTAINER_multipeermap_get (cp_map, peer); 655 return GNUNET_CONTAINER_multipeermap_get(cp_map, peer);
668} 656}
669 657
670 658
671/** 659/**
672 * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message. 660 * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
673 * 661 *
674 * @param cls closure, the `struct GSF_ConnectedPeer` 662 * @param cls closure, the `struct GSF_ConnectedPeer`
675 * @param msm the actual message 663 * @param msm the actual message
676 */ 664 */
677void 665void
678handle_p2p_migration_stop (void *cls, 666handle_p2p_migration_stop(void *cls,
679 const struct MigrationStopMessage *msm) 667 const struct MigrationStopMessage *msm)
680{ 668{
681 struct GSF_ConnectedPeer *cp = cls; 669 struct GSF_ConnectedPeer *cp = cls;
682 struct GNUNET_TIME_Relative bt; 670 struct GNUNET_TIME_Relative bt;
683 671
684 GNUNET_STATISTICS_update (GSF_stats, 672 GNUNET_STATISTICS_update(GSF_stats,
685 gettext_noop ("# migration stop messages received"), 673 gettext_noop("# migration stop messages received"),
686 1, GNUNET_NO); 674 1, GNUNET_NO);
687 bt = GNUNET_TIME_relative_ntoh (msm->duration); 675 bt = GNUNET_TIME_relative_ntoh(msm->duration);
688 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 676 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
689 _("Migration of content to peer `%s' blocked for %s\n"), 677 _("Migration of content to peer `%s' blocked for %s\n"),
690 GNUNET_i2s (cp->ppd.peer), 678 GNUNET_i2s(cp->ppd.peer),
691 GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES)); 679 GNUNET_STRINGS_relative_time_to_string(bt, GNUNET_YES));
692 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt); 680 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute(bt);
693 if ( (NULL == cp->mig_revive_task) && 681 if ((NULL == cp->mig_revive_task) &&
694 (NULL == cp->respect_iterate_req) ) 682 (NULL == cp->respect_iterate_req))
695 { 683 {
696 GSF_push_stop_ (cp); 684 GSF_push_stop_(cp);
697 cp->mig_revive_task = 685 cp->mig_revive_task =
698 GNUNET_SCHEDULER_add_delayed (bt, 686 GNUNET_SCHEDULER_add_delayed(bt,
699 &revive_migration, cp); 687 &revive_migration, cp);
700 } 688 }
701} 689}
702 690
703 691
@@ -707,26 +695,26 @@ handle_p2p_migration_stop (void *cls,
707 * @param peerreq request to free 695 * @param peerreq request to free
708 */ 696 */
709static void 697static void
710free_pending_request (struct PeerRequest *peerreq) 698free_pending_request(struct PeerRequest *peerreq)
711{ 699{
712 struct GSF_ConnectedPeer *cp = peerreq->cp; 700 struct GSF_ConnectedPeer *cp = peerreq->cp;
713 struct GSF_PendingRequestData *prd; 701 struct GSF_PendingRequestData *prd;
714 702
715 prd = GSF_pending_request_get_data_ (peerreq->pr); 703 prd = GSF_pending_request_get_data_(peerreq->pr);
716 if (NULL != peerreq->kill_task) 704 if (NULL != peerreq->kill_task)
717 { 705 {
718 GNUNET_SCHEDULER_cancel (peerreq->kill_task); 706 GNUNET_SCHEDULER_cancel(peerreq->kill_task);
719 peerreq->kill_task = NULL; 707 peerreq->kill_task = NULL;
720 } 708 }
721 GNUNET_STATISTICS_update (GSF_stats, 709 GNUNET_STATISTICS_update(GSF_stats,
722 gettext_noop ("# P2P searches active"), 710 gettext_noop("# P2P searches active"),
723 -1, 711 -1,
724 GNUNET_NO); 712 GNUNET_NO);
725 GNUNET_break (GNUNET_YES == 713 GNUNET_break(GNUNET_YES ==
726 GNUNET_CONTAINER_multihashmap_remove (cp->request_map, 714 GNUNET_CONTAINER_multihashmap_remove(cp->request_map,
727 &prd->query, 715 &prd->query,
728 peerreq)); 716 peerreq));
729 GNUNET_free (peerreq); 717 GNUNET_free(peerreq);
730} 718}
731 719
732 720
@@ -739,16 +727,16 @@ free_pending_request (struct PeerRequest *peerreq)
739 * @return #GNUNET_YES (continue to iterate) 727 * @return #GNUNET_YES (continue to iterate)
740 */ 728 */
741static int 729static int
742cancel_pending_request (void *cls, 730cancel_pending_request(void *cls,
743 const struct GNUNET_HashCode *query, 731 const struct GNUNET_HashCode *query,
744 void *value) 732 void *value)
745{ 733{
746 struct PeerRequest *peerreq = value; 734 struct PeerRequest *peerreq = value;
747 struct GSF_PendingRequest *pr = peerreq->pr; 735 struct GSF_PendingRequest *pr = peerreq->pr;
748 736
749 free_pending_request (peerreq); 737 free_pending_request(peerreq);
750 GSF_pending_request_cancel_ (pr, 738 GSF_pending_request_cancel_(pr,
751 GNUNET_NO); 739 GNUNET_NO);
752 return GNUNET_OK; 740 return GNUNET_OK;
753} 741}
754 742
@@ -759,17 +747,17 @@ cancel_pending_request (void *cls,
759 * @param cls the request to free 747 * @param cls the request to free
760 */ 748 */
761static void 749static void
762peer_request_destroy (void *cls) 750peer_request_destroy(void *cls)
763{ 751{
764 struct PeerRequest *peerreq = cls; 752 struct PeerRequest *peerreq = cls;
765 struct GSF_PendingRequest *pr = peerreq->pr; 753 struct GSF_PendingRequest *pr = peerreq->pr;
766 struct GSF_PendingRequestData *prd; 754 struct GSF_PendingRequestData *prd;
767 755
768 peerreq->kill_task = NULL; 756 peerreq->kill_task = NULL;
769 prd = GSF_pending_request_get_data_ (pr); 757 prd = GSF_pending_request_get_data_(pr);
770 cancel_pending_request (NULL, 758 cancel_pending_request(NULL,
771 &prd->query, 759 &prd->query,
772 peerreq); 760 peerreq);
773} 761}
774 762
775 763
@@ -779,20 +767,20 @@ peer_request_destroy (void *cls)
779 * @param cls the `struct GSF_DelayedHandle` with the message 767 * @param cls the `struct GSF_DelayedHandle` with the message
780 */ 768 */
781static void 769static void
782transmit_delayed_now (void *cls) 770transmit_delayed_now(void *cls)
783{ 771{
784 struct GSF_DelayedHandle *dh = cls; 772 struct GSF_DelayedHandle *dh = cls;
785 struct GSF_ConnectedPeer *cp = dh->cp; 773 struct GSF_ConnectedPeer *cp = dh->cp;
786 774
787 GNUNET_CONTAINER_DLL_remove (cp->delayed_head, 775 GNUNET_CONTAINER_DLL_remove(cp->delayed_head,
788 cp->delayed_tail, 776 cp->delayed_tail,
789 dh); 777 dh);
790 cp->delay_queue_size--; 778 cp->delay_queue_size--;
791 GSF_peer_transmit_ (cp, 779 GSF_peer_transmit_(cp,
792 GNUNET_NO, 780 GNUNET_NO,
793 UINT32_MAX, 781 UINT32_MAX,
794 dh->env); 782 dh->env);
795 GNUNET_free (dh); 783 GNUNET_free(dh);
796} 784}
797 785
798 786
@@ -802,20 +790,20 @@ transmit_delayed_now (void *cls)
802 * @return desired delay 790 * @return desired delay
803 */ 791 */
804static struct GNUNET_TIME_Relative 792static struct GNUNET_TIME_Relative
805get_randomized_delay () 793get_randomized_delay()
806{ 794{
807 struct GNUNET_TIME_Relative ret; 795 struct GNUNET_TIME_Relative ret;
808 796
809 ret = 797 ret =
810 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 798 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS,
811 GNUNET_CRYPTO_random_u32 799 GNUNET_CRYPTO_random_u32
812 (GNUNET_CRYPTO_QUALITY_WEAK, 800 (GNUNET_CRYPTO_QUALITY_WEAK,
813 2 * GSF_avg_latency.rel_value_us + 1)); 801 2 * GSF_avg_latency.rel_value_us + 1));
814#if INSANE_STATISTICS 802#if INSANE_STATISTICS
815 GNUNET_STATISTICS_update (GSF_stats, 803 GNUNET_STATISTICS_update(GSF_stats,
816 gettext_noop 804 gettext_noop
817 ("# artificial delays introduced (ms)"), 805 ("# artificial delays introduced (ms)"),
818 ret.rel_value_us / 1000LL, GNUNET_NO); 806 ret.rel_value_us / 1000LL, GNUNET_NO);
819#endif 807#endif
820 return ret; 808 return ret;
821} 809}
@@ -840,15 +828,15 @@ get_randomized_delay ()
840 * @param data_len number of bytes in @a data 828 * @param data_len number of bytes in @a data
841 */ 829 */
842static void 830static void
843handle_p2p_reply (void *cls, 831handle_p2p_reply(void *cls,
844 enum GNUNET_BLOCK_EvaluationResult eval, 832 enum GNUNET_BLOCK_EvaluationResult eval,
845 struct GSF_PendingRequest *pr, 833 struct GSF_PendingRequest *pr,
846 uint32_t reply_anonymity_level, 834 uint32_t reply_anonymity_level,
847 struct GNUNET_TIME_Absolute expiration, 835 struct GNUNET_TIME_Absolute expiration,
848 struct GNUNET_TIME_Absolute last_transmission, 836 struct GNUNET_TIME_Absolute last_transmission,
849 enum GNUNET_BLOCK_Type type, 837 enum GNUNET_BLOCK_Type type,
850 const void *data, 838 const void *data,
851 size_t data_len) 839 size_t data_len)
852{ 840{
853 struct PeerRequest *peerreq = cls; 841 struct PeerRequest *peerreq = cls;
854 struct GSF_ConnectedPeer *cp = peerreq->cp; 842 struct GSF_ConnectedPeer *cp = peerreq->cp;
@@ -857,96 +845,96 @@ handle_p2p_reply (void *cls,
857 struct PutMessage *pm; 845 struct PutMessage *pm;
858 size_t msize; 846 size_t msize;
859 847
860 GNUNET_assert (data_len + sizeof (struct PutMessage) < 848 GNUNET_assert(data_len + sizeof(struct PutMessage) <
861 GNUNET_MAX_MESSAGE_SIZE); 849 GNUNET_MAX_MESSAGE_SIZE);
862 GNUNET_assert (peerreq->pr == pr); 850 GNUNET_assert(peerreq->pr == pr);
863 prd = GSF_pending_request_get_data_ (pr); 851 prd = GSF_pending_request_get_data_(pr);
864 if (NULL == data) 852 if (NULL == data)
865 { 853 {
866 free_pending_request (peerreq); 854 free_pending_request(peerreq);
867 return; 855 return;
868 } 856 }
869 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type); 857 GNUNET_break(GNUNET_BLOCK_TYPE_ANY != type);
870 if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type)) 858 if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
871 { 859 {
872 GNUNET_STATISTICS_update (GSF_stats, 860 GNUNET_STATISTICS_update(GSF_stats,
873 gettext_noop 861 gettext_noop
874 ("# replies dropped due to type mismatch"), 862 ("# replies dropped due to type mismatch"),
875 1, GNUNET_NO); 863 1, GNUNET_NO);
876 return; 864 return;
877 } 865 }
878 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 866 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
879 "Transmitting result for query `%s' to peer\n", 867 "Transmitting result for query `%s' to peer\n",
880 GNUNET_h2s (&prd->query)); 868 GNUNET_h2s(&prd->query));
881 GNUNET_STATISTICS_update (GSF_stats, 869 GNUNET_STATISTICS_update(GSF_stats,
882 gettext_noop ("# replies received for other peers"), 870 gettext_noop("# replies received for other peers"),
883 1, GNUNET_NO); 871 1, GNUNET_NO);
884 msize = sizeof (struct PutMessage) + data_len; 872 msize = sizeof(struct PutMessage) + data_len;
885 if (msize >= GNUNET_MAX_MESSAGE_SIZE) 873 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
886 {
887 GNUNET_break (0);
888 return;
889 }
890 if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
891 {
892 if (reply_anonymity_level - 1 > GSF_cover_content_count)
893 { 874 {
894 GNUNET_STATISTICS_update (GSF_stats, 875 GNUNET_break(0);
895 gettext_noop
896 ("# replies dropped due to insufficient cover traffic"),
897 1, GNUNET_NO);
898 return; 876 return;
899 } 877 }
900 GSF_cover_content_count -= (reply_anonymity_level - 1); 878 if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
901 } 879 {
902 880 if (reply_anonymity_level - 1 > GSF_cover_content_count)
903 env = GNUNET_MQ_msg_extra (pm, 881 {
904 data_len, 882 GNUNET_STATISTICS_update(GSF_stats,
905 GNUNET_MESSAGE_TYPE_FS_PUT); 883 gettext_noop
906 pm->type = htonl (type); 884 ("# replies dropped due to insufficient cover traffic"),
907 pm->expiration = GNUNET_TIME_absolute_hton (expiration); 885 1, GNUNET_NO);
908 GNUNET_memcpy (&pm[1], 886 return;
909 data, 887 }
910 data_len); 888 GSF_cover_content_count -= (reply_anonymity_level - 1);
911 if ( (UINT32_MAX != reply_anonymity_level) && 889 }
912 (0 != reply_anonymity_level) && 890
913 (GNUNET_YES == GSF_enable_randomized_delays) ) 891 env = GNUNET_MQ_msg_extra(pm,
914 { 892 data_len,
915 struct GSF_DelayedHandle *dh; 893 GNUNET_MESSAGE_TYPE_FS_PUT);
916 894 pm->type = htonl(type);
917 dh = GNUNET_new (struct GSF_DelayedHandle); 895 pm->expiration = GNUNET_TIME_absolute_hton(expiration);
918 dh->cp = cp; 896 GNUNET_memcpy(&pm[1],
919 dh->env = env; 897 data,
920 dh->msize = msize; 898 data_len);
921 GNUNET_CONTAINER_DLL_insert (cp->delayed_head, 899 if ((UINT32_MAX != reply_anonymity_level) &&
922 cp->delayed_tail, 900 (0 != reply_anonymity_level) &&
923 dh); 901 (GNUNET_YES == GSF_enable_randomized_delays))
924 cp->delay_queue_size++; 902 {
925 dh->delay_task = 903 struct GSF_DelayedHandle *dh;
926 GNUNET_SCHEDULER_add_delayed (get_randomized_delay (), 904
927 &transmit_delayed_now, 905 dh = GNUNET_new(struct GSF_DelayedHandle);
928 dh); 906 dh->cp = cp;
929 } 907 dh->env = env;
908 dh->msize = msize;
909 GNUNET_CONTAINER_DLL_insert(cp->delayed_head,
910 cp->delayed_tail,
911 dh);
912 cp->delay_queue_size++;
913 dh->delay_task =
914 GNUNET_SCHEDULER_add_delayed(get_randomized_delay(),
915 &transmit_delayed_now,
916 dh);
917 }
930 else 918 else
931 { 919 {
932 GSF_peer_transmit_ (cp, 920 GSF_peer_transmit_(cp,
933 GNUNET_NO, 921 GNUNET_NO,
934 UINT32_MAX, 922 UINT32_MAX,
935 env); 923 env);
936 } 924 }
937 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval) 925 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
938 return; 926 return;
939 if (NULL == peerreq->kill_task) 927 if (NULL == peerreq->kill_task)
940 { 928 {
941 GNUNET_STATISTICS_update (GSF_stats, 929 GNUNET_STATISTICS_update(GSF_stats,
942 gettext_noop 930 gettext_noop
943 ("# P2P searches destroyed due to ultimate reply"), 931 ("# P2P searches destroyed due to ultimate reply"),
944 1, 932 1,
945 GNUNET_NO); 933 GNUNET_NO);
946 peerreq->kill_task = 934 peerreq->kill_task =
947 GNUNET_SCHEDULER_add_now (&peer_request_destroy, 935 GNUNET_SCHEDULER_add_now(&peer_request_destroy,
948 peerreq); 936 peerreq);
949 } 937 }
950} 938}
951 939
952 940
@@ -959,31 +947,31 @@ handle_p2p_reply (void *cls,
959 * @returns the actual change in respect (positive or negative) 947 * @returns the actual change in respect (positive or negative)
960 */ 948 */
961static int 949static int
962change_peer_respect (struct GSF_ConnectedPeer *cp, int value) 950change_peer_respect(struct GSF_ConnectedPeer *cp, int value)
963{ 951{
964 if (0 == value) 952 if (0 == value)
965 return 0; 953 return 0;
966 GNUNET_assert (NULL != cp); 954 GNUNET_assert(NULL != cp);
967 if (value > 0) 955 if (value > 0)
968 {
969 if (cp->ppd.respect + value < cp->ppd.respect)
970 { 956 {
971 value = UINT32_MAX - cp->ppd.respect; 957 if (cp->ppd.respect + value < cp->ppd.respect)
972 cp->ppd.respect = UINT32_MAX; 958 {
959 value = UINT32_MAX - cp->ppd.respect;
960 cp->ppd.respect = UINT32_MAX;
961 }
962 else
963 cp->ppd.respect += value;
973 } 964 }
974 else
975 cp->ppd.respect += value;
976 }
977 else 965 else
978 {
979 if (cp->ppd.respect < -value)
980 { 966 {
981 value = -cp->ppd.respect; 967 if (cp->ppd.respect < -value)
982 cp->ppd.respect = 0; 968 {
969 value = -cp->ppd.respect;
970 cp->ppd.respect = 0;
971 }
972 else
973 cp->ppd.respect += value;
983 } 974 }
984 else
985 cp->ppd.respect += value;
986 }
987 return value; 975 return value;
988} 976}
989 977
@@ -997,58 +985,58 @@ change_peer_respect (struct GSF_ConnectedPeer *cp, int value)
997 * @return effective priority 985 * @return effective priority
998 */ 986 */
999static int32_t 987static int32_t
1000bound_priority (uint32_t prio_in, 988bound_priority(uint32_t prio_in,
1001 struct GSF_ConnectedPeer *cp) 989 struct GSF_ConnectedPeer *cp)
1002{ 990{
1003#define N ((double)128.0) 991#define N ((double)128.0)
1004 uint32_t ret; 992 uint32_t ret;
1005 double rret; 993 double rret;
1006 int ld; 994 int ld;
1007 995
1008 ld = GSF_test_get_load_too_high_ (0); 996 ld = GSF_test_get_load_too_high_(0);
1009 if (GNUNET_SYSERR == ld) 997 if (GNUNET_SYSERR == ld)
1010 { 998 {
1011#if INSANE_STATISTICS 999#if INSANE_STATISTICS
1012 GNUNET_STATISTICS_update (GSF_stats, 1000 GNUNET_STATISTICS_update(GSF_stats,
1013 gettext_noop 1001 gettext_noop
1014 ("# requests done for free (low load)"), 1, 1002 ("# requests done for free (low load)"), 1,
1015 GNUNET_NO); 1003 GNUNET_NO);
1016#endif 1004#endif
1017 return 0; /* excess resources */ 1005 return 0; /* excess resources */
1018 } 1006 }
1019 if (prio_in > INT32_MAX) 1007 if (prio_in > INT32_MAX)
1020 prio_in = INT32_MAX; 1008 prio_in = INT32_MAX;
1021 ret = -change_peer_respect (cp, -(int) prio_in); 1009 ret = -change_peer_respect(cp, -(int)prio_in);
1022 if (ret > 0) 1010 if (ret > 0)
1023 { 1011 {
1024 if (ret > GSF_current_priorities + N) 1012 if (ret > GSF_current_priorities + N)
1025 rret = GSF_current_priorities + N; 1013 rret = GSF_current_priorities + N;
1026 else 1014 else
1027 rret = ret; 1015 rret = ret;
1028 GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N; 1016 GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1029 } 1017 }
1030 if ((GNUNET_YES == ld) && (ret > 0)) 1018 if ((GNUNET_YES == ld) && (ret > 0))
1031 { 1019 {
1032 /* try with charging */ 1020 /* try with charging */
1033 ld = GSF_test_get_load_too_high_ (ret); 1021 ld = GSF_test_get_load_too_high_(ret);
1034 } 1022 }
1035 if (GNUNET_YES == ld) 1023 if (GNUNET_YES == ld)
1036 { 1024 {
1037 GNUNET_STATISTICS_update (GSF_stats, 1025 GNUNET_STATISTICS_update(GSF_stats,
1038 gettext_noop 1026 gettext_noop
1039 ("# request dropped, priority insufficient"), 1, 1027 ("# request dropped, priority insufficient"), 1,
1040 GNUNET_NO); 1028 GNUNET_NO);
1041 /* undo charge */ 1029 /* undo charge */
1042 change_peer_respect (cp, (int) ret); 1030 change_peer_respect(cp, (int)ret);
1043 return -1; /* not enough resources */ 1031 return -1; /* not enough resources */
1044 } 1032 }
1045 else 1033 else
1046 { 1034 {
1047 GNUNET_STATISTICS_update (GSF_stats, 1035 GNUNET_STATISTICS_update(GSF_stats,
1048 gettext_noop 1036 gettext_noop
1049 ("# requests done for a price (normal load)"), 1, 1037 ("# requests done for a price (normal load)"), 1,
1050 GNUNET_NO); 1038 GNUNET_NO);
1051 } 1039 }
1052#undef N 1040#undef N
1053 return ret; 1041 return ret;
1054} 1042}
@@ -1064,20 +1052,20 @@ bound_priority (uint32_t prio_in,
1064 * otherwise the ttl-limit for the given @a prio 1052 * otherwise the ttl-limit for the given @a prio
1065 */ 1053 */
1066static int32_t 1054static int32_t
1067bound_ttl (int32_t ttl_in, 1055bound_ttl(int32_t ttl_in,
1068 uint32_t prio) 1056 uint32_t prio)
1069{ 1057{
1070 unsigned long long allowed; 1058 unsigned long long allowed;
1071 1059
1072 if (ttl_in <= 0) 1060 if (ttl_in <= 0)
1073 return ttl_in; 1061 return ttl_in;
1074 allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 1062 allowed = ((unsigned long long)prio) * TTL_DECREMENT / 1000;
1075 if (ttl_in > allowed) 1063 if (ttl_in > allowed)
1076 { 1064 {
1077 if (allowed >= (1 << 30)) 1065 if (allowed >= (1 << 30))
1078 return 1 << 30; 1066 return 1 << 30;
1079 return allowed; 1067 return allowed;
1080 } 1068 }
1081 return ttl_in; 1069 return ttl_in;
1082} 1070}
1083 1071
@@ -1085,9 +1073,7 @@ bound_ttl (int32_t ttl_in,
1085/** 1073/**
1086 * Closure for #test_exist_cb(). 1074 * Closure for #test_exist_cb().
1087 */ 1075 */
1088struct TestExistClosure 1076struct TestExistClosure {
1089{
1090
1091 /** 1077 /**
1092 * Priority of the incoming request. 1078 * Priority of the incoming request.
1093 */ 1079 */
@@ -1107,7 +1093,6 @@ struct TestExistClosure
1107 * Set to #GNUNET_YES if we are done handling the query. 1093 * Set to #GNUNET_YES if we are done handling the query.
1108 */ 1094 */
1109 int finished; 1095 int finished;
1110
1111}; 1096};
1112 1097
1113 1098
@@ -1122,9 +1107,9 @@ struct TestExistClosure
1122 * #GNUNET_NO if we successfully merged 1107 * #GNUNET_NO if we successfully merged
1123 */ 1108 */
1124static int 1109static int
1125test_exist_cb (void *cls, 1110test_exist_cb(void *cls,
1126 const struct GNUNET_HashCode *hc, 1111 const struct GNUNET_HashCode *hc,
1127 void *value) 1112 void *value)
1128{ 1113{
1129 struct TestExistClosure *tec = cls; 1114 struct TestExistClosure *tec = cls;
1130 struct PeerRequest *peerreq = value; 1115 struct PeerRequest *peerreq = value;
@@ -1132,28 +1117,28 @@ test_exist_cb (void *cls,
1132 struct GSF_PendingRequestData *prd; 1117 struct GSF_PendingRequestData *prd;
1133 1118
1134 pr = peerreq->pr; 1119 pr = peerreq->pr;
1135 prd = GSF_pending_request_get_data_ (pr); 1120 prd = GSF_pending_request_get_data_(pr);
1136 if (prd->type != tec->type) 1121 if (prd->type != tec->type)
1137 return GNUNET_YES; 1122 return GNUNET_YES;
1138 if (prd->ttl.abs_value_us >= 1123 if (prd->ttl.abs_value_us >=
1139 GNUNET_TIME_absolute_get ().abs_value_us + tec->ttl * 1000LL) 1124 GNUNET_TIME_absolute_get().abs_value_us + tec->ttl * 1000LL)
1140 { 1125 {
1141 /* existing request has higher TTL, drop new one! */ 1126 /* existing request has higher TTL, drop new one! */
1142 prd->priority += tec->priority; 1127 prd->priority += tec->priority;
1143 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1128 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1144 "Have existing request with higher TTL, dropping new request.\n"); 1129 "Have existing request with higher TTL, dropping new request.\n");
1145 GNUNET_STATISTICS_update (GSF_stats, 1130 GNUNET_STATISTICS_update(GSF_stats,
1146 gettext_noop 1131 gettext_noop
1147 ("# requests dropped due to higher-TTL request"), 1132 ("# requests dropped due to higher-TTL request"),
1148 1, GNUNET_NO); 1133 1, GNUNET_NO);
1149 tec->finished = GNUNET_YES; 1134 tec->finished = GNUNET_YES;
1150 return GNUNET_NO; 1135 return GNUNET_NO;
1151 } 1136 }
1152 /* existing request has lower TTL, drop old one! */ 1137 /* existing request has lower TTL, drop old one! */
1153 tec->priority += prd->priority; 1138 tec->priority += prd->priority;
1154 free_pending_request (peerreq); 1139 free_pending_request(peerreq);
1155 GSF_pending_request_cancel_ (pr, 1140 GSF_pending_request_cancel_(pr,
1156 GNUNET_YES); 1141 GNUNET_YES);
1157 return GNUNET_NO; 1142 return GNUNET_NO;
1158} 1143}
1159 1144
@@ -1168,8 +1153,8 @@ test_exist_cb (void *cls,
1168 * @param gm the GET message 1153 * @param gm the GET message
1169 */ 1154 */
1170void 1155void
1171handle_p2p_get (void *cls, 1156handle_p2p_get(void *cls,
1172 const struct GetMessage *gm) 1157 const struct GetMessage *gm)
1173{ 1158{
1174 struct GSF_ConnectedPeer *cps = cls; 1159 struct GSF_ConnectedPeer *cps = cls;
1175 struct PeerRequest *peerreq; 1160 struct PeerRequest *peerreq;
@@ -1187,167 +1172,167 @@ handle_p2p_get (void *cls,
1187 GNUNET_PEER_Id spid; 1172 GNUNET_PEER_Id spid;
1188 const struct GSF_PendingRequestData *prd; 1173 const struct GSF_PendingRequestData *prd;
1189 1174
1190 msize = ntohs (gm->header.size); 1175 msize = ntohs(gm->header.size);
1191 tec.type = ntohl (gm->type); 1176 tec.type = ntohl(gm->type);
1192 bm = ntohl (gm->hash_bitmap); 1177 bm = ntohl(gm->hash_bitmap);
1193 bits = 0; 1178 bits = 0;
1194 while (bm > 0) 1179 while (bm > 0)
1195 { 1180 {
1196 if (1 == (bm & 1)) 1181 if (1 == (bm & 1))
1197 bits++; 1182 bits++;
1198 bm >>= 1; 1183 bm >>= 1;
1199 } 1184 }
1200 opt = (const struct GNUNET_PeerIdentity *) &gm[1]; 1185 opt = (const struct GNUNET_PeerIdentity *)&gm[1];
1201 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity); 1186 bfsize = msize - sizeof(struct GetMessage) - bits * sizeof(struct GNUNET_PeerIdentity);
1202 GNUNET_STATISTICS_update (GSF_stats, 1187 GNUNET_STATISTICS_update(GSF_stats,
1203 gettext_noop 1188 gettext_noop
1204 ("# GET requests received (from other peers)"), 1189 ("# GET requests received (from other peers)"),
1205 1, 1190 1,
1206 GNUNET_NO); 1191 GNUNET_NO);
1207 GSF_cover_query_count++; 1192 GSF_cover_query_count++;
1208 bm = ntohl (gm->hash_bitmap); 1193 bm = ntohl(gm->hash_bitmap);
1209 bits = 0; 1194 bits = 0;
1210 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) 1195 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1211 cp = GSF_peer_get_ (&opt[bits++]); 1196 cp = GSF_peer_get_(&opt[bits++]);
1212 else 1197 else
1213 cp = cps; 1198 cp = cps;
1214 if (NULL == cp) 1199 if (NULL == cp)
1215 { 1200 {
1216 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) 1201 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1217 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1202 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1218 "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n", 1203 "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1219 GNUNET_i2s (&opt[bits - 1])); 1204 GNUNET_i2s(&opt[bits - 1]));
1220 1205
1221 else 1206 else
1222 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1207 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1223 "Failed to find peer `%s' in connection set. Dropping query.\n", 1208 "Failed to find peer `%s' in connection set. Dropping query.\n",
1224 GNUNET_i2s (cps->ppd.peer)); 1209 GNUNET_i2s(cps->ppd.peer));
1225 GNUNET_STATISTICS_update (GSF_stats, 1210 GNUNET_STATISTICS_update(GSF_stats,
1226 gettext_noop 1211 gettext_noop
1227 ("# requests dropped due to missing reverse route"), 1212 ("# requests dropped due to missing reverse route"),
1228 1, 1213 1,
1229 GNUNET_NO); 1214 GNUNET_NO);
1230 return; 1215 return;
1231 } 1216 }
1232 unsigned int queue_size = GNUNET_MQ_get_length (cp->mq); 1217 unsigned int queue_size = GNUNET_MQ_get_length(cp->mq);
1233 queue_size += cp->ppd.pending_replies + cp->delay_queue_size; 1218 queue_size += cp->ppd.pending_replies + cp->delay_queue_size;
1234 if (queue_size > MAX_QUEUE_PER_PEER) 1219 if (queue_size > MAX_QUEUE_PER_PEER)
1235 { 1220 {
1236 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1221 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1237 "Peer `%s' has too many replies queued already. Dropping query.\n", 1222 "Peer `%s' has too many replies queued already. Dropping query.\n",
1238 GNUNET_i2s (cps->ppd.peer)); 1223 GNUNET_i2s(cps->ppd.peer));
1239 GNUNET_STATISTICS_update (GSF_stats, 1224 GNUNET_STATISTICS_update(GSF_stats,
1240 gettext_noop ("# requests dropped due to full reply queue"), 1225 gettext_noop("# requests dropped due to full reply queue"),
1241 1, 1226 1,
1242 GNUNET_NO); 1227 GNUNET_NO);
1243 return; 1228 return;
1244 } 1229 }
1245 /* note that we can really only check load here since otherwise 1230 /* note that we can really only check load here since otherwise
1246 * peers could find out that we are overloaded by not being 1231 * peers could find out that we are overloaded by not being
1247 * disconnected after sending us a malformed query... */ 1232 * disconnected after sending us a malformed query... */
1248 tec.priority = bound_priority (ntohl (gm->priority), 1233 tec.priority = bound_priority(ntohl(gm->priority),
1249 cps); 1234 cps);
1250 if (tec.priority < 0) 1235 if (tec.priority < 0)
1251 { 1236 {
1252 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1237 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1253 "Dropping query from `%s', this peer is too busy.\n", 1238 "Dropping query from `%s', this peer is too busy.\n",
1254 GNUNET_i2s (cps->ppd.peer)); 1239 GNUNET_i2s(cps->ppd.peer));
1255 return; 1240 return;
1256 } 1241 }
1257 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1242 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1258 "Received request for `%s' of type %u from peer `%s' with flags %u\n", 1243 "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1259 GNUNET_h2s (&gm->query), 1244 GNUNET_h2s(&gm->query),
1260 (unsigned int) tec.type, 1245 (unsigned int)tec.type,
1261 GNUNET_i2s (cps->ppd.peer), 1246 GNUNET_i2s(cps->ppd.peer),
1262 (unsigned int) bm); 1247 (unsigned int)bm);
1263 target = 1248 target =
1264 (0 != 1249 (0 !=
1265 (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL; 1250 (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1266 options = GSF_PRO_DEFAULTS; 1251 options = GSF_PRO_DEFAULTS;
1267 spid = 0; 1252 spid = 0;
1268 if ((GNUNET_LOAD_get_load (cp->ppd.transmission_delay) > 3 * (1 + tec.priority)) 1253 if ((GNUNET_LOAD_get_load(cp->ppd.transmission_delay) > 3 * (1 + tec.priority))
1269 || (GNUNET_LOAD_get_average (cp->ppd.transmission_delay) > 1254 || (GNUNET_LOAD_get_average(cp->ppd.transmission_delay) >
1270 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 + 1255 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1271 GNUNET_LOAD_get_average (GSF_rt_entry_lifetime))) 1256 GNUNET_LOAD_get_average(GSF_rt_entry_lifetime)))
1272 { 1257 {
1273 /* don't have BW to send to peer, or would likely take longer than we have for it, 1258 /* don't have BW to send to peer, or would likely take longer than we have for it,
1274 * so at best indirect the query */ 1259 * so at best indirect the query */
1275 tec.priority = 0; 1260 tec.priority = 0;
1276 options |= GSF_PRO_FORWARD_ONLY; 1261 options |= GSF_PRO_FORWARD_ONLY;
1277 spid = GNUNET_PEER_intern (cps->ppd.peer); 1262 spid = GNUNET_PEER_intern(cps->ppd.peer);
1278 GNUNET_assert (0 != spid); 1263 GNUNET_assert(0 != spid);
1279 } 1264 }
1280 tec.ttl = bound_ttl (ntohl (gm->ttl), 1265 tec.ttl = bound_ttl(ntohl(gm->ttl),
1281 tec.priority); 1266 tec.priority);
1282 /* decrement ttl (always) */ 1267 /* decrement ttl (always) */
1283 ttl_decrement = 1268 ttl_decrement =
1284 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1269 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
1285 TTL_DECREMENT); 1270 TTL_DECREMENT);
1286 if ( (tec.ttl < 0) && 1271 if ((tec.ttl < 0) &&
1287 (((int32_t) (tec.ttl - ttl_decrement)) > 0) ) 1272 (((int32_t)(tec.ttl - ttl_decrement)) > 0))
1288 { 1273 {
1289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1274 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1290 "Dropping query from `%s' due to TTL underflow (%d - %u).\n", 1275 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1291 GNUNET_i2s (cps->ppd.peer), 1276 GNUNET_i2s(cps->ppd.peer),
1292 tec.ttl, 1277 tec.ttl,
1293 ttl_decrement); 1278 ttl_decrement);
1294 GNUNET_STATISTICS_update (GSF_stats, 1279 GNUNET_STATISTICS_update(GSF_stats,
1295 gettext_noop 1280 gettext_noop
1296 ("# requests dropped due TTL underflow"), 1, 1281 ("# requests dropped due TTL underflow"), 1,
1297 GNUNET_NO); 1282 GNUNET_NO);
1298 /* integer underflow => drop (should be very rare)! */ 1283 /* integer underflow => drop (should be very rare)! */
1299 return; 1284 return;
1300 } 1285 }
1301 tec.ttl -= ttl_decrement; 1286 tec.ttl -= ttl_decrement;
1302 1287
1303 /* test if the request already exists */ 1288 /* test if the request already exists */
1304 tec.finished = GNUNET_NO; 1289 tec.finished = GNUNET_NO;
1305 GNUNET_CONTAINER_multihashmap_get_multiple (cp->request_map, 1290 GNUNET_CONTAINER_multihashmap_get_multiple(cp->request_map,
1306 &gm->query, 1291 &gm->query,
1307 &test_exist_cb, 1292 &test_exist_cb,
1308 &tec); 1293 &tec);
1309 if (GNUNET_YES == tec.finished) 1294 if (GNUNET_YES == tec.finished)
1310 return; /* merged into existing request, we're done */ 1295 return; /* merged into existing request, we're done */
1311 1296
1312 peerreq = GNUNET_new (struct PeerRequest); 1297 peerreq = GNUNET_new(struct PeerRequest);
1313 peerreq->cp = cp; 1298 peerreq->cp = cp;
1314 pr = GSF_pending_request_create_ (options, 1299 pr = GSF_pending_request_create_(options,
1315 tec.type, 1300 tec.type,
1316 &gm->query, 1301 &gm->query,
1317 target, 1302 target,
1318 (bfsize > 0) 1303 (bfsize > 0)
1319 ? (const char *) &opt[bits] 1304 ? (const char *)&opt[bits]
1320 : NULL, 1305 : NULL,
1321 bfsize, 1306 bfsize,
1322 ntohl (gm->filter_mutator), 1307 ntohl(gm->filter_mutator),
1323 1 /* anonymity */, 1308 1 /* anonymity */,
1324 (uint32_t) tec.priority, 1309 (uint32_t)tec.priority,
1325 tec.ttl, 1310 tec.ttl,
1326 spid, 1311 spid,
1327 GNUNET_PEER_intern (cps->ppd.peer), 1312 GNUNET_PEER_intern(cps->ppd.peer),
1328 NULL, 0, /* replies_seen */ 1313 NULL, 0, /* replies_seen */
1329 &handle_p2p_reply, 1314 &handle_p2p_reply,
1330 peerreq); 1315 peerreq);
1331 GNUNET_assert (NULL != pr); 1316 GNUNET_assert(NULL != pr);
1332 prd = GSF_pending_request_get_data_ (pr); 1317 prd = GSF_pending_request_get_data_(pr);
1333 peerreq->pr = pr; 1318 peerreq->pr = pr;
1334 GNUNET_break (GNUNET_OK == 1319 GNUNET_break(GNUNET_OK ==
1335 GNUNET_CONTAINER_multihashmap_put (cp->request_map, 1320 GNUNET_CONTAINER_multihashmap_put(cp->request_map,
1336 &prd->query, 1321 &prd->query,
1337 peerreq, 1322 peerreq,
1338 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 1323 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1339 GNUNET_STATISTICS_update (GSF_stats, 1324 GNUNET_STATISTICS_update(GSF_stats,
1340 gettext_noop ("# P2P query messages received and processed"), 1325 gettext_noop("# P2P query messages received and processed"),
1341 1, 1326 1,
1342 GNUNET_NO); 1327 GNUNET_NO);
1343 GNUNET_STATISTICS_update (GSF_stats, 1328 GNUNET_STATISTICS_update(GSF_stats,
1344 gettext_noop ("# P2P searches active"), 1329 gettext_noop("# P2P searches active"),
1345 1, 1330 1,
1346 GNUNET_NO); 1331 GNUNET_NO);
1347 GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; 1332 GSF_pending_request_get_data_(pr)->has_started = GNUNET_YES;
1348 GSF_local_lookup_ (pr, 1333 GSF_local_lookup_(pr,
1349 &GSF_consider_forwarding, 1334 &GSF_consider_forwarding,
1350 NULL); 1335 NULL);
1351} 1336}
1352 1337
1353 1338
@@ -1359,22 +1344,22 @@ handle_p2p_get (void *cls,
1359 * @param cp target peer 1344 * @param cp target peer
1360 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR) 1345 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1361 * @param priority how important is this request? 1346 * @param priority how important is this request?
1362 * @param timeout when does this request timeout 1347 * @param timeout when does this request timeout
1363 * @param size number of bytes we would like to send to the peer 1348 * @param size number of bytes we would like to send to the peer
1364 * @param env message to send 1349 * @param env message to send
1365 */ 1350 */
1366void 1351void
1367GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, 1352GSF_peer_transmit_(struct GSF_ConnectedPeer *cp,
1368 int is_query, 1353 int is_query,
1369 uint32_t priority, 1354 uint32_t priority,
1370 struct GNUNET_MQ_Envelope *env) 1355 struct GNUNET_MQ_Envelope *env)
1371{ 1356{
1372 struct GSF_PeerTransmitHandle *pth; 1357 struct GSF_PeerTransmitHandle *pth;
1373 struct GSF_PeerTransmitHandle *pos; 1358 struct GSF_PeerTransmitHandle *pos;
1374 struct GSF_PeerTransmitHandle *prev; 1359 struct GSF_PeerTransmitHandle *prev;
1375 1360
1376 pth = GNUNET_new (struct GSF_PeerTransmitHandle); 1361 pth = GNUNET_new(struct GSF_PeerTransmitHandle);
1377 pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); 1362 pth->transmission_request_start_time = GNUNET_TIME_absolute_get();
1378 pth->env = env; 1363 pth->env = env;
1379 pth->is_query = is_query; 1364 pth->is_query = is_query;
1380 pth->priority = priority; 1365 pth->priority = priority;
@@ -1383,19 +1368,19 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1383 prev = NULL; 1368 prev = NULL;
1384 pos = cp->pth_head; 1369 pos = cp->pth_head;
1385 while ((NULL != pos) && (pos->priority > priority)) 1370 while ((NULL != pos) && (pos->priority > priority))
1386 { 1371 {
1387 prev = pos; 1372 prev = pos;
1388 pos = pos->next; 1373 pos = pos->next;
1389 } 1374 }
1390 GNUNET_CONTAINER_DLL_insert_after (cp->pth_head, 1375 GNUNET_CONTAINER_DLL_insert_after(cp->pth_head,
1391 cp->pth_tail, 1376 cp->pth_tail,
1392 prev, 1377 prev,
1393 pth); 1378 pth);
1394 if (GNUNET_YES == is_query) 1379 if (GNUNET_YES == is_query)
1395 cp->ppd.pending_queries++; 1380 cp->ppd.pending_queries++;
1396 else if (GNUNET_NO == is_query) 1381 else if (GNUNET_NO == is_query)
1397 cp->ppd.pending_replies++; 1382 cp->ppd.pending_replies++;
1398 schedule_transmission (pth); 1383 schedule_transmission(pth);
1399} 1384}
1400 1385
1401 1386
@@ -1407,19 +1392,19 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1407 * @param request_priority priority of the original request 1392 * @param request_priority priority of the original request
1408 */ 1393 */
1409void 1394void
1410GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp, 1395GSF_peer_update_performance_(struct GSF_ConnectedPeer *cp,
1411 struct GNUNET_TIME_Absolute request_time, 1396 struct GNUNET_TIME_Absolute request_time,
1412 uint32_t request_priority) 1397 uint32_t request_priority)
1413{ 1398{
1414 struct GNUNET_TIME_Relative delay; 1399 struct GNUNET_TIME_Relative delay;
1415 1400
1416 delay = GNUNET_TIME_absolute_get_duration (request_time); 1401 delay = GNUNET_TIME_absolute_get_duration(request_time);
1417 cp->ppd.avg_reply_delay.rel_value_us = 1402 cp->ppd.avg_reply_delay.rel_value_us =
1418 (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) + 1403 (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1419 delay.rel_value_us) / RUNAVG_DELAY_N; 1404 delay.rel_value_us) / RUNAVG_DELAY_N;
1420 cp->ppd.avg_priority = 1405 cp->ppd.avg_priority =
1421 (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) + 1406 (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1422 request_priority) / RUNAVG_DELAY_N; 1407 request_priority) / RUNAVG_DELAY_N;
1423} 1408}
1424 1409
1425 1410
@@ -1431,8 +1416,8 @@ GSF_peer_update_performance_ (struct GSF_ConnectedPeer *cp,
1431 * @param initiator_client local client on responsible for query 1416 * @param initiator_client local client on responsible for query
1432 */ 1417 */
1433void 1418void
1434GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp, 1419GSF_peer_update_responder_client_(struct GSF_ConnectedPeer *cp,
1435 struct GSF_LocalClient *initiator_client) 1420 struct GSF_LocalClient *initiator_client)
1436{ 1421{
1437 cp->ppd.last_client_replies[cp->last_client_replies_woff++ % 1422 cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1438 CS2P_SUCCESS_LIST_SIZE] = initiator_client; 1423 CS2P_SUCCESS_LIST_SIZE] = initiator_client;
@@ -1447,15 +1432,15 @@ GSF_peer_update_responder_client_ (struct GSF_ConnectedPeer *cp,
1447 * @param initiator_peer other peer responsible for query 1432 * @param initiator_peer other peer responsible for query
1448 */ 1433 */
1449void 1434void
1450GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp, 1435GSF_peer_update_responder_peer_(struct GSF_ConnectedPeer *cp,
1451 const struct GSF_ConnectedPeer *initiator_peer) 1436 const struct GSF_ConnectedPeer *initiator_peer)
1452{ 1437{
1453 unsigned int woff; 1438 unsigned int woff;
1454 1439
1455 woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE; 1440 woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1456 GNUNET_PEER_change_rc (cp->ppd.last_p2p_replies[woff], -1); 1441 GNUNET_PEER_change_rc(cp->ppd.last_p2p_replies[woff], -1);
1457 cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid; 1442 cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1458 GNUNET_PEER_change_rc (initiator_peer->ppd.pid, 1); 1443 GNUNET_PEER_change_rc(initiator_peer->ppd.pid, 1);
1459 cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE; 1444 cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1460} 1445}
1461 1446
@@ -1469,23 +1454,23 @@ GSF_peer_update_responder_peer_ (struct GSF_ConnectedPeer *cp,
1469 * @return #GNUNET_OK to continue iteration 1454 * @return #GNUNET_OK to continue iteration
1470 */ 1455 */
1471static int 1456static int
1472flush_respect (void *cls, 1457flush_respect(void *cls,
1473 const struct GNUNET_PeerIdentity *key, 1458 const struct GNUNET_PeerIdentity *key,
1474 void *value) 1459 void *value)
1475{ 1460{
1476 struct GSF_ConnectedPeer *cp = value; 1461 struct GSF_ConnectedPeer *cp = value;
1477 struct GNUNET_PeerIdentity pid; 1462 struct GNUNET_PeerIdentity pid;
1478 1463
1479 if (cp->ppd.respect == cp->disk_respect) 1464 if (cp->ppd.respect == cp->disk_respect)
1480 return GNUNET_OK; /* unchanged */ 1465 return GNUNET_OK; /* unchanged */
1481 GNUNET_assert (0 != cp->ppd.pid); 1466 GNUNET_assert(0 != cp->ppd.pid);
1482 GNUNET_PEER_resolve (cp->ppd.pid, &pid); 1467 GNUNET_PEER_resolve(cp->ppd.pid, &pid);
1483 GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect, 1468 GNUNET_PEERSTORE_store(peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1484 sizeof (cp->ppd.respect), 1469 sizeof(cp->ppd.respect),
1485 GNUNET_TIME_UNIT_FOREVER_ABS, 1470 GNUNET_TIME_UNIT_FOREVER_ABS,
1486 GNUNET_PEERSTORE_STOREOPTION_REPLACE, 1471 GNUNET_PEERSTORE_STOREOPTION_REPLACE,
1487 NULL, 1472 NULL,
1488 NULL); 1473 NULL);
1489 return GNUNET_OK; 1474 return GNUNET_OK;
1490} 1475}
1491 1476
@@ -1499,9 +1484,9 @@ flush_respect (void *cls,
1499 * @param internal_cls the corresponding `struct GSF_ConnectedPeer` 1484 * @param internal_cls the corresponding `struct GSF_ConnectedPeer`
1500 */ 1485 */
1501void 1486void
1502GSF_peer_disconnect_handler (void *cls, 1487GSF_peer_disconnect_handler(void *cls,
1503 const struct GNUNET_PeerIdentity *peer, 1488 const struct GNUNET_PeerIdentity *peer,
1504 void *internal_cls) 1489 void *internal_cls)
1505{ 1490{
1506 struct GSF_ConnectedPeer *cp = internal_cls; 1491 struct GSF_ConnectedPeer *cp = internal_cls;
1507 struct GSF_PeerTransmitHandle *pth; 1492 struct GSF_PeerTransmitHandle *pth;
@@ -1509,84 +1494,83 @@ GSF_peer_disconnect_handler (void *cls,
1509 1494
1510 if (NULL == cp) 1495 if (NULL == cp)
1511 return; /* must have been disconnect from core with 1496 return; /* must have been disconnect from core with
1512 * 'peer' == my_id, ignore */ 1497 * 'peer' == my_id, ignore */
1513 flush_respect (NULL, 1498 flush_respect(NULL,
1514 peer, 1499 peer,
1515 cp); 1500 cp);
1516 GNUNET_assert (GNUNET_YES == 1501 GNUNET_assert(GNUNET_YES ==
1517 GNUNET_CONTAINER_multipeermap_remove (cp_map, 1502 GNUNET_CONTAINER_multipeermap_remove(cp_map,
1518 peer, 1503 peer,
1519 cp)); 1504 cp));
1520 GNUNET_STATISTICS_set (GSF_stats, 1505 GNUNET_STATISTICS_set(GSF_stats,
1521 gettext_noop ("# peers connected"), 1506 gettext_noop("# peers connected"),
1522 GNUNET_CONTAINER_multipeermap_size (cp_map), 1507 GNUNET_CONTAINER_multipeermap_size(cp_map),
1523 GNUNET_NO); 1508 GNUNET_NO);
1524 if (NULL != cp->respect_iterate_req) 1509 if (NULL != cp->respect_iterate_req)
1525 { 1510 {
1526 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); 1511 GNUNET_PEERSTORE_iterate_cancel(cp->respect_iterate_req);
1527 cp->respect_iterate_req = NULL; 1512 cp->respect_iterate_req = NULL;
1528 } 1513 }
1529 if (NULL != cp->rc) 1514 if (NULL != cp->rc)
1530 { 1515 {
1531 GNUNET_ATS_reserve_bandwidth_cancel (cp->rc); 1516 GNUNET_ATS_reserve_bandwidth_cancel(cp->rc);
1532 cp->rc = NULL; 1517 cp->rc = NULL;
1533 } 1518 }
1534 if (NULL != cp->rc_delay_task) 1519 if (NULL != cp->rc_delay_task)
1535 { 1520 {
1536 GNUNET_SCHEDULER_cancel (cp->rc_delay_task); 1521 GNUNET_SCHEDULER_cancel(cp->rc_delay_task);
1537 cp->rc_delay_task = NULL; 1522 cp->rc_delay_task = NULL;
1538 } 1523 }
1539 GNUNET_CONTAINER_multihashmap_iterate (cp->request_map, 1524 GNUNET_CONTAINER_multihashmap_iterate(cp->request_map,
1540 &cancel_pending_request, 1525 &cancel_pending_request,
1541 cp); 1526 cp);
1542 GNUNET_CONTAINER_multihashmap_destroy (cp->request_map); 1527 GNUNET_CONTAINER_multihashmap_destroy(cp->request_map);
1543 cp->request_map = NULL; 1528 cp->request_map = NULL;
1544 GSF_plan_notify_peer_disconnect_ (cp); 1529 GSF_plan_notify_peer_disconnect_(cp);
1545 GNUNET_LOAD_value_free (cp->ppd.transmission_delay); 1530 GNUNET_LOAD_value_free(cp->ppd.transmission_delay);
1546 GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, 1531 GNUNET_PEER_decrement_rcs(cp->ppd.last_p2p_replies,
1547 P2P_SUCCESS_LIST_SIZE); 1532 P2P_SUCCESS_LIST_SIZE);
1548 memset (cp->ppd.last_p2p_replies, 1533 memset(cp->ppd.last_p2p_replies,
1549 0, 1534 0,
1550 sizeof (cp->ppd.last_p2p_replies)); 1535 sizeof(cp->ppd.last_p2p_replies));
1551 GSF_push_stop_ (cp); 1536 GSF_push_stop_(cp);
1552 while (NULL != (pth = cp->pth_head)) 1537 while (NULL != (pth = cp->pth_head))
1553 { 1538 {
1554 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 1539 GNUNET_CONTAINER_DLL_remove(cp->pth_head,
1555 cp->pth_tail, 1540 cp->pth_tail,
1556 pth); 1541 pth);
1557 if (GNUNET_YES == pth->is_query) 1542 if (GNUNET_YES == pth->is_query)
1558 GNUNET_assert (0 < cp->ppd.pending_queries--); 1543 GNUNET_assert(0 < cp->ppd.pending_queries--);
1559 else if (GNUNET_NO == pth->is_query) 1544 else if (GNUNET_NO == pth->is_query)
1560 GNUNET_assert (0 < cp->ppd.pending_replies--); 1545 GNUNET_assert(0 < cp->ppd.pending_replies--);
1561 GNUNET_free (pth); 1546 GNUNET_free(pth);
1562 } 1547 }
1563 while (NULL != (dh = cp->delayed_head)) 1548 while (NULL != (dh = cp->delayed_head))
1564 { 1549 {
1565 GNUNET_CONTAINER_DLL_remove (cp->delayed_head, 1550 GNUNET_CONTAINER_DLL_remove(cp->delayed_head,
1566 cp->delayed_tail, 1551 cp->delayed_tail,
1567 dh); 1552 dh);
1568 GNUNET_MQ_discard (dh->env); 1553 GNUNET_MQ_discard(dh->env);
1569 cp->delay_queue_size--; 1554 cp->delay_queue_size--;
1570 GNUNET_SCHEDULER_cancel (dh->delay_task); 1555 GNUNET_SCHEDULER_cancel(dh->delay_task);
1571 GNUNET_free (dh); 1556 GNUNET_free(dh);
1572 } 1557 }
1573 GNUNET_PEER_change_rc (cp->ppd.pid, -1); 1558 GNUNET_PEER_change_rc(cp->ppd.pid, -1);
1574 if (NULL != cp->mig_revive_task) 1559 if (NULL != cp->mig_revive_task)
1575 { 1560 {
1576 GNUNET_SCHEDULER_cancel (cp->mig_revive_task); 1561 GNUNET_SCHEDULER_cancel(cp->mig_revive_task);
1577 cp->mig_revive_task = NULL; 1562 cp->mig_revive_task = NULL;
1578 } 1563 }
1579 GNUNET_break (0 == cp->ppd.pending_queries); 1564 GNUNET_break(0 == cp->ppd.pending_queries);
1580 GNUNET_break (0 == cp->ppd.pending_replies); 1565 GNUNET_break(0 == cp->ppd.pending_replies);
1581 GNUNET_free (cp); 1566 GNUNET_free(cp);
1582} 1567}
1583 1568
1584 1569
1585/** 1570/**
1586 * Closure for #call_iterator(). 1571 * Closure for #call_iterator().
1587 */ 1572 */
1588struct IterationContext 1573struct IterationContext {
1589{
1590 /** 1574 /**
1591 * Function to call on each entry. 1575 * Function to call on each entry.
1592 */ 1576 */
@@ -1608,16 +1592,16 @@ struct IterationContext
1608 * @return #GNUNET_YES to continue iteration 1592 * @return #GNUNET_YES to continue iteration
1609 */ 1593 */
1610static int 1594static int
1611call_iterator (void *cls, 1595call_iterator(void *cls,
1612 const struct GNUNET_PeerIdentity *key, 1596 const struct GNUNET_PeerIdentity *key,
1613 void *value) 1597 void *value)
1614{ 1598{
1615 struct IterationContext *ic = cls; 1599 struct IterationContext *ic = cls;
1616 struct GSF_ConnectedPeer *cp = value; 1600 struct GSF_ConnectedPeer *cp = value;
1617 1601
1618 ic->it (ic->it_cls, 1602 ic->it(ic->it_cls,
1619 key, cp, 1603 key, cp,
1620 &cp->ppd); 1604 &cp->ppd);
1621 return GNUNET_YES; 1605 return GNUNET_YES;
1622} 1606}
1623 1607
@@ -1629,16 +1613,16 @@ call_iterator (void *cls,
1629 * @param it_cls closure for @a it 1613 * @param it_cls closure for @a it
1630 */ 1614 */
1631void 1615void
1632GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it, 1616GSF_iterate_connected_peers_(GSF_ConnectedPeerIterator it,
1633 void *it_cls) 1617 void *it_cls)
1634{ 1618{
1635 struct IterationContext ic; 1619 struct IterationContext ic;
1636 1620
1637 ic.it = it; 1621 ic.it = it;
1638 ic.it_cls = it_cls; 1622 ic.it_cls = it_cls;
1639 GNUNET_CONTAINER_multipeermap_iterate (cp_map, 1623 GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1640 &call_iterator, 1624 &call_iterator,
1641 &ic); 1625 &ic);
1642} 1626}
1643 1627
1644 1628
@@ -1649,11 +1633,11 @@ GSF_iterate_connected_peers_ (GSF_ConnectedPeerIterator it,
1649 * @param id identity to set (written to) 1633 * @param id identity to set (written to)
1650 */ 1634 */
1651void 1635void
1652GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp, 1636GSF_connected_peer_get_identity_(const struct GSF_ConnectedPeer *cp,
1653 struct GNUNET_PeerIdentity *id) 1637 struct GNUNET_PeerIdentity *id)
1654{ 1638{
1655 GNUNET_assert (0 != cp->ppd.pid); 1639 GNUNET_assert(0 != cp->ppd.pid);
1656 GNUNET_PEER_resolve (cp->ppd.pid, id); 1640 GNUNET_PEER_resolve(cp->ppd.pid, id);
1657} 1641}
1658 1642
1659 1643
@@ -1664,10 +1648,10 @@ GSF_connected_peer_get_identity_ (const struct GSF_ConnectedPeer *cp,
1664 * @return reference to peer identity, valid until peer disconnects (!) 1648 * @return reference to peer identity, valid until peer disconnects (!)
1665 */ 1649 */
1666const struct GNUNET_PeerIdentity * 1650const struct GNUNET_PeerIdentity *
1667GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp) 1651GSF_connected_peer_get_identity2_(const struct GSF_ConnectedPeer *cp)
1668{ 1652{
1669 GNUNET_assert (0 != cp->ppd.pid); 1653 GNUNET_assert(0 != cp->ppd.pid);
1670 return GNUNET_PEER_resolve2 (cp->ppd.pid); 1654 return GNUNET_PEER_resolve2(cp->ppd.pid);
1671} 1655}
1672 1656
1673 1657
@@ -1679,38 +1663,38 @@ GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1679 * @param block_time until when to block 1663 * @param block_time until when to block
1680 */ 1664 */
1681void 1665void
1682GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, 1666GSF_block_peer_migration_(struct GSF_ConnectedPeer *cp,
1683 struct GNUNET_TIME_Absolute block_time) 1667 struct GNUNET_TIME_Absolute block_time)
1684{ 1668{
1685 struct GNUNET_MQ_Envelope *env; 1669 struct GNUNET_MQ_Envelope *env;
1686 struct MigrationStopMessage *msm; 1670 struct MigrationStopMessage *msm;
1687 1671
1688 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us) 1672 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1689 { 1673 {
1690 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1674 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1691 "Migration already blocked for another %s\n", 1675 "Migration already blocked for another %s\n",
1692 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining 1676 GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining
1693 (cp->last_migration_block), GNUNET_YES)); 1677 (cp->last_migration_block), GNUNET_YES));
1694 return; /* already blocked */ 1678 return; /* already blocked */
1695 } 1679 }
1696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n", 1680 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1697 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time), 1681 GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining(block_time),
1698 GNUNET_YES)); 1682 GNUNET_YES));
1699 cp->last_migration_block = block_time; 1683 cp->last_migration_block = block_time;
1700 env = GNUNET_MQ_msg (msm, 1684 env = GNUNET_MQ_msg(msm,
1701 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); 1685 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1702 msm->reserved = htonl (0); 1686 msm->reserved = htonl(0);
1703 msm->duration 1687 msm->duration
1704 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining 1688 = GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining
1705 (cp->last_migration_block)); 1689 (cp->last_migration_block));
1706 GNUNET_STATISTICS_update (GSF_stats, 1690 GNUNET_STATISTICS_update(GSF_stats,
1707 gettext_noop ("# migration stop messages sent"), 1691 gettext_noop("# migration stop messages sent"),
1708 1, 1692 1,
1709 GNUNET_NO); 1693 GNUNET_NO);
1710 GSF_peer_transmit_ (cp, 1694 GSF_peer_transmit_(cp,
1711 GNUNET_SYSERR, 1695 GNUNET_SYSERR,
1712 UINT32_MAX, 1696 UINT32_MAX,
1713 env); 1697 env);
1714} 1698}
1715 1699
1716 1700
@@ -1724,8 +1708,8 @@ GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1724 * @param pref preference change 1708 * @param pref preference change
1725 */ 1709 */
1726void 1710void
1727GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp, 1711GSF_connected_peer_change_preference_(struct GSF_ConnectedPeer *cp,
1728 uint64_t pref) 1712 uint64_t pref)
1729{ 1713{
1730 cp->inc_preference += pref; 1714 cp->inc_preference += pref;
1731} 1715}
@@ -1737,15 +1721,15 @@ GSF_connected_peer_change_preference_ (struct GSF_ConnectedPeer *cp,
1737 * @param cls closure, not used 1721 * @param cls closure, not used
1738 */ 1722 */
1739static void 1723static void
1740cron_flush_respect (void *cls) 1724cron_flush_respect(void *cls)
1741{ 1725{
1742 fr_task = NULL; 1726 fr_task = NULL;
1743 GNUNET_CONTAINER_multipeermap_iterate (cp_map, 1727 GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1744 &flush_respect, 1728 &flush_respect,
1745 NULL); 1729 NULL);
1746 fr_task = GNUNET_SCHEDULER_add_delayed_with_priority (RESPECT_FLUSH_FREQ, 1730 fr_task = GNUNET_SCHEDULER_add_delayed_with_priority(RESPECT_FLUSH_FREQ,
1747 GNUNET_SCHEDULER_PRIORITY_HIGH, 1731 GNUNET_SCHEDULER_PRIORITY_HIGH,
1748 &cron_flush_respect, NULL); 1732 &cron_flush_respect, NULL);
1749} 1733}
1750 1734
1751 1735
@@ -1753,12 +1737,12 @@ cron_flush_respect (void *cls)
1753 * Initialize peer management subsystem. 1737 * Initialize peer management subsystem.
1754 */ 1738 */
1755void 1739void
1756GSF_connected_peer_init_ () 1740GSF_connected_peer_init_()
1757{ 1741{
1758 cp_map = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES); 1742 cp_map = GNUNET_CONTAINER_multipeermap_create(128, GNUNET_YES);
1759 peerstore = GNUNET_PEERSTORE_connect (GSF_cfg); 1743 peerstore = GNUNET_PEERSTORE_connect(GSF_cfg);
1760 fr_task = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH, 1744 fr_task = GNUNET_SCHEDULER_add_with_priority(GNUNET_SCHEDULER_PRIORITY_HIGH,
1761 &cron_flush_respect, NULL); 1745 &cron_flush_respect, NULL);
1762} 1746}
1763 1747
1764 1748
@@ -1766,18 +1750,17 @@ GSF_connected_peer_init_ ()
1766 * Shutdown peer management subsystem. 1750 * Shutdown peer management subsystem.
1767 */ 1751 */
1768void 1752void
1769GSF_connected_peer_done_ () 1753GSF_connected_peer_done_()
1770{ 1754{
1771 GNUNET_CONTAINER_multipeermap_iterate (cp_map, 1755 GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1772 &flush_respect, 1756 &flush_respect,
1773 NULL); 1757 NULL);
1774 GNUNET_SCHEDULER_cancel (fr_task); 1758 GNUNET_SCHEDULER_cancel(fr_task);
1775 fr_task = NULL; 1759 fr_task = NULL;
1776 GNUNET_CONTAINER_multipeermap_destroy (cp_map); 1760 GNUNET_CONTAINER_multipeermap_destroy(cp_map);
1777 cp_map = NULL; 1761 cp_map = NULL;
1778 GNUNET_PEERSTORE_disconnect (peerstore, 1762 GNUNET_PEERSTORE_disconnect(peerstore,
1779 GNUNET_YES); 1763 GNUNET_YES);
1780
1781} 1764}
1782 1765
1783 1766
@@ -1790,9 +1773,9 @@ GSF_connected_peer_done_ ()
1790 * @return #GNUNET_YES (we should continue to iterate) 1773 * @return #GNUNET_YES (we should continue to iterate)
1791 */ 1774 */
1792static int 1775static int
1793clean_local_client (void *cls, 1776clean_local_client(void *cls,
1794 const struct GNUNET_PeerIdentity *key, 1777 const struct GNUNET_PeerIdentity *key,
1795 void *value) 1778 void *value)
1796{ 1779{
1797 const struct GSF_LocalClient *lc = cls; 1780 const struct GSF_LocalClient *lc = cls;
1798 struct GSF_ConnectedPeer *cp = value; 1781 struct GSF_ConnectedPeer *cp = value;
@@ -1812,13 +1795,13 @@ clean_local_client (void *cls,
1812 * @param lc handle to the local client (henceforth invalid) 1795 * @param lc handle to the local client (henceforth invalid)
1813 */ 1796 */
1814void 1797void
1815GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc) 1798GSF_handle_local_client_disconnect_(const struct GSF_LocalClient *lc)
1816{ 1799{
1817 if (NULL == cp_map) 1800 if (NULL == cp_map)
1818 return; /* already cleaned up */ 1801 return; /* already cleaned up */
1819 GNUNET_CONTAINER_multipeermap_iterate (cp_map, 1802 GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1820 &clean_local_client, 1803 &clean_local_client,
1821 (void *) lc); 1804 (void *)lc);
1822} 1805}
1823 1806
1824 1807