diff options
-rw-r--r-- | src/transport/gnunet-service-tng.c | 176 |
1 files changed, 141 insertions, 35 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 568e5b1d7..6a8a3fc4d 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -34,8 +34,6 @@ | |||
34 | * | 34 | * |
35 | * Implement next: | 35 | * Implement next: |
36 | * - DV data structures: | 36 | * - DV data structures: |
37 | * + initiation of DV learn (incl. RTT measurement logic!) | ||
38 | * - security considerations? add signatures to routes? initiator signature? | ||
39 | * + using DV routes! | 37 | * + using DV routes! |
40 | * - handling of DV-boxed messages that need to be forwarded | 38 | * - handling of DV-boxed messages that need to be forwarded |
41 | * - route_message implementation, including using DV data structures | 39 | * - route_message implementation, including using DV data structures |
@@ -133,8 +131,8 @@ | |||
133 | #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) | 131 | #define DELAY_WARN_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) |
134 | 132 | ||
135 | /** | 133 | /** |
136 | * We only consider queues as "quality" connections when | 134 | * We only consider queues as "quality" connections when |
137 | * suppressing the generation of DV initiation messages if | 135 | * suppressing the generation of DV initiation messages if |
138 | * the latency of the queue is below this threshold. | 136 | * the latency of the queue is below this threshold. |
139 | */ | 137 | */ |
140 | #define DV_QUALITY_RTT_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) | 138 | #define DV_QUALITY_RTT_THRESHOLD GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) |
@@ -183,7 +181,7 @@ | |||
183 | * do we need to have to suppress initiating DV learn messages? | 181 | * do we need to have to suppress initiating DV learn messages? |
184 | */ | 182 | */ |
185 | #define DV_LEARN_QUALITY_THRESHOLD 100 | 183 | #define DV_LEARN_QUALITY_THRESHOLD 100 |
186 | 184 | ||
187 | /** | 185 | /** |
188 | * When do we forget an invalid address for sure? | 186 | * When do we forget an invalid address for sure? |
189 | */ | 187 | */ |
@@ -817,7 +815,7 @@ enum ClientType | |||
817 | 815 | ||
818 | 816 | ||
819 | /** | 817 | /** |
820 | * When did we launch this DV learning activity? | 818 | * When did we launch this DV learning activity? |
821 | */ | 819 | */ |
822 | struct LearnLaunchEntry | 820 | struct LearnLaunchEntry |
823 | { | 821 | { |
@@ -3738,6 +3736,49 @@ handle_fragment_box (void *cls, | |||
3738 | 3736 | ||
3739 | 3737 | ||
3740 | /** | 3738 | /** |
3739 | * Check the @a fa against the fragments associated with @a pm. | ||
3740 | * If it matches, remove the matching fragments from the transmission | ||
3741 | * list. | ||
3742 | * | ||
3743 | * @param pm pending message to check against the ack | ||
3744 | * @param fa the ack that was received | ||
3745 | * @return #GNUNET_YES if @a fa matched, #GNUNET_NO if not | ||
3746 | */ | ||
3747 | static int | ||
3748 | check_ack_against_pm (struct PendingMessage *pm, | ||
3749 | const struct TransportFragmentAckMessage *fa) | ||
3750 | { | ||
3751 | int match; | ||
3752 | struct PendingMessage *nxt; | ||
3753 | uint32_t fs = ntohl (fa->frag_uuid); | ||
3754 | uint64_t xtra = GNUNET_ntohll (fa->extra_acks); | ||
3755 | |||
3756 | match = GNUNET_NO; | ||
3757 | for (struct PendingMessage *frag = pm->head_frag; | ||
3758 | NULL != frag; | ||
3759 | frag = nxt) | ||
3760 | { | ||
3761 | const struct TransportFragmentBox *tfb | ||
3762 | = (const struct TransportFragmentBox *) &pm[1]; | ||
3763 | uint32_t fu = ntohl (tfb->frag_uuid); | ||
3764 | |||
3765 | GNUNET_assert (PMT_FRAGMENT_BOX == frag->pmt); | ||
3766 | nxt = frag->next_frag; | ||
3767 | /* Check for exact match or match in the 'xtra' bitmask */ | ||
3768 | if ( (fu == fs) || | ||
3769 | ( (fu > fs) && | ||
3770 | (fu <= fs + 64) && | ||
3771 | (0 != (1LLU << (fu - fs - 1) & xtra)) ) ) | ||
3772 | { | ||
3773 | match = GNUNET_YES; | ||
3774 | free_fragment_tree (frag); | ||
3775 | } | ||
3776 | } | ||
3777 | return match; | ||
3778 | } | ||
3779 | |||
3780 | |||
3781 | /** | ||
3741 | * Communicator gave us a fragment acknowledgement. Process the request. | 3782 | * Communicator gave us a fragment acknowledgement. Process the request. |
3742 | * | 3783 | * |
3743 | * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) | 3784 | * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) |
@@ -3748,11 +3789,76 @@ handle_fragment_ack (void *cls, | |||
3748 | const struct TransportFragmentAckMessage *fa) | 3789 | const struct TransportFragmentAckMessage *fa) |
3749 | { | 3790 | { |
3750 | struct CommunicatorMessageContext *cmc = cls; | 3791 | struct CommunicatorMessageContext *cmc = cls; |
3792 | struct Neighbour *n; | ||
3793 | int matched; | ||
3751 | 3794 | ||
3752 | // FIXME: do work: identify original message; then identify fragments being acked; | 3795 | n = GNUNET_CONTAINER_multipeermap_get (neighbours, |
3753 | // remove those from the tree to prevent retransmission; | 3796 | &cmc->im.sender); |
3754 | // compute RTT | 3797 | if (NULL == n) |
3755 | // if entire message is ACKed, handle that as well. | 3798 | { |
3799 | struct GNUNET_SERVICE_Client *client = cmc->tc->client; | ||
3800 | |||
3801 | GNUNET_break (0); | ||
3802 | finish_cmc_handling (cmc); | ||
3803 | GNUNET_SERVICE_client_drop (client); | ||
3804 | return; | ||
3805 | } | ||
3806 | /* FIXME-OPTIMIZE: maybe use another hash map here? */ | ||
3807 | matched = GNUNET_NO; | ||
3808 | for (struct PendingMessage *pm = n->pending_msg_head; | ||
3809 | NULL != pm; | ||
3810 | pm = pm->prev_neighbour) | ||
3811 | { | ||
3812 | if (0 != | ||
3813 | GNUNET_memcmp (&fa->msg_uuid, | ||
3814 | &pm->msg_uuid)) | ||
3815 | continue; | ||
3816 | matched = GNUNET_YES; | ||
3817 | if (GNUNET_YES == | ||
3818 | check_ack_against_pm (pm, | ||
3819 | fa)) | ||
3820 | { | ||
3821 | struct GNUNET_TIME_Relative avg_ack_delay | ||
3822 | = GNUNET_TIME_relative_ntoh (fa->avg_ack_delay); | ||
3823 | // FIXME: update RTT and other reliability data! | ||
3824 | // ISSUE: we don't know which of n's queues the message(s) | ||
3825 | // took (and in fact the different messages might have gone | ||
3826 | // over different queues and possibly over multiple). | ||
3827 | // => track queues with PendingMessages, and update RTT only if | ||
3828 | // the queue used is unique? | ||
3829 | // -> how can we get loss rates? | ||
3830 | // -> or, add extra state to Box and ACK to identify queue? | ||
3831 | (void) avg_ack_delay; | ||
3832 | } | ||
3833 | else | ||
3834 | { | ||
3835 | GNUNET_STATISTICS_update (GST_stats, | ||
3836 | "# FRAGMENT_ACKS dropped, no matching fragment", | ||
3837 | 1, | ||
3838 | GNUNET_NO); | ||
3839 | } | ||
3840 | if (NULL == pm->head_frag) | ||
3841 | { | ||
3842 | // if entire message is ACKed, handle that as well. | ||
3843 | // => clean up PM, any post actions? | ||
3844 | free_pending_message (pm); | ||
3845 | } | ||
3846 | else | ||
3847 | { | ||
3848 | struct GNUNET_TIME_Relative reassembly_timeout | ||
3849 | = GNUNET_TIME_relative_ntoh (fa->reassembly_timeout); | ||
3850 | // OPTIMIZE-FIXME: adjust retransmission strategy based on reassembly_timeout! | ||
3851 | (void) reassembly_timeout; | ||
3852 | } | ||
3853 | break; | ||
3854 | } | ||
3855 | if (GNUNET_NO == matched) | ||
3856 | { | ||
3857 | GNUNET_STATISTICS_update (GST_stats, | ||
3858 | "# FRAGMENT_ACKS dropped, no matching pending message", | ||
3859 | 1, | ||
3860 | GNUNET_NO); | ||
3861 | } | ||
3756 | finish_cmc_handling (cmc); | 3862 | finish_cmc_handling (cmc); |
3757 | } | 3863 | } |
3758 | 3864 | ||
@@ -4921,8 +5027,8 @@ set_pending_message_uuid (struct PendingMessage *pm) | |||
4921 | if (pm->msg_uuid_set) | 5027 | if (pm->msg_uuid_set) |
4922 | return; | 5028 | return; |
4923 | GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, | 5029 | GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, |
4924 | &pm->msg_uuid, | 5030 | &pm->msg_uuid, |
4925 | sizeof (pm->msg_uuid)); | 5031 | sizeof (pm->msg_uuid)); |
4926 | pm->msg_uuid_set = GNUNET_YES; | 5032 | pm->msg_uuid_set = GNUNET_YES; |
4927 | } | 5033 | } |
4928 | 5034 | ||
@@ -4939,7 +5045,7 @@ set_pending_message_uuid (struct PendingMessage *pm) | |||
4939 | */ | 5045 | */ |
4940 | static struct PendingMessage * | 5046 | static struct PendingMessage * |
4941 | fragment_message (struct PendingMessage *pm, | 5047 | fragment_message (struct PendingMessage *pm, |
4942 | uint16_t mtu) | 5048 | uint16_t mtu) |
4943 | { | 5049 | { |
4944 | struct PendingMessage *ff; | 5050 | struct PendingMessage *ff; |
4945 | 5051 | ||
@@ -4952,15 +5058,15 @@ fragment_message (struct PendingMessage *pm, | |||
4952 | been expanded until we are at a leaf or at a fragment that is small enough */ | 5058 | been expanded until we are at a leaf or at a fragment that is small enough */ |
4953 | ff = pm; | 5059 | ff = pm; |
4954 | while ( ( (ff->bytes_msg > mtu) || | 5060 | while ( ( (ff->bytes_msg > mtu) || |
4955 | (pm == ff) ) && | 5061 | (pm == ff) ) && |
4956 | (ff->frag_off == ff->bytes_msg) && | 5062 | (ff->frag_off == ff->bytes_msg) && |
4957 | (NULL != ff->head_frag) ) | 5063 | (NULL != ff->head_frag) ) |
4958 | { | 5064 | { |
4959 | ff = ff->head_frag; /* descent into fragmented fragments */ | 5065 | ff = ff->head_frag; /* descent into fragmented fragments */ |
4960 | } | 5066 | } |
4961 | 5067 | ||
4962 | if ( ( (ff->bytes_msg > mtu) || | 5068 | if ( ( (ff->bytes_msg > mtu) || |
4963 | (pm == ff) ) && | 5069 | (pm == ff) ) && |
4964 | (pm->frag_off < pm->bytes_msg) ) | 5070 | (pm->frag_off < pm->bytes_msg) ) |
4965 | { | 5071 | { |
4966 | /* Did not yet calculate all fragments, calculate next fragment */ | 5072 | /* Did not yet calculate all fragments, calculate next fragment */ |
@@ -4986,10 +5092,10 @@ fragment_message (struct PendingMessage *pm, | |||
4986 | } | 5092 | } |
4987 | fragmax = mtu - sizeof (struct TransportFragmentBox); | 5093 | fragmax = mtu - sizeof (struct TransportFragmentBox); |
4988 | fragsize = GNUNET_MIN (msize - ff->frag_off, | 5094 | fragsize = GNUNET_MIN (msize - ff->frag_off, |
4989 | fragmax); | 5095 | fragmax); |
4990 | frag = GNUNET_malloc (sizeof (struct PendingMessage) + | 5096 | frag = GNUNET_malloc (sizeof (struct PendingMessage) + |
4991 | sizeof (struct TransportFragmentBox) + | 5097 | sizeof (struct TransportFragmentBox) + |
4992 | fragsize); | 5098 | fragsize); |
4993 | frag->target = pm->target; | 5099 | frag->target = pm->target; |
4994 | frag->frag_parent = ff; | 5100 | frag->frag_parent = ff; |
4995 | frag->timeout = pm->timeout; | 5101 | frag->timeout = pm->timeout; |
@@ -4998,21 +5104,21 @@ fragment_message (struct PendingMessage *pm, | |||
4998 | msg = (char *) &frag[1]; | 5104 | msg = (char *) &frag[1]; |
4999 | tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT); | 5105 | tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT); |
5000 | tfb.header.size = htons (sizeof (struct TransportFragmentBox) + | 5106 | tfb.header.size = htons (sizeof (struct TransportFragmentBox) + |
5001 | fragsize); | 5107 | fragsize); |
5002 | tfb.frag_uuid = htonl (pm->frag_uuidgen++); | 5108 | tfb.frag_uuid = htonl (pm->frag_uuidgen++); |
5003 | tfb.msg_uuid = pm->msg_uuid; | 5109 | tfb.msg_uuid = pm->msg_uuid; |
5004 | tfb.frag_off = htons (ff->frag_off + xoff); | 5110 | tfb.frag_off = htons (ff->frag_off + xoff); |
5005 | tfb.msg_size = htons (pm->bytes_msg); | 5111 | tfb.msg_size = htons (pm->bytes_msg); |
5006 | memcpy (msg, | 5112 | memcpy (msg, |
5007 | &tfb, | 5113 | &tfb, |
5008 | sizeof (tfb)); | 5114 | sizeof (tfb)); |
5009 | memcpy (&msg[sizeof (tfb)], | 5115 | memcpy (&msg[sizeof (tfb)], |
5010 | &orig[ff->frag_off], | 5116 | &orig[ff->frag_off], |
5011 | fragsize); | 5117 | fragsize); |
5012 | GNUNET_CONTAINER_MDLL_insert (frag, | 5118 | GNUNET_CONTAINER_MDLL_insert (frag, |
5013 | ff->head_frag, | 5119 | ff->head_frag, |
5014 | ff->tail_frag, | 5120 | ff->tail_frag, |
5015 | frag); | 5121 | frag); |
5016 | ff->frag_off += fragsize; | 5122 | ff->frag_off += fragsize; |
5017 | ff = frag; | 5123 | ff = frag; |
5018 | } | 5124 | } |
@@ -5322,7 +5428,7 @@ static void | |||
5322 | tracker_excess_out_cb (void *cls) | 5428 | tracker_excess_out_cb (void *cls) |
5323 | { | 5429 | { |
5324 | (void) cls; | 5430 | (void) cls; |
5325 | 5431 | ||
5326 | /* FIXME: trigger excess bandwidth report to core? Right now, | 5432 | /* FIXME: trigger excess bandwidth report to core? Right now, |
5327 | this is done internally within transport_api2_core already, | 5433 | this is done internally within transport_api2_core already, |
5328 | but we probably want to change the logic and trigger it | 5434 | but we probably want to change the logic and trigger it |
@@ -5719,7 +5825,7 @@ struct QueueQualityContext | |||
5719 | { | 5825 | { |
5720 | /** | 5826 | /** |
5721 | * Set to the @e k'th queue encountered. | 5827 | * Set to the @e k'th queue encountered. |
5722 | */ | 5828 | */ |
5723 | struct Queue *q; | 5829 | struct Queue *q; |
5724 | 5830 | ||
5725 | /** | 5831 | /** |
@@ -5729,7 +5835,7 @@ struct QueueQualityContext | |||
5729 | 5835 | ||
5730 | /** | 5836 | /** |
5731 | * Set to the total number of queues encountered. | 5837 | * Set to the total number of queues encountered. |
5732 | */ | 5838 | */ |
5733 | unsigned int num_queues; | 5839 | unsigned int num_queues; |
5734 | 5840 | ||
5735 | /** | 5841 | /** |
@@ -5784,7 +5890,7 @@ check_connection_quality (void *cls, | |||
5784 | 5890 | ||
5785 | 5891 | ||
5786 | /** | 5892 | /** |
5787 | * Task run when we CONSIDER initiating a DV learn | 5893 | * Task run when we CONSIDER initiating a DV learn |
5788 | * process. We first check that sending out a message is | 5894 | * process. We first check that sending out a message is |
5789 | * even possible (queues exist), then that it is desirable | 5895 | * even possible (queues exist), then that it is desirable |
5790 | * (if not, reschedule the task for later), and finally | 5896 | * (if not, reschedule the task for later), and finally |
@@ -5882,15 +5988,15 @@ start_dv_learn (void *cls) | |||
5882 | &check_connection_quality, | 5988 | &check_connection_quality, |
5883 | &qqc); | 5989 | &qqc); |
5884 | GNUNET_assert (NULL != qqc.q); | 5990 | GNUNET_assert (NULL != qqc.q); |
5885 | 5991 | ||
5886 | /* Do this as close to transmission time as possible! */ | 5992 | /* Do this as close to transmission time as possible! */ |
5887 | lle->launch_time = GNUNET_TIME_absolute_get (); | 5993 | lle->launch_time = GNUNET_TIME_absolute_get (); |
5888 | // FIXME: not so easy, need to BOX this message | 5994 | // FIXME: not so easy, need to BOX this message |
5889 | // in a transmission request! (mistake also done elsewhere!) | 5995 | // in a transmission request! (mistake also done elsewhere!) |
5890 | GNUNET_MQ_send (qqc.q->tc->mq, | 5996 | GNUNET_MQ_send (qqc.q->tc->mq, |
5891 | env); | 5997 | env); |
5892 | 5998 | ||
5893 | /* reschedule this job, randomizing the time it runs (but no | 5999 | /* reschedule this job, randomizing the time it runs (but no |
5894 | actual backoff!) */ | 6000 | actual backoff!) */ |
5895 | dvlearn_task | 6001 | dvlearn_task |
5896 | = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (DV_LEARN_BASE_FREQUENCY), | 6002 | = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_randomize (DV_LEARN_BASE_FREQUENCY), |