diff options
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r-- | src/transport/gnunet-service-tng.c | 234 |
1 files changed, 135 insertions, 99 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 825d45522..a8f70986b 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c | |||
@@ -24,40 +24,31 @@ | |||
24 | * | 24 | * |
25 | * TODO: | 25 | * TODO: |
26 | * Implement next: | 26 | * Implement next: |
27 | * - complete flow control push back from CORE via TRANSPORT to communicators: | 27 | * - add (more) logging |
28 | * + resume communicators in handle_client_recv_ok (see FIXME) | 28 | * - change transport-core API to specify transmission preferences (latency, |
29 | * + count transmissions to CORE and suspend communicators if window is full | ||
30 | * - check flow control push back from TRANSPROT to CORE: | ||
31 | * + check when to send ACKs | ||
32 | * - change transport-core API to provide proper flow control in both | ||
33 | * directions, allow multiple messages per peer simultaneously (tag | ||
34 | * confirmations with unique message ID), and replace quota-out with | ||
35 | * proper flow control; specify transmission preferences (latency, | ||
36 | * reliability, etc.) per message! | 29 | * reliability, etc.) per message! |
37 | * - add logging | ||
38 | * | ||
39 | * Later: | ||
40 | * - review retransmission logic, right now there is no smartness there! | 30 | * - review retransmission logic, right now there is no smartness there! |
41 | * => congestion control, flow control, etc | 31 | * => congestion control, flow control, etc [PERFORMANCE-BASICS] |
42 | * | 32 | * |
43 | * Optimizations: | 33 | * Optimizations: |
44 | * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do) | 34 | * - AcknowledgementUUIDPs are overkill with 256 bits (128 would do) |
45 | * => Need 128 bit hash map though! | 35 | * => Need 128 bit hash map though! [BANDWIDTH, MEMORY] |
46 | * - queue_send_msg and route_message both by API design have to make copies | 36 | * - queue_send_msg and route_message both by API design have to make copies |
47 | * of the payload, and route_message on top of that requires a malloc/free. | 37 | * of the payload, and route_message on top of that requires a malloc/free. |
48 | * Change design to approximate "zero" copy better... | 38 | * Change design to approximate "zero" copy better... [CPU] |
49 | * - could avoid copying body of message into each fragment and keep | 39 | * - could avoid copying body of message into each fragment and keep |
50 | * fragments as just pointers into the original message and only | 40 | * fragments as just pointers into the original message and only |
51 | * fully build fragments just before transmission (optimization, should | 41 | * fully build fragments just before transmission (optimization, should |
52 | * reduce CPU and memory use) | 42 | * reduce CPU and memory use) [CPU, MEMORY] |
53 | * - if messages are below MTU, consider adding ACKs and other stuff | 43 | * - if messages are below MTU, consider adding ACKs and other stuff |
54 | * (requires planning at receiver, and additional MST-style demultiplex | 44 | * to the same transmission to avoid tiny messages (requires planning at |
55 | * at receiver!) | 45 | * receiver, and additional MST-style demultiplex at receiver!) [PACKET COUNT] |
56 | * - When we passively learned DV (with unconfirmed freshness), we | 46 | * - When we passively learned DV (with unconfirmed freshness), we |
57 | * right now add the path to our list but with a zero path_valid_until | 47 | * right now add the path to our list but with a zero path_valid_until |
58 | * time and only use it for unconfirmed routes. However, we could consider | 48 | * time and only use it for unconfirmed routes. However, we could consider |
59 | * triggering an explicit validation mechansim ourselves, specifically routing | 49 | * triggering an explicit validation mechansim ourselves, specifically routing |
60 | * a challenge-response message over the path (OPTIMIZATION-FIXME). | 50 | * a challenge-response message over the path [ROUTING] |
51 | * - Track ACK losses based on ACK-counter [ROUTING] | ||
61 | * | 52 | * |
62 | * Design realizations / discussion: | 53 | * Design realizations / discussion: |
63 | * - communicators do flow control by calling MQ "notify sent" | 54 | * - communicators do flow control by calling MQ "notify sent" |
@@ -1115,6 +1106,44 @@ struct PendingMessage; | |||
1115 | */ | 1106 | */ |
1116 | struct DistanceVectorHop; | 1107 | struct DistanceVectorHop; |
1117 | 1108 | ||
1109 | |||
1110 | /** | ||
1111 | * Context from #handle_incoming_msg(). Closure for many | ||
1112 | * message handlers below. | ||
1113 | */ | ||
1114 | struct CommunicatorMessageContext | ||
1115 | { | ||
1116 | |||
1117 | /** | ||
1118 | * Kept in a DLL of `struct VirtualLink` if waiting for CORE | ||
1119 | * flow control to unchoke. | ||
1120 | */ | ||
1121 | struct CommunicatorMessageContext *next; | ||
1122 | |||
1123 | /** | ||
1124 | * Kept in a DLL of `struct VirtualLink` if waiting for CORE | ||
1125 | * flow control to unchoke. | ||
1126 | */ | ||
1127 | struct CommunicatorMessageContext *prev; | ||
1128 | |||
1129 | /** | ||
1130 | * Which communicator provided us with the message. | ||
1131 | */ | ||
1132 | struct TransportClient *tc; | ||
1133 | |||
1134 | /** | ||
1135 | * Additional information for flow control and about the sender. | ||
1136 | */ | ||
1137 | struct GNUNET_TRANSPORT_IncomingMessage im; | ||
1138 | |||
1139 | /** | ||
1140 | * Number of hops the message has travelled (if DV-routed). | ||
1141 | * FIXME: make use of this in ACK handling! | ||
1142 | */ | ||
1143 | uint16_t total_hops; | ||
1144 | }; | ||
1145 | |||
1146 | |||
1118 | /** | 1147 | /** |
1119 | * A virtual link is another reachable peer that is known to CORE. It | 1148 | * A virtual link is another reachable peer that is known to CORE. It |
1120 | * can be either a `struct Neighbour` with at least one confirmed | 1149 | * can be either a `struct Neighbour` with at least one confirmed |
@@ -1131,6 +1160,18 @@ struct VirtualLink | |||
1131 | struct GNUNET_PeerIdentity target; | 1160 | struct GNUNET_PeerIdentity target; |
1132 | 1161 | ||
1133 | /** | 1162 | /** |
1163 | * Communicators blocked for receiving on @e target as we are waiting | ||
1164 | * on the @e core_recv_window to increase. | ||
1165 | */ | ||
1166 | struct CommunicatorMessageContext *cmc_head; | ||
1167 | |||
1168 | /** | ||
1169 | * Communicators blocked for receiving on @e target as we are waiting | ||
1170 | * on the @e core_recv_window to increase. | ||
1171 | */ | ||
1172 | struct CommunicatorMessageContext *cmc_tail; | ||
1173 | |||
1174 | /** | ||
1134 | * Task scheduled to possibly notfiy core that this peer is no | 1175 | * Task scheduled to possibly notfiy core that this peer is no |
1135 | * longer counting as confirmed. Runs the #core_visibility_check(), | 1176 | * longer counting as confirmed. Runs the #core_visibility_check(), |
1136 | * which checks that some DV-path or a queue exists that is still | 1177 | * which checks that some DV-path or a queue exists that is still |
@@ -1152,9 +1193,11 @@ struct VirtualLink | |||
1152 | * How many more messages can we send to core before we exhaust | 1193 | * How many more messages can we send to core before we exhaust |
1153 | * the receive window of CORE for this peer? If this hits zero, | 1194 | * the receive window of CORE for this peer? If this hits zero, |
1154 | * we must tell communicators to stop providing us more messages | 1195 | * we must tell communicators to stop providing us more messages |
1155 | * for this peer. | 1196 | * for this peer. In fact, the window can go negative as we |
1197 | * have multiple communicators, so per communicator we can go | ||
1198 | * down by one into the negative range. | ||
1156 | */ | 1199 | */ |
1157 | unsigned int core_recv_window; | 1200 | int core_recv_window; |
1158 | }; | 1201 | }; |
1159 | 1202 | ||
1160 | 1203 | ||
@@ -3497,21 +3540,14 @@ free_pending_message (struct PendingMessage *pm) | |||
3497 | 3540 | ||
3498 | 3541 | ||
3499 | /** | 3542 | /** |
3500 | * Send a response to the @a pm that we have processed a | 3543 | * Send a response to the @a pm that we have processed a "send" |
3501 | * "send" request with status @a success. We | 3544 | * request. Sends a confirmation to the "core" client responsible for |
3502 | * transmitted @a bytes_physical on the actual wire. | 3545 | * the original request and free's @a pm. |
3503 | * Sends a confirmation to the "core" client responsible | ||
3504 | * for the original request and free's @a pm. | ||
3505 | * | 3546 | * |
3506 | * @param pm handle to the original pending message | 3547 | * @param pm handle to the original pending message |
3507 | * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR | ||
3508 | * for transmission failure | ||
3509 | * @param bytes_physical amount of bandwidth consumed | ||
3510 | */ | 3548 | */ |
3511 | static void | 3549 | static void |
3512 | client_send_response (struct PendingMessage *pm, | 3550 | client_send_response (struct PendingMessage *pm) |
3513 | int success, | ||
3514 | uint32_t bytes_physical) | ||
3515 | { | 3551 | { |
3516 | struct TransportClient *tc = pm->client; | 3552 | struct TransportClient *tc = pm->client; |
3517 | struct Neighbour *target = pm->target; | 3553 | struct Neighbour *target = pm->target; |
@@ -3523,10 +3559,7 @@ client_send_response (struct PendingMessage *pm, | |||
3523 | env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); | 3559 | env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); |
3524 | som->peer = target->pid; | 3560 | som->peer = target->pid; |
3525 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 3561 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
3526 | "Confirming %s transmission of %u/%u bytes to %s\n", | 3562 | "Confirming transmission to %s\n", |
3527 | (GNUNET_OK == success) ? "successful" : "failed", | ||
3528 | (unsigned int) pm->bytes_msg, | ||
3529 | (unsigned int) bytes_physical, | ||
3530 | GNUNET_i2s (&pm->target->pid)); | 3563 | GNUNET_i2s (&pm->target->pid)); |
3531 | GNUNET_MQ_send (tc->mq, env); | 3564 | GNUNET_MQ_send (tc->mq, env); |
3532 | } | 3565 | } |
@@ -3827,6 +3860,31 @@ check_communicator_available ( | |||
3827 | 3860 | ||
3828 | 3861 | ||
3829 | /** | 3862 | /** |
3863 | * Send ACK to communicator (if requested) and free @a cmc. | ||
3864 | * | ||
3865 | * @param cmc context for which we are done handling the message | ||
3866 | */ | ||
3867 | static void | ||
3868 | finish_cmc_handling (struct CommunicatorMessageContext *cmc) | ||
3869 | { | ||
3870 | if (0 != ntohl (cmc->im.fc_on)) | ||
3871 | { | ||
3872 | /* send ACK when done to communicator for flow control! */ | ||
3873 | struct GNUNET_MQ_Envelope *env; | ||
3874 | struct GNUNET_TRANSPORT_IncomingMessageAck *ack; | ||
3875 | |||
3876 | env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK); | ||
3877 | ack->reserved = htonl (0); | ||
3878 | ack->fc_id = cmc->im.fc_id; | ||
3879 | ack->sender = cmc->im.sender; | ||
3880 | GNUNET_MQ_send (cmc->tc->mq, env); | ||
3881 | } | ||
3882 | GNUNET_SERVICE_client_continue (cmc->tc->client); | ||
3883 | GNUNET_free (cmc); | ||
3884 | } | ||
3885 | |||
3886 | |||
3887 | /** | ||
3830 | * Client confirms that it is done handling message(s) to a particular | 3888 | * Client confirms that it is done handling message(s) to a particular |
3831 | * peer. We may now provide more messages to CORE for this peer. | 3889 | * peer. We may now provide more messages to CORE for this peer. |
3832 | * | 3890 | * |
@@ -3841,6 +3899,7 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom) | |||
3841 | struct TransportClient *tc = cls; | 3899 | struct TransportClient *tc = cls; |
3842 | struct VirtualLink *vl; | 3900 | struct VirtualLink *vl; |
3843 | uint32_t delta; | 3901 | uint32_t delta; |
3902 | struct CommunicatorMessageContext *cmc; | ||
3844 | 3903 | ||
3845 | if (CT_CORE != tc->type) | 3904 | if (CT_CORE != tc->type) |
3846 | { | 3905 | { |
@@ -3860,9 +3919,13 @@ handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom) | |||
3860 | } | 3919 | } |
3861 | delta = ntohl (rom->increase_window_delta); | 3920 | delta = ntohl (rom->increase_window_delta); |
3862 | vl->core_recv_window += delta; | 3921 | vl->core_recv_window += delta; |
3863 | if (delta == vl->core_recv_window) | 3922 | if (vl->core_recv_window <= 0) |
3923 | return; | ||
3924 | /* resume communicators */ | ||
3925 | while (NULL != (cmc = vl->cmc_tail)) | ||
3864 | { | 3926 | { |
3865 | // FIXME: resume communicators! | 3927 | GNUNET_CONTAINER_DLL_remove (vl->cmc_head, vl->cmc_tail, cmc); |
3928 | finish_cmc_handling (cmc); | ||
3866 | } | 3929 | } |
3867 | } | 3930 | } |
3868 | 3931 | ||
@@ -4684,30 +4747,6 @@ handle_del_address (void *cls, | |||
4684 | 4747 | ||
4685 | 4748 | ||
4686 | /** | 4749 | /** |
4687 | * Context from #handle_incoming_msg(). Closure for many | ||
4688 | * message handlers below. | ||
4689 | */ | ||
4690 | struct CommunicatorMessageContext | ||
4691 | { | ||
4692 | /** | ||
4693 | * Which communicator provided us with the message. | ||
4694 | */ | ||
4695 | struct TransportClient *tc; | ||
4696 | |||
4697 | /** | ||
4698 | * Additional information for flow control and about the sender. | ||
4699 | */ | ||
4700 | struct GNUNET_TRANSPORT_IncomingMessage im; | ||
4701 | |||
4702 | /** | ||
4703 | * Number of hops the message has travelled (if DV-routed). | ||
4704 | * FIXME: make use of this in ACK handling! | ||
4705 | */ | ||
4706 | uint16_t total_hops; | ||
4707 | }; | ||
4708 | |||
4709 | |||
4710 | /** | ||
4711 | * Given an inbound message @a msg from a communicator @a cmc, | 4750 | * Given an inbound message @a msg from a communicator @a cmc, |
4712 | * demultiplex it based on the type calling the right handler. | 4751 | * demultiplex it based on the type calling the right handler. |
4713 | * | 4752 | * |
@@ -4720,31 +4759,6 @@ demultiplex_with_cmc (struct CommunicatorMessageContext *cmc, | |||
4720 | 4759 | ||
4721 | 4760 | ||
4722 | /** | 4761 | /** |
4723 | * Send ACK to communicator (if requested) and free @a cmc. | ||
4724 | * | ||
4725 | * @param cmc context for which we are done handling the message | ||
4726 | */ | ||
4727 | static void | ||
4728 | finish_cmc_handling (struct CommunicatorMessageContext *cmc) | ||
4729 | { | ||
4730 | if (0 != ntohl (cmc->im.fc_on)) | ||
4731 | { | ||
4732 | /* send ACK when done to communicator for flow control! */ | ||
4733 | struct GNUNET_MQ_Envelope *env; | ||
4734 | struct GNUNET_TRANSPORT_IncomingMessageAck *ack; | ||
4735 | |||
4736 | env = GNUNET_MQ_msg (ack, GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK); | ||
4737 | ack->reserved = htonl (0); | ||
4738 | ack->fc_id = cmc->im.fc_id; | ||
4739 | ack->sender = cmc->im.sender; | ||
4740 | GNUNET_MQ_send (cmc->tc->mq, env); | ||
4741 | } | ||
4742 | GNUNET_SERVICE_client_continue (cmc->tc->client); | ||
4743 | GNUNET_free (cmc); | ||
4744 | } | ||
4745 | |||
4746 | |||
4747 | /** | ||
4748 | * Communicator gave us an unencapsulated message to pass as-is to | 4762 | * Communicator gave us an unencapsulated message to pass as-is to |
4749 | * CORE. Process the request. | 4763 | * CORE. Process the request. |
4750 | * | 4764 | * |
@@ -4756,6 +4770,7 @@ static void | |||
4756 | handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) | 4770 | handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) |
4757 | { | 4771 | { |
4758 | struct CommunicatorMessageContext *cmc = cls; | 4772 | struct CommunicatorMessageContext *cmc = cls; |
4773 | struct VirtualLink *vl; | ||
4759 | uint16_t size = ntohs (mh->size); | 4774 | uint16_t size = ntohs (mh->size); |
4760 | 4775 | ||
4761 | if ((size > UINT16_MAX - sizeof (struct InboundMessage)) || | 4776 | if ((size > UINT16_MAX - sizeof (struct InboundMessage)) || |
@@ -4768,6 +4783,25 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) | |||
4768 | GNUNET_SERVICE_client_drop (client); | 4783 | GNUNET_SERVICE_client_drop (client); |
4769 | return; | 4784 | return; |
4770 | } | 4785 | } |
4786 | vl = GNUNET_CONTAINER_multipeermap_get (links, &cmc->im.sender); | ||
4787 | if (NULL == vl) | ||
4788 | { | ||
4789 | /* FIXME: sender is giving us messages for CORE but we don't have | ||
4790 | the link up yet! I *suspect* this can happen right now (i.e. | ||
4791 | sender has verified us, but we didn't verify sender), but if | ||
4792 | we pass this on, CORE would be confused (link down, messages | ||
4793 | arrive). We should investigate more if this happens often, | ||
4794 | or in a persistent manner, and possibly do "something" about | ||
4795 | it. Thus logging as error for now. */ | ||
4796 | GNUNET_break_op (0); | ||
4797 | GNUNET_STATISTICS_update (GST_stats, | ||
4798 | "# CORE messages droped (virtual link still down)", | ||
4799 | 1, | ||
4800 | GNUNET_NO); | ||
4801 | |||
4802 | finish_cmc_handling (cmc); | ||
4803 | return; | ||
4804 | } | ||
4771 | /* Forward to all CORE clients */ | 4805 | /* Forward to all CORE clients */ |
4772 | for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) | 4806 | for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next) |
4773 | { | 4807 | { |
@@ -4781,11 +4815,15 @@ handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh) | |||
4781 | memcpy (&im[1], mh, size); | 4815 | memcpy (&im[1], mh, size); |
4782 | GNUNET_MQ_send (tc->mq, env); | 4816 | GNUNET_MQ_send (tc->mq, env); |
4783 | } | 4817 | } |
4784 | /* FIXME: consider doing this _only_ once the message | 4818 | vl->core_recv_window--; |
4785 | was drained from the CORE MQs to extend flow control to CORE! | 4819 | if (vl->core_recv_window > 0) |
4786 | (basically, increment counter in cmc, decrement on MQ send continuation! | 4820 | { |
4787 | */ | 4821 | finish_cmc_handling (cmc); |
4788 | finish_cmc_handling (cmc); | 4822 | return; |
4823 | } | ||
4824 | /* Wait with calling #finish_cmc_handling(cmc) until the message | ||
4825 | was processed by CORE MQs (for CORE flow control)! */ | ||
4826 | GNUNET_CONTAINER_DLL_insert (vl->cmc_head, vl->cmc_tail, cmc); | ||
4789 | } | 4827 | } |
4790 | 4828 | ||
4791 | 4829 | ||
@@ -5345,7 +5383,8 @@ handle_reliability_ack (void *cls, | |||
5345 | } | 5383 | } |
5346 | 5384 | ||
5347 | ack_counter = htonl (ra->ack_counter); | 5385 | ack_counter = htonl (ra->ack_counter); |
5348 | // FIXME: track ACK losses based on ack_counter somewhere! | 5386 | (void) ack_counter; /* silence compiler warning for now */ |
5387 | // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere! | ||
5349 | // (DV and/or Neighbour?) | 5388 | // (DV and/or Neighbour?) |
5350 | finish_cmc_handling (cmc); | 5389 | finish_cmc_handling (cmc); |
5351 | } | 5390 | } |
@@ -7380,7 +7419,7 @@ reliability_box_message (struct Queue *queue, | |||
7380 | { | 7419 | { |
7381 | /* failed hard */ | 7420 | /* failed hard */ |
7382 | GNUNET_break (0); | 7421 | GNUNET_break (0); |
7383 | client_send_response (pm, GNUNET_NO, 0); | 7422 | client_send_response (pm); |
7384 | return NULL; | 7423 | return NULL; |
7385 | } | 7424 | } |
7386 | pa = prepare_pending_acknowledgement (queue, dvh, pm); | 7425 | pa = prepare_pending_acknowledgement (queue, dvh, pm); |
@@ -7531,7 +7570,7 @@ transmit_on_queue (void *cls) | |||
7531 | (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)) | 7570 | (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)) |
7532 | { | 7571 | { |
7533 | /* Full message sent, and over reliabile channel */ | 7572 | /* Full message sent, and over reliabile channel */ |
7534 | client_send_response (pm, GNUNET_YES, pm->bytes_msg); | 7573 | client_send_response (pm); |
7535 | } | 7574 | } |
7536 | else if ((GNUNET_TRANSPORT_CC_RELIABLE == | 7575 | else if ((GNUNET_TRANSPORT_CC_RELIABLE == |
7537 | queue->tc->details.communicator.cc) && | 7576 | queue->tc->details.communicator.cc) && |
@@ -7556,10 +7595,7 @@ transmit_on_queue (void *cls) | |||
7556 | 7595 | ||
7557 | /* Was this the last applicable fragmment? */ | 7596 | /* Was this the last applicable fragmment? */ |
7558 | if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg)) | 7597 | if ((NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg)) |
7559 | client_send_response ( | 7598 | client_send_response (pm); |
7560 | pm, | ||
7561 | GNUNET_YES, | ||
7562 | pm->bytes_msg /* FIXME: calculate and add overheads! */); | ||
7563 | } | 7599 | } |
7564 | else if (PMT_CORE != pm->pmt) | 7600 | else if (PMT_CORE != pm->pmt) |
7565 | { | 7601 | { |