aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cp.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-31 21:23:23 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-31 21:23:23 +0000
commita78990b412db2c0ead2da8061c4f454f068991d1 (patch)
tree2e87adae62fd1ca3cd5a9f4c69248986f78bb106 /src/fs/gnunet-service-fs_cp.c
parent406c7d2d2d126c994a1fff13470b1f96439c6f9d (diff)
downloadgnunet-a78990b412db2c0ead2da8061c4f454f068991d1.tar.gz
gnunet-a78990b412db2c0ead2da8061c4f454f068991d1.zip
converting FS to new MQ-based core API
Diffstat (limited to 'src/fs/gnunet-service-fs_cp.c')
-rw-r--r--src/fs/gnunet-service-fs_cp.c612
1 files changed, 178 insertions, 434 deletions
diff --git a/src/fs/gnunet-service-fs_cp.c b/src/fs/gnunet-service-fs_cp.c
index bda33d766..3f7783ded 100644
--- a/src/fs/gnunet-service-fs_cp.c
+++ b/src/fs/gnunet-service-fs_cp.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2011 GNUnet e.V. 3 Copyright (C) 2011, 2016 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -78,19 +78,9 @@ struct GSF_PeerTransmitHandle
78 struct GNUNET_TIME_Absolute transmission_request_start_time; 78 struct GNUNET_TIME_Absolute transmission_request_start_time;
79 79
80 /** 80 /**
81 * Timeout for this request. 81 * Envelope with the actual message.
82 */ 82 */
83 struct GNUNET_TIME_Absolute timeout; 83 struct GNUNET_MQ_Envelope *env;
84
85 /**
86 * Task called on timeout, or 0 for none.
87 */
88 struct GNUNET_SCHEDULER_Task *timeout_task;
89
90 /**
91 * Function to call to get the actual message.
92 */
93 GSF_GetMessageCallback gmc;
94 84
95 /** 85 /**
96 * Peer this request targets. 86 * Peer this request targets.
@@ -98,16 +88,6 @@ struct GSF_PeerTransmitHandle
98 struct GSF_ConnectedPeer *cp; 88 struct GSF_ConnectedPeer *cp;
99 89
100 /** 90 /**
101 * Closure for @e gmc.
102 */
103 void *gmc_cls;
104
105 /**
106 * Size of the message to be transmitted.
107 */
108 size_t size;
109
110 /**
111 * #GNUNET_YES if this is a query, #GNUNET_NO for content. 91 * #GNUNET_YES if this is a query, #GNUNET_NO for content.
112 */ 92 */
113 int is_query; 93 int is_query;
@@ -147,9 +127,9 @@ struct GSF_DelayedHandle
147 struct GSF_ConnectedPeer *cp; 127 struct GSF_ConnectedPeer *cp;
148 128
149 /** 129 /**
150 * The PUT that was delayed. 130 * Envelope of the message that was delayed.
151 */ 131 */
152 struct PutMessage *pm; 132 struct GNUNET_MQ_Envelope *env;
153 133
154 /** 134 /**
155 * Task for the delay. 135 * Task for the delay.
@@ -235,11 +215,6 @@ struct GSF_ConnectedPeer
235 struct GSF_DelayedHandle *delayed_tail; 215 struct GSF_DelayedHandle *delayed_tail;
236 216
237 /** 217 /**
238 * Migration stop message in our queue, or NULL if we have none pending.
239 */
240 struct GSF_PeerTransmitHandle *migration_pth;
241
242 /**
243 * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL). 218 * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
244 */ 219 */
245 struct GNUNET_ATS_ReservationContext *rc; 220 struct GNUNET_ATS_ReservationContext *rc;
@@ -256,9 +231,9 @@ struct GSF_ConnectedPeer
256 231
257 /** 232 /**
258 * Handle for an active request for transmission to this 233 * Handle for an active request for transmission to this
259 * peer, or NULL (if core queue was full). 234 * peer.
260 */ 235 */
261 struct GNUNET_CORE_TransmitHandle *cth; 236 struct GNUNET_MQ_Handle *mq;
262 237
263 /** 238 /**
264 * Increase in traffic preference still to be submitted 239 * Increase in traffic preference still to be submitted
@@ -267,14 +242,6 @@ struct GSF_ConnectedPeer
267 uint64_t inc_preference; 242 uint64_t inc_preference;
268 243
269 /** 244 /**
270 * Set to 1 if we're currently in the process of calling
271 * #GNUNET_CORE_notify_transmit_ready() (so while @e cth is
272 * NULL, we should not call notify_transmit_ready for this
273 * handle right now).
274 */
275 unsigned int cth_in_progress;
276
277 /**
278 * Number of entries in @e delayed_head DLL. 245 * Number of entries in @e delayed_head DLL.
279 */ 246 */
280 unsigned int delay_queue_size; 247 unsigned int delay_queue_size;
@@ -308,16 +275,6 @@ struct GSF_ConnectedPeer
308 int did_reserve; 275 int did_reserve;
309 276
310 /** 277 /**
311 * Function called when the creation of this record is complete.
312 */
313 GSF_ConnectedPeerCreationCallback creation_cb;
314
315 /**
316 * Closure for @e creation_cb
317 */
318 void *creation_cb_cls;
319
320 /**
321 * Handle to the PEERSTORE iterate request for peer respect value 278 * Handle to the PEERSTORE iterate request for peer respect value
322 */ 279 */
323 struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req; 280 struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
@@ -377,15 +334,10 @@ GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
377/** 334/**
378 * Core is ready to transmit to a peer, get the message. 335 * Core is ready to transmit to a peer, get the message.
379 * 336 *
380 * @param cls the `struct GSF_PeerTransmitHandle` of the message 337 * @param cp which peer to send a message to
381 * @param size number of bytes core is willing to take
382 * @param buf where to copy the message
383 * @return number of bytes copied to @a buf
384 */ 338 */
385static size_t 339static void
386peer_transmit_ready_cb (void *cls, 340peer_transmit (struct GSF_ConnectedPeer *cp);
387 size_t size,
388 void *buf);
389 341
390 342
391/** 343/**
@@ -418,8 +370,6 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth)
418 struct GNUNET_PeerIdentity target; 370 struct GNUNET_PeerIdentity target;
419 371
420 cp = pth->cp; 372 cp = pth->cp;
421 if ((NULL != cp->cth) || (0 != cp->cth_in_progress))
422 return; /* already done */
423 GNUNET_assert (0 != cp->ppd.pid); 373 GNUNET_assert (0 != cp->ppd.pid);
424 GNUNET_PEER_resolve (cp->ppd.pid, &target); 374 GNUNET_PEER_resolve (cp->ppd.pid, &target);
425 375
@@ -449,52 +399,23 @@ schedule_transmission (struct GSF_PeerTransmitHandle *pth)
449 cp); 399 cp);
450 return; 400 return;
451 } 401 }
452 GNUNET_assert (NULL == cp->cth); 402 peer_transmit (cp);
453 cp->cth_in_progress++;
454 cp->cth =
455 GNUNET_CORE_notify_transmit_ready (GSF_core,
456 GNUNET_YES,
457 GNUNET_CORE_PRIO_BACKGROUND,
458 GNUNET_TIME_absolute_get_remaining (pth->timeout),
459 &target,
460 pth->size,
461 &peer_transmit_ready_cb, cp);
462 GNUNET_assert (NULL != cp->cth);
463 GNUNET_assert (0 < cp->cth_in_progress--);
464} 403}
465 404
466 405
467/** 406/**
468 * Core is ready to transmit to a peer, get the message. 407 * Core is ready to transmit to a peer, get the message.
469 * 408 *
470 * @param cls the `struct GSF_PeerTransmitHandle` of the message 409 * @param cp which peer to send a message to
471 * @param size number of bytes core is willing to take
472 * @param buf where to copy the message
473 * @return number of bytes copied to @a buf
474 */ 410 */
475static size_t 411static void
476peer_transmit_ready_cb (void *cls, 412peer_transmit (struct GSF_ConnectedPeer *cp)
477 size_t size,
478 void *buf)
479{ 413{
480 struct GSF_ConnectedPeer *cp = cls;
481 struct GSF_PeerTransmitHandle *pth = cp->pth_head; 414 struct GSF_PeerTransmitHandle *pth = cp->pth_head;
482 struct GSF_PeerTransmitHandle *pos; 415 struct GSF_PeerTransmitHandle *pos;
483 size_t ret;
484 416
485 cp->cth = NULL;
486 if (NULL == pth) 417 if (NULL == pth)
487 return 0; 418 return;
488 if (pth->size > size)
489 {
490 schedule_transmission (pth);
491 return 0;
492 }
493 if (NULL != pth->timeout_task)
494 {
495 GNUNET_SCHEDULER_cancel (pth->timeout_task);
496 pth->timeout_task = NULL;
497 }
498 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 419 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
499 cp->pth_tail, 420 cp->pth_tail,
500 pth); 421 pth);
@@ -512,14 +433,14 @@ peer_transmit_ready_cb (void *cls,
512 GNUNET_LOAD_update (cp->ppd.transmission_delay, 433 GNUNET_LOAD_update (cp->ppd.transmission_delay,
513 GNUNET_TIME_absolute_get_duration 434 GNUNET_TIME_absolute_get_duration
514 (pth->transmission_request_start_time).rel_value_us); 435 (pth->transmission_request_start_time).rel_value_us);
515 ret = pth->gmc (pth->gmc_cls, size, buf); 436 GNUNET_MQ_send (cp->mq,
437 pth->env);
438 GNUNET_free (pth);
516 if (NULL != (pos = cp->pth_head)) 439 if (NULL != (pos = cp->pth_head))
517 { 440 {
518 GNUNET_assert (pos != pth); 441 GNUNET_assert (pos != pth);
519 schedule_transmission (pos); 442 schedule_transmission (pos);
520 } 443 }
521 GNUNET_free (pth);
522 return ret;
523} 444}
524 445
525 446
@@ -578,23 +499,10 @@ ats_reserve_callback (void *cls,
578 } 499 }
579 cp->did_reserve = GNUNET_YES; 500 cp->did_reserve = GNUNET_YES;
580 pth = cp->pth_head; 501 pth = cp->pth_head;
581 if ( (NULL != pth) && 502 if (NULL != pth)
582 (NULL == cp->cth) &&
583 (0 == cp->cth_in_progress) )
584 { 503 {
585 /* reservation success, try transmission now! */ 504 /* reservation success, try transmission now! */
586 cp->cth_in_progress++; 505 peer_transmit (cp);
587 cp->cth =
588 GNUNET_CORE_notify_transmit_ready (GSF_core,
589 GNUNET_YES,
590 GNUNET_CORE_PRIO_BACKGROUND,
591 GNUNET_TIME_absolute_get_remaining (pth->timeout),
592 peer,
593 pth->size,
594 &peer_transmit_ready_cb,
595 cp);
596 GNUNET_assert (NULL != cp->cth);
597 GNUNET_assert (0 < cp->cth_in_progress--);
598 } 506 }
599} 507}
600 508
@@ -614,11 +522,13 @@ peer_respect_cb (void *cls,
614 struct GSF_ConnectedPeer *cp = cls; 522 struct GSF_ConnectedPeer *cp = cls;
615 523
616 GNUNET_assert (NULL != cp->respect_iterate_req); 524 GNUNET_assert (NULL != cp->respect_iterate_req);
617 if ((NULL != record) && (sizeof (cp->disk_respect) == record->value_size)) 525 if ( (NULL != record) &&
618 cp->disk_respect = cp->ppd.respect = *((uint32_t *)record->value); 526 (sizeof (cp->disk_respect) == record->value_size))
527 {
528 cp->disk_respect = *((uint32_t *)record->value);
529 cp->ppd.respect += *((uint32_t *)record->value);
530 }
619 GSF_push_start_ (cp); 531 GSF_push_start_ (cp);
620 if (NULL != cp->creation_cb)
621 cp->creation_cb (cp->creation_cb_cls, cp);
622 if (NULL != record) 532 if (NULL != record)
623 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); 533 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
624 cp->respect_iterate_req = NULL; 534 cp->respect_iterate_req = NULL;
@@ -626,25 +536,68 @@ peer_respect_cb (void *cls,
626 536
627 537
628/** 538/**
539 * Function called for each pending request whenever a new
540 * peer connects, giving us a chance to decide about submitting
541 * the existing request to the new peer.
542 *
543 * @param cls the `struct GSF_ConnectedPeer` of the new peer
544 * @param key query for the request
545 * @param pr handle to the pending request
546 * @return #GNUNET_YES to continue to iterate
547 */
548static int
549consider_peer_for_forwarding (void *cls,
550 const struct GNUNET_HashCode *key,
551 struct GSF_PendingRequest *pr)
552{
553 struct GSF_ConnectedPeer *cp = cls;
554 struct GNUNET_PeerIdentity pid;
555
556 if (GNUNET_YES !=
557 GSF_pending_request_test_active_ (pr))
558 return GNUNET_YES; /* request is not actually active, skip! */
559 GSF_connected_peer_get_identity_ (cp, &pid);
560 if (GNUNET_YES !=
561 GSF_pending_request_test_target_ (pr, &pid))
562 {
563 GNUNET_STATISTICS_update (GSF_stats,
564 gettext_noop ("# Loopback routes suppressed"),
565 1,
566 GNUNET_NO);
567 return GNUNET_YES;
568 }
569 GSF_plan_add_ (cp, pr);
570 return GNUNET_YES;
571}
572
573
574/**
629 * A peer connected to us. Setup the connected peer 575 * A peer connected to us. Setup the connected peer
630 * records. 576 * records.
631 * 577 *
578 * @param cls NULL
632 * @param peer identity of peer that connected 579 * @param peer identity of peer that connected
633 * @param creation_cb callback function when the record is created. 580 * @param mq message queue for talking to @a peer
634 * @param creation_cb_cls closure for @creation_cb 581 * @return our internal handle for the peer
635 */ 582 */
636void 583void *
637GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer, 584GSF_peer_connect_handler (void *cls,
638 GSF_ConnectedPeerCreationCallback creation_cb, 585 const struct GNUNET_PeerIdentity *peer,
639 void *creation_cb_cls) 586 struct GNUNET_MQ_Handle *mq)
640{ 587{
641 struct GSF_ConnectedPeer *cp; 588 struct GSF_ConnectedPeer *cp;
642 589
590 if (0 ==
591 GNUNET_CRYPTO_cmp_peer_identity (&GSF_my_id,
592 peer))
593 return NULL;
643 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 594 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644 "Connected to peer %s\n", 595 "Connected to peer %s\n",
645 GNUNET_i2s (peer)); 596 GNUNET_i2s (peer));
646 cp = GNUNET_new (struct GSF_ConnectedPeer); 597 cp = GNUNET_new (struct GSF_ConnectedPeer);
647 cp->ppd.pid = GNUNET_PEER_intern (peer); 598 cp->ppd.pid = GNUNET_PEER_intern (peer);
599 cp->ppd.peer = peer;
600 cp->mq = mq;
648 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO); 601 cp->ppd.transmission_delay = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_ZERO);
649 cp->rc = 602 cp->rc =
650 GNUNET_ATS_reserve_bandwidth (GSF_ats, 603 GNUNET_ATS_reserve_bandwidth (GSF_ats,
@@ -662,14 +615,17 @@ GSF_peer_connect_handler_ (const struct GNUNET_PeerIdentity *peer,
662 gettext_noop ("# peers connected"), 615 gettext_noop ("# peers connected"),
663 GNUNET_CONTAINER_multipeermap_size (cp_map), 616 GNUNET_CONTAINER_multipeermap_size (cp_map),
664 GNUNET_NO); 617 GNUNET_NO);
665 cp->creation_cb = creation_cb; 618 cp->respect_iterate_req
666 cp->creation_cb_cls = creation_cb_cls; 619 = GNUNET_PEERSTORE_iterate (peerstore,
667 cp->respect_iterate_req = 620 "fs",
668 GNUNET_PEERSTORE_iterate (peerstore, "fs", 621 peer,
669 peer, "respect", 622 "respect",
670 GNUNET_TIME_UNIT_FOREVER_REL, 623 GNUNET_TIME_UNIT_FOREVER_REL,
671 &peer_respect_cb, 624 &peer_respect_cb,
672 cp); 625 cp);
626 GSF_iterate_pending_requests_ (&consider_peer_for_forwarding,
627 cp);
628 return cp;
673} 629}
674 630
675 631
@@ -714,38 +670,25 @@ GSF_peer_get_ (const struct GNUNET_PeerIdentity *peer)
714 670
715 671
716/** 672/**
717 * Handle P2P "MIGRATION_STOP" message. 673 * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
718 * 674 *
719 * @param cls closure, always NULL 675 * @param cls closure, the `struct GSF_ConnectedPeer`
720 * @param other the other peer involved (sender or receiver, NULL 676 * @param msm the actual message
721 * for loopback messages where we are both sender and receiver)
722 * @param message the actual message
723 * @return #GNUNET_OK to keep the connection open,
724 * #GNUNET_SYSERR to close it (signal serious error)
725 */ 677 */
726int 678void
727GSF_handle_p2p_migration_stop_ (void *cls, 679handle_p2p_migration_stop (void *cls,
728 const struct GNUNET_PeerIdentity *other, 680 const struct MigrationStopMessage *msm)
729 const struct GNUNET_MessageHeader *message)
730{ 681{
731 struct GSF_ConnectedPeer *cp; 682 struct GSF_ConnectedPeer *cp = cls;
732 const struct MigrationStopMessage *msm;
733 struct GNUNET_TIME_Relative bt; 683 struct GNUNET_TIME_Relative bt;
734 684
735 msm = (const struct MigrationStopMessage *) message;
736 cp = GSF_peer_get_ (other);
737 if (NULL == cp)
738 {
739 GNUNET_break (0);
740 return GNUNET_OK;
741 }
742 GNUNET_STATISTICS_update (GSF_stats, 685 GNUNET_STATISTICS_update (GSF_stats,
743 gettext_noop ("# migration stop messages received"), 686 gettext_noop ("# migration stop messages received"),
744 1, GNUNET_NO); 687 1, GNUNET_NO);
745 bt = GNUNET_TIME_relative_ntoh (msm->duration); 688 bt = GNUNET_TIME_relative_ntoh (msm->duration);
746 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 689 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
747 _("Migration of content to peer `%s' blocked for %s\n"), 690 _("Migration of content to peer `%s' blocked for %s\n"),
748 GNUNET_i2s (other), 691 GNUNET_i2s (cp->ppd.peer),
749 GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES)); 692 GNUNET_STRINGS_relative_time_to_string (bt, GNUNET_YES));
750 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt); 693 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute (bt);
751 if ( (NULL == cp->mig_revive_task) && 694 if ( (NULL == cp->mig_revive_task) &&
@@ -756,46 +699,6 @@ GSF_handle_p2p_migration_stop_ (void *cls,
756 GNUNET_SCHEDULER_add_delayed (bt, 699 GNUNET_SCHEDULER_add_delayed (bt,
757 &revive_migration, cp); 700 &revive_migration, cp);
758 } 701 }
759 return GNUNET_OK;
760}
761
762
763/**
764 * Copy reply and free put message.
765 *
766 * @param cls the `struct PutMessage`
767 * @param buf_size number of bytes available in @a buf
768 * @param buf where to copy the message, NULL on error (peer disconnect)
769 * @return number of bytes copied to @a buf, can be 0 (without indicating an error)
770 */
771static size_t
772copy_reply (void *cls,
773 size_t buf_size,
774 void *buf)
775{
776 struct PutMessage *pm = cls;
777 size_t size;
778
779 if (NULL != buf)
780 {
781 GNUNET_assert (buf_size >= ntohs (pm->header.size));
782 size = ntohs (pm->header.size);
783 GNUNET_memcpy (buf, pm, size);
784 GNUNET_STATISTICS_update (GSF_stats,
785 gettext_noop ("# replies transmitted to other peers"),
786 1,
787 GNUNET_NO);
788 }
789 else
790 {
791 size = 0;
792 GNUNET_STATISTICS_update (GSF_stats,
793 gettext_noop ("# replies dropped"),
794 1,
795 GNUNET_NO);
796 }
797 GNUNET_free (pm);
798 return size;
799} 702}
800 703
801 704
@@ -886,13 +789,10 @@ transmit_delayed_now (void *cls)
886 cp->delayed_tail, 789 cp->delayed_tail,
887 dh); 790 dh);
888 cp->delay_queue_size--; 791 cp->delay_queue_size--;
889 (void) GSF_peer_transmit_ (cp, 792 GSF_peer_transmit_ (cp,
890 GNUNET_NO, 793 GNUNET_NO,
891 UINT32_MAX, 794 UINT32_MAX,
892 REPLY_TIMEOUT, 795 dh->env);
893 dh->msize,
894 &copy_reply,
895 dh->pm);
896 GNUNET_free (dh); 796 GNUNET_free (dh);
897} 797}
898 798
@@ -954,6 +854,7 @@ handle_p2p_reply (void *cls,
954 struct PeerRequest *peerreq = cls; 854 struct PeerRequest *peerreq = cls;
955 struct GSF_ConnectedPeer *cp = peerreq->cp; 855 struct GSF_ConnectedPeer *cp = peerreq->cp;
956 struct GSF_PendingRequestData *prd; 856 struct GSF_PendingRequestData *prd;
857 struct GNUNET_MQ_Envelope *env;
957 struct PutMessage *pm; 858 struct PutMessage *pm;
958 size_t msize; 859 size_t msize;
959 860
@@ -1000,12 +901,14 @@ handle_p2p_reply (void *cls,
1000 GSF_cover_content_count -= (reply_anonymity_level - 1); 901 GSF_cover_content_count -= (reply_anonymity_level - 1);
1001 } 902 }
1002 903
1003 pm = GNUNET_malloc (msize); 904 env = GNUNET_MQ_msg_extra (pm,
1004 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); 905 data_len,
1005 pm->header.size = htons (msize); 906 GNUNET_MESSAGE_TYPE_FS_PUT);
1006 pm->type = htonl (type); 907 pm->type = htonl (type);
1007 pm->expiration = GNUNET_TIME_absolute_hton (expiration); 908 pm->expiration = GNUNET_TIME_absolute_hton (expiration);
1008 GNUNET_memcpy (&pm[1], data, data_len); 909 GNUNET_memcpy (&pm[1],
910 data,
911 data_len);
1009 if ( (UINT32_MAX != reply_anonymity_level) && 912 if ( (UINT32_MAX != reply_anonymity_level) &&
1010 (0 != reply_anonymity_level) && 913 (0 != reply_anonymity_level) &&
1011 (GNUNET_YES == GSF_enable_randomized_delays) ) 914 (GNUNET_YES == GSF_enable_randomized_delays) )
@@ -1014,7 +917,7 @@ handle_p2p_reply (void *cls,
1014 917
1015 dh = GNUNET_new (struct GSF_DelayedHandle); 918 dh = GNUNET_new (struct GSF_DelayedHandle);
1016 dh->cp = cp; 919 dh->cp = cp;
1017 dh->pm = pm; 920 dh->env = env;
1018 dh->msize = msize; 921 dh->msize = msize;
1019 GNUNET_CONTAINER_DLL_insert (cp->delayed_head, 922 GNUNET_CONTAINER_DLL_insert (cp->delayed_head,
1020 cp->delayed_tail, 923 cp->delayed_tail,
@@ -1027,13 +930,10 @@ handle_p2p_reply (void *cls,
1027 } 930 }
1028 else 931 else
1029 { 932 {
1030 (void) GSF_peer_transmit_ (cp, 933 GSF_peer_transmit_ (cp,
1031 GNUNET_NO, 934 GNUNET_NO,
1032 UINT32_MAX, 935 UINT32_MAX,
1033 REPLY_TIMEOUT, 936 env);
1034 msize,
1035 &copy_reply,
1036 pm);
1037 } 937 }
1038 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval) 938 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
1039 return; 939 return;
@@ -1265,23 +1165,20 @@ test_exist_cb (void *cls,
1265 * process replies properly. Does not initiate forwarding or 1165 * process replies properly. Does not initiate forwarding or
1266 * local database lookups. 1166 * local database lookups.
1267 * 1167 *
1268 * @param other the other peer involved (sender or receiver, NULL 1168 * @param cls the other peer involved (sender of the message)
1269 * for loopback messages where we are both sender and receiver) 1169 * @param gm the GET message
1270 * @param message the actual message
1271 * @return pending request handle, NULL on error
1272 */ 1170 */
1273struct GSF_PendingRequest * 1171void
1274GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other, 1172handle_p2p_get (void *cls,
1275 const struct GNUNET_MessageHeader *message) 1173 const struct GetMessage *gm)
1276{ 1174{
1175 struct GSF_ConnectedPeer *cps = cls;
1277 struct PeerRequest *peerreq; 1176 struct PeerRequest *peerreq;
1278 struct GSF_PendingRequest *pr; 1177 struct GSF_PendingRequest *pr;
1279 struct GSF_ConnectedPeer *cp; 1178 struct GSF_ConnectedPeer *cp;
1280 struct GSF_ConnectedPeer *cps;
1281 const struct GNUNET_PeerIdentity *target; 1179 const struct GNUNET_PeerIdentity *target;
1282 enum GSF_PendingRequestOptions options; 1180 enum GSF_PendingRequestOptions options;
1283 uint16_t msize; 1181 uint16_t msize;
1284 const struct GetMessage *gm;
1285 unsigned int bits; 1182 unsigned int bits;
1286 const struct GNUNET_PeerIdentity *opt; 1183 const struct GNUNET_PeerIdentity *opt;
1287 uint32_t bm; 1184 uint32_t bm;
@@ -1291,18 +1188,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1291 GNUNET_PEER_Id spid; 1188 GNUNET_PEER_Id spid;
1292 const struct GSF_PendingRequestData *prd; 1189 const struct GSF_PendingRequestData *prd;
1293 1190
1294 msize = ntohs (message->size); 1191 msize = ntohs (gm->header.size);
1295 if (msize < sizeof (struct GetMessage))
1296 {
1297 GNUNET_break_op (0);
1298 return NULL;
1299 }
1300 GNUNET_STATISTICS_update (GSF_stats,
1301 gettext_noop
1302 ("# GET requests received (from other peers)"),
1303 1,
1304 GNUNET_NO);
1305 gm = (const struct GetMessage *) message;
1306 tec.type = ntohl (gm->type); 1192 tec.type = ntohl (gm->type);
1307 bm = ntohl (gm->hash_bitmap); 1193 bm = ntohl (gm->hash_bitmap);
1308 bits = 0; 1194 bits = 0;
@@ -1312,32 +1198,16 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1312 bits++; 1198 bits++;
1313 bm >>= 1; 1199 bm >>= 1;
1314 } 1200 }
1315 if (msize < sizeof (struct GetMessage) + bits * sizeof (struct GNUNET_PeerIdentity))
1316 {
1317 GNUNET_break_op (0);
1318 return NULL;
1319 }
1320 opt = (const struct GNUNET_PeerIdentity *) &gm[1]; 1201 opt = (const struct GNUNET_PeerIdentity *) &gm[1];
1321 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity); 1202 bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (struct GNUNET_PeerIdentity);
1322 /* bfsize must be power of 2, check! */ 1203 GNUNET_STATISTICS_update (GSF_stats,
1323 if (0 != ((bfsize - 1) & bfsize)) 1204 gettext_noop
1324 { 1205 ("# GET requests received (from other peers)"),
1325 GNUNET_break_op (0); 1206 1,
1326 return NULL; 1207 GNUNET_NO);
1327 }
1328 GSF_cover_query_count++; 1208 GSF_cover_query_count++;
1329 bm = ntohl (gm->hash_bitmap); 1209 bm = ntohl (gm->hash_bitmap);
1330 bits = 0; 1210 bits = 0;
1331 cps = GSF_peer_get_ (other);
1332 if (NULL == cps)
1333 {
1334 /* peer must have just disconnected */
1335 GNUNET_STATISTICS_update (GSF_stats,
1336 gettext_noop
1337 ("# requests dropped due to initiator not being connected"),
1338 1, GNUNET_NO);
1339 return NULL;
1340 }
1341 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO)) 1211 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1342 cp = GSF_peer_get_ (&opt[bits++]); 1212 cp = GSF_peer_get_ (&opt[bits++]);
1343 else 1213 else
@@ -1352,24 +1222,24 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1352 else 1222 else
1353 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1223 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1354 "Failed to find peer `%s' in connection set. Dropping query.\n", 1224 "Failed to find peer `%s' in connection set. Dropping query.\n",
1355 GNUNET_i2s (other)); 1225 GNUNET_i2s (cps->ppd.peer));
1356 GNUNET_STATISTICS_update (GSF_stats, 1226 GNUNET_STATISTICS_update (GSF_stats,
1357 gettext_noop 1227 gettext_noop
1358 ("# requests dropped due to missing reverse route"), 1228 ("# requests dropped due to missing reverse route"),
1359 1, 1229 1,
1360 GNUNET_NO); 1230 GNUNET_NO);
1361 return NULL; 1231 return;
1362 } 1232 }
1363 if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER) 1233 if (cp->ppd.pending_replies + cp->delay_queue_size > MAX_QUEUE_PER_PEER)
1364 { 1234 {
1365 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1235 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366 "Peer `%s' has too many replies queued already. Dropping query.\n", 1236 "Peer `%s' has too many replies queued already. Dropping query.\n",
1367 GNUNET_i2s (other)); 1237 GNUNET_i2s (cps->ppd.peer));
1368 GNUNET_STATISTICS_update (GSF_stats, 1238 GNUNET_STATISTICS_update (GSF_stats,
1369 gettext_noop ("# requests dropped due to full reply queue"), 1239 gettext_noop ("# requests dropped due to full reply queue"),
1370 1, 1240 1,
1371 GNUNET_NO); 1241 GNUNET_NO);
1372 return NULL; 1242 return;
1373 } 1243 }
1374 /* note that we can really only check load here since otherwise 1244 /* note that we can really only check load here since otherwise
1375 * peers could find out that we are overloaded by not being 1245 * peers could find out that we are overloaded by not being
@@ -1380,14 +1250,14 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1380 { 1250 {
1381 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1251 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1382 "Dropping query from `%s', this peer is too busy.\n", 1252 "Dropping query from `%s', this peer is too busy.\n",
1383 GNUNET_i2s (other)); 1253 GNUNET_i2s (cps->ppd.peer));
1384 return NULL; 1254 return;
1385 } 1255 }
1386 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1256 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1387 "Received request for `%s' of type %u from peer `%s' with flags %u\n", 1257 "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1388 GNUNET_h2s (&gm->query), 1258 GNUNET_h2s (&gm->query),
1389 (unsigned int) tec.type, 1259 (unsigned int) tec.type,
1390 GNUNET_i2s (other), 1260 GNUNET_i2s (cps->ppd.peer),
1391 (unsigned int) bm); 1261 (unsigned int) bm);
1392 target = 1262 target =
1393 (0 != 1263 (0 !=
@@ -1403,7 +1273,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1403 * so at best indirect the query */ 1273 * so at best indirect the query */
1404 tec.priority = 0; 1274 tec.priority = 0;
1405 options |= GSF_PRO_FORWARD_ONLY; 1275 options |= GSF_PRO_FORWARD_ONLY;
1406 spid = GNUNET_PEER_intern (other); 1276 spid = GNUNET_PEER_intern (cps->ppd.peer);
1407 GNUNET_assert (0 != spid); 1277 GNUNET_assert (0 != spid);
1408 } 1278 }
1409 tec.ttl = bound_ttl (ntohl (gm->ttl), 1279 tec.ttl = bound_ttl (ntohl (gm->ttl),
@@ -1412,11 +1282,12 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1412 ttl_decrement = 1282 ttl_decrement =
1413 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 1283 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1414 TTL_DECREMENT); 1284 TTL_DECREMENT);
1415 if ((tec.ttl < 0) && (((int32_t) (tec.ttl - ttl_decrement)) > 0)) 1285 if ( (tec.ttl < 0) &&
1286 (((int32_t) (tec.ttl - ttl_decrement)) > 0) )
1416 { 1287 {
1417 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1288 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1418 "Dropping query from `%s' due to TTL underflow (%d - %u).\n", 1289 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1419 GNUNET_i2s (other), 1290 GNUNET_i2s (cps->ppd.peer),
1420 tec.ttl, 1291 tec.ttl,
1421 ttl_decrement); 1292 ttl_decrement);
1422 GNUNET_STATISTICS_update (GSF_stats, 1293 GNUNET_STATISTICS_update (GSF_stats,
@@ -1424,7 +1295,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1424 ("# requests dropped due TTL underflow"), 1, 1295 ("# requests dropped due TTL underflow"), 1,
1425 GNUNET_NO); 1296 GNUNET_NO);
1426 /* integer underflow => drop (should be very rare)! */ 1297 /* integer underflow => drop (should be very rare)! */
1427 return NULL; 1298 return;
1428 } 1299 }
1429 tec.ttl -= ttl_decrement; 1300 tec.ttl -= ttl_decrement;
1430 1301
@@ -1435,7 +1306,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1435 &test_exist_cb, 1306 &test_exist_cb,
1436 &tec); 1307 &tec);
1437 if (GNUNET_YES == tec.finished) 1308 if (GNUNET_YES == tec.finished)
1438 return NULL; /* merged into existing request, we're done */ 1309 return; /* merged into existing request, we're done */
1439 1310
1440 peerreq = GNUNET_new (struct PeerRequest); 1311 peerreq = GNUNET_new (struct PeerRequest);
1441 peerreq->cp = cp; 1312 peerreq->cp = cp;
@@ -1452,7 +1323,7 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1452 (uint32_t) tec.priority, 1323 (uint32_t) tec.priority,
1453 tec.ttl, 1324 tec.ttl,
1454 spid, 1325 spid,
1455 GNUNET_PEER_intern (other), 1326 GNUNET_PEER_intern (cps->ppd.peer),
1456 NULL, 0, /* replies_seen */ 1327 NULL, 0, /* replies_seen */
1457 &handle_p2p_reply, 1328 &handle_p2p_reply,
1458 peerreq); 1329 peerreq);
@@ -1472,43 +1343,10 @@ GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
1472 gettext_noop ("# P2P searches active"), 1343 gettext_noop ("# P2P searches active"),
1473 1, 1344 1,
1474 GNUNET_NO); 1345 GNUNET_NO);
1475 return pr; 1346 GSF_pending_request_get_data_ (pr)->has_started = GNUNET_YES;
1476} 1347 GSF_local_lookup_ (pr,
1477 1348 &GSF_consider_forwarding,
1478 1349 NULL);
1479/**
1480 * Function called if there has been a timeout trying to satisfy
1481 * a transmission request.
1482 *
1483 * @param cls the `struct GSF_PeerTransmitHandle` of the request
1484 */
1485static void
1486peer_transmit_timeout (void *cls)
1487{
1488 struct GSF_PeerTransmitHandle *pth = cls;
1489 struct GSF_ConnectedPeer *cp;
1490
1491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1492 "Timeout trying to transmit to other peer\n");
1493 pth->timeout_task = NULL;
1494 cp = pth->cp;
1495 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1496 cp->pth_tail,
1497 pth);
1498 if (GNUNET_YES == pth->is_query)
1499 GNUNET_assert (0 < cp->ppd.pending_queries--);
1500 else if (GNUNET_NO == pth->is_query)
1501 GNUNET_assert (0 < cp->ppd.pending_replies--);
1502 GNUNET_LOAD_update (cp->ppd.transmission_delay,
1503 UINT64_MAX);
1504 if (NULL != cp->cth)
1505 {
1506 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1507 cp->cth = NULL;
1508 }
1509 pth->gmc (pth->gmc_cls, 0, NULL);
1510 GNUNET_assert (0 == cp->cth_in_progress);
1511 GNUNET_free (pth);
1512} 1350}
1513 1351
1514 1352
@@ -1520,19 +1358,15 @@ peer_transmit_timeout (void *cls)
1520 * @param cp target peer 1358 * @param cp target peer
1521 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR) 1359 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1522 * @param priority how important is this request? 1360 * @param priority how important is this request?
1523 * @param timeout when does this request timeout (call gmc with error) 1361 * @param timeout when does this request timeout
1524 * @param size number of bytes we would like to send to the peer 1362 * @param size number of bytes we would like to send to the peer
1525 * @param gmc function to call to get the message 1363 * @param env message to send
1526 * @param gmc_cls closure for @a gmc
1527 * @return handle to cancel request
1528 */ 1364 */
1529struct GSF_PeerTransmitHandle * 1365void
1530GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp, 1366GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1531 int is_query, 1367 int is_query,
1532 uint32_t priority, 1368 uint32_t priority,
1533 struct GNUNET_TIME_Relative timeout, 1369 struct GNUNET_MQ_Envelope *env)
1534 size_t size,
1535 GSF_GetMessageCallback gmc, void *gmc_cls)
1536{ 1370{
1537 struct GSF_PeerTransmitHandle *pth; 1371 struct GSF_PeerTransmitHandle *pth;
1538 struct GSF_PeerTransmitHandle *pos; 1372 struct GSF_PeerTransmitHandle *pos;
@@ -1540,10 +1374,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1540 1374
1541 pth = GNUNET_new (struct GSF_PeerTransmitHandle); 1375 pth = GNUNET_new (struct GSF_PeerTransmitHandle);
1542 pth->transmission_request_start_time = GNUNET_TIME_absolute_get (); 1376 pth->transmission_request_start_time = GNUNET_TIME_absolute_get ();
1543 pth->timeout = GNUNET_TIME_relative_to_absolute (timeout); 1377 pth->env = env;
1544 pth->gmc = gmc;
1545 pth->gmc_cls = gmc_cls;
1546 pth->size = size;
1547 pth->is_query = is_query; 1378 pth->is_query = is_query;
1548 pth->priority = priority; 1379 pth->priority = priority;
1549 pth->cp = cp; 1380 pth->cp = cp;
@@ -1563,39 +1394,7 @@ GSF_peer_transmit_ (struct GSF_ConnectedPeer *cp,
1563 cp->ppd.pending_queries++; 1394 cp->ppd.pending_queries++;
1564 else if (GNUNET_NO == is_query) 1395 else if (GNUNET_NO == is_query)
1565 cp->ppd.pending_replies++; 1396 cp->ppd.pending_replies++;
1566 pth->timeout_task
1567 = GNUNET_SCHEDULER_add_delayed (timeout,
1568 &peer_transmit_timeout,
1569 pth);
1570 schedule_transmission (pth); 1397 schedule_transmission (pth);
1571 return pth;
1572}
1573
1574
1575/**
1576 * Cancel an earlier request for transmission.
1577 *
1578 * @param pth request to cancel
1579 */
1580void
1581GSF_peer_transmit_cancel_ (struct GSF_PeerTransmitHandle *pth)
1582{
1583 struct GSF_ConnectedPeer *cp;
1584
1585 if (NULL != pth->timeout_task)
1586 {
1587 GNUNET_SCHEDULER_cancel (pth->timeout_task);
1588 pth->timeout_task = NULL;
1589 }
1590 cp = pth->cp;
1591 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1592 cp->pth_tail,
1593 pth);
1594 if (GNUNET_YES == pth->is_query)
1595 GNUNET_assert (0 < cp->ppd.pending_queries--);
1596 else if (GNUNET_NO == pth->is_query)
1597 GNUNET_assert (0 < cp->ppd.pending_replies--);
1598 GNUNET_free (pth);
1599} 1398}
1600 1399
1601 1400
@@ -1683,7 +1482,9 @@ flush_respect (void *cls,
1683 GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect, 1482 GNUNET_PEERSTORE_store (peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1684 sizeof (cp->ppd.respect), 1483 sizeof (cp->ppd.respect),
1685 GNUNET_TIME_UNIT_FOREVER_ABS, 1484 GNUNET_TIME_UNIT_FOREVER_ABS,
1686 GNUNET_PEERSTORE_STOREOPTION_REPLACE, NULL, NULL); 1485 GNUNET_PEERSTORE_STOREOPTION_REPLACE,
1486 NULL,
1487 NULL);
1687 return GNUNET_OK; 1488 return GNUNET_OK;
1688} 1489}
1689 1490
@@ -1693,26 +1494,30 @@ flush_respect (void *cls,
1693 * record. 1494 * record.
1694 * 1495 *
1695 * @param cls unused 1496 * @param cls unused
1696 * @param peer identity of peer that connected 1497 * @param peer identity of peer that disconnected
1498 * @param internal_cls the corresponding `struct GSF_ConnectedPeer`
1697 */ 1499 */
1698void 1500void
1699GSF_peer_disconnect_handler_ (void *cls, 1501GSF_peer_disconnect_handler (void *cls,
1700 const struct GNUNET_PeerIdentity *peer) 1502 const struct GNUNET_PeerIdentity *peer,
1503 void *internal_cls)
1701{ 1504{
1702 struct GSF_ConnectedPeer *cp; 1505 struct GSF_ConnectedPeer *cp = internal_cls;
1703 struct GSF_PeerTransmitHandle *pth; 1506 struct GSF_PeerTransmitHandle *pth;
1704 struct GSF_DelayedHandle *dh; 1507 struct GSF_DelayedHandle *dh;
1705 1508
1706 cp = GSF_peer_get_ (peer);
1707 if (NULL == cp) 1509 if (NULL == cp)
1708 return; /* must have been disconnect from core with 1510 return; /* must have been disconnect from core with
1709 * 'peer' == my_id, ignore */ 1511 * 'peer' == my_id, ignore */
1710 flush_respect (NULL, peer, cp); 1512 flush_respect (NULL,
1513 peer,
1514 cp);
1711 GNUNET_assert (GNUNET_YES == 1515 GNUNET_assert (GNUNET_YES ==
1712 GNUNET_CONTAINER_multipeermap_remove (cp_map, 1516 GNUNET_CONTAINER_multipeermap_remove (cp_map,
1713 peer, 1517 peer,
1714 cp)); 1518 cp));
1715 GNUNET_STATISTICS_set (GSF_stats, gettext_noop ("# peers connected"), 1519 GNUNET_STATISTICS_set (GSF_stats,
1520 gettext_noop ("# peers connected"),
1716 GNUNET_CONTAINER_multipeermap_size (cp_map), 1521 GNUNET_CONTAINER_multipeermap_size (cp_map),
1717 GNUNET_NO); 1522 GNUNET_NO);
1718 if (NULL != cp->respect_iterate_req) 1523 if (NULL != cp->respect_iterate_req)
@@ -1720,11 +1525,6 @@ GSF_peer_disconnect_handler_ (void *cls,
1720 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req); 1525 GNUNET_PEERSTORE_iterate_cancel (cp->respect_iterate_req);
1721 cp->respect_iterate_req = NULL; 1526 cp->respect_iterate_req = NULL;
1722 } 1527 }
1723 if (NULL != cp->migration_pth)
1724 {
1725 GSF_peer_transmit_cancel_ (cp->migration_pth);
1726 cp->migration_pth = NULL;
1727 }
1728 if (NULL != cp->rc) 1528 if (NULL != cp->rc)
1729 { 1529 {
1730 GNUNET_ATS_reserve_bandwidth_cancel (cp->rc); 1530 GNUNET_ATS_reserve_bandwidth_cancel (cp->rc);
@@ -1748,19 +1548,8 @@ GSF_peer_disconnect_handler_ (void *cls,
1748 0, 1548 0,
1749 sizeof (cp->ppd.last_p2p_replies)); 1549 sizeof (cp->ppd.last_p2p_replies));
1750 GSF_push_stop_ (cp); 1550 GSF_push_stop_ (cp);
1751 if (NULL != cp->cth)
1752 {
1753 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1754 cp->cth = NULL;
1755 }
1756 GNUNET_assert (0 == cp->cth_in_progress);
1757 while (NULL != (pth = cp->pth_head)) 1551 while (NULL != (pth = cp->pth_head))
1758 { 1552 {
1759 if (pth->timeout_task != NULL)
1760 {
1761 GNUNET_SCHEDULER_cancel (pth->timeout_task);
1762 pth->timeout_task = NULL;
1763 }
1764 GNUNET_CONTAINER_DLL_remove (cp->pth_head, 1553 GNUNET_CONTAINER_DLL_remove (cp->pth_head,
1765 cp->pth_tail, 1554 cp->pth_tail,
1766 pth); 1555 pth);
@@ -1768,7 +1557,6 @@ GSF_peer_disconnect_handler_ (void *cls,
1768 GNUNET_assert (0 < cp->ppd.pending_queries--); 1557 GNUNET_assert (0 < cp->ppd.pending_queries--);
1769 else if (GNUNET_NO == pth->is_query) 1558 else if (GNUNET_NO == pth->is_query)
1770 GNUNET_assert (0 < cp->ppd.pending_replies--); 1559 GNUNET_assert (0 < cp->ppd.pending_replies--);
1771 pth->gmc (pth->gmc_cls, 0, NULL);
1772 GNUNET_free (pth); 1560 GNUNET_free (pth);
1773 } 1561 }
1774 while (NULL != (dh = cp->delayed_head)) 1562 while (NULL != (dh = cp->delayed_head))
@@ -1776,9 +1564,9 @@ GSF_peer_disconnect_handler_ (void *cls,
1776 GNUNET_CONTAINER_DLL_remove (cp->delayed_head, 1564 GNUNET_CONTAINER_DLL_remove (cp->delayed_head,
1777 cp->delayed_tail, 1565 cp->delayed_tail,
1778 dh); 1566 dh);
1567 GNUNET_MQ_discard (dh->env);
1779 cp->delay_queue_size--; 1568 cp->delay_queue_size--;
1780 GNUNET_SCHEDULER_cancel (dh->delay_task); 1569 GNUNET_SCHEDULER_cancel (dh->delay_task);
1781 GNUNET_free (dh->pm);
1782 GNUNET_free (dh); 1570 GNUNET_free (dh);
1783 } 1571 }
1784 GNUNET_PEER_change_rc (cp->ppd.pid, -1); 1572 GNUNET_PEER_change_rc (cp->ppd.pid, -1);
@@ -1883,40 +1671,6 @@ GSF_connected_peer_get_identity2_ (const struct GSF_ConnectedPeer *cp)
1883 1671
1884 1672
1885/** 1673/**
1886 * Assemble a migration stop message for transmission.
1887 *
1888 * @param cls the `struct GSF_ConnectedPeer` to use
1889 * @param size number of bytes we're allowed to write to @a buf
1890 * @param buf where to copy the message
1891 * @return number of bytes copied to @a buf
1892 */
1893static size_t
1894create_migration_stop_message (void *cls,
1895 size_t size,
1896 void *buf)
1897{
1898 struct GSF_ConnectedPeer *cp = cls;
1899 struct MigrationStopMessage msm;
1900
1901 cp->migration_pth = NULL;
1902 if (NULL == buf)
1903 return 0;
1904 GNUNET_assert (size >= sizeof (struct MigrationStopMessage));
1905 msm.header.size = htons (sizeof (struct MigrationStopMessage));
1906 msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1907 msm.reserved = htonl (0);
1908 msm.duration =
1909 GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1910 (cp->last_migration_block));
1911 GNUNET_memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
1912 GNUNET_STATISTICS_update (GSF_stats,
1913 gettext_noop ("# migration stop messages sent"),
1914 1, GNUNET_NO);
1915 return sizeof (struct MigrationStopMessage);
1916}
1917
1918
1919/**
1920 * Ask a peer to stop migrating data to us until the given point 1674 * Ask a peer to stop migrating data to us until the given point
1921 * in time. 1675 * in time.
1922 * 1676 *
@@ -1927,6 +1681,9 @@ void
1927GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp, 1681GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1928 struct GNUNET_TIME_Absolute block_time) 1682 struct GNUNET_TIME_Absolute block_time)
1929{ 1683{
1684 struct GNUNET_MQ_Envelope *env;
1685 struct MigrationStopMessage *msm;
1686
1930 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us) 1687 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1931 { 1688 {
1932 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1689 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1939,13 +1696,20 @@ GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
1939 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time), 1696 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (block_time),
1940 GNUNET_YES)); 1697 GNUNET_YES));
1941 cp->last_migration_block = block_time; 1698 cp->last_migration_block = block_time;
1942 if (NULL != cp->migration_pth) 1699 env = GNUNET_MQ_msg (msm,
1943 GSF_peer_transmit_cancel_ (cp->migration_pth); 1700 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1944 cp->migration_pth = 1701 msm->reserved = htonl (0);
1945 GSF_peer_transmit_ (cp, GNUNET_SYSERR, UINT32_MAX, 1702 msm->duration
1946 GNUNET_TIME_UNIT_FOREVER_REL, 1703 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1947 sizeof (struct MigrationStopMessage), 1704 (cp->last_migration_block));
1948 &create_migration_stop_message, cp); 1705 GNUNET_STATISTICS_update (GSF_stats,
1706 gettext_noop ("# migration stop messages sent"),
1707 1,
1708 GNUNET_NO);
1709 GSF_peer_transmit_ (cp,
1710 GNUNET_SYSERR,
1711 UINT32_MAX,
1712 env);
1949} 1713}
1950 1714
1951 1715
@@ -1998,24 +1762,6 @@ GSF_connected_peer_init_ ()
1998 1762
1999 1763
2000/** 1764/**
2001 * Iterator to free peer entries.
2002 *
2003 * @param cls closure, unused
2004 * @param key current key code
2005 * @param value value in the hash map (peer entry)
2006 * @return #GNUNET_YES (we should continue to iterate)
2007 */
2008static int
2009clean_peer (void *cls,
2010 const struct GNUNET_PeerIdentity *key,
2011 void *value)
2012{
2013 GSF_peer_disconnect_handler_ (NULL, key);
2014 return GNUNET_YES;
2015}
2016
2017
2018/**
2019 * Shutdown peer management subsystem. 1765 * Shutdown peer management subsystem.
2020 */ 1766 */
2021void 1767void
@@ -2024,9 +1770,6 @@ GSF_connected_peer_done_ ()
2024 GNUNET_CONTAINER_multipeermap_iterate (cp_map, 1770 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
2025 &flush_respect, 1771 &flush_respect,
2026 NULL); 1772 NULL);
2027 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
2028 &clean_peer,
2029 NULL);
2030 GNUNET_SCHEDULER_cancel (fr_task); 1773 GNUNET_SCHEDULER_cancel (fr_task);
2031 fr_task = NULL; 1774 fr_task = NULL;
2032 GNUNET_CONTAINER_multipeermap_destroy (cp_map); 1775 GNUNET_CONTAINER_multipeermap_destroy (cp_map);
@@ -2072,7 +1815,8 @@ GSF_handle_local_client_disconnect_ (const struct GSF_LocalClient *lc)
2072{ 1815{
2073 if (NULL == cp_map) 1816 if (NULL == cp_map)
2074 return; /* already cleaned up */ 1817 return; /* already cleaned up */
2075 GNUNET_CONTAINER_multipeermap_iterate (cp_map, &clean_local_client, 1818 GNUNET_CONTAINER_multipeermap_iterate (cp_map,
1819 &clean_local_client,
2076 (void *) lc); 1820 (void *) lc);
2077} 1821}
2078 1822