diff options
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 123 |
1 files changed, 117 insertions, 6 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index ac4a262d7..5a8bf5bc1 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -50,6 +50,9 @@ | |||
50 | * directions, allow multiple messages per peer simultaneously (tag | 50 | * directions, allow multiple messages per peer simultaneously (tag |
51 | * confirmations with unique message ID), and replace quota-out with | 51 | * confirmations with unique message ID), and replace quota-out with |
52 | * proper flow control; | 52 | * proper flow control; |
53 | * - if messages are below MTU, consider adding ACKs and other stuff | ||
54 | * (requires planning at receiver, and additional MST-style demultiplex | ||
55 | * at receiver!) | ||
53 | * | 56 | * |
54 | * Design realizations / discussion: | 57 | * Design realizations / discussion: |
55 | * - communicators do flow control by calling MQ "notify sent" | 58 | * - communicators do flow control by calling MQ "notify sent" |
@@ -2800,6 +2803,37 @@ check_fragment_box (void *cls, | |||
2800 | 2803 | ||
2801 | 2804 | ||
2802 | /** | 2805 | /** |
2806 | * Generate a fragment acknowledgement for an @a rc. | ||
2807 | * | ||
2808 | * @param rc context to generate ACK for, @a rc ACK state is reset | ||
2809 | */ | ||
2810 | static void | ||
2811 | send_fragment_ack (struct ReassemblyContext *rc) | ||
2812 | { | ||
2813 | struct TransportFragmentAckMessage *ack; | ||
2814 | |||
2815 | ack = GNUNET_new (struct TransportFragmentAckMessage); | ||
2816 | ack->header.size = htons (sizeof (struct TransportFragmentAckMessage)); | ||
2817 | ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK); | ||
2818 | ack->frag_uuid = htonl (rc->frag_uuid); | ||
2819 | ack->extra_acks = GNUNET_htonll (rc->extra_acks); | ||
2820 | ack->msg_uuid = rc->msg_uuid; | ||
2821 | ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay); | ||
2822 | if (0 == rc->msg_missing) | ||
2823 | ack->reassembly_timeout | ||
2824 | = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */ | ||
2825 | else | ||
2826 | ack->reassembly_timeout | ||
2827 | = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout)); | ||
2828 | route_message (&rc->neighbour->pid, | ||
2829 | &ack->header); | ||
2830 | rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO; | ||
2831 | rc->num_acks = 0; | ||
2832 | rc->extra_acks = 0LLU; | ||
2833 | } | ||
2834 | |||
2835 | |||
2836 | /** | ||
2803 | * Communicator gave us a fragment. Process the request. | 2837 | * Communicator gave us a fragment. Process the request. |
2804 | * | 2838 | * |
2805 | * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) | 2839 | * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) |
@@ -2814,6 +2848,12 @@ handle_fragment_box (void *cls, | |||
2814 | struct ReassemblyContext *rc; | 2848 | struct ReassemblyContext *rc; |
2815 | const struct GNUNET_MessageHeader *msg; | 2849 | const struct GNUNET_MessageHeader *msg; |
2816 | uint16_t msize; | 2850 | uint16_t msize; |
2851 | uint16_t fsize; | ||
2852 | uint16_t frag_off; | ||
2853 | uint32_t frag_uuid; | ||
2854 | char *target; | ||
2855 | struct GNUNET_TIME_Relative cdelay; | ||
2856 | int ack_now; | ||
2817 | 2857 | ||
2818 | n = GNUNET_CONTAINER_multipeermap_get (neighbours, | 2858 | n = GNUNET_CONTAINER_multipeermap_get (neighbours, |
2819 | &cmc->im.sender); | 2859 | &cmc->im.sender); |
@@ -2856,9 +2896,14 @@ handle_fragment_box (void *cls, | |||
2856 | &rc->msg_uuid, | 2896 | &rc->msg_uuid, |
2857 | rc, | 2897 | rc, |
2858 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 2898 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
2859 | rc->bitfield = (uint8_t *) (((char *) &rc[1]) + rc->msg_size); | 2899 | target = (char *) &rc[1]; |
2900 | rc->bitfield = (uint8_t *) (target + rc->msg_size); | ||
2860 | rc->msg_missing = rc->msg_size; | 2901 | rc->msg_missing = rc->msg_size; |
2861 | } | 2902 | } |
2903 | else | ||
2904 | { | ||
2905 | target = (char *) &rc[1]; | ||
2906 | } | ||
2862 | if (msize != rc->msg_size) | 2907 | if (msize != rc->msg_size) |
2863 | { | 2908 | { |
2864 | GNUNET_break (0); | 2909 | GNUNET_break (0); |
@@ -2866,12 +2911,78 @@ handle_fragment_box (void *cls, | |||
2866 | return; | 2911 | return; |
2867 | } | 2912 | } |
2868 | 2913 | ||
2869 | // FIXME: do work: reassemble | 2914 | /* reassemble */ |
2870 | 2915 | fsize = ntohs (fb->header.size) - sizeof (*fb); | |
2916 | frag_off = ntohs (fb->frag_off); | ||
2917 | memcpy (&target[frag_off], | ||
2918 | &fb[1], | ||
2919 | fsize); | ||
2920 | /* update bitfield and msg_missing */ | ||
2921 | for (unsigned int i=frag_off;i<frag_off+fsize;i++) | ||
2922 | { | ||
2923 | if (0 == (rc->bitfield[i / 8] & (1 << (i % 8)))) | ||
2924 | { | ||
2925 | rc->bitfield[i / 8] |= (1 << (i % 8)); | ||
2926 | rc->msg_missing--; | ||
2927 | } | ||
2928 | } | ||
2929 | |||
2930 | /* Compute cummulative ACK */ | ||
2931 | frag_uuid = ntohl (fb->frag_uuid); | ||
2932 | cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag); | ||
2933 | cdelay = GNUNET_TIME_relative_multiply (cdelay, | ||
2934 | rc->num_acks); | ||
2935 | rc->last_frag = GNUNET_TIME_absolute_get (); | ||
2936 | rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay, | ||
2937 | cdelay); | ||
2938 | ack_now = GNUNET_NO; | ||
2939 | if (0 == rc->num_acks) | ||
2940 | { | ||
2941 | /* case one: first ack */ | ||
2942 | rc->frag_uuid = frag_uuid; | ||
2943 | rc->extra_acks = 0LLU; | ||
2944 | rc->num_acks = 1; | ||
2945 | } | ||
2946 | else if ( (frag_uuid >= rc->frag_uuid) && | ||
2947 | (frag_uuid <= rc->frag_uuid + 64) ) | ||
2948 | { | ||
2949 | /* case two: ack fits after existing min UUID */ | ||
2950 | if ( (frag_uuid == rc->frag_uuid) || | ||
2951 | (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) ) | ||
2952 | { | ||
2953 | /* duplicate fragment, ack now! */ | ||
2954 | ack_now = GNUNET_YES; | ||
2955 | } | ||
2956 | else | ||
2957 | { | ||
2958 | rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1)); | ||
2959 | rc->num_acks++; | ||
2960 | } | ||
2961 | } | ||
2962 | else if ( (rc->frag_uuid > frag_uuid) && | ||
2963 | ( ( (rc->frag_uuid == frag_uuid + 64) && | ||
2964 | (0 == rc->extra_acks) ) || | ||
2965 | ( (rc->frag_uuid < frag_uuid + 64) && | ||
2966 | (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) ) | ||
2967 | { | ||
2968 | /* can fit ack by shifting extra acks and starting at | ||
2969 | frag_uid, test above esured that the bits we will | ||
2970 | shift 'extra_acks' by are all zero. */ | ||
2971 | rc->extra_acks <<= (rc->frag_uuid - frag_uuid); | ||
2972 | rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1)); | ||
2973 | rc->frag_uuid = frag_uuid; | ||
2974 | rc->num_acks++; | ||
2975 | } | ||
2976 | if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */ | ||
2977 | ack_now = GNUNET_YES; /* maximum acks received */ | ||
2978 | // FIXME: possibly also ACK based on RTT (but for that we'd need to | ||
2979 | // determine the session used for the ACK first!) | ||
2980 | |||
2871 | /* is reassembly complete? */ | 2981 | /* is reassembly complete? */ |
2872 | if (0 != rc->msg_missing) | 2982 | if (0 != rc->msg_missing) |
2873 | { | 2983 | { |
2874 | /* FIXME: possibly send ACK! */ | 2984 | if (ack_now) |
2985 | send_fragment_ack (rc); | ||
2875 | finish_cmc_handling (cmc); | 2986 | finish_cmc_handling (cmc); |
2876 | return; | 2987 | return; |
2877 | } | 2988 | } |
@@ -2885,7 +2996,7 @@ handle_fragment_box (void *cls, | |||
2885 | return; | 2996 | return; |
2886 | } | 2997 | } |
2887 | /* successful reassembly */ | 2998 | /* successful reassembly */ |
2888 | /* FIXME: definitively send ACK! */ | 2999 | send_fragment_ack (rc); |
2889 | demultiplex_with_cmc (cmc, | 3000 | demultiplex_with_cmc (cmc, |
2890 | msg); | 3001 | msg); |
2891 | /* FIXME: really free here? Might be bad if fragments are still | 3002 | /* FIXME: really free here? Might be bad if fragments are still |
@@ -3146,7 +3257,7 @@ handle_dv_box (void *cls, | |||
3146 | const struct TransportDVBox *dvb) | 3257 | const struct TransportDVBox *dvb) |
3147 | { | 3258 | { |
3148 | struct CommunicatorMessageContext *cmc = cls; | 3259 | struct CommunicatorMessageContext *cmc = cls; |
3149 | uint16_t size = ntohs (dvb->header.size); | 3260 | uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb); |
3150 | uint16_t num_hops = ntohs (dvb->num_hops); | 3261 | uint16_t num_hops = ntohs (dvb->num_hops); |
3151 | const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1]; | 3262 | const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1]; |
3152 | const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops]; | 3263 | const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops]; |