aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-01-26 17:20:47 +0100
committerChristian Grothoff <christian@grothoff.org>2019-01-26 17:20:47 +0100
commitf8cea0ca2c5a683cdd0209e3027599c56b2ec4f3 (patch)
treec9a562a3bc7bc58c4c1d03163a8382134b3cc538 /src
parentcb26df28be6f46898c34d7e8957baa86fa56ed11 (diff)
downloadgnunet-f8cea0ca2c5a683cdd0209e3027599c56b2ec4f3.tar.gz
gnunet-f8cea0ca2c5a683cdd0209e3027599c56b2ec4f3.zip
reassembly logic
Diffstat (limited to 'src')
-rw-r--r--src/transport/gnunet-service-tng.c123
1 files changed, 117 insertions, 6 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index ac4a262d7..5a8bf5bc1 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -50,6 +50,9 @@
50 * directions, allow multiple messages per peer simultaneously (tag 50 * directions, allow multiple messages per peer simultaneously (tag
51 * confirmations with unique message ID), and replace quota-out with 51 * confirmations with unique message ID), and replace quota-out with
52 * proper flow control; 52 * proper flow control;
53 * - if messages are below MTU, consider adding ACKs and other stuff
54 * (requires planning at receiver, and additional MST-style demultiplex
55 * at receiver!)
53 * 56 *
54 * Design realizations / discussion: 57 * Design realizations / discussion:
55 * - communicators do flow control by calling MQ "notify sent" 58 * - communicators do flow control by calling MQ "notify sent"
@@ -2800,6 +2803,37 @@ check_fragment_box (void *cls,
2800 2803
2801 2804
2802/** 2805/**
2806 * Generate a fragment acknowledgement for an @a rc.
2807 *
2808 * @param rc context to generate ACK for, @a rc ACK state is reset
2809 */
2810static void
2811send_fragment_ack (struct ReassemblyContext *rc)
2812{
2813 struct TransportFragmentAckMessage *ack;
2814
2815 ack = GNUNET_new (struct TransportFragmentAckMessage);
2816 ack->header.size = htons (sizeof (struct TransportFragmentAckMessage));
2817 ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK);
2818 ack->frag_uuid = htonl (rc->frag_uuid);
2819 ack->extra_acks = GNUNET_htonll (rc->extra_acks);
2820 ack->msg_uuid = rc->msg_uuid;
2821 ack->avg_ack_delay = GNUNET_TIME_relative_hton (rc->avg_ack_delay);
2822 if (0 == rc->msg_missing)
2823 ack->reassembly_timeout
2824 = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_FOREVER_REL); /* signal completion */
2825 else
2826 ack->reassembly_timeout
2827 = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (rc->reassembly_timeout));
2828 route_message (&rc->neighbour->pid,
2829 &ack->header);
2830 rc->avg_ack_delay = GNUNET_TIME_UNIT_ZERO;
2831 rc->num_acks = 0;
2832 rc->extra_acks = 0LLU;
2833}
2834
2835
2836/**
2803 * Communicator gave us a fragment. Process the request. 2837 * Communicator gave us a fragment. Process the request.
2804 * 2838 *
2805 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done) 2839 * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
@@ -2814,6 +2848,12 @@ handle_fragment_box (void *cls,
2814 struct ReassemblyContext *rc; 2848 struct ReassemblyContext *rc;
2815 const struct GNUNET_MessageHeader *msg; 2849 const struct GNUNET_MessageHeader *msg;
2816 uint16_t msize; 2850 uint16_t msize;
2851 uint16_t fsize;
2852 uint16_t frag_off;
2853 uint32_t frag_uuid;
2854 char *target;
2855 struct GNUNET_TIME_Relative cdelay;
2856 int ack_now;
2817 2857
2818 n = GNUNET_CONTAINER_multipeermap_get (neighbours, 2858 n = GNUNET_CONTAINER_multipeermap_get (neighbours,
2819 &cmc->im.sender); 2859 &cmc->im.sender);
@@ -2856,9 +2896,14 @@ handle_fragment_box (void *cls,
2856 &rc->msg_uuid, 2896 &rc->msg_uuid,
2857 rc, 2897 rc,
2858 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 2898 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
2859 rc->bitfield = (uint8_t *) (((char *) &rc[1]) + rc->msg_size); 2899 target = (char *) &rc[1];
2900 rc->bitfield = (uint8_t *) (target + rc->msg_size);
2860 rc->msg_missing = rc->msg_size; 2901 rc->msg_missing = rc->msg_size;
2861 } 2902 }
2903 else
2904 {
2905 target = (char *) &rc[1];
2906 }
2862 if (msize != rc->msg_size) 2907 if (msize != rc->msg_size)
2863 { 2908 {
2864 GNUNET_break (0); 2909 GNUNET_break (0);
@@ -2866,12 +2911,78 @@ handle_fragment_box (void *cls,
2866 return; 2911 return;
2867 } 2912 }
2868 2913
2869 // FIXME: do work: reassemble 2914 /* reassemble */
2870 2915 fsize = ntohs (fb->header.size) - sizeof (*fb);
2916 frag_off = ntohs (fb->frag_off);
2917 memcpy (&target[frag_off],
2918 &fb[1],
2919 fsize);
2920 /* update bitfield and msg_missing */
2921 for (unsigned int i=frag_off;i<frag_off+fsize;i++)
2922 {
2923 if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
2924 {
2925 rc->bitfield[i / 8] |= (1 << (i % 8));
2926 rc->msg_missing--;
2927 }
2928 }
2929
2930 /* Compute cummulative ACK */
2931 frag_uuid = ntohl (fb->frag_uuid);
2932 cdelay = GNUNET_TIME_absolute_get_duration (rc->last_frag);
2933 cdelay = GNUNET_TIME_relative_multiply (cdelay,
2934 rc->num_acks);
2935 rc->last_frag = GNUNET_TIME_absolute_get ();
2936 rc->avg_ack_delay = GNUNET_TIME_relative_add (rc->avg_ack_delay,
2937 cdelay);
2938 ack_now = GNUNET_NO;
2939 if (0 == rc->num_acks)
2940 {
2941 /* case one: first ack */
2942 rc->frag_uuid = frag_uuid;
2943 rc->extra_acks = 0LLU;
2944 rc->num_acks = 1;
2945 }
2946 else if ( (frag_uuid >= rc->frag_uuid) &&
2947 (frag_uuid <= rc->frag_uuid + 64) )
2948 {
2949 /* case two: ack fits after existing min UUID */
2950 if ( (frag_uuid == rc->frag_uuid) ||
2951 (0 != (rc->extra_acks & (1LLU << (frag_uuid - rc->frag_uuid - 1)))) )
2952 {
2953 /* duplicate fragment, ack now! */
2954 ack_now = GNUNET_YES;
2955 }
2956 else
2957 {
2958 rc->extra_acks |= (1LLU << (frag_uuid - rc->frag_uuid - 1));
2959 rc->num_acks++;
2960 }
2961 }
2962 else if ( (rc->frag_uuid > frag_uuid) &&
2963 ( ( (rc->frag_uuid == frag_uuid + 64) &&
2964 (0 == rc->extra_acks) ) ||
2965 ( (rc->frag_uuid < frag_uuid + 64) &&
2966 (rc->extra_acks == (rc->extra_acks & ~ ((1LLU << (64 - (rc->frag_uuid - frag_uuid))) - 1LLU))) ) ) )
2967 {
2968 /* can fit ack by shifting extra acks and starting at
2969 frag_uid, test above esured that the bits we will
2970 shift 'extra_acks' by are all zero. */
2971 rc->extra_acks <<= (rc->frag_uuid - frag_uuid);
2972 rc->extra_acks |= (1LLU << (rc->frag_uuid - frag_uuid - 1));
2973 rc->frag_uuid = frag_uuid;
2974 rc->num_acks++;
2975 }
2976 if (65 == rc->num_acks) /* FIXME: maybe use smaller threshold? This is very aggressive. */
2977 ack_now = GNUNET_YES; /* maximum acks received */
2978 // FIXME: possibly also ACK based on RTT (but for that we'd need to
2979 // determine the session used for the ACK first!)
2980
2871 /* is reassembly complete? */ 2981 /* is reassembly complete? */
2872 if (0 != rc->msg_missing) 2982 if (0 != rc->msg_missing)
2873 { 2983 {
2874 /* FIXME: possibly send ACK! */ 2984 if (ack_now)
2985 send_fragment_ack (rc);
2875 finish_cmc_handling (cmc); 2986 finish_cmc_handling (cmc);
2876 return; 2987 return;
2877 } 2988 }
@@ -2885,7 +2996,7 @@ handle_fragment_box (void *cls,
2885 return; 2996 return;
2886 } 2997 }
2887 /* successful reassembly */ 2998 /* successful reassembly */
2888 /* FIXME: definitively send ACK! */ 2999 send_fragment_ack (rc);
2889 demultiplex_with_cmc (cmc, 3000 demultiplex_with_cmc (cmc,
2890 msg); 3001 msg);
2891 /* FIXME: really free here? Might be bad if fragments are still 3002 /* FIXME: really free here? Might be bad if fragments are still
@@ -3146,7 +3257,7 @@ handle_dv_box (void *cls,
3146 const struct TransportDVBox *dvb) 3257 const struct TransportDVBox *dvb)
3147{ 3258{
3148 struct CommunicatorMessageContext *cmc = cls; 3259 struct CommunicatorMessageContext *cmc = cls;
3149 uint16_t size = ntohs (dvb->header.size); 3260 uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
3150 uint16_t num_hops = ntohs (dvb->num_hops); 3261 uint16_t num_hops = ntohs (dvb->num_hops);
3151 const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1]; 3262 const struct GNUNET_PeerIdentity *hops = (const struct GNUNET_PeerIdentity *) &dvb[1];
3152 const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops]; 3263 const struct GNUNET_MessageHeader *inbox = (const struct GNUNET_MessageHeader *) &hops[num_hops];