aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/transport/gnunet-service-tng.c195
1 files changed, 161 insertions, 34 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 83c057795..471ded644 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -28,7 +28,6 @@
28 * communicators do not offer flow control). 28 * communicators do not offer flow control).
29 * We do transmit FC window sizes now. Left: 29 * We do transmit FC window sizes now. Left:
30 * for SENDING) 30 * for SENDING)
31 * - Increment "outbound_fc_window_size_used" on transmission
32 * - Throttle sending if "outbound_fc_window_size_used" reaches limit 31 * - Throttle sending if "outbound_fc_window_size_used" reaches limit
33 * - Send *new* challenge when we get close to the limit (including 32 * - Send *new* challenge when we get close to the limit (including
34 * at the beginning when the limit is zero!) 33 * at the beginning when the limit is zero!)
@@ -203,6 +202,13 @@
203 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) 202 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
204 203
205/** 204/**
205 * If a DVBox could not be forwarded after this number of
206 * seconds we drop it.
207 */
208#define DV_FORWARD_TIMEOUT \
209 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
210
211/**
206 * We only consider queues as "quality" connections when 212 * We only consider queues as "quality" connections when
207 * suppressing the generation of DV initiation messages if 213 * suppressing the generation of DV initiation messages if
208 * the latency of the queue is below this threshold. 214 * the latency of the queue is below this threshold.
@@ -939,9 +945,9 @@ struct TransportFlowControlMessage
939 * Used to detect one-sided connection drops. On wrap-around, the 945 * Used to detect one-sided connection drops. On wrap-around, the
940 * flow control counters will be reset as if the connection had 946 * flow control counters will be reset as if the connection had
941 * dropped. 947 * dropped.
942 */ 948 */
943 uint32_t seq GNUNET_PACKED; 949 uint32_t seq GNUNET_PACKED;
944 950
945 /** 951 /**
946 * Flow control window size in bytes, in NBO. 952 * Flow control window size in bytes, in NBO.
947 * The receiver can send this many bytes at most. 953 * The receiver can send this many bytes at most.
@@ -974,7 +980,6 @@ struct TransportFlowControlMessage
974 * reset the counters for the number of bytes sent! 980 * reset the counters for the number of bytes sent!
975 */ 981 */
976 struct GNUNET_TIME_AbsoluteNBO sender_time; 982 struct GNUNET_TIME_AbsoluteNBO sender_time;
977
978}; 983};
979 984
980 985
@@ -1300,7 +1305,7 @@ struct VirtualLink
1300 * Distance vector used by this virtual link, NULL if @e n is used. 1305 * Distance vector used by this virtual link, NULL if @e n is used.
1301 */ 1306 */
1302 struct DistanceVector *dv; 1307 struct DistanceVector *dv;
1303 1308
1304 /** 1309 /**
1305 * Last challenge we received from @a n. 1310 * Last challenge we received from @a n.
1306 * FIXME: where do we need this? 1311 * FIXME: where do we need this?
@@ -1308,7 +1313,7 @@ struct VirtualLink
1308 struct ChallengeNonceP n_challenge; 1313 struct ChallengeNonceP n_challenge;
1309 1314
1310 /** 1315 /**
1311 * Last challenge we used with @a n for flow control. 1316 * Last challenge we used with @a n for flow control.
1312 * FIXME: where do we need this? 1317 * FIXME: where do we need this?
1313 */ 1318 */
1314 struct ChallengeNonceP my_challenge; 1319 struct ChallengeNonceP my_challenge;
@@ -1373,7 +1378,7 @@ struct VirtualLink
1373 * Based on the difference between how much the sender sent according 1378 * Based on the difference between how much the sender sent according
1374 * to the last #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message 1379 * to the last #GNUNET_MESSAGE_TYPE_TRANSPORT_FLOW_CONTROL message
1375 * (@e outbound_sent field) and how much we actually received at that 1380 * (@e outbound_sent field) and how much we actually received at that
1376 * time (@e incoming_fc_window_size_used). This delta is then 1381 * time (@e incoming_fc_window_size_used). This delta is then
1377 * added onto the @e incoming_fc_window_size when determining the 1382 * added onto the @e incoming_fc_window_size when determining the
1378 * @e outbound_window_size we send to the other peer. Initially zero. 1383 * @e outbound_window_size we send to the other peer. Initially zero.
1379 * May be negative if we (due to out-of-order delivery) actually received 1384 * May be negative if we (due to out-of-order delivery) actually received
@@ -1407,7 +1412,7 @@ struct VirtualLink
1407 * received. 1412 * received.
1408 */ 1413 */
1409 uint32_t last_fc_seq; 1414 uint32_t last_fc_seq;
1410 1415
1411 /** 1416 /**
1412 * How many more messages can we send to CORE before we exhaust 1417 * How many more messages can we send to CORE before we exhaust
1413 * the receive window of CORE for this peer? If this hits zero, 1418 * the receive window of CORE for this peer? If this hits zero,
@@ -2057,7 +2062,12 @@ enum PendingMessageType
2057 /** 2062 /**
2058 * Reliability box. 2063 * Reliability box.
2059 */ 2064 */
2060 PMT_RELIABILITY_BOX = 2 2065 PMT_RELIABILITY_BOX = 2,
2066
2067 /**
2068 * Pending message created during #forward_dv_box().
2069 */
2070 PMT_DV_BOX = 3
2061 2071
2062}; 2072};
2063 2073
@@ -2133,8 +2143,8 @@ struct PendingMessage
2133 struct PendingAcknowledgement *pa_tail; 2143 struct PendingAcknowledgement *pa_tail;
2134 2144
2135 /** 2145 /**
2136 * This message, reliability boxed. Only possibly available if @e pmt is 2146 * This message, reliability *or* DV-boxed. Only possibly available
2137 * #PMT_CORE. 2147 * if @e pmt is #PMT_CORE.
2138 */ 2148 */
2139 struct PendingMessage *bpm; 2149 struct PendingMessage *bpm;
2140 2150
@@ -2949,7 +2959,11 @@ free_pending_message (struct PendingMessage *pm)
2949 GNUNET_assert (pm == pm->qe->pm); 2959 GNUNET_assert (pm == pm->qe->pm);
2950 pm->qe->pm = NULL; 2960 pm->qe->pm = NULL;
2951 } 2961 }
2952 GNUNET_free_non_null (pm->bpm); 2962 if (NULL != pm->bpm)
2963 {
2964 free_fragment_tree (pm->bpm);
2965 GNUNET_free (pm->bpm);
2966 }
2953 GNUNET_free (pm); 2967 GNUNET_free (pm);
2954} 2968}
2955 2969
@@ -4751,16 +4765,18 @@ send_dv_to_neighbour (void *cls,
4751 4765
4752/** 4766/**
4753 * We need to transmit @a hdr to @a target. If necessary, this may 4767 * We need to transmit @a hdr to @a target. If necessary, this may
4754 * involve DV routing. 4768 * involve DV routing. This function routes without applying flow
4769 * control or congestion control and should only be used for control
4770 * traffic.
4755 * 4771 *
4756 * @param target peer to receive @a hdr 4772 * @param target peer to receive @a hdr
4757 * @param hdr header of the message to route and #GNUNET_free() 4773 * @param hdr header of the message to route and #GNUNET_free()
4758 * @param options which transmission channels are allowed 4774 * @param options which transmission channels are allowed
4759 */ 4775 */
4760static void 4776static void
4761route_message (const struct GNUNET_PeerIdentity *target, 4777route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
4762 const struct GNUNET_MessageHeader *hdr, 4778 const struct GNUNET_MessageHeader *hdr,
4763 enum RouteMessageOptions options) 4779 enum RouteMessageOptions options)
4764{ 4780{
4765 struct VirtualLink *vl; 4781 struct VirtualLink *vl;
4766 struct Neighbour *n; 4782 struct Neighbour *n;
@@ -4878,7 +4894,7 @@ handle_communicator_backchannel (
4878 isize], 4894 isize],
4879 is, 4895 is,
4880 strlen (is) + 1); 4896 strlen (is) + 1);
4881 route_message (&cb->pid, &be->header, RMO_DV_ALLOWED); 4897 route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
4882 GNUNET_SERVICE_client_continue (tc->client); 4898 GNUNET_SERVICE_client_continue (tc->client);
4883} 4899}
4884 4900
@@ -5303,7 +5319,7 @@ transmit_cummulative_ack_cb (void *cls)
5303 ap[i].ack_delay = GNUNET_TIME_relative_hton ( 5319 ap[i].ack_delay = GNUNET_TIME_relative_hton (
5304 GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time)); 5320 GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
5305 } 5321 }
5306 route_message (&ac->target, &ack->header, RMO_DV_ALLOWED); 5322 route_control_message_without_fc (&ac->target, &ack->header, RMO_DV_ALLOWED);
5307 ac->num_acks = 0; 5323 ac->num_acks = 0;
5308 ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT, 5324 ac->task = GNUNET_SCHEDULER_add_delayed (ACK_CUMMULATOR_TIMEOUT,
5309 &destroy_ack_cummulator, 5325 &destroy_ack_cummulator,
@@ -5742,6 +5758,12 @@ completed_pending_message (struct PendingMessage *pm)
5742 (pos->frag_off == pos->bytes_msg)) 5758 (pos->frag_off == pos->bytes_msg))
5743 client_send_response (pos); 5759 client_send_response (pos);
5744 return; 5760 return;
5761 case PMT_DV_BOX:
5762 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5763 "Completed transmission of message %llu (DV Box)\n",
5764 pm->logging_uuid);
5765 free_pending_message (pm);
5766 return;
5745 } 5767 }
5746} 5768}
5747 5769
@@ -6309,7 +6331,9 @@ forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
6309 &dhp.purpose, 6331 &dhp.purpose,
6310 &dhops[nhops].hop_sig)); 6332 &dhops[nhops].hop_sig));
6311 } 6333 }
6312 route_message (next_hop, &fwd->header, RMO_UNCONFIRMED_ALLOWED); 6334 route_control_message_without_fc (next_hop,
6335 &fwd->header,
6336 RMO_UNCONFIRMED_ALLOWED);
6313} 6337}
6314 6338
6315 6339
@@ -6924,17 +6948,39 @@ forward_dv_box (struct Neighbour *next_hop,
6924 const void *enc_payload, 6948 const void *enc_payload,
6925 uint16_t enc_payload_size) 6949 uint16_t enc_payload_size)
6926{ 6950{
6927 char buf[sizeof (struct TransportDVBoxMessage) + 6951 struct VirtualLink *vl = next_hop->vl;
6928 num_hops * sizeof (struct GNUNET_PeerIdentity) + enc_payload_size]; 6952 struct PendingMessage *pm;
6929 struct GNUNET_PeerIdentity *dhops = 6953 size_t msg_size;
6930 (struct GNUNET_PeerIdentity *) &buf[sizeof (struct TransportDVBoxMessage)]; 6954 char *buf;
6931 6955 struct GNUNET_PeerIdentity *dhops;
6956
6957 GNUNET_assert (NULL != vl);
6958 msg_size = sizeof (struct TransportDVBoxMessage) +
6959 num_hops * sizeof (struct GNUNET_PeerIdentity) + enc_payload_size;
6960 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msg_size);
6961 pm->pmt = PMT_DV_BOX;
6962 pm->vl = vl;
6963 pm->timeout = GNUNET_TIME_relative_to_absolute (DV_FORWARD_TIMEOUT);
6964 pm->logging_uuid = logging_uuid_gen++;
6965 pm->prefs = GNUNET_MQ_PRIO_BACKGROUND;
6966 pm->bytes_msg = msg_size;
6967 buf = (char *) &pm[1];
6932 memcpy (buf, hdr, sizeof (*hdr)); 6968 memcpy (buf, hdr, sizeof (*hdr));
6969 dhops =
6970 (struct GNUNET_PeerIdentity *) &buf[sizeof (struct TransportDVBoxMessage)];
6933 memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity)); 6971 memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
6934 memcpy (&dhops[num_hops], enc_payload, enc_payload_size); 6972 memcpy (&dhops[num_hops], enc_payload, enc_payload_size);
6935 route_message (&next_hop->pid, 6973 GNUNET_CONTAINER_MDLL_insert (vl,
6936 (const struct GNUNET_MessageHeader *) buf, 6974 vl->pending_msg_head,
6937 RMO_NONE); 6975 vl->pending_msg_tail,
6976 pm);
6977 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6978 "Created pending message %llu for DV Box with next hop %s (%u/%u)\n",
6979 pm->logging_uuid,
6980 GNUNET_i2s (&next_hop->pid),
6981 (unsigned int) num_hops,
6982 (unsigned int) total_hops);
6983 check_vl_transmission (vl);
6938} 6984}
6939 6985
6940 6986
@@ -7696,9 +7742,9 @@ handle_validation_challenge (
7696 &tvp.purpose, 7742 &tvp.purpose,
7697 &tvr.signature)); 7743 &tvr.signature));
7698 } 7744 }
7699 route_message (&cmc->im.sender, 7745 route_control_message_without_fc (&cmc->im.sender,
7700 &tvr.header, 7746 &tvr.header,
7701 RMO_ANYTHING_GOES | RMO_REDUNDANT); 7747 RMO_ANYTHING_GOES | RMO_REDUNDANT);
7702 finish_cmc_handling (cmc); 7748 finish_cmc_handling (cmc);
7703 7749
7704 vl = lookup_virtual_link (&cmc->im.sender); 7750 vl = lookup_virtual_link (&cmc->im.sender);
@@ -8459,6 +8505,8 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
8459 int frag; 8505 int frag;
8460 int relb; 8506 int relb;
8461 8507
8508 if ((NULL != dvh) && (PMT_DV_BOX == pos->pmt))
8509 continue; /* DV messages must not be DV-routed to next hop! */
8462 if (pos->next_attempt.abs_value_us > now.abs_value_us) 8510 if (pos->next_attempt.abs_value_us > now.abs_value_us)
8463 break; /* too early for all messages, they are sorted by next_attempt */ 8511 break; /* too early for all messages, they are sorted by next_attempt */
8464 if (NULL != pos->qe) 8512 if (NULL != pos->qe)
@@ -8547,6 +8595,41 @@ select_best_pending_from_link (struct PendingMessageScoreContext *sc,
8547 8595
8548 8596
8549/** 8597/**
8598 * Function to call to further operate on the now DV encapsulated
8599 * message @a hdr, forwarding it via @a next_hop under respect of
8600 * @a options.
8601 *
8602 * @param cls a `struct PendingMessageScoreContext`
8603 * @param next_hop next hop of the DV path
8604 * @param hdr encapsulated message, technically a `struct TransportDFBoxMessage`
8605 * @param options options of the original message
8606 */
8607static void
8608extract_box_cb (void *cls,
8609 struct Neighbour *next_hop,
8610 const struct GNUNET_MessageHeader *hdr,
8611 enum RouteMessageOptions options)
8612{
8613 struct PendingMessageScoreContext *sc = cls;
8614 struct PendingMessage *pm = sc->best;
8615 struct PendingMessage *bpm;
8616 uint16_t bsize = ntohs (hdr->size);
8617
8618 GNUNET_assert (NULL == pm->bpm);
8619 bpm = GNUNET_malloc (sizeof (struct PendingMessage) + bsize);
8620 bpm->logging_uuid = logging_uuid_gen++;
8621 bpm->pmt = PMT_DV_BOX;
8622 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
8623 "Creating DV Box %llu for original message %llu (next hop is %s)\n",
8624 bpm->logging_uuid,
8625 pm->logging_uuid,
8626 GNUNET_i2s (&next_hop->pid));
8627 memcpy (&bpm[1], hdr, bsize);
8628 pm->bpm = bpm;
8629}
8630
8631
8632/**
8550 * We believe we are ready to transmit a `struct PendingMessage` on a 8633 * We believe we are ready to transmit a `struct PendingMessage` on a
8551 * queue, the big question is which one! We need to see if there is 8634 * queue, the big question is which one! We need to see if there is
8552 * one pending that is allowed by flow control and congestion control 8635 * one pending that is allowed by flow control and congestion control
@@ -8610,9 +8693,30 @@ transmit_on_queue (void *cls)
8610 8693
8611 /* Given selection in `sc`, do transmission */ 8694 /* Given selection in `sc`, do transmission */
8612 pm = sc.best; 8695 pm = sc.best;
8696 if (NULL != sc.dvh)
8697 {
8698 GNUNET_assert (PMT_DV_BOX != pm->pmt);
8699 if (NULL != sc.best->bpm)
8700 {
8701 /* We did this boxing before, but possibly for a different path!
8702 Discard old DV box! OPTIMIZE-ME: we might want to check if
8703 it is the same and then not re-build the message... */
8704 free_pending_message (sc.best->bpm);
8705 sc.best->bpm = NULL;
8706 }
8707 encapsulate_for_dv (sc.dvh->dv,
8708 1,
8709 &sc.dvh,
8710 (const struct GNUNET_MessageHeader *) &sc.best[1],
8711 &extract_box_cb,
8712 &sc,
8713 RMO_NONE);
8714 GNUNET_assert (NULL != sc.best->bpm);
8715 pm = sc.best->bpm;
8716 }
8613 if (GNUNET_YES == sc.frag) 8717 if (GNUNET_YES == sc.frag)
8614 { 8718 {
8615 pm = fragment_message (queue, sc.dvh, sc.best); 8719 pm = fragment_message (queue, sc.dvh, pm);
8616 if (NULL == pm) 8720 if (NULL == pm)
8617 { 8721 {
8618 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 8722 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -8625,7 +8729,7 @@ transmit_on_queue (void *cls)
8625 } 8729 }
8626 else if (GNUNET_YES == sc.relb) 8730 else if (GNUNET_YES == sc.relb)
8627 { 8731 {
8628 pm = reliability_box_message (queue, sc.dvh, sc.best); 8732 pm = reliability_box_message (queue, sc.dvh, pm);
8629 if (NULL == pm) 8733 if (NULL == pm)
8630 { 8734 {
8631 /* Reliability boxing failed, try next message... */ 8735 /* Reliability boxing failed, try next message... */
@@ -8639,8 +8743,6 @@ transmit_on_queue (void *cls)
8639 return; 8743 return;
8640 } 8744 }
8641 } 8745 }
8642 else
8643 pm = sc.best; /* no boxing required */
8644 8746
8645 /* Pass 'pm' for transission to the communicator */ 8747 /* Pass 'pm' for transission to the communicator */
8646 GNUNET_log ( 8748 GNUNET_log (
@@ -8650,6 +8752,31 @@ transmit_on_queue (void *cls)
8650 queue->address, 8752 queue->address,
8651 GNUNET_i2s (&n->pid), 8753 GNUNET_i2s (&n->pid),
8652 sc.consideration_counter); 8754 sc.consideration_counter);
8755
8756 /* Flow control: increment amount of traffic sent; if we are routing
8757 via DV (and thus the ultimate target of the pending message is for
8758 a different virtual link than the one of the queue), then we need
8759 to use up not only the window of the direct link but also the
8760 flow control window for the DV link! */
8761 pm->vl->outbound_fc_window_size_used += pm->bytes_msg;
8762
8763 if (pm->vl != queue->neighbour->vl)
8764 {
8765 /* If the virtual link of the queue differs, this better be distance
8766 vector routing! */
8767 GNUNET_assert (NULL != sc.dvh);
8768 /* If we do distance vector routing, we better not do this for a
8769 message that was itself DV-routed */
8770 GNUNET_assert (PMT_DV_BOX != sc.best->pmt);
8771 /* We use the size of the unboxed message here, to avoid counting
8772 the DV-Box header which is eaten up on the way by intermediaries */
8773 queue->neighbour->vl->outbound_fc_window_size_used += sc.best->bytes_msg;
8774 }
8775 else
8776 {
8777 GNUNET_assert (NULL == sc.dvh);
8778 }
8779
8653 queue_send_msg (queue, pm, &pm[1], pm->bytes_msg); 8780 queue_send_msg (queue, pm, &pm[1], pm->bytes_msg);
8654 8781
8655 /* Check if this transmission somehow conclusively finished handing 'pm' 8782 /* Check if this transmission somehow conclusively finished handing 'pm'