From e1ec52bbdd5faeb0254ff24cd7ec1a9ec45ffb1e Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Fri, 16 Aug 2013 17:26:04 +0000 Subject: - add generic channel buffering --- src/mesh/gnunet-service-mesh-enc.c | 174 ++++++++++++++++++++----------------- 1 file changed, 95 insertions(+), 79 deletions(-) diff --git a/src/mesh/gnunet-service-mesh-enc.c b/src/mesh/gnunet-service-mesh-enc.c index ccb66f75f..75f788895 100644 --- a/src/mesh/gnunet-service-mesh-enc.c +++ b/src/mesh/gnunet-service-mesh-enc.c @@ -27,6 +27,7 @@ * - when sending in-order buffered data, wait for client ACKs * - add signatures * - add encryption + * - set connection IDs independently from tunnel, tunnel has no ID * * TODO: * - relay corking down to core @@ -376,6 +377,11 @@ struct MeshReliableMessage struct MeshReliableMessage *next; struct MeshReliableMessage *prev; + /** + * Type of message (payload, channel management). + */ + int16_t type; + /** * Tunnel Reliability queue this message is in. */ @@ -3091,9 +3097,10 @@ channel_send_data_ack (struct MeshChannel *ch, int fwd) struct GNUNET_MESH_DataACK msg; struct MeshChannelReliability *rel; struct MeshReliableMessage *copy; + unsigned int delta; uint64_t mask; uint32_t *mid; - unsigned int delta; + uint16_t type; if (GNUNET_NO == ch->reliable) { @@ -3106,14 +3113,16 @@ channel_send_data_ack (struct MeshChannel *ch, int fwd) "send_data_ack for %u\n", *mid - 1); - msg.header.type = htons (fwd ? GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK : - GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK); + type = GNUNET_MESSAGE_TYPE_MESH_DATA_ACK; + msg.header.type = htons (type); msg.header.size = htons (sizeof (msg)); msg.chid = htonl (ch->gid); msg.mid = htonl (*mid - 1); msg.futures = 0; for (copy = rel->head_recv; NULL != copy; copy = copy->next) { + if (copy->type != type) + continue; delta = copy->mid - *mid; if (63 < delta) break; @@ -3265,6 +3274,7 @@ channel_send_client_buffered_data (struct MeshChannel *ch, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n"); copy = rel->head_recv; + /* We never buffer channel management messages */ if (NULL != copy) { if (copy->mid == *mid || GNUNET_NO == ch->reliable) @@ -3298,7 +3308,7 @@ channel_send_client_buffered_data (struct MeshChannel *ch, * Buffer it until we receive an ACK from the client or the missing * message from the channel. * - * @param msg Message to buffer. + * @param msg Message to buffer (MUST be of type MESH_DATA). * @param rel Reliability data to the corresponding direction. */ static void @@ -3312,6 +3322,7 @@ channel_rel_add_buffered_data (const struct GNUNET_MESH_Data *msg, size = ntohs (msg->header.size); mid = ntohl (msg->mid); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "add_buffered_data %u\n", mid); copy = GNUNET_malloc (sizeof (*copy) + size); @@ -3673,6 +3684,54 @@ channel_confirm (struct MeshChannel *ch, int fwd) } +/** + * Save a copy to retransmit in case it gets lost. + * + * Initializes all needed callbacks and timers. + * + * @param ch Channel this message goes on. + * @param msg Message to copy. + * @param fwd Is this fwd traffic? + */ +static void +channel_save_copy (struct MeshChannel *ch, + const struct GNUNET_MessageHeader *msg, + int fwd) +{ + struct MeshChannelReliability *rel; + struct MeshReliableMessage *copy; + uint32_t mid; + uint16_t type; + uint16_t size; + + rel = fwd ? ch->fwd_rel : ch->bck_rel; + mid = fwd ? ch->mid_send_fwd : ch->mid_send_bck; + type = ntohs (msg->type); + size = ntohs (msg->size); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! SAVE %u\n", mid); + copy = GNUNET_malloc (sizeof (struct MeshReliableMessage) + size); + copy->mid = mid; + copy->timestamp = GNUNET_TIME_absolute_get (); + copy->rel = rel; + copy->type = type; + memcpy (©[1], msg, size); + rel->n_sent++; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " n_sent %u\n", rel->n_sent); + GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy); + if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task) + { + rel->retry_timer = + GNUNET_TIME_relative_multiply (rel->expected_delay, + MESH_RETRANSMIT_MARGIN); + rel->retry_task = + GNUNET_SCHEDULER_add_delayed (rel->retry_timer, + &channel_retransmit_message, + rel); + } +} + + /** * Send keepalive packets for a connection. * @@ -4594,8 +4653,7 @@ queue_send (void *cls, size_t size, void *buf) else data_size = send_core_data_raw (queue->cls, size, buf); break; - case GNUNET_MESSAGE_TYPE_MESH_UNICAST: - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: + case GNUNET_MESSAGE_TYPE_MESH_DATA: case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE: case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY: /* This should be encapsulted */ @@ -5527,6 +5585,7 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer, size_t dsize = size - sizeof (struct GNUNET_MESH_Encrypted); char cbuf[dsize]; struct GNUNET_MessageHeader *msgh; + int r; /* TODO signature verification */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " it's for us!\n"); @@ -5537,52 +5596,41 @@ handle_mesh_encrypted (const struct GNUNET_PeerIdentity *peer, msgh = (struct GNUNET_MessageHeader *) cbuf; switch (ntohs (msgh->type)) { - case GNUNET_MESSAGE_TYPE_MESH_UNICAST_ACK: - if (GNUNET_YES == fwd) - return handle_data_ack (t, (struct GNUNET_MESH_DataACK *) msgh, - GNUNET_YES); - GNUNET_break_op (0); - break; - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIG_ACK: - if (GNUNET_NO == fwd) - return handle_data_ack (t, (struct GNUNET_MESH_DataACK *) msgh, - GNUNET_YES); - GNUNET_break_op (0); - break; - case GNUNET_MESSAGE_TYPE_MESH_UNICAST: - if (GNUNET_YES == fwd) - handle_data (t, (struct GNUNET_MESH_Data *) msgh, GNUNET_YES); - GNUNET_break_op (0); + case GNUNET_MESSAGE_TYPE_MESH_DATA_ACK: + r = handle_data_ack (t, (struct GNUNET_MESH_DataACK *) msgh, fwd); break; - case GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN: - if (GNUNET_NO == fwd) - handle_data (t, (struct GNUNET_MESH_Data *) msgh, GNUNET_NO); - GNUNET_break_op (0); + + case GNUNET_MESSAGE_TYPE_MESH_DATA: + r = handle_data (t, (struct GNUNET_MESH_Data *) msgh, fwd); break; + case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE: - return handle_channel_create (t, - (struct GNUNET_MESH_ChannelCreate *) msgh, - fwd); + r = handle_channel_create (t, + (struct GNUNET_MESH_ChannelCreate *) msgh, + fwd); break; + case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_ACK: - return handle_channel_ack (t, - (struct GNUNET_MESH_ChannelManage *) msgh, - fwd); + r = handle_channel_ack (t, + (struct GNUNET_MESH_ChannelManage *) msgh, + fwd); break; + case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY: - return handle_channel_destroy (t, - (struct GNUNET_MESH_ChannelManage *) - msgh, - fwd); + r = handle_channel_destroy (t, + (struct GNUNET_MESH_ChannelManage *) msgh, + fwd); break; + default: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "end-to-end message not known (%u)\n", ntohs (msgh->type)); + r = GNUNET_OK; } connection_send_ack (c, fwd); - return GNUNET_OK; + return r; } /* Message not for us: forward to next hop */ @@ -6315,58 +6363,26 @@ handle_local_data (void *cls, struct GNUNET_SERVER_Client *client, else ch->blocked_bck = GNUNET_YES; - /* Ok, everything is correct, send the message - * (pretend we got it from a mesh peer) - */ + /* Ok, everything is correct, send the message. */ { struct GNUNET_MESH_Data *payload; - char cbuf[sizeof(struct GNUNET_MESH_Data) + size]; + uint16_t p2p_size = sizeof(struct GNUNET_MESH_Data) + size; + unsigned char cbuf[p2p_size]; uint32_t *mid; mid = fwd ? &ch->mid_send_fwd : &ch->mid_send_bck; - if (GNUNET_YES == ch->reliable) - { - struct MeshChannelReliability *rel; - struct MeshReliableMessage *copy; - - rel = fwd ? ch->fwd_rel : ch->bck_rel; - copy = GNUNET_malloc (sizeof (struct MeshReliableMessage) - + sizeof(struct GNUNET_MESH_Data) - + size); - copy->mid = *mid; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! DATA %u\n", copy->mid); - copy->timestamp = GNUNET_TIME_absolute_get (); - copy->rel = rel; - rel->n_sent++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " n_sent %u\n", rel->n_sent); - GNUNET_CONTAINER_DLL_insert_tail (rel->head_sent, rel->tail_sent, copy); - if (GNUNET_SCHEDULER_NO_TASK == rel->retry_task) - { - rel->retry_timer = - GNUNET_TIME_relative_multiply (rel->expected_delay, - MESH_RETRANSMIT_MARGIN); - rel->retry_task = - GNUNET_SCHEDULER_add_delayed (rel->retry_timer, - &channel_retransmit_message, - rel); - } - payload = (struct GNUNET_MESH_Data *) ©[1]; - } - else - { - payload = (struct GNUNET_MESH_Data *) cbuf; - } + payload = (struct GNUNET_MESH_Data *) cbuf; payload->mid = htonl (*mid); *mid = *mid + 1; memcpy (&payload[1], &msg[1], size); - payload->header.size = htons (sizeof (struct GNUNET_MESH_Data) + size); - payload->header.type = htons (chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV ? - GNUNET_MESSAGE_TYPE_MESH_UNICAST : - GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN); + payload->header.size = htons (p2p_size); + payload->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_DATA); payload->chid = htonl (ch->gid); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - " calling generic handler...\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " sending on channel...\n"); send_prebuilt_message_channel (&payload->header, ch, fwd); + + if (GNUNET_YES == ch->reliable) + channel_save_copy (ch, &payload->header, fwd); } if (tunnel_get_buffer (ch->t, fwd) > 0) send_local_ack (ch, c, fwd); -- cgit v1.2.3