summaryrefslogtreecommitdiff
path: root/src/transport/gnunet-service-tng.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/gnunet-service-tng.c')
-rw-r--r--src/transport/gnunet-service-tng.c565
1 files changed, 502 insertions, 63 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 8febbdfff..3cccf5173 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -33,7 +33,7 @@
* transport-to-transport traffic)
*
* Implement:
- * - manage fragmentation/defragmentation, retransmission, track RTT, loss, etc.
+ * - manage defragmentation, retransmission, track RTT, loss, etc.
*
* Easy:
* - use ATS bandwidth allocation callback and schedule transmissions!
@@ -165,8 +165,8 @@ struct TransportBackchannelEncapsulationMessage
/**
- * Message by which a peer confirms that it is using an
- * ephemeral key.
+ * Body by which a peqer confirms that it is using an ephemeral
+ * key.
*/
struct EphemeralConfirmation
{
@@ -192,6 +192,37 @@ struct EphemeralConfirmation
/**
+ * Message by which a peqer confirms that it is using an ephemeral
+ * key.
+ */
+struct EphemeralConfirmationMessage
+{
+
+ /**
+ * Message header, type is #GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Must be zero.
+ */
+ uint32_t reserved;
+
+ /**
+ * How long is this signature over the ephemeral key
+ * valid?
+ */
+ struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
+
+ /**
+ * Ephemeral key setup by the sender for @e target, used
+ * to encrypt the payload.
+ */
+ struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
+};
+
+
+/**
* Plaintext of the variable-size payload that is encrypted
* within a `struct TransportBackchannelEncapsulationMessage`
*/
@@ -863,7 +894,12 @@ struct PendingMessage
* Kept in a MDLL of messages from this @a cpm (if @e pmt is #PMT_FRAGMENT_BOX)
*/
struct PendingMessage *prev_frag;
-
+
+ /**
+ * This message, reliability boxed. Only possibly available if @e pmt is #PMT_CORE.
+ */
+ struct PendingMessage *bpm;
+
/**
* Target of the request.
*/
@@ -1798,6 +1834,36 @@ free_fragment_tree (struct PendingMessage *root)
/**
+ * Release memory associated with @a pm and remove @a pm from associated
+ * data structures. @a pm must be a top-level pending message and not
+ * a fragment in the tree. The entire tree is freed (if applicable).
+ *
+ * @param pm the pending message to free
+ */
+static void
+free_pending_message (struct PendingMessage *pm)
+{
+ struct TransportClient *tc = pm->client;
+ struct Neighbour *target = pm->target;
+
+ if (NULL != tc)
+ {
+ GNUNET_CONTAINER_MDLL_remove (client,
+ tc->details.core.pending_msg_head,
+ tc->details.core.pending_msg_tail,
+ pm);
+ }
+ GNUNET_CONTAINER_MDLL_remove (neighbour,
+ target->pending_msg_head,
+ target->pending_msg_tail,
+ pm);
+ free_fragment_tree (pm);
+ GNUNET_free_non_null (pm->bpm);
+ GNUNET_free (pm);
+}
+
+
+/**
* Send a response to the @a pm that we have processed a
* "send" request with status @a success. We
* transmitted @a bytes_physical on the actual wire.
@@ -1829,17 +1895,8 @@ client_send_response (struct PendingMessage *pm,
som->peer = target->pid;
GNUNET_MQ_send (tc->mq,
env);
- GNUNET_CONTAINER_MDLL_remove (client,
- tc->details.core.pending_msg_head,
- tc->details.core.pending_msg_tail,
- pm);
}
- GNUNET_CONTAINER_MDLL_remove (neighbour,
- target->pending_msg_head,
- target->pending_msg_tail,
- pm);
- free_fragment_tree (pm);
- GNUNET_free (pm);
+ free_pending_message (pm);
}
@@ -2176,36 +2233,291 @@ handle_del_address (void *cls,
/**
+ * Context from #handle_incoming_msg(). Closure for many
+ * message handlers below.
+ */
+struct CommunicatorMessageContext
+{
+ /**
+ * Which communicator provided us with the message.
+ */
+ struct TransportClient *tc;
+
+ /**
+ * Additional information for flow control and about the sender.
+ */
+ struct GNUNET_TRANSPORT_IncomingMessage im;
+};
+
+
+/**
+ * Send ACK to communicator (if requested) and free @a cmc.
+ *
+ * @param cmc context for which we are done handling the message
+ */
+static void
+finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+{
+ // FIXME: if (0 != ntohl (im->fc_on)) => send ACK when done to communicator for flow control!
+ GNUNET_SERVICE_client_continue (cmc->tc->client);
+
+ GNUNET_free (cmc);
+}
+
+
+/**
+ * Communicator gave us an unencapsulated message to pass
+ * as-is to CORE. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
+ * @param mh the message that was received
+ */
+static void
+handle_raw_message (void *cls,
+ const struct GNUNET_MessageHeader *mh)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a fragment box. Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param fb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_fragment_box (void *cls,
+ const struct TransportFragmentBox *fb)
+{
+ // FIXME! check that off + size-of-payload <= total-length!
+ return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a fragment. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
+ * @param fb the message that was received
+ */
+static void
+handle_fragment_box (void *cls,
+ const struct TransportFragmentBox *fb)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a fragment acknowledgement. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
+ * @param fa the message that was received
+ */
+static void
+handle_fragment_ack (void *cls,
+ const struct TransportFragmentAckMessage *fa)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a reliability box. Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param rb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_reliability_box (void *cls,
+ const struct TransportReliabilityBox *rb)
+{
+ GNUNET_MQ_check_boxed_message (rb);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a reliability box. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
+ * @param rb the message that was received
+ */
+static void
+handle_reliability_box (void *cls,
+ const struct TransportReliabilityBox *rb)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a reliability ack. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
+ * @param ra the message that was received
+ */
+static void
+handle_reliability_ack (void *cls,
+ const struct TransportReliabilityAckMessage *ra)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a backchannel encapsulation. Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param be the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_backchannel_encapsulation (void *cls,
+ const struct TransportBackchannelEncapsulationMessage *be)
+{
+ // FIXME: do work!
+ return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a backchannel encapsulation. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
+ * @param be the message that was received
+ */
+static void
+handle_backchannel_encapsulation (void *cls,
+ const struct TransportBackchannelEncapsulationMessage *be)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us an ephemeral confirmation. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
+ * @param ec the message that was received
+ */
+static void
+handle_ephemeral_confirmation (void *cls,
+ const struct EphemeralConfirmationMessage *ec)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a DV learn message. Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param dvl the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_dv_learn (void *cls,
+ const struct TransportDVLearn *dvl)
+{
+ // FIXME: do work!
+ return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a DV learn message. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
+ * @param dvl the message that was received
+ */
+static void
+handle_dv_learn (void *cls,
+ const struct TransportDVLearn *dvl)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a DV box. Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param dvb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_dv_box (void *cls,
+ const struct TransportDVBox *dvb)
+{
+ // FIXME: do work!
+ return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a DV box. Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call #finish_cmc_handling() when done)
+ * @param dvb the message that was received
+ */
+static void
+handle_dv_box (void *cls,
+ const struct TransportDVBox *dvb)
+{
+ struct CommunicatorMessageContext *cmc = cls;
+
+ // FIXME: do work!
+ finish_cmc_handling (cmc);
+}
+
+
+/**
* Client notified us about transmission from a peer. Process the request.
*
- * @param cls the client
+ * @param cls a `struct TransportClient` which sent us the message
* @param obm the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
*/
static int
check_incoming_msg (void *cls,
const struct GNUNET_TRANSPORT_IncomingMessage *im)
{
struct TransportClient *tc = cls;
- uint16_t size;
- const struct GNUNET_MessageHeader *obmm;
if (CT_COMMUNICATOR != tc->type)
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
- size = ntohs (im->header.size) - sizeof (*im);
- if (size < sizeof (struct GNUNET_MessageHeader))
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- obmm = (const struct GNUNET_MessageHeader *) &im[1];
- if (size != ntohs (obmm->size))
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
+ GNUNET_MQ_check_boxed_message (im);
return GNUNET_OK;
}
@@ -2213,7 +2525,6 @@ check_incoming_msg (void *cls,
/**
* Incoming meessage. Process the request.
*
- * @param cls the client
* @param im the send message that was received
*/
static void
@@ -2221,8 +2532,61 @@ handle_incoming_msg (void *cls,
const struct GNUNET_TRANSPORT_IncomingMessage *im)
{
struct TransportClient *tc = cls;
+ struct CommunicatorMessageContext *cmc = GNUNET_new (struct CommunicatorMessageContext);
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (fragment_box,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
+ struct TransportFragmentBox,
+ &cmc),
+ GNUNET_MQ_hd_fixed_size (fragment_ack,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
+ struct TransportFragmentAckMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (reliability_box,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
+ struct TransportReliabilityBox,
+ &cmc),
+ GNUNET_MQ_hd_fixed_size (reliability_ack,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
+ struct TransportReliabilityAckMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (backchannel_encapsulation,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
+ struct TransportBackchannelEncapsulationMessage,
+ &cmc),
+ GNUNET_MQ_hd_fixed_size (ephemeral_confirmation,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION,
+ struct EphemeralConfirmationMessage,
+ &cmc),
+ GNUNET_MQ_hd_var_size (dv_learn,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
+ struct TransportDVLearn,
+ &cmc),
+ GNUNET_MQ_hd_var_size (dv_box,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
+ struct TransportDVBox,
+ &cmc),
+ GNUNET_MQ_handler_end()
+ };
+ int ret;
- GNUNET_SERVICE_client_continue (tc->client);
+ cmc->tc = tc;
+ cmc->im = *im;
+ ret = GNUNET_MQ_handle_message (handlers,
+ (const struct GNUNET_MessageHeader *) &im[1]);
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ GNUNET_free (cmc);
+ return;
+ }
+ if (GNUNET_NO == ret)
+ {
+ /* unencapsulated 'raw' message */
+ handle_raw_message (&cmc,
+ (const struct GNUNET_MessageHeader *) &im[1]);
+ }
}
@@ -2269,6 +2633,23 @@ tracker_update_in_cb (void *cls)
/**
+ * If necessary, generates the UUID for a @a pm
+ *
+ * @param pm pending message to generate UUID for.
+ */
+static void
+set_pending_message_uuid (struct PendingMessage *pm)
+{
+ if (pm->msg_uuid_set)
+ return;
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+ &pm->msg_uuid,
+ sizeof (pm->msg_uuid));
+ pm->msg_uuid_set = GNUNET_YES;
+}
+
+
+/**
* Fragment the given @a pm to the given @a mtu. Adds
* additional fragments to the neighbour as well. If the
* @a mtu is too small, generates and error for the @a pm
@@ -2284,13 +2665,7 @@ fragment_message (struct PendingMessage *pm,
{
struct PendingMessage *ff;
- if (GNUNET_NO == pm->msg_uuid_set)
- {
- GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
- &pm->msg_uuid,
- sizeof (pm->msg_uuid));
- pm->msg_uuid_set = GNUNET_YES;
- }
+ set_pending_message_uuid (pm);
/* This invariant is established in #handle_add_queue_message() */
GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
@@ -2390,24 +2765,50 @@ fragment_message (struct PendingMessage *pm,
static struct PendingMessage *
reliability_box_message (struct PendingMessage *pm)
{
- if (PMT_CORE != pm->pmt)
- {
- /* already fragmented or reliability boxed, or control message: do nothing */
- return pm;
- }
-
- if (0) // FIXME
+ struct TransportReliabilityBox rbox;
+ struct PendingMessage *bpm;
+ char *msg;
+
+ if (PMT_CORE != pm->pmt)
+ return pm; /* already fragmented or reliability boxed, or control message: do nothing */
+ if (NULL != pm->bpm)
+ return pm->bpm; /* already computed earlier: do nothing */
+ GNUNET_assert (NULL == pm->head_frag);
+ if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX)
{
/* failed hard */
- // FIMXE: bitch
+ GNUNET_break (0);
client_send_response (pm,
GNUNET_NO,
0);
return NULL;
}
-
- /* FIXME: return boxed PM here! */
- return NULL;
+ bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
+ sizeof (rbox) +
+ pm->bytes_msg);
+ bpm->target = pm->target;
+ bpm->frag_parent = pm;
+ GNUNET_CONTAINER_MDLL_insert (frag,
+ pm->head_frag,
+ pm->tail_frag,
+ bpm);
+ bpm->timeout = pm->timeout;
+ bpm->pmt = PMT_RELIABILITY_BOX;
+ bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
+ set_pending_message_uuid (bpm);
+ rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
+ rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
+ rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
+ rbox.msg_uuid = pm->msg_uuid;
+ msg = (char *) &bpm[1];
+ memcpy (msg,
+ &rbox,
+ sizeof (rbox));
+ memcpy (&msg[sizeof (rbox)],
+ &pm[1],
+ pm->bytes_msg);
+ pm->bpm = bpm;
+ return bpm;
}
@@ -2542,26 +2943,64 @@ transmit_on_queue (void *cls)
else if (PMT_CORE != pm->pmt)
{
/* This was an acknowledgement of some type, always free */
-
- struct Neighbour *neighbour = pm->target;
- GNUNET_CONTAINER_MDLL_remove (neighbour,
- neighbour->pending_msg_head,
- neighbour->pending_msg_tail,
- pm);
- GNUNET_free (pm);
+ free_pending_message (pm);
}
else
{
/* message not finished, waiting for acknowledgement */
- // FIXME: update time by which we might retransmit 's' based on
- // queue characteristics (i.e. RTT)
-
- // FIXME: move 'pm' back in the transmission queue (simplistic: to
- // the end, better: with position depending on type, timeout,
- // etc.)
+ struct Neighbour *neighbour = pm->target;
+ /* Update time by which we might retransmit 's' based on queue
+ characteristics (i.e. RTT); it takes one RTT for the message to
+ arrive and the ACK to come back in the best case; but the other
+ side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
+ retransmitting. Note that in the future this heuristic should
+ likely be improved further (measure RTT stability, consider
+ message urgency and size when delaying ACKs, etc.) */
+ s->next_attempt = GNUNET_TIME_relative_to_absolute
+ (GNUNET_TIME_relative_multiply (queue->rtt,
+ 4));
+ if (s == pm)
+ {
+ struct PendingMessage *pos;
+
+ /* re-insert sort in neighbour list */
+ GNUNET_CONTAINER_MDLL_remove (neighbour,
+ neighbour->pending_msg_head,
+ neighbour->pending_msg_tail,
+ pm);
+ pos = neighbour->pending_msg_tail;
+ while ( (NULL != pos) &&
+ (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
+ pos = pos->prev_neighbour;
+ GNUNET_CONTAINER_MDLL_insert_after (neighbour,
+ neighbour->pending_msg_head,
+ neighbour->pending_msg_tail,
+ pos,
+ pm);
+ }
+ else
+ {
+ /* re-insert sort in fragment list */
+ struct PendingMessage *fp = s->frag_parent;
+ struct PendingMessage *pos;
+
+ GNUNET_CONTAINER_MDLL_remove (frag,
+ fp->head_frag,
+ fp->tail_frag,
+ s);
+ pos = fp->tail_frag;
+ while ( (NULL != pos) &&
+ (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
+ pos = pos->prev_frag;
+ GNUNET_CONTAINER_MDLL_insert_after (frag,
+ fp->head_frag,
+ fp->tail_frag,
+ pos,
+ s);
+ }
}
- /* finally, re-schedule self */
+ /* finally, re-schedule queue transmission task itself */
schedule_transmit_on_queue (queue);
}