From 0373e8a441feb05359b12be0e1803423783b55b2 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 2 Jun 2019 14:14:50 +0200 Subject: work on DV logic and FC --- src/transport/gnunet-service-tng.c | 195 ++++++++++++++++++++++++++++++------- 1 file changed, 161 insertions(+), 34 deletions(-) (limited to 'src/transport/gnunet-service-tng.c') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 83c057795..471ded644 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -28,7 +28,6 @@ * communicators do not offer flow control). * We do transmit FC window sizes now. Left: * for SENDING) - * - Increment "outbound_fc_window_size_used" on transmission * - Throttle sending if "outbound_fc_window_size_used" reaches limit * - Send *new* challenge when we get close to the limit (including * at the beginning when the limit is zero!) @@ -202,6 +201,13 @@ #define DELAY_WARN_THRESHOLD \ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) +/** + * If a DVBox could not be forwarded after this number of + * seconds we drop it. + */ +#define DV_FORWARD_TIMEOUT \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60) + /** * We only consider queues as "quality" connections when * suppressing the generation of DV initiation messages if @@ -939,9 +945,9 @@ struct TransportFlowControlMessage * Used to detect one-sided connection drops. On wrap-around, the * flow control counters will be reset as if the connection had * dropped. - */ + */ uint32_t seq GNUNET_PACKED; - + /** * Flow control window size in bytes, in NBO. * The receiver can send this many bytes at most. @@ -974,7 +980,6 @@ struct TransportFlowControlMessage * reset the counters for the number of bytes sent! */ struct GNUNET_TIME_AbsoluteNBO sender_time; - }; @@ -1300,7 +1305,7 @@ struct VirtualLink * Distance vector used by this virtual link, NULL if @e n is used. */ struct DistanceVector *dv; - + /** * Last challenge we received from @a n. * FIXME: where do we need this? @@ -1308,7 +1313,7 @@ struct VirtualLink struct ChallengeNonceP n_challenge; /** - * Last challenge we used with @a n for flow control. + * Last challenge we used with @a n for flow control. * FIXME: where do we need this? */ struct ChallengeNonceP my_challenge; @@ -1373,7 +1378,7 @@ struct VirtualLink * Based on the difference between how much the sender sent according * to the last #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message * (@e outbound_sent field) and how much we actually received at that - * time (@e incoming_fc_window_size_used). This delta is then + * time (@e incoming_fc_window_size_used). This delta is then * added onto the @e incoming_fc_window_size when determining the * @e outbound_window_size we send to the other peer. Initially zero. * May be negative if we (due to out-of-order delivery) actually received @@ -1407,7 +1412,7 @@ struct VirtualLink * received. */ uint32_t last_fc_seq; - + /** * How many more messages can we send to CORE before we exhaust * the receive window of CORE for this peer? If this hits zero, @@ -2057,7 +2062,12 @@ enum PendingMessageType /** * Reliability box. */ - PMT_RELIABILITY_BOX = 2 + PMT_RELIABILITY_BOX = 2, + + /** + * Pending message created during #forward_dv_box(). + */ + PMT_DV_BOX = 3 }; @@ -2133,8 +2143,8 @@ struct PendingMessage struct PendingAcknowledgement *pa_tail; /** - * This message, reliability boxed. Only possibly available if @e pmt is - * #PMT_CORE. + * This message, reliability *or* DV-boxed. Only possibly available + * if @e pmt is #PMT_CORE. */ struct PendingMessage *bpm; @@ -2949,7 +2959,11 @@ free_pending_message (struct PendingMessage *pm) GNUNET_assert (pm == pm->qe->pm); pm->qe->pm = NULL; } - GNUNET_free_non_null (pm->bpm); + if (NULL != pm->bpm) + { + free_fragment_tree (pm->bpm); + GNUNET_free (pm->bpm); + } GNUNET_free (pm); } @@ -4751,16 +4765,18 @@ send_dv_to_neighbour (void *cls, /** * We need to transmit @a hdr to @a target. If necessary, this may - * involve DV routing. + * involve DV routing. This function routes without applying flow + * control or congestion control and should only be used for control + * traffic. * * @param target peer to receive @a hdr * @param hdr header of the message to route and #GNUNET_free() * @param options which transmission channels are allowed */ static void -route_message (const struct GNUNET_PeerIdentity *target, - const struct GNUNET_MessageHeader *hdr, - enum RouteMessageOptions options) +route_control_message_without_fc (const struct GNUNET_PeerIdentity *target, + const struct GNUNET_MessageHeader *hdr, + enum RouteMessageOptions options) { struct VirtualLink *vl; struct Neighbour *n; @@ -4878,7 +4894,7 @@ handle_communicator_backchannel ( isize], is, strlen (is) + 1); - route_message (&cb->pid, &be->header, RMO_DV_ALLOWED); + route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED); GNUNET_SERVICE_client_continue (tc->client); } @@ -5303,7 +5319,7 @@ transmit_cummulative_ack_cb (void *cls) ap[i].ack_delay = GNUNET_TIME_relative_hton ( GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time)); } - route_message (&ac->target, &ack->header, RMO_DV_ALLOWED); + route_control_message_without_fc (&ac->target, &ack->header, RMO_DV_ALLOWED); ac->num_acks = 0; ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT, &destroy_ack_cummulator, @@ -5742,6 +5758,12 @@ completed_pending_message (struct PendingMessage *pm) (pos->frag_off == pos->bytes_msg)) client_send_response (pos); return; + case PMT_DV_BOX: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Completed transmission of message %llu (DV Box)\n", + pm->logging_uuid); + free_pending_message (pm); + return; } } @@ -6309,7 +6331,9 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop, &dhp.purpose, &dhops[nhops].hop_sig)); } - route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED); + route_control_message_without_fc (next_hop, + &fwd->header, + RMO_UNCONFIRMED_ALLOWED); } @@ -6924,17 +6948,39 @@ forward_dv_box (struct Neighbour *next_hop, const void *enc_payload, uint16_t enc_payload_size) { - char buf[sizeof (struct TransportDVBoxMessage) + - num_hops * sizeof (struct GNUNET_PeerIdentity) + enc_payload_size]; - struct GNUNET_PeerIdentity *dhops = - (struct GNUNET_PeerIdentity *) &buf[sizeof (struct TransportDVBoxMessage)]; - + struct VirtualLink *vl = next_hop->vl; + struct PendingMessage *pm; + size_t msg_size; + char *buf; + struct GNUNET_PeerIdentity *dhops; + + GNUNET_assert (NULL != vl); + msg_size = sizeof (struct TransportDVBoxMessage) + + num_hops * sizeof (struct GNUNET_PeerIdentity) + enc_payload_size; + pm = GNUNET_malloc (sizeof (struct PendingMessage) + msg_size); + pm->pmt = PMT_DV_BOX; + pm->vl = vl; + pm->timeout = GNUNET_TIME_relative_to_absolute (DV_FORWARD_TIMEOUT); + pm->logging_uuid = logging_uuid_gen++; + pm->prefs = GNUNET_MQ_PRIO_BACKGROUND; + pm->bytes_msg = msg_size; + buf = (char *) &pm[1]; memcpy (buf, hdr, sizeof (*hdr)); + dhops = + (struct GNUNET_PeerIdentity *) &buf[sizeof (struct TransportDVBoxMessage)]; memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity)); memcpy (&dhops[num_hops], enc_payload, enc_payload_size); - route_message (&next_hop->pid, - (const struct GNUNET_MessageHeader *) buf, - RMO_NONE); + GNUNET_CONTAINER_MDLL_insert (vl, + vl->pending_msg_head, + vl->pending_msg_tail, + pm); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Created pending message %llu for DV Box with next hop %s (%u/%u)\n", + pm->logging_uuid, + GNUNET_i2s (&next_hop->pid), + (unsigned int) num_hops, + (unsigned int) total_hops); + check_vl_transmission (vl); } @@ -7696,9 +7742,9 @@ handle_validation_challenge ( &tvp.purpose, &tvr.signature)); } - route_message (&cmc->im.sender, - &tvr.header, - RMO_ANYTHING_GOES | RMO_REDUNDANT); + route_control_message_without_fc (&cmc->im.sender, + &tvr.header, + RMO_ANYTHING_GOES | RMO_REDUNDANT); finish_cmc_handling (cmc); vl = lookup_virtual_link (&cmc->im.sender); @@ -8459,6 +8505,8 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc, int frag; int relb; + if ((NULL != dvh) && (PMT_DV_BOX == pos->pmt)) + continue; /* DV messages must not be DV-routed to next hop! */ if (pos->next_attempt.abs_value_us > now.abs_value_us) break; /* too early for all messages, they are sorted by next_attempt */ if (NULL != pos->qe) @@ -8546,6 +8594,41 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc, } +/** + * Function to call to further operate on the now DV encapsulated + * message @a hdr, forwarding it via @a next_hop under respect of + * @a options. + * + * @param cls a `struct PendingMessageScoreContext` + * @param next_hop next hop of the DV path + * @param hdr encapsulated message, technically a `struct TransportDFBoxMessage` + * @param options options of the original message + */ +static void +extract_box_cb (void *cls, + struct Neighbour *next_hop, + const struct GNUNET_MessageHeader *hdr, + enum RouteMessageOptions options) +{ + struct PendingMessageScoreContext *sc = cls; + struct PendingMessage *pm = sc->best; + struct PendingMessage *bpm; + uint16_t bsize = ntohs (hdr->size); + + GNUNET_assert (NULL == pm->bpm); + bpm = GNUNET_malloc (sizeof (struct PendingMessage) + bsize); + bpm->logging_uuid = logging_uuid_gen++; + bpm->pmt = PMT_DV_BOX; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating DV Box %llu for original message %llu (next hop is %s)\n", + bpm->logging_uuid, + pm->logging_uuid, + GNUNET_i2s (&next_hop->pid)); + memcpy (&bpm[1], hdr, bsize); + pm->bpm = bpm; +} + + /** * We believe we are ready to transmit a `struct PendingMessage` on a * queue, the big question is which one! We need to see if there is @@ -8610,9 +8693,30 @@ transmit_on_queue (void *cls) /* Given selection in `sc`, do transmission */ pm = sc.best; + if (NULL != sc.dvh) + { + GNUNET_assert (PMT_DV_BOX != pm->pmt); + if (NULL != sc.best->bpm) + { + /* We did this boxing before, but possibly for a different path! + Discard old DV box! OPTIMIZE-ME: we might want to check if + it is the same and then not re-build the message... */ + free_pending_message (sc.best->bpm); + sc.best->bpm = NULL; + } + encapsulate_for_dv (sc.dvh->dv, + 1, + &sc.dvh, + (const struct GNUNET_MessageHeader *) &sc.best[1], + &extract_box_cb, + &sc, + RMO_NONE); + GNUNET_assert (NULL != sc.best->bpm); + pm = sc.best->bpm; + } if (GNUNET_YES == sc.frag) { - pm = fragment_message (queue, sc.dvh, sc.best); + pm = fragment_message (queue, sc.dvh, pm); if (NULL == pm) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -8625,7 +8729,7 @@ transmit_on_queue (void *cls) } else if (GNUNET_YES == sc.relb) { - pm = reliability_box_message (queue, sc.dvh, sc.best); + pm = reliability_box_message (queue, sc.dvh, pm); if (NULL == pm) { /* Reliability boxing failed, try next message... */ @@ -8639,8 +8743,6 @@ transmit_on_queue (void *cls) return; } } - else - pm = sc.best; /* no boxing required */ /* Pass 'pm' for transission to the communicator */ GNUNET_log ( @@ -8650,6 +8752,31 @@ transmit_on_queue (void *cls) queue->address, GNUNET_i2s (&n->pid), sc.consideration_counter); + + /* Flow control: increment amount of traffic sent; if we are routing + via DV (and thus the ultimate target of the pending message is for + a different virtual link than the one of the queue), then we need + to use up not only the window of the direct link but also the + flow control window for the DV link! */ + pm->vl->outbound_fc_window_size_used += pm->bytes_msg; + + if (pm->vl != queue->neighbour->vl) + { + /* If the virtual link of the queue differs, this better be distance + vector routing! */ + GNUNET_assert (NULL != sc.dvh); + /* If we do distance vector routing, we better not do this for a + message that was itself DV-routed */ + GNUNET_assert (PMT_DV_BOX != sc.best->pmt); + /* We use the size of the unboxed message here, to avoid counting + the DV-Box header which is eaten up on the way by intermediaries */ + queue->neighbour->vl->outbound_fc_window_size_used += sc.best->bytes_msg; + } + else + { + GNUNET_assert (NULL == sc.dvh); + } + queue_send_msg (queue, pm, &pm[1], pm->bytes_msg); /* Check if this transmission somehow conclusively finished handing 'pm' -- cgit v1.2.3