diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-31 21:23:23 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-31 21:23:23 +0000 |
commit | a78990b412db2c0ead2da8061c4f454f068991d1 (patch) | |
tree | 2e87adae62fd1ca3cd5a9f4c69248986f78bb106 /src/fs/gnunet-service-fs_cp.c | |
parent | 406c7d2d2d126c994a1fff13470b1f96439c6f9d (diff) | |
download | gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.tar.gz gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.zip |
converting FS to new MQ-based core API
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r-- | src/fs/gnunet-service-fs_cp.c | 612 |
1 files changed, 178 insertions, 434 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c index bda33d766..3f7783ded 100644 --- a/src/fs/gnunet-service-fs_cp.c +++ b/src/fs/gnunet-service-fs_cp.c | |||
@@ -1,6 +1,6 @@ | |||
1 | /* | 1 | /* |
2 | This file is part of GNUnet. | 2 | This file is part of GNUnet. |
3 | Copyright (C) 2011 GNUnet e.V. | 3 | Copyright (C) 2011, 2016 GNUnet e.V. |
4 | 4 | ||
5 | GNUnet is free software; you can redistribute it and/or modify | 5 | GNUnet is free software; you can redistribute it and/or modify |
6 | it under the terms of the GNU General Public License as published | 6 | it under the terms of the GNU General Public License as published |
@@ -78,19 +78,9 @@ struct GSF_PeerTransmitHandle | |||
78 | struct GNUNET_TIME_Absolute transmission_request_start_time; | 78 | struct GNUNET_TIME_Absolute transmission_request_start_time; |
79 | 79 | ||
80 | /** | 80 | /** |
81 | * Timeout for this request. | 81 | * Envelope with the actual message. |
82 | */ | 82 | */ |
83 | struct GNUNET_TIME_Absolute timeout; | 83 | struct GNUNET_MQ_Envelope *env; |
84 | |||
85 | /** | ||
86 | * Task called on timeout, or 0 for none. | ||
87 | */ | ||
88 | struct GNUNET_SCHEDULER_Task *timeout_task; | ||
89 | |||
90 | /** | ||
91 | * Function to call to get the actual message. | ||
92 | */ | ||
93 | GSF_GetMessageCallback gmc; | ||
94 | 84 | ||
95 | /** | 85 | /** |
96 | * Peer this request targets. | 86 | * Peer this request targets. |
@@ -98,16 +88,6 @@ struct GSF_PeerTransmitHandle | |||
98 | struct GSF_ConnectedPeer *cp; | 88 | struct GSF_ConnectedPeer *cp; |
99 | 89 | ||
100 | /** | 90 | /** |
101 | * Closure for @e gmc. | ||
102 | */ | ||
103 | void *gmc_cls; | ||
104 | |||
105 | /** | ||
106 | * Size of the message to be transmitted. | ||
107 | */ | ||
108 | size_t size; | ||
109 | |||
110 | /** | ||
111 | * #GNUNET_YES if this is a query, #GNUNET_NO for content. | 91 | * #GNUNET_YES if this is a query, #GNUNET_NO for content. |
112 | */ | 92 | */ |
113 | int is_query; | 93 | int is_query; |
@@ -147,9 +127,9 @@ struct GSF_DelayedHandle | |||
147 | struct GSF_ConnectedPeer *cp; | 127 | struct GSF_ConnectedPeer *cp; |
148 | 128 | ||
149 | /** | 129 | /** |
150 | * The PUT that was delayed. | 130 | * Envelope of the message that was delayed. |
151 | */ | 131 | */ |
152 | struct PutMessage *pm; | 132 | struct GNUNET_MQ_Envelope *env; |
153 | 133 | ||
154 | /** | 134 | /** |
155 | * Task for the delay. | 135 | * Task for the delay. |
@@ -235,11 +215,6 @@ struct GSF_ConnectedPeer | |||
235 | struct GSF_DelayedHandle *delayed_tail; | 215 | struct GSF_DelayedHandle *delayed_tail; |
236 | 216 | ||
237 | /** | 217 | /** |
238 | * Migration stop message in our queue, or NULL if we have none pending. | ||
239 | */ | ||
240 | struct GSF_PeerTransmitHandle *migration_pth; | ||
241 | |||
242 | /** | ||
243 | * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL). | 218 | * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL). |
244 | */ | 219 | */ |
245 | struct GNUNET_ATS_ReservationContext *rc; | 220 | struct GNUNET_ATS_ReservationContext *rc; |
@@ -256,9 +231,9 @@ struct GSF_ConnectedPeer | |||
256 | 231 | ||
257 | /** | 232 | /** |
258 | * Handle for an active request for transmission to this | 233 | * Handle for an active request for transmission to this |
259 | * peer, or NULL (if core queue was full). | 234 | * peer. |
260 | */ | 235 | */ |
261 | struct GNUNET_CORE_TransmitHandle *cth; | 236 | struct GNUNET_MQ_Handle *mq; |
262 | 237 | ||
263 | /** | 238 | /** |
264 | * Increase in traffic preference still to be submitted | 239 | * Increase in traffic preference still to be submitted |
@@ -267,14 +242,6 @@ struct GSF_ConnectedPeer | |||
267 | uint64_t inc_preference; | 242 | uint64_t inc_preference; |
268 | 243 | ||
269 | /** | 244 | /** |
270 | * Set to 1 if we're currently in the process of calling | ||
271 | * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is | ||
272 | * NULL, we should not call notify_transmit_ready for this | ||
273 | * handle right now). | ||
274 | */ | ||
275 | unsigned int cth_in_progress; | ||
276 | |||
277 | /** | ||
278 | * Number of entries in @e delayed_head DLL. | 245 | * Number of entries in @e delayed_head DLL. |
279 | */ | 246 | */ |
280 | unsigned int delay_queue_size; | 247 | unsigned int delay_queue_size; |
@@ -308,16 +275,6 @@ struct GSF_ConnectedPeer | |||
308 | int did_reserve; | 275 | int did_reserve; |
309 | 276 | ||
310 | /** | 277 | /** |
311 | * Function called when the creation of this record is complete. | ||
312 | */ | ||
313 | GSF_ConnectedPeerCreationCallback creation_cb; | ||
314 | |||
315 | /** | ||
316 | * Closure for @e creation_cb | ||
317 | */ | ||
318 | void *creation_cb_cls; | ||
319 | |||
320 | /** | ||
321 | * Handle to the PEERSTORE iterate request for peer respect value | 278 | * Handle to the PEERSTORE iterate request for peer respect value |
322 | */ | 279 | */ |
323 | struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req; | 280 | struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req; |
@@ -377,15 +334,10 @@ GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp) | |||
377 | /** | 334 | /** |
378 | * Core is ready to transmit to a peer, get the message. | 335 | * Core is ready to transmit to a peer, get the message. |
379 | * | 336 | * |
380 | * @param cls the `struct GSF_PeerTransmitHandle` of the message | 337 | * @param cp which peer to send a message to |
381 | * @param size number of bytes core is willing to take | ||
382 | * @param buf where to copy the message | ||
383 | * @return number of bytes copied to @a buf | ||
384 | */ | 338 | */ |
385 | static size_t | 339 | static void |
386 | peer_transmit_ready_cb (void *cls, | 340 | peer_transmit (struct GSF_ConnectedPeer *cp); |
387 | size_t size, | ||
388 | void *buf); | ||
389 | 341 | ||
390 | 342 | ||
391 | /** | 343 | /** |
@@ -418,8 +370,6 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth) | |||
418 | struct GNUNET_PeerIdentity target; | 370 | struct GNUNET_PeerIdentity target; |
419 | 371 | ||
420 | cp = pth->cp; | 372 | cp = pth->cp; |
421 | if ((NULL != cp->cth) || (0 != cp->cth_in_progress)) | ||
422 | return; /* already done */ | ||
423 | GNUNET_assert (0 != cp->ppd.pid); | 373 | GNUNET_assert (0 != cp->ppd.pid); |
424 | GNUNET_PEER_resolve (cp->ppd.pid, &target); | 374 | GNUNET_PEER_resolve (cp->ppd.pid, &target); |
425 | 375 | ||
@@ -449,52 +399,23 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth) | |||
449 | cp); | 399 | cp); |
450 | return; | 400 | return; |
451 | } | 401 | } |
452 | GNUNET_assert (NULL == cp->cth); | 402 | peer_transmit (cp); |
453 | cp->cth_in_progress++; | ||
454 | cp->cth = | ||
455 | GNUNET_CORE_notify_transmit_ready (GSF_core, | ||
456 | GNUNET_YES, | ||
457 | GNUNET_CORE_PRIO_BACKGROUND, | ||
458 | GNUNET_TIME_absolute_get_remaining (pth->timeout), | ||
459 | &target, | ||
460 | pth->size, | ||
461 | &peer_transmit_ready_cb, cp); | ||
462 | GNUNET_assert (NULL != cp->cth); | ||
463 | GNUNET_assert (0 < cp->cth_in_progress--); | ||
464 | } | 403 | } |
465 | 404 | ||
466 | 405 | ||
467 | /** | 406 | /** |
468 | * Core is ready to transmit to a peer, get the message. | 407 | * Core is ready to transmit to a peer, get the message. |
469 | * | 408 | * |
470 | * @param cls the `struct GSF_PeerTransmitHandle` of the message | 409 | * @param cp which peer to send a message to |
471 | * @param size number of bytes core is willing to take | ||
472 | * @param buf where to copy the message | ||
473 | * @return number of bytes copied to @a buf | ||
474 | */ | 410 | */ |
475 | static size_t | 411 | static void |
476 | peer_transmit_ready_cb (void *cls, | 412 | peer_transmit (struct GSF_ConnectedPeer *cp) |
477 | size_t size, | ||
478 | void *buf) | ||
479 | { | 413 | { |
480 | struct GSF_ConnectedPeer *cp = cls; | ||
481 | struct GSF_PeerTransmitHandle *pth = cp->pth_head; | 414 | struct GSF_PeerTransmitHandle *pth = cp->pth_head; |
482 | struct GSF_PeerTransmitHandle *pos; | 415 | struct GSF_PeerTransmitHandle *pos; |
483 | size_t ret; | ||
484 | 416 | ||
485 | cp->cth = NULL; | ||
486 | if (NULL == pth) | 417 | if (NULL == pth) |
487 | return 0; | 418 | return; |
488 | if (pth->size > size) | ||
489 | { | ||
490 | schedule_transmission (pth); | ||
491 | return 0; | ||
492 | } | ||
493 | if (NULL != pth->timeout_task) | ||
494 | { | ||
495 | GNUNET_SCHEDULER_cancel (pth->timeout_task); | ||
496 | pth->timeout_task = NULL; | ||
497 | } | ||
498 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, | 419 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, |
499 | cp->pth_tail, | 420 | cp->pth_tail, |
500 | pth); | 421 | pth); |
@@ -512,14 +433,14 @@ peer_transmit_ready_cb (void *cls, | |||
512 | GNUNET_LOAD_update (cp->ppd.transmission_delay, | 433 | GNUNET_LOAD_update (cp->ppd.transmission_delay, |
513 | GNUNET_TIME_absolute_get_duration | 434 | GNUNET_TIME_absolute_get_duration |
514 | (pth->transmission_request_start_time).rel_value_us); | 435 | (pth->transmission_request_start_time).rel_value_us); |
515 | ret = pth->gmc (pth->gmc_cls, size, buf); | 436 | GNUNET_MQ_send (cp->mq, |
437 | pth->env); | ||
438 | GNUNET_free (pth); | ||
516 | if (NULL != (pos = cp->pth_head)) | 439 | if (NULL != (pos = cp->pth_head)) |
517 | { | 440 | { |
518 | GNUNET_assert (pos != pth); | 441 | GNUNET_assert (pos != pth); |
519 | schedule_transmission (pos); | 442 | schedule_transmission (pos); |
520 | } | 443 | } |
521 | GNUNET_free (pth); | ||
522 | return ret; | ||
523 | } | 444 | } |
524 | 445 | ||
525 | 446 | ||
@@ -578,23 +499,10 @@ ats_reserve_callback (void *cls, | |||
578 | } | 499 | } |
579 | cp->did_reserve = GNUNET_YES; | 500 | cp->did_reserve = GNUNET_YES; |
580 | pth = cp->pth_head; | 501 | pth = cp->pth_head; |
581 | if ( (NULL != pth) && | 502 | if (NULL != pth) |
582 | (NULL == cp->cth) && | ||
583 | (0 == cp->cth_in_progress) ) | ||
584 | { | 503 | { |
585 | /* reservation success, try transmission now! */ | 504 | /* reservation success, try transmission now! */ |
586 | cp->cth_in_progress++; | 505 | peer_transmit (cp); |
587 | cp->cth = | ||
588 | GNUNET_CORE_notify_transmit_ready (GSF_core, | ||
589 | GNUNET_YES, | ||
590 | GNUNET_CORE_PRIO_BACKGROUND, | ||
591 | GNUNET_TIME_absolute_get_remaining (pth->timeout), | ||
592 | peer, | ||
593 | pth->size, | ||
594 | &peer_transmit_ready_cb, | ||
595 | cp); | ||
596 | GNUNET_assert (NULL != cp->cth); | ||
597 | GNUNET_assert (0 < cp->cth_in_progress--); | ||
598 | } | 506 | } |
599 | } | 507 | } |
600 | 508 | ||
@@ -614,11 +522,13 @@ peer_respect_cb (void *cls, | |||
614 | struct GSF_ConnectedPeer *cp = cls; | 522 | struct GSF_ConnectedPeer *cp = cls; |
615 | 523 | ||
616 | GNUNET_assert (NULL != cp->respect_iterate_req); | 524 | GNUNET_assert (NULL != cp->respect_iterate_req); |
617 | if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size)) | 525 | if ( (NULL != record) && |
618 | cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value); | 526 | (sizeof (cp->disk_respect) == record->value_size)) |
527 | { | ||
528 | cp->disk_respect = *((uint32_t *)record->value); | ||
529 | cp->ppd.respect += *((uint32_t *)record->value); | ||
530 | } | ||
619 | GSF_push_start_ (cp); | 531 | GSF_push_start_ (cp); |
620 | if (NULL != cp->creation_cb) | ||
621 | cp->creation_cb (cp->creation_cb_cls, cp); | ||
622 | if (NULL != record) | 532 | if (NULL != record) |
623 | GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); | 533 | GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); |
624 | cp->respect_iterate_req = NULL; | 534 | cp->respect_iterate_req = NULL; |
@@ -626,25 +536,68 @@ peer_respect_cb (void *cls, | |||
626 | 536 | ||
627 | 537 | ||
628 | /** | 538 | /** |
539 | * Function called for each pending request whenever a new | ||
540 | * peer connects, giving us a chance to decide about submitting | ||
541 | * the existing request to the new peer. | ||
542 | * | ||
543 | * @param cls the `struct GSF_ConnectedPeer` of the new peer | ||
544 | * @param key query for the request | ||
545 | * @param pr handle to the pending request | ||
546 | * @return #GNUNET_YES to continue to iterate | ||
547 | */ | ||
548 | static int | ||
549 | consider_peer_for_forwarding (void *cls, | ||
550 | const struct GNUNET_HashCode *key, | ||
551 | struct GSF_PendingRequest *pr) | ||
552 | { | ||
553 | struct GSF_ConnectedPeer *cp = cls; | ||
554 | struct GNUNET_PeerIdentity pid; | ||
555 | |||
556 | if (GNUNET_YES != | ||
557 | GSF_pending_request_test_active_ (pr)) | ||
558 | return GNUNET_YES; /* request is not actually active, skip! */ | ||
559 | GSF_connected_peer_get_identity_ (cp, &pid); | ||
560 | if (GNUNET_YES != | ||
561 | GSF_pending_request_test_target_ (pr, &pid)) | ||
562 | { | ||
563 | GNUNET_STATISTICS_update (GSF_stats, | ||
564 | gettext_noop ("# Loopback routes suppressed"), | ||
565 | 1, | ||
566 | GNUNET_NO); | ||
567 | return GNUNET_YES; | ||
568 | } | ||
569 | GSF_plan_add_ (cp, pr); | ||
570 | return GNUNET_YES; | ||
571 | } | ||
572 | |||
573 | |||
574 | /** | ||
629 | * A peer connected to us. Setup the connected peer | 575 | * A peer connected to us. Setup the connected peer |
630 | * records. | 576 | * records. |
631 | * | 577 | * |
578 | * @param cls NULL | ||
632 | * @param peer identity of peer that connected | 579 | * @param peer identity of peer that connected |
633 | * @param creation_cb callback function when the record is created. | 580 | * @param mq message queue for talking to @a peer |
634 | * @param creation_cb_cls closure for @creation_cb | 581 | * @return our internal handle for the peer |
635 | */ | 582 | */ |
636 | void | 583 | void * |
637 | GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, | 584 | GSF_peer_connect_handler (void *cls, |
638 | GSF_ConnectedPeerCreationCallback creation_cb, | 585 | const struct GNUNET_PeerIdentity *peer, |
639 | void *creation_cb_cls) | 586 | struct GNUNET_MQ_Handle *mq) |
640 | { | 587 | { |
641 | struct GSF_ConnectedPeer *cp; | 588 | struct GSF_ConnectedPeer *cp; |
642 | 589 | ||
590 | if (0 == | ||
591 | GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id, | ||
592 | peer)) | ||
593 | return NULL; | ||
643 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 594 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
644 | "Connected to peer %s\n", | 595 | "Connected to peer %s\n", |
645 | GNUNET_i2s (peer)); | 596 | GNUNET_i2s (peer)); |
646 | cp = GNUNET_new (struct GSF_ConnectedPeer); | 597 | cp = GNUNET_new (struct GSF_ConnectedPeer); |
647 | cp->ppd.pid = GNUNET_PEER_intern (peer); | 598 | cp->ppd.pid = GNUNET_PEER_intern (peer); |
599 | cp->ppd.peer = peer; | ||
600 | cp->mq = mq; | ||
648 | cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO); | 601 | cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO); |
649 | cp->rc = | 602 | cp->rc = |
650 | GNUNET_ATS_reserve_bandwidth (GSF_ats, | 603 | GNUNET_ATS_reserve_bandwidth (GSF_ats, |
@@ -662,14 +615,17 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, | |||
662 | gettext_noop ("# peers connected"), | 615 | gettext_noop ("# peers connected"), |
663 | GNUNET_CONTAINER_multipeermap_size (cp_map), | 616 | GNUNET_CONTAINER_multipeermap_size (cp_map), |
664 | GNUNET_NO); | 617 | GNUNET_NO); |
665 | cp->creation_cb = creation_cb; | 618 | cp->respect_iterate_req |
666 | cp->creation_cb_cls = creation_cb_cls; | 619 | = GNUNET_PEERSTORE_iterate (peerstore, |
667 | cp->respect_iterate_req = | 620 | "fs", |
668 | GNUNET_PEERSTORE_iterate (peerstore, "fs", | 621 | peer, |
669 | peer, "respect", | 622 | "respect", |
670 | GNUNET_TIME_UNIT_FOREVER_REL, | 623 | GNUNET_TIME_UNIT_FOREVER_REL, |
671 | &peer_respect_cb, | 624 | &peer_respect_cb, |
672 | cp); | 625 | cp); |
626 | GSF_iterate_pending_requests_ (&consider_peer_for_forwarding, | ||
627 | cp); | ||
628 | return cp; | ||
673 | } | 629 | } |
674 | 630 | ||
675 | 631 | ||
@@ -714,38 +670,25 @@ GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer) | |||
714 | 670 | ||
715 | 671 | ||
716 | /** | 672 | /** |
717 | * Handle P2P "MIGRATION_STOP" message. | 673 | * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message. |
718 | * | 674 | * |
719 | * @param cls closure, always NULL | 675 | * @param cls closure, the `struct GSF_ConnectedPeer` |
720 | * @param other the other peer involved (sender or receiver, NULL | 676 | * @param msm the actual message |
721 | * for loopback messages where we are both sender and receiver) | ||
722 | * @param message the actual message | ||
723 | * @return #GNUNET_OK to keep the connection open, | ||
724 | * #GNUNET_SYSERR to close it (signal serious error) | ||
725 | */ | 677 | */ |
726 | int | 678 | void |
727 | GSF_handle_p2p_migration_stop_ (void *cls, | 679 | handle_p2p_migration_stop (void *cls, |
728 | const struct GNUNET_PeerIdentity *other, | 680 | const struct MigrationStopMessage *msm) |
729 | const struct GNUNET_MessageHeader *message) | ||
730 | { | 681 | { |
731 | struct GSF_ConnectedPeer *cp; | 682 | struct GSF_ConnectedPeer *cp = cls; |
732 | const struct MigrationStopMessage *msm; | ||
733 | struct GNUNET_TIME_Relative bt; | 683 | struct GNUNET_TIME_Relative bt; |
734 | 684 | ||
735 | msm = (const struct MigrationStopMessage *) message; | ||
736 | cp = GSF_peer_get_ (other); | ||
737 | if (NULL == cp) | ||
738 | { | ||
739 | GNUNET_break (0); | ||
740 | return GNUNET_OK; | ||
741 | } | ||
742 | GNUNET_STATISTICS_update (GSF_stats, | 685 | GNUNET_STATISTICS_update (GSF_stats, |
743 | gettext_noop ("# migration stop messages received"), | 686 | gettext_noop ("# migration stop messages received"), |
744 | 1, GNUNET_NO); | 687 | 1, GNUNET_NO); |
745 | bt = GNUNET_TIME_relative_ntoh (msm->duration); | 688 | bt = GNUNET_TIME_relative_ntoh (msm->duration); |
746 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | 689 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
747 | _("Migration of content to peer `%s' blocked for %s\n"), | 690 | _("Migration of content to peer `%s' blocked for %s\n"), |
748 | GNUNET_i2s (other), | 691 | GNUNET_i2s (cp->ppd.peer), |
749 | GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES)); | 692 | GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES)); |
750 | cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt); | 693 | cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt); |
751 | if ( (NULL == cp->mig_revive_task) && | 694 | if ( (NULL == cp->mig_revive_task) && |
@@ -756,46 +699,6 @@ GSF_handle_p2p_migration_stop_ (void *cls, | |||
756 | GNUNET_SCHEDULER_add_delayed (bt, | 699 | GNUNET_SCHEDULER_add_delayed (bt, |
757 | &revive_migration, cp); | 700 | &revive_migration, cp); |
758 | } | 701 | } |
759 | return GNUNET_OK; | ||
760 | } | ||
761 | |||
762 | |||
763 | /** | ||
764 | * Copy reply and free put message. | ||
765 | * | ||
766 | * @param cls the `struct PutMessage` | ||
767 | * @param buf_size number of bytes available in @a buf | ||
768 | * @param buf where to copy the message, NULL on error (peer disconnect) | ||
769 | * @return number of bytes copied to @a buf, can be 0 (without indicating an error) | ||
770 | */ | ||
771 | static size_t | ||
772 | copy_reply (void *cls, | ||
773 | size_t buf_size, | ||
774 | void *buf) | ||
775 | { | ||
776 | struct PutMessage *pm = cls; | ||
777 | size_t size; | ||
778 | |||
779 | if (NULL != buf) | ||
780 | { | ||
781 | GNUNET_assert (buf_size >= ntohs (pm->header.size)); | ||
782 | size = ntohs (pm->header.size); | ||
783 | GNUNET_memcpy (buf, pm, size); | ||
784 | GNUNET_STATISTICS_update (GSF_stats, | ||
785 | gettext_noop ("# replies transmitted to other peers"), | ||
786 | 1, | ||
787 | GNUNET_NO); | ||
788 | } | ||
789 | else | ||
790 | { | ||
791 | size = 0; | ||
792 | GNUNET_STATISTICS_update (GSF_stats, | ||
793 | gettext_noop ("# replies dropped"), | ||
794 | 1, | ||
795 | GNUNET_NO); | ||
796 | } | ||
797 | GNUNET_free (pm); | ||
798 | return size; | ||
799 | } | 702 | } |
800 | 703 | ||
801 | 704 | ||
@@ -886,13 +789,10 @@ transmit_delayed_now (void *cls) | |||
886 | cp->delayed_tail, | 789 | cp->delayed_tail, |
887 | dh); | 790 | dh); |
888 | cp->delay_queue_size--; | 791 | cp->delay_queue_size--; |
889 | (void) GSF_peer_transmit_ (cp, | 792 | GSF_peer_transmit_ (cp, |
890 | GNUNET_NO, | 793 | GNUNET_NO, |
891 | UINT32_MAX, | 794 | UINT32_MAX, |
892 | REPLY_TIMEOUT, | 795 | dh->env); |
893 | dh->msize, | ||
894 | ©_reply, | ||
895 | dh->pm); | ||
896 | GNUNET_free (dh); | 796 | GNUNET_free (dh); |
897 | } | 797 | } |
898 | 798 | ||
@@ -954,6 +854,7 @@ handle_p2p_reply (void *cls, | |||
954 | struct PeerRequest *peerreq = cls; | 854 | struct PeerRequest *peerreq = cls; |
955 | struct GSF_ConnectedPeer *cp = peerreq->cp; | 855 | struct GSF_ConnectedPeer *cp = peerreq->cp; |
956 | struct GSF_PendingRequestData *prd; | 856 | struct GSF_PendingRequestData *prd; |
857 | struct GNUNET_MQ_Envelope *env; | ||
957 | struct PutMessage *pm; | 858 | struct PutMessage *pm; |
958 | size_t msize; | 859 | size_t msize; |
959 | 860 | ||
@@ -1000,12 +901,14 @@ handle_p2p_reply (void *cls, | |||
1000 | GSF_cover_content_count -= (reply_anonymity_level - 1); | 901 | GSF_cover_content_count -= (reply_anonymity_level - 1); |
1001 | } | 902 | } |
1002 | 903 | ||
1003 | pm = GNUNET_malloc (msize); | 904 | env = GNUNET_MQ_msg_extra (pm, |
1004 | pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); | 905 | data_len, |
1005 | pm->header.size = htons (msize); | 906 | GNUNET_MESSAGE_TYPE_FS_PUT); |
1006 | pm->type = htonl (type); | 907 | pm->type = htonl (type); |
1007 | pm->expiration = GNUNET_TIME_absolute_hton (expiration); | 908 | pm->expiration = GNUNET_TIME_absolute_hton (expiration); |
1008 | GNUNET_memcpy (&pm[1], data, data_len); | 909 | GNUNET_memcpy (&pm[1], |
910 | data, | ||
911 | data_len); | ||
1009 | if ( (UINT32_MAX != reply_anonymity_level) && | 912 | if ( (UINT32_MAX != reply_anonymity_level) && |
1010 | (0 != reply_anonymity_level) && | 913 | (0 != reply_anonymity_level) && |
1011 | (GNUNET_YES == GSF_enable_randomized_delays) ) | 914 | (GNUNET_YES == GSF_enable_randomized_delays) ) |
@@ -1014,7 +917,7 @@ handle_p2p_reply (void *cls, | |||
1014 | 917 | ||
1015 | dh = GNUNET_new (struct GSF_DelayedHandle); | 918 | dh = GNUNET_new (struct GSF_DelayedHandle); |
1016 | dh->cp = cp; | 919 | dh->cp = cp; |
1017 | dh->pm = pm; | 920 | dh->env = env; |
1018 | dh->msize = msize; | 921 | dh->msize = msize; |
1019 | GNUNET_CONTAINER_DLL_insert (cp->delayed_head, | 922 | GNUNET_CONTAINER_DLL_insert (cp->delayed_head, |
1020 | cp->delayed_tail, | 923 | cp->delayed_tail, |
@@ -1027,13 +930,10 @@ handle_p2p_reply (void *cls, | |||
1027 | } | 930 | } |
1028 | else | 931 | else |
1029 | { | 932 | { |
1030 | (void) GSF_peer_transmit_ (cp, | 933 | GSF_peer_transmit_ (cp, |
1031 | GNUNET_NO, | 934 | GNUNET_NO, |
1032 | UINT32_MAX, | 935 | UINT32_MAX, |
1033 | REPLY_TIMEOUT, | 936 | env); |
1034 | msize, | ||
1035 | ©_reply, | ||
1036 | pm); | ||
1037 | } | 937 | } |
1038 | if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval) | 938 | if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval) |
1039 | return; | 939 | return; |
@@ -1265,23 +1165,20 @@ test_exist_cb (void *cls, | |||
1265 | * process replies properly. Does not initiate forwarding or | 1165 | * process replies properly. Does not initiate forwarding or |
1266 | * local database lookups. | 1166 | * local database lookups. |
1267 | * | 1167 | * |
1268 | * @param other the other peer involved (sender or receiver, NULL | 1168 | * @param cls the other peer involved (sender of the message) |
1269 | * for loopback messages where we are both sender and receiver) | 1169 | * @param gm the GET message |
1270 | * @param message the actual message | ||
1271 | * @return pending request handle, NULL on error | ||
1272 | */ | 1170 | */ |
1273 | struct GSF_PendingRequest * | 1171 | void |
1274 | GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | 1172 | handle_p2p_get (void *cls, |
1275 | const struct GNUNET_MessageHeader *message) | 1173 | const struct GetMessage *gm) |
1276 | { | 1174 | { |
1175 | struct GSF_ConnectedPeer *cps = cls; | ||
1277 | struct PeerRequest *peerreq; | 1176 | struct PeerRequest *peerreq; |
1278 | struct GSF_PendingRequest *pr; | 1177 | struct GSF_PendingRequest *pr; |
1279 | struct GSF_ConnectedPeer *cp; | 1178 | struct GSF_ConnectedPeer *cp; |
1280 | struct GSF_ConnectedPeer *cps; | ||
1281 | const struct GNUNET_PeerIdentity *target; | 1179 | const struct GNUNET_PeerIdentity *target; |
1282 | enum GSF_PendingRequestOptions options; | 1180 | enum GSF_PendingRequestOptions options; |
1283 | uint16_t msize; | 1181 | uint16_t msize; |
1284 | const struct GetMessage *gm; | ||
1285 | unsigned int bits; | 1182 | unsigned int bits; |
1286 | const struct GNUNET_PeerIdentity *opt; | 1183 | const struct GNUNET_PeerIdentity *opt; |
1287 | uint32_t bm; | 1184 | uint32_t bm; |
@@ -1291,18 +1188,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1291 | GNUNET_PEER_Id spid; | 1188 | GNUNET_PEER_Id spid; |
1292 | const struct GSF_PendingRequestData *prd; | 1189 | const struct GSF_PendingRequestData *prd; |
1293 | 1190 | ||
1294 | msize = ntohs (message->size); | 1191 | msize = ntohs (gm->header.size); |
1295 | if (msize < sizeof (struct GetMessage)) | ||
1296 | { | ||
1297 | GNUNET_break_op (0); | ||
1298 | return NULL; | ||
1299 | } | ||
1300 | GNUNET_STATISTICS_update (GSF_stats, | ||
1301 | gettext_noop | ||
1302 | ("# GET requests received (from other peers)"), | ||
1303 | 1, | ||
1304 | GNUNET_NO); | ||
1305 | gm = (const struct GetMessage *) message; | ||
1306 | tec.type = ntohl (gm->type); | 1192 | tec.type = ntohl (gm->type); |
1307 | bm = ntohl (gm->hash_bitmap); | 1193 | bm = ntohl (gm->hash_bitmap); |
1308 | bits = 0; | 1194 | bits = 0; |
@@ -1312,32 +1198,16 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1312 | bits++; | 1198 | bits++; |
1313 | bm >>= 1; | 1199 | bm >>= 1; |
1314 | } | 1200 | } |
1315 | if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity)) | ||
1316 | { | ||
1317 | GNUNET_break_op (0); | ||
1318 | return NULL; | ||
1319 | } | ||
1320 | opt = (const struct GNUNET_PeerIdentity *) &gm[1]; | 1201 | opt = (const struct GNUNET_PeerIdentity *) &gm[1]; |
1321 | bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity); | 1202 | bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity); |
1322 | /* bfsize must be power of 2, check! */ | 1203 | GNUNET_STATISTICS_update (GSF_stats, |
1323 | if (0 != ((bfsize - 1) & bfsize)) | 1204 | gettext_noop |
1324 | { | 1205 | ("# GET requests received (from other peers)"), |
1325 | GNUNET_break_op (0); | 1206 | 1, |
1326 | return NULL; | 1207 | GNUNET_NO); |
1327 | } | ||
1328 | GSF_cover_query_count++; | 1208 | GSF_cover_query_count++; |
1329 | bm = ntohl (gm->hash_bitmap); | 1209 | bm = ntohl (gm->hash_bitmap); |
1330 | bits = 0; | 1210 | bits = 0; |
1331 | cps = GSF_peer_get_ (other); | ||
1332 | if (NULL == cps) | ||
1333 | { | ||
1334 | /* peer must have just disconnected */ | ||
1335 | GNUNET_STATISTICS_update (GSF_stats, | ||
1336 | gettext_noop | ||
1337 | ("# requests dropped due to initiator not being connected"), | ||
1338 | 1, GNUNET_NO); | ||
1339 | return NULL; | ||
1340 | } | ||
1341 | if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) | 1211 | if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) |
1342 | cp = GSF_peer_get_ (&opt[bits++]); | 1212 | cp = GSF_peer_get_ (&opt[bits++]); |
1343 | else | 1213 | else |
@@ -1352,24 +1222,24 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1352 | else | 1222 | else |
1353 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1223 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1354 | "Failed to find peer `%s' in connection set. Dropping query.\n", | 1224 | "Failed to find peer `%s' in connection set. Dropping query.\n", |
1355 | GNUNET_i2s (other)); | 1225 | GNUNET_i2s (cps->ppd.peer)); |
1356 | GNUNET_STATISTICS_update (GSF_stats, | 1226 | GNUNET_STATISTICS_update (GSF_stats, |
1357 | gettext_noop | 1227 | gettext_noop |
1358 | ("# requests dropped due to missing reverse route"), | 1228 | ("# requests dropped due to missing reverse route"), |
1359 | 1, | 1229 | 1, |
1360 | GNUNET_NO); | 1230 | GNUNET_NO); |
1361 | return NULL; | 1231 | return; |
1362 | } | 1232 | } |
1363 | if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER) | 1233 | if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER) |
1364 | { | 1234 | { |
1365 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1235 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1366 | "Peer `%s' has too many replies queued already. Dropping query.\n", | 1236 | "Peer `%s' has too many replies queued already. Dropping query.\n", |
1367 | GNUNET_i2s (other)); | 1237 | GNUNET_i2s (cps->ppd.peer)); |
1368 | GNUNET_STATISTICS_update (GSF_stats, | 1238 | GNUNET_STATISTICS_update (GSF_stats, |
1369 | gettext_noop ("# requests dropped due to full reply queue"), | 1239 | gettext_noop ("# requests dropped due to full reply queue"), |
1370 | 1, | 1240 | 1, |
1371 | GNUNET_NO); | 1241 | GNUNET_NO); |
1372 | return NULL; | 1242 | return; |
1373 | } | 1243 | } |
1374 | /* note that we can really only check load here since otherwise | 1244 | /* note that we can really only check load here since otherwise |
1375 | * peers could find out that we are overloaded by not being | 1245 | * peers could find out that we are overloaded by not being |
@@ -1380,14 +1250,14 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1380 | { | 1250 | { |
1381 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1251 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1382 | "Dropping query from `%s', this peer is too busy.\n", | 1252 | "Dropping query from `%s', this peer is too busy.\n", |
1383 | GNUNET_i2s (other)); | 1253 | GNUNET_i2s (cps->ppd.peer)); |
1384 | return NULL; | 1254 | return; |
1385 | } | 1255 | } |
1386 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1256 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1387 | "Received request for `%s' of type %u from peer `%s' with flags %u\n", | 1257 | "Received request for `%s' of type %u from peer `%s' with flags %u\n", |
1388 | GNUNET_h2s (&gm->query), | 1258 | GNUNET_h2s (&gm->query), |
1389 | (unsigned int) tec.type, | 1259 | (unsigned int) tec.type, |
1390 | GNUNET_i2s (other), | 1260 | GNUNET_i2s (cps->ppd.peer), |
1391 | (unsigned int) bm); | 1261 | (unsigned int) bm); |
1392 | target = | 1262 | target = |
1393 | (0 != | 1263 | (0 != |
@@ -1403,7 +1273,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1403 | * so at best indirect the query */ | 1273 | * so at best indirect the query */ |
1404 | tec.priority = 0; | 1274 | tec.priority = 0; |
1405 | options |= GSF_PRO_FORWARD_ONLY; | 1275 | options |= GSF_PRO_FORWARD_ONLY; |
1406 | spid = GNUNET_PEER_intern (other); | 1276 | spid = GNUNET_PEER_intern (cps->ppd.peer); |
1407 | GNUNET_assert (0 != spid); | 1277 | GNUNET_assert (0 != spid); |
1408 | } | 1278 | } |
1409 | tec.ttl = bound_ttl (ntohl (gm->ttl), | 1279 | tec.ttl = bound_ttl (ntohl (gm->ttl), |
@@ -1412,11 +1282,12 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1412 | ttl_decrement = | 1282 | ttl_decrement = |
1413 | 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, | 1283 | 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, |
1414 | TTL_DECREMENT); | 1284 | TTL_DECREMENT); |
1415 | if ((tec.ttl < 0) && (((int32_t) (tec.ttl - ttl_decrement)) > 0)) | 1285 | if ( (tec.ttl < 0) && |
1286 | (((int32_t) (tec.ttl - ttl_decrement)) > 0) ) | ||
1416 | { | 1287 | { |
1417 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1288 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1418 | "Dropping query from `%s' due to TTL underflow (%d - %u).\n", | 1289 | "Dropping query from `%s' due to TTL underflow (%d - %u).\n", |
1419 | GNUNET_i2s (other), | 1290 | GNUNET_i2s (cps->ppd.peer), |
1420 | tec.ttl, | 1291 | tec.ttl, |
1421 | ttl_decrement); | 1292 | ttl_decrement); |
1422 | GNUNET_STATISTICS_update (GSF_stats, | 1293 | GNUNET_STATISTICS_update (GSF_stats, |
@@ -1424,7 +1295,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1424 | ("# requests dropped due TTL underflow"), 1, | 1295 | ("# requests dropped due TTL underflow"), 1, |
1425 | GNUNET_NO); | 1296 | GNUNET_NO); |
1426 | /* integer underflow => drop (should be very rare)! */ | 1297 | /* integer underflow => drop (should be very rare)! */ |
1427 | return NULL; | 1298 | return; |
1428 | } | 1299 | } |
1429 | tec.ttl -= ttl_decrement; | 1300 | tec.ttl -= ttl_decrement; |
1430 | 1301 | ||
@@ -1435,7 +1306,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1435 | &test_exist_cb, | 1306 | &test_exist_cb, |
1436 | &tec); | 1307 | &tec); |
1437 | if (GNUNET_YES == tec.finished) | 1308 | if (GNUNET_YES == tec.finished) |
1438 | return NULL; /* merged into existing request, we're done */ | 1309 | return; /* merged into existing request, we're done */ |
1439 | 1310 | ||
1440 | peerreq = GNUNET_new (struct PeerRequest); | 1311 | peerreq = GNUNET_new (struct PeerRequest); |
1441 | peerreq->cp = cp; | 1312 | peerreq->cp = cp; |
@@ -1452,7 +1323,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1452 | (uint32_t) tec.priority, | 1323 | (uint32_t) tec.priority, |
1453 | tec.ttl, | 1324 | tec.ttl, |
1454 | spid, | 1325 | spid, |
1455 | GNUNET_PEER_intern (other), | 1326 | GNUNET_PEER_intern (cps->ppd.peer), |
1456 | NULL, 0, /* replies_seen */ | 1327 | NULL, 0, /* replies_seen */ |
1457 | &handle_p2p_reply, | 1328 | &handle_p2p_reply, |
1458 | peerreq); | 1329 | peerreq); |
@@ -1472,43 +1343,10 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, | |||
1472 | gettext_noop ("# P2P searches active"), | 1343 | gettext_noop ("# P2P searches active"), |
1473 | 1, | 1344 | 1, |
1474 | GNUNET_NO); | 1345 | GNUNET_NO); |
1475 | return pr; | 1346 | GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES; |
1476 | } | 1347 | GSF_local_lookup_ (pr, |
1477 | 1348 | &GSF_consider_forwarding, | |
1478 | 1349 | NULL); | |
1479 | /** | ||
1480 | * Function called if there has been a timeout trying to satisfy | ||
1481 | * a transmission request. | ||
1482 | * | ||
1483 | * @param cls the `struct GSF_PeerTransmitHandle` of the request | ||
1484 | */ | ||
1485 | static void | ||
1486 | peer_transmit_timeout (void *cls) | ||
1487 | { | ||
1488 | struct GSF_PeerTransmitHandle *pth = cls; | ||
1489 | struct GSF_ConnectedPeer *cp; | ||
1490 | |||
1491 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1492 | "Timeout trying to transmit to other peer\n"); | ||
1493 | pth->timeout_task = NULL; | ||
1494 | cp = pth->cp; | ||
1495 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, | ||
1496 | cp->pth_tail, | ||
1497 | pth); | ||
1498 | if (GNUNET_YES == pth->is_query) | ||
1499 | GNUNET_assert (0 < cp->ppd.pending_queries--); | ||
1500 | else if (GNUNET_NO == pth->is_query) | ||
1501 | GNUNET_assert (0 < cp->ppd.pending_replies--); | ||
1502 | GNUNET_LOAD_update (cp->ppd.transmission_delay, | ||
1503 | UINT64_MAX); | ||
1504 | if (NULL != cp->cth) | ||
1505 | { | ||
1506 | GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); | ||
1507 | cp->cth = NULL; | ||
1508 | } | ||
1509 | pth->gmc (pth->gmc_cls, 0, NULL); | ||
1510 | GNUNET_assert (0 == cp->cth_in_progress); | ||
1511 | GNUNET_free (pth); | ||
1512 | } | 1350 | } |
1513 | 1351 | ||
1514 | 1352 | ||
@@ -1520,19 +1358,15 @@ peer_transmit_timeout (void *cls) | |||
1520 | * @param cp target peer | 1358 | * @param cp target peer |
1521 | * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR) | 1359 | * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR) |
1522 | * @param priority how important is this request? | 1360 | * @param priority how important is this request? |
1523 | * @param timeout when does this request timeout (call gmc with error) | 1361 | * @param timeout when does this request timeout |
1524 | * @param size number of bytes we would like to send to the peer | 1362 | * @param size number of bytes we would like to send to the peer |
1525 | * @param gmc function to call to get the message | 1363 | * @param env message to send |
1526 | * @param gmc_cls closure for @a gmc | ||
1527 | * @return handle to cancel request | ||
1528 | */ | 1364 | */ |
1529 | struct GSF_PeerTransmitHandle * | 1365 | void |
1530 | GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, | 1366 | GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, |
1531 | int is_query, | 1367 | int is_query, |
1532 | uint32_t priority, | 1368 | uint32_t priority, |
1533 | struct GNUNET_TIME_Relative timeout, | 1369 | struct GNUNET_MQ_Envelope *env) |
1534 | size_t size, | ||
1535 | GSF_GetMessageCallback gmc, void *gmc_cls) | ||
1536 | { | 1370 | { |
1537 | struct GSF_PeerTransmitHandle *pth; | 1371 | struct GSF_PeerTransmitHandle *pth; |
1538 | struct GSF_PeerTransmitHandle *pos; | 1372 | struct GSF_PeerTransmitHandle *pos; |
@@ -1540,10 +1374,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, | |||
1540 | 1374 | ||
1541 | pth = GNUNET_new (struct GSF_PeerTransmitHandle); | 1375 | pth = GNUNET_new (struct GSF_PeerTransmitHandle); |
1542 | pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); | 1376 | pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); |
1543 | pth->timeout = GNUNET_TIME_relative_to_absolute (timeout); | 1377 | pth->env = env; |
1544 | pth->gmc = gmc; | ||
1545 | pth->gmc_cls = gmc_cls; | ||
1546 | pth->size = size; | ||
1547 | pth->is_query = is_query; | 1378 | pth->is_query = is_query; |
1548 | pth->priority = priority; | 1379 | pth->priority = priority; |
1549 | pth->cp = cp; | 1380 | pth->cp = cp; |
@@ -1563,39 +1394,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, | |||
1563 | cp->ppd.pending_queries++; | 1394 | cp->ppd.pending_queries++; |
1564 | else if (GNUNET_NO == is_query) | 1395 | else if (GNUNET_NO == is_query) |
1565 | cp->ppd.pending_replies++; | 1396 | cp->ppd.pending_replies++; |
1566 | pth->timeout_task | ||
1567 | = GNUNET_SCHEDULER_add_delayed (timeout, | ||
1568 | &peer_transmit_timeout, | ||
1569 | pth); | ||
1570 | schedule_transmission (pth); | 1397 | schedule_transmission (pth); |
1571 | return pth; | ||
1572 | } | ||
1573 | |||
1574 | |||
1575 | /** | ||
1576 | * Cancel an earlier request for transmission. | ||
1577 | * | ||
1578 | * @param pth request to cancel | ||
1579 | */ | ||
1580 | void | ||
1581 | GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth) | ||
1582 | { | ||
1583 | struct GSF_ConnectedPeer *cp; | ||
1584 | |||
1585 | if (NULL != pth->timeout_task) | ||
1586 | { | ||
1587 | GNUNET_SCHEDULER_cancel (pth->timeout_task); | ||
1588 | pth->timeout_task = NULL; | ||
1589 | } | ||
1590 | cp = pth->cp; | ||
1591 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, | ||
1592 | cp->pth_tail, | ||
1593 | pth); | ||
1594 | if (GNUNET_YES == pth->is_query) | ||
1595 | GNUNET_assert (0 < cp->ppd.pending_queries--); | ||
1596 | else if (GNUNET_NO == pth->is_query) | ||
1597 | GNUNET_assert (0 < cp->ppd.pending_replies--); | ||
1598 | GNUNET_free (pth); | ||
1599 | } | 1398 | } |
1600 | 1399 | ||
1601 | 1400 | ||
@@ -1683,7 +1482,9 @@ flush_respect (void *cls, | |||
1683 | GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect, | 1482 | GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect, |
1684 | sizeof (cp->ppd.respect), | 1483 | sizeof (cp->ppd.respect), |
1685 | GNUNET_TIME_UNIT_FOREVER_ABS, | 1484 | GNUNET_TIME_UNIT_FOREVER_ABS, |
1686 | GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL); | 1485 | GNUNET_PEERSTORE_STOREOPTION_REPLACE, |
1486 | NULL, | ||
1487 | NULL); | ||
1687 | return GNUNET_OK; | 1488 | return GNUNET_OK; |
1688 | } | 1489 | } |
1689 | 1490 | ||
@@ -1693,26 +1494,30 @@ flush_respect (void *cls, | |||
1693 | * record. | 1494 | * record. |
1694 | * | 1495 | * |
1695 | * @param cls unused | 1496 | * @param cls unused |
1696 | * @param peer identity of peer that connected | 1497 | * @param peer identity of peer that disconnected |
1498 | * @param internal_cls the corresponding `struct GSF_ConnectedPeer` | ||
1697 | */ | 1499 | */ |
1698 | void | 1500 | void |
1699 | GSF_peer_disconnect_handler_ (void *cls, | 1501 | GSF_peer_disconnect_handler (void *cls, |
1700 | const struct GNUNET_PeerIdentity *peer) | 1502 | const struct GNUNET_PeerIdentity *peer, |
1503 | void *internal_cls) | ||
1701 | { | 1504 | { |
1702 | struct GSF_ConnectedPeer *cp; | 1505 | struct GSF_ConnectedPeer *cp = internal_cls; |
1703 | struct GSF_PeerTransmitHandle *pth; | 1506 | struct GSF_PeerTransmitHandle *pth; |
1704 | struct GSF_DelayedHandle *dh; | 1507 | struct GSF_DelayedHandle *dh; |
1705 | 1508 | ||
1706 | cp = GSF_peer_get_ (peer); | ||
1707 | if (NULL == cp) | 1509 | if (NULL == cp) |
1708 | return; /* must have been disconnect from core with | 1510 | return; /* must have been disconnect from core with |
1709 | * 'peer' == my_id, ignore */ | 1511 | * 'peer' == my_id, ignore */ |
1710 | flush_respect (NULL, peer, cp); | 1512 | flush_respect (NULL, |
1513 | peer, | ||
1514 | cp); | ||
1711 | GNUNET_assert (GNUNET_YES == | 1515 | GNUNET_assert (GNUNET_YES == |
1712 | GNUNET_CONTAINER_multipeermap_remove (cp_map, | 1516 | GNUNET_CONTAINER_multipeermap_remove (cp_map, |
1713 | peer, | 1517 | peer, |
1714 | cp)); | 1518 | cp)); |
1715 | GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"), | 1519 | GNUNET_STATISTICS_set (GSF_stats, |
1520 | gettext_noop ("# peers connected"), | ||
1716 | GNUNET_CONTAINER_multipeermap_size (cp_map), | 1521 | GNUNET_CONTAINER_multipeermap_size (cp_map), |
1717 | GNUNET_NO); | 1522 | GNUNET_NO); |
1718 | if (NULL != cp->respect_iterate_req) | 1523 | if (NULL != cp->respect_iterate_req) |
@@ -1720,11 +1525,6 @@ GSF_peer_disconnect_handler_ (void *cls, | |||
1720 | GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); | 1525 | GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); |
1721 | cp->respect_iterate_req = NULL; | 1526 | cp->respect_iterate_req = NULL; |
1722 | } | 1527 | } |
1723 | if (NULL != cp->migration_pth) | ||
1724 | { | ||
1725 | GSF_peer_transmit_cancel_ (cp->migration_pth); | ||
1726 | cp->migration_pth = NULL; | ||
1727 | } | ||
1728 | if (NULL != cp->rc) | 1528 | if (NULL != cp->rc) |
1729 | { | 1529 | { |
1730 | GNUNET_ATS_reserve_bandwidth_cancel (cp->rc); | 1530 | GNUNET_ATS_reserve_bandwidth_cancel (cp->rc); |
@@ -1748,19 +1548,8 @@ GSF_peer_disconnect_handler_ (void *cls, | |||
1748 | 0, | 1548 | 0, |
1749 | sizeof (cp->ppd.last_p2p_replies)); | 1549 | sizeof (cp->ppd.last_p2p_replies)); |
1750 | GSF_push_stop_ (cp); | 1550 | GSF_push_stop_ (cp); |
1751 | if (NULL != cp->cth) | ||
1752 | { | ||
1753 | GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); | ||
1754 | cp->cth = NULL; | ||
1755 | } | ||
1756 | GNUNET_assert (0 == cp->cth_in_progress); | ||
1757 | while (NULL != (pth = cp->pth_head)) | 1551 | while (NULL != (pth = cp->pth_head)) |
1758 | { | 1552 | { |
1759 | if (pth->timeout_task != NULL) | ||
1760 | { | ||
1761 | GNUNET_SCHEDULER_cancel (pth->timeout_task); | ||
1762 | pth->timeout_task = NULL; | ||
1763 | } | ||
1764 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, | 1553 | GNUNET_CONTAINER_DLL_remove (cp->pth_head, |
1765 | cp->pth_tail, | 1554 | cp->pth_tail, |
1766 | pth); | 1555 | pth); |
@@ -1768,7 +1557,6 @@ GSF_peer_disconnect_handler_ (void *cls, | |||
1768 | GNUNET_assert (0 < cp->ppd.pending_queries--); | 1557 | GNUNET_assert (0 < cp->ppd.pending_queries--); |
1769 | else if (GNUNET_NO == pth->is_query) | 1558 | else if (GNUNET_NO == pth->is_query) |
1770 | GNUNET_assert (0 < cp->ppd.pending_replies--); | 1559 | GNUNET_assert (0 < cp->ppd.pending_replies--); |
1771 | pth->gmc (pth->gmc_cls, 0, NULL); | ||
1772 | GNUNET_free (pth); | 1560 | GNUNET_free (pth); |
1773 | } | 1561 | } |
1774 | while (NULL != (dh = cp->delayed_head)) | 1562 | while (NULL != (dh = cp->delayed_head)) |
@@ -1776,9 +1564,9 @@ GSF_peer_disconnect_handler_ (void *cls, | |||
1776 | GNUNET_CONTAINER_DLL_remove (cp->delayed_head, | 1564 | GNUNET_CONTAINER_DLL_remove (cp->delayed_head, |
1777 | cp->delayed_tail, | 1565 | cp->delayed_tail, |
1778 | dh); | 1566 | dh); |
1567 | GNUNET_MQ_discard (dh->env); | ||
1779 | cp->delay_queue_size--; | 1568 | cp->delay_queue_size--; |
1780 | GNUNET_SCHEDULER_cancel (dh->delay_task); | 1569 | GNUNET_SCHEDULER_cancel (dh->delay_task); |
1781 | GNUNET_free (dh->pm); | ||
1782 | GNUNET_free (dh); | 1570 | GNUNET_free (dh); |
1783 | } | 1571 | } |
1784 | GNUNET_PEER_change_rc (cp->ppd.pid, -1); | 1572 | GNUNET_PEER_change_rc (cp->ppd.pid, -1); |
@@ -1883,40 +1671,6 @@ GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp) | |||
1883 | 1671 | ||
1884 | 1672 | ||
1885 | /** | 1673 | /** |
1886 | * Assemble a migration stop message for transmission. | ||
1887 | * | ||
1888 | * @param cls the `struct GSF_ConnectedPeer` to use | ||
1889 | * @param size number of bytes we're allowed to write to @a buf | ||
1890 | * @param buf where to copy the message | ||
1891 | * @return number of bytes copied to @a buf | ||
1892 | */ | ||
1893 | static size_t | ||
1894 | create_migration_stop_message (void *cls, | ||
1895 | size_t size, | ||
1896 | void *buf) | ||
1897 | { | ||
1898 | struct GSF_ConnectedPeer *cp = cls; | ||
1899 | struct MigrationStopMessage msm; | ||
1900 | |||
1901 | cp->migration_pth = NULL; | ||
1902 | if (NULL == buf) | ||
1903 | return 0; | ||
1904 | GNUNET_assert (size >= sizeof (struct MigrationStopMessage)); | ||
1905 | msm.header.size = htons (sizeof (struct MigrationStopMessage)); | ||
1906 | msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); | ||
1907 | msm.reserved = htonl (0); | ||
1908 | msm.duration = | ||
1909 | GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining | ||
1910 | (cp->last_migration_block)); | ||
1911 | GNUNET_memcpy (buf, &msm, sizeof (struct MigrationStopMessage)); | ||
1912 | GNUNET_STATISTICS_update (GSF_stats, | ||
1913 | gettext_noop ("# migration stop messages sent"), | ||
1914 | 1, GNUNET_NO); | ||
1915 | return sizeof (struct MigrationStopMessage); | ||
1916 | } | ||
1917 | |||
1918 | |||
1919 | /** | ||
1920 | * Ask a peer to stop migrating data to us until the given point | 1674 | * Ask a peer to stop migrating data to us until the given point |
1921 | * in time. | 1675 | * in time. |
1922 | * | 1676 | * |
@@ -1927,6 +1681,9 @@ void | |||
1927 | GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, | 1681 | GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, |
1928 | struct GNUNET_TIME_Absolute block_time) | 1682 | struct GNUNET_TIME_Absolute block_time) |
1929 | { | 1683 | { |
1684 | struct GNUNET_MQ_Envelope *env; | ||
1685 | struct MigrationStopMessage *msm; | ||
1686 | |||
1930 | if (cp->last_migration_block.abs_value_us > block_time.abs_value_us) | 1687 | if (cp->last_migration_block.abs_value_us > block_time.abs_value_us) |
1931 | { | 1688 | { |
1932 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1689 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
@@ -1939,13 +1696,20 @@ GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, | |||
1939 | GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time), | 1696 | GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time), |
1940 | GNUNET_YES)); | 1697 | GNUNET_YES)); |
1941 | cp->last_migration_block = block_time; | 1698 | cp->last_migration_block = block_time; |
1942 | if (NULL != cp->migration_pth) | 1699 | env = GNUNET_MQ_msg (msm, |
1943 | GSF_peer_transmit_cancel_ (cp->migration_pth); | 1700 | GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP); |
1944 | cp->migration_pth = | 1701 | msm->reserved = htonl (0); |
1945 | GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX, | 1702 | msm->duration |
1946 | GNUNET_TIME_UNIT_FOREVER_REL, | 1703 | = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining |
1947 | sizeof (struct MigrationStopMessage), | 1704 | (cp->last_migration_block)); |
1948 | &create_migration_stop_message, cp); | 1705 | GNUNET_STATISTICS_update (GSF_stats, |
1706 | gettext_noop ("# migration stop messages sent"), | ||
1707 | 1, | ||
1708 | GNUNET_NO); | ||
1709 | GSF_peer_transmit_ (cp, | ||
1710 | GNUNET_SYSERR, | ||
1711 | UINT32_MAX, | ||
1712 | env); | ||
1949 | } | 1713 | } |
1950 | 1714 | ||
1951 | 1715 | ||
@@ -1998,24 +1762,6 @@ GSF_connected_peer_init_ () | |||
1998 | 1762 | ||
1999 | 1763 | ||
2000 | /** | 1764 | /** |
2001 | * Iterator to free peer entries. | ||
2002 | * | ||
2003 | * @param cls closure, unused | ||
2004 | * @param key current key code | ||
2005 | * @param value value in the hash map (peer entry) | ||
2006 | * @return #GNUNET_YES (we should continue to iterate) | ||
2007 | */ | ||
2008 | static int | ||
2009 | clean_peer (void *cls, | ||
2010 | const struct GNUNET_PeerIdentity *key, | ||
2011 | void *value) | ||
2012 | { | ||
2013 | GSF_peer_disconnect_handler_ (NULL, key); | ||
2014 | return GNUNET_YES; | ||
2015 | } | ||
2016 | |||
2017 | |||
2018 | /** | ||
2019 | * Shutdown peer management subsystem. | 1765 | * Shutdown peer management subsystem. |
2020 | */ | 1766 | */ |
2021 | void | 1767 | void |
@@ -2024,9 +1770,6 @@ GSF_connected_peer_done_ () | |||
2024 | GNUNET_CONTAINER_multipeermap_iterate (cp_map, | 1770 | GNUNET_CONTAINER_multipeermap_iterate (cp_map, |
2025 | &flush_respect, | 1771 | &flush_respect, |
2026 | NULL); | 1772 | NULL); |
2027 | GNUNET_CONTAINER_multipeermap_iterate (cp_map, | ||
2028 | &clean_peer, | ||
2029 | NULL); | ||
2030 | GNUNET_SCHEDULER_cancel (fr_task); | 1773 | GNUNET_SCHEDULER_cancel (fr_task); |
2031 | fr_task = NULL; | 1774 | fr_task = NULL; |
2032 | GNUNET_CONTAINER_multipeermap_destroy (cp_map); | 1775 | GNUNET_CONTAINER_multipeermap_destroy (cp_map); |
@@ -2072,7 +1815,8 @@ GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc) | |||
2072 | { | 1815 | { |
2073 | if (NULL == cp_map) | 1816 | if (NULL == cp_map) |
2074 | return; /* already cleaned up */ | 1817 | return; /* already cleaned up */ |
2075 | GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client, | 1818 | GNUNET_CONTAINER_multipeermap_iterate (cp_map, |
1819 | &clean_local_client, | ||
2076 | (void *) lc); | 1820 | (void *) lc); |
2077 | } | 1821 | } |
2078 | 1822 | ||