aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-04-30 10:58:56 +0200
committerChristian Grothoff <christian@grothoff.org>2019-04-30 10:59:05 +0200
commita563820a60326473e74ac2431431e86e1963fd31 (patch)
tree5c69c43a9ff2867a83cd8a1b604e8bd6ff1892eb /src
parenta7bb6a804cb3a598ad9957f4e1c0a7a716731fd2 (diff)
downloadgnunet-a563820a60326473e74ac2431431e86e1963fd31.tar.gz
gnunet-a563820a60326473e74ac2431431e86e1963fd31.zip
complete CORE flow control loop
Diffstat (limited to 'src')
-rw-r--r--src/transport/gnunet-service-tng.c234
1 files changed, 135 insertions, 99 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 825d45522..a8f70986b 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -24,40 +24,31 @@
24 * 24 *
25 * TODO: 25 * TODO:
26 * Implement next: 26 * Implement next:
27 * - complete flow control push back from CORE via TRANSPORT to communicators: 27 * - add (more) logging
28 * + resume communicators in handle_client_recv_ok (see FIXME) 28 * - change transport-core API to specify transmission preferences (latency,
29 * + count transmissions to CORE and suspend communicators if window is full
30 * - check flow control push back from TRANSPROT to CORE:
31 * + check when to send ACKs
32 * - change transport-core API to provide proper flow control in both
33 * directions, allow multiple messages per peer simultaneously (tag
34 * confirmations with unique message ID), and replace quota-out with
35 * proper flow control; specify transmission preferences (latency,
36 * reliability, etc.) per message! 29 * reliability, etc.) per message!
37 * - add logging
38 *
39 * Later:
40 * - review retransmission logic, right now there is no smartness there! 30 * - review retransmission logic, right now there is no smartness there!
41 * => congestion control, flow control, etc 31 * => congestion control, flow control, etc [PERFORMANCE-BASICS]
42 * 32 *
43 * Optimizations: 33 * Optimizations:
44 * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do) 34 * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do)
45 * => Need 128 bit hash map though! 35 * => Need 128 bit hash map though! [BANDWIDTH, MEMORY]
46 * - queue_send_msg and route_message both by API design have to make copies 36 * - queue_send_msg and route_message both by API design have to make copies
47 * of the payload, and route_message on top of that requires a malloc/free. 37 * of the payload, and route_message on top of that requires a malloc/free.
48 * Change design to approximate "zero" copy better... 38 * Change design to approximate "zero" copy better... [CPU]
49 * - could avoid copying body of message into each fragment and keep 39 * - could avoid copying body of message into each fragment and keep
50 * fragments as just pointers into the original message and only 40 * fragments as just pointers into the original message and only
51 * fully build fragments just before transmission (optimization, should 41 * fully build fragments just before transmission (optimization, should
52 * reduce CPU and memory use) 42 * reduce CPU and memory use) [CPU, MEMORY]
53 * - if messages are below MTU, consider adding ACKs and other stuff 43 * - if messages are below MTU, consider adding ACKs and other stuff
54 * (requires planning at receiver, and additional MST-style demultiplex 44 * to the same transmission to avoid tiny messages (requires planning at
55 * at receiver!) 45 * receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT]
56 * - When we passively learned DV (with unconfirmed freshness), we 46 * - When we passively learned DV (with unconfirmed freshness), we
57 * right now add the path to our list but with a zero path_valid_until 47 * right now add the path to our list but with a zero path_valid_until
58 * time and only use it for unconfirmed routes. However, we could consider 48 * time and only use it for unconfirmed routes. However, we could consider
59 * triggering an explicit validation mechansim ourselves, specifically routing 49 * triggering an explicit validation mechansim ourselves, specifically routing
60 * a challenge-response message over the path (OPTIMIZATION-FIXME). 50 * a challenge-response message over the path [ROUTING]
51 * - Track ACK losses based on ACK-counter [ROUTING]
61 * 52 *
62 * Design realizations / discussion: 53 * Design realizations / discussion:
63 * - communicators do flow control by calling MQ "notify sent" 54 * - communicators do flow control by calling MQ "notify sent"
@@ -1115,6 +1106,44 @@ struct PendingMessage;
1115 */ 1106 */
1116struct DistanceVectorHop; 1107struct DistanceVectorHop;
1117 1108
1109
1110/**
1111 * Context from #handle_incoming_msg(). Closure for many
1112 * message handlers below.
1113 */
1114struct CommunicatorMessageContext
1115{
1116
1117 /**
1118 * Kept in a DLL of `struct VirtualLink` if waiting for CORE
1119 * flow control to unchoke.
1120 */
1121 struct CommunicatorMessageContext *next;
1122
1123 /**
1124 * Kept in a DLL of `struct VirtualLink` if waiting for CORE
1125 * flow control to unchoke.
1126 */
1127 struct CommunicatorMessageContext *prev;
1128
1129 /**
1130 * Which communicator provided us with the message.
1131 */
1132 struct TransportClient *tc;
1133
1134 /**
1135 * Additional information for flow control and about the sender.
1136 */
1137 struct GNUNET_TRANSPORT_IncomingMessage im;
1138
1139 /**
1140 * Number of hops the message has travelled (if DV-routed).
1141 * FIXME: make use of this in ACK handling!
1142 */
1143 uint16_t total_hops;
1144};
1145
1146
1118/** 1147/**
1119 * A virtual link is another reachable peer that is known to CORE. It 1148 * A virtual link is another reachable peer that is known to CORE. It
1120 * can be either a `struct Neighbour` with at least one confirmed 1149 * can be either a `struct Neighbour` with at least one confirmed
@@ -1131,6 +1160,18 @@ struct VirtualLink
1131 struct GNUNET_PeerIdentity target; 1160 struct GNUNET_PeerIdentity target;
1132 1161
1133 /** 1162 /**
1163 * Communicators blocked for receiving on @e target as we are waiting
1164 * on the @e core_recv_window to increase.
1165 */
1166 struct CommunicatorMessageContext *cmc_head;
1167
1168 /**
1169 * Communicators blocked for receiving on @e target as we are waiting
1170 * on the @e core_recv_window to increase.
1171 */
1172 struct CommunicatorMessageContext *cmc_tail;
1173
1174 /**
1134 * Task scheduled to possibly notfiy core that this peer is no 1175 * Task scheduled to possibly notfiy core that this peer is no
1135 * longer counting as confirmed. Runs the #core_visibility_check(), 1176 * longer counting as confirmed. Runs the #core_visibility_check(),
1136 * which checks that some DV-path or a queue exists that is still 1177 * which checks that some DV-path or a queue exists that is still
@@ -1152,9 +1193,11 @@ struct VirtualLink
1152 * How many more messages can we send to core before we exhaust 1193 * How many more messages can we send to core before we exhaust
1153 * the receive window of CORE for this peer? If this hits zero, 1194 * the receive window of CORE for this peer? If this hits zero,
1154 * we must tell communicators to stop providing us more messages 1195 * we must tell communicators to stop providing us more messages
1155 * for this peer. 1196 * for this peer. In fact, the window can go negative as we
1197 * have multiple communicators, so per communicator we can go
1198 * down by one into the negative range.
1156 */ 1199 */
1157 unsigned int core_recv_window; 1200 int core_recv_window;
1158}; 1201};
1159 1202
1160 1203
@@ -3497,21 +3540,14 @@ free_pending_message (struct PendingMessage *pm)
3497 3540
3498 3541
3499/** 3542/**
3500 * Send a response to the @a pm that we have processed a 3543 * Send a response to the @a pm that we have processed a "send"
3501 * "send" request with status @a success. We 3544 * request. Sends a confirmation to the "core" client responsible for
3502 * transmitted @a bytes_physical on the actual wire. 3545 * the original request and free's @a pm.
3503 * Sends a confirmation to the "core" client responsible
3504 * for the original request and free's @a pm.
3505 * 3546 *
3506 * @param pm handle to the original pending message 3547 * @param pm handle to the original pending message
3507 * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
3508 * for transmission failure
3509 * @param bytes_physical amount of bandwidth consumed
3510 */ 3548 */
3511static void 3549static void
3512client_send_response (struct PendingMessage *pm, 3550client_send_response (struct PendingMessage *pm)
3513 int success,
3514 uint32_t bytes_physical)
3515{ 3551{
3516 struct TransportClient *tc = pm->client; 3552 struct TransportClient *tc = pm->client;
3517 struct Neighbour *target = pm->target; 3553 struct Neighbour *target = pm->target;
@@ -3523,10 +3559,7 @@ client_send_response (struct PendingMessage *pm,
3523 env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); 3559 env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
3524 som->peer = target->pid; 3560 som->peer = target->pid;
3525 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 3561 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3526 "Confirming %s transmission of %u/%u bytes to %s\n", 3562 "Confirming transmission to %s\n",
3527 (GNUNET_OK == success) ? "successful" : "failed",
3528 (unsigned int) pm->bytes_msg,
3529 (unsigned int) bytes_physical,
3530 GNUNET_i2s (&pm->target->pid)); 3563 GNUNET_i2s (&pm->target->pid));
3531 GNUNET_MQ_send (tc->mq, env); 3564 GNUNET_MQ_send (tc->mq, env);
3532 } 3565 }
@@ -3827,6 +3860,31 @@ check_communicator_available (
3827 3860
3828 3861
3829/** 3862/**
3863 * Send ACK to communicator (if requested) and free @a cmc.
3864 *
3865 * @param cmc context for which we are done handling the message
3866 */
3867static void
3868finish_cmc_handling (struct CommunicatorMessageContext *cmc)
3869{
3870 if (0 != ntohl (cmc->im.fc_on))
3871 {
3872 /* send ACK when done to communicator for flow control! */
3873 struct GNUNET_MQ_Envelope *env;
3874 struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
3875
3876 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
3877 ack->reserved = htonl (0);
3878 ack->fc_id = cmc->im.fc_id;
3879 ack->sender = cmc->im.sender;
3880 GNUNET_MQ_send (cmc->tc->mq, env);
3881 }
3882 GNUNET_SERVICE_client_continue (cmc->tc->client);
3883 GNUNET_free (cmc);
3884}
3885
3886
3887/**
3830 * Client confirms that it is done handling message(s) to a particular 3888 * Client confirms that it is done handling message(s) to a particular
3831 * peer. We may now provide more messages to CORE for this peer. 3889 * peer. We may now provide more messages to CORE for this peer.
3832 * 3890 *
@@ -3841,6 +3899,7 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
3841 struct TransportClient *tc = cls; 3899 struct TransportClient *tc = cls;
3842 struct VirtualLink *vl; 3900 struct VirtualLink *vl;
3843 uint32_t delta; 3901 uint32_t delta;
3902 struct CommunicatorMessageContext *cmc;
3844 3903
3845 if (CT_CORE != tc->type) 3904 if (CT_CORE != tc->type)
3846 { 3905 {
@@ -3860,9 +3919,13 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
3860 } 3919 }
3861 delta = ntohl (rom->increase_window_delta); 3920 delta = ntohl (rom->increase_window_delta);
3862 vl->core_recv_window += delta; 3921 vl->core_recv_window += delta;
3863 if (delta == vl->core_recv_window) 3922 if (vl->core_recv_window <= 0)
3923 return;
3924 /* resume communicators */
3925 while (NULL != (cmc = vl->cmc_tail))
3864 { 3926 {
3865 // FIXME: resume communicators! 3927 GNUNET_CONTAINER_DLL_remove (vl->cmc_head, vl->cmc_tail, cmc);
3928 finish_cmc_handling (cmc);
3866 } 3929 }
3867} 3930}
3868 3931
@@ -4684,30 +4747,6 @@ handle_del_address (void *cls,
4684 4747
4685 4748
4686/** 4749/**
4687 * Context from #handle_incoming_msg(). Closure for many
4688 * message handlers below.
4689 */
4690struct CommunicatorMessageContext
4691{
4692 /**
4693 * Which communicator provided us with the message.
4694 */
4695 struct TransportClient *tc;
4696
4697 /**
4698 * Additional information for flow control and about the sender.
4699 */
4700 struct GNUNET_TRANSPORT_IncomingMessage im;
4701
4702 /**
4703 * Number of hops the message has travelled (if DV-routed).
4704 * FIXME: make use of this in ACK handling!
4705 */
4706 uint16_t total_hops;
4707};
4708
4709
4710/**
4711 * Given an inbound message @a msg from a communicator @a cmc, 4750 * Given an inbound message @a msg from a communicator @a cmc,
4712 * demultiplex it based on the type calling the right handler. 4751 * demultiplex it based on the type calling the right handler.
4713 * 4752 *
@@ -4720,31 +4759,6 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc,
4720 4759
4721 4760
4722/** 4761/**
4723 * Send ACK to communicator (if requested) and free @a cmc.
4724 *
4725 * @param cmc context for which we are done handling the message
4726 */
4727static void
4728finish_cmc_handling (struct CommunicatorMessageContext *cmc)
4729{
4730 if (0 != ntohl (cmc->im.fc_on))
4731 {
4732 /* send ACK when done to communicator for flow control! */
4733 struct GNUNET_MQ_Envelope *env;
4734 struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
4735
4736 env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK);
4737 ack->reserved = htonl (0);
4738 ack->fc_id = cmc->im.fc_id;
4739 ack->sender = cmc->im.sender;
4740 GNUNET_MQ_send (cmc->tc->mq, env);
4741 }
4742 GNUNET_SERVICE_client_continue (cmc->tc->client);
4743 GNUNET_free (cmc);
4744}
4745
4746
4747/**
4748 * Communicator gave us an unencapsulated message to pass as-is to 4762 * Communicator gave us an unencapsulated message to pass as-is to
4749 * CORE. Process the request. 4763 * CORE. Process the request.
4750 * 4764 *
@@ -4756,6 +4770,7 @@ static void
4756handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) 4770handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
4757{ 4771{
4758 struct CommunicatorMessageContext *cmc = cls; 4772 struct CommunicatorMessageContext *cmc = cls;
4773 struct VirtualLink *vl;
4759 uint16_t size = ntohs (mh->size); 4774 uint16_t size = ntohs (mh->size);
4760 4775
4761 if ((size > UINT16_MAX - sizeof (struct InboundMessage)) || 4776 if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
@@ -4768,6 +4783,25 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
4768 GNUNET_SERVICE_client_drop (client); 4783 GNUNET_SERVICE_client_drop (client);
4769 return; 4784 return;
4770 } 4785 }
4786 vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender);
4787 if (NULL == vl)
4788 {
4789 /* FIXME: sender is giving us messages for CORE but we don't have
4790 the link up yet! I *suspect* this can happen right now (i.e.
4791 sender has verified us, but we didn't verify sender), but if
4792 we pass this on, CORE would be confused (link down, messages
4793 arrive). We should investigate more if this happens often,
4794 or in a persistent manner, and possibly do "something" about
4795 it. Thus logging as error for now. */
4796 GNUNET_break_op (0);
4797 GNUNET_STATISTICS_update (GST_stats,
4798 "# CORE messages droped (virtual link still down)",
4799 1,
4800 GNUNET_NO);
4801
4802 finish_cmc_handling (cmc);
4803 return;
4804 }
4771 /* Forward to all CORE clients */ 4805 /* Forward to all CORE clients */
4772 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) 4806 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
4773 { 4807 {
@@ -4781,11 +4815,15 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
4781 memcpy (&im[1], mh, size); 4815 memcpy (&im[1], mh, size);
4782 GNUNET_MQ_send (tc->mq, env); 4816 GNUNET_MQ_send (tc->mq, env);
4783 } 4817 }
4784 /* FIXME: consider doing this _only_ once the message 4818 vl->core_recv_window--;
4785 was drained from the CORE MQs to extend flow control to CORE! 4819 if (vl->core_recv_window > 0)
4786 (basically, increment counter in cmc, decrement on MQ send continuation! 4820 {
4787 */ 4821 finish_cmc_handling (cmc);
4788 finish_cmc_handling (cmc); 4822 return;
4823 }
4824 /* Wait with calling #finish_cmc_handling(cmc) until the message
4825 was processed by CORE MQs (for CORE flow control)! */
4826 GNUNET_CONTAINER_DLL_insert (vl->cmc_head, vl->cmc_tail, cmc);
4789} 4827}
4790 4828
4791 4829
@@ -5345,7 +5383,8 @@ handle_reliability_ack (void *cls,
5345 } 5383 }
5346 5384
5347 ack_counter = htonl (ra->ack_counter); 5385 ack_counter = htonl (ra->ack_counter);
5348 // FIXME: track ACK losses based on ack_counter somewhere! 5386 (void) ack_counter; /* silence compiler warning for now */
5387 // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
5349 // (DV and/or Neighbour?) 5388 // (DV and/or Neighbour?)
5350 finish_cmc_handling (cmc); 5389 finish_cmc_handling (cmc);
5351} 5390}
@@ -7380,7 +7419,7 @@ reliability_box_message (struct Queue *queue,
7380 { 7419 {
7381 /* failed hard */ 7420 /* failed hard */
7382 GNUNET_break (0); 7421 GNUNET_break (0);
7383 client_send_response (pm, GNUNET_NO, 0); 7422 client_send_response (pm);
7384 return NULL; 7423 return NULL;
7385 } 7424 }
7386 pa = prepare_pending_acknowledgement (queue, dvh, pm); 7425 pa = prepare_pending_acknowledgement (queue, dvh, pm);
@@ -7531,7 +7570,7 @@ transmit_on_queue (void *cls)
7531 (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)) 7570 (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc))
7532 { 7571 {
7533 /* Full message sent, and over reliabile channel */ 7572 /* Full message sent, and over reliabile channel */
7534 client_send_response (pm, GNUNET_YES, pm->bytes_msg); 7573 client_send_response (pm);
7535 } 7574 }
7536 else if ((GNUNET_TRANSPORT_CC_RELIABLE == 7575 else if ((GNUNET_TRANSPORT_CC_RELIABLE ==
7537 queue->tc->details.communicator.cc) && 7576 queue->tc->details.communicator.cc) &&
@@ -7556,10 +7595,7 @@ transmit_on_queue (void *cls)
7556 7595
7557 /* Was this the last applicable fragmment? */ 7596 /* Was this the last applicable fragmment? */
7558 if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg)) 7597 if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg))
7559 client_send_response ( 7598 client_send_response (pm);
7560 pm,
7561 GNUNET_YES,
7562 pm->bytes_msg /* FIXME: calculate and add overheads! */);
7563 } 7599 }
7564 else if (PMT_CORE != pm->pmt) 7600 else if (PMT_CORE != pm->pmt)
7565 { 7601 {