From 67342da57d1baeea6a486b9bf9feac670d53c8d2 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 16 Feb 2010 16:26:48 +0000 Subject: making things nice by breaking tons --- src/core/gnunet-service-core.c | 2 +- src/include/gnunet_protocols.h | 24 +- src/include/gnunet_transport_service.h | 4 - src/transport/gnunet-service-transport.c | 244 ++-- src/transport/test_transport_api.c | 64 +- src/transport/test_transport_api_tcp_peer2.conf | 2 +- src/transport/transport.h | 42 +- src/transport/transport_api.c | 1725 ++++++++++------------- 8 files changed, 867 insertions(+), 1240 deletions(-) (limited to 'src') diff --git a/src/core/gnunet-service-core.c b/src/core/gnunet-service-core.c index 47f6b41c2..08c1c5455 100644 --- a/src/core/gnunet-service-core.c +++ b/src/core/gnunet-service-core.c @@ -1096,7 +1096,7 @@ process_encrypted_neighbour_queue (struct Neighbour *n) n); if (n->th == NULL) { - /* message request too large (oops) */ + /* message request too large or duplicate request */ GNUNET_break (0); /* discard encrypted message */ GNUNET_assert (NULL != (m = n->encrypted_head)); diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 249ba3de0..e43143168 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -163,49 +163,37 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA 26 -/** - * Message telling transport to try to connect to the - * given peer. - */ -#define GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT 27 - -/** - * Response to another peer confirming that communication was - * established. - */ -#define GNUNET_MESSAGE_TYPE_TRANSPORT_ACK 28 - /** * Request to look addresses of peers in server. */ -#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP 29 +#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP 27 /** * Response to the address lookup request. */ -#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY 30 +#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY 28 /** * Change in blacklisting status of a peer. */ -#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST 31 +#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST 29 /** * Request to transport to notify us about any blacklisting status * changes on this connection (and to immediately send all * active blacklist entries). */ -#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST_NOTIFY 32 +#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST_NOTIFY 30 /** * Transport PING message */ -#define GNUNET_MESSAGE_TYPE_TRANSPORT_PING 33 +#define GNUNET_MESSAGE_TYPE_TRANSPORT_PING 32 /** * Transport PONG message */ -#define GNUNET_MESSAGE_TYPE_TRANSPORT_PONG 34 +#define GNUNET_MESSAGE_TYPE_TRANSPORT_PONG 33 /** * Request addition of a HELLO diff --git a/src/include/gnunet_transport_service.h b/src/include/gnunet_transport_service.h index 650dd2e68..7a3ccbf7b 100644 --- a/src/include/gnunet_transport_service.h +++ b/src/include/gnunet_transport_service.h @@ -22,10 +22,6 @@ * @file include/gnunet_transport_service.h * @brief low-level P2P IO * @author Christian Grothoff - * - * TODO: - * - define API for blacklisting, un-blacklisting and notifications - * about blacklisted peers */ #ifndef GNUNET_TRANSPORT_SERVICE_H diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index e2616f078..854eef773 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -197,6 +197,12 @@ struct ForeignAddressList */ unsigned int connect_attempts; + /** + * DV distance to this peer (1 if no DV is used). + * FIXME: need to set this from transport plugins! + */ + uint32_t distance; + }; @@ -445,6 +451,11 @@ struct NeighborList */ struct GNUNET_TIME_Relative latency; + /** + * DV distance to this peer (1 if no DV is used). + */ + uint32_t distance; + /** * How many bytes have we received since the "last_quota_update" * timestamp? @@ -465,9 +476,8 @@ struct NeighborList unsigned int quota_violation_count; /** - * Have we seen an ACK from this neighbor in the past? - * (used to make up a fake ACK for clients connecting after - * the neighbor connected to us). + * Have we seen an PONG from this neighbor in the past (and + * not had a disconnect since)? */ int received_pong; @@ -771,7 +781,6 @@ static struct CheckHelloValidatedContext *chvc_tail; static struct GNUNET_CONTAINER_MultiHashMap *validation_map; - /** * The peer specified by the given neighbor has timed-out or a plugin * has disconnected. We may either need to do nothing (other plugins @@ -999,6 +1008,30 @@ transmit_to_client (struct TransportClient *client, } +/** + * Transmit a 'SEND_OK' notification to the given client for the + * given neighbor. + * + * @param client who to notify + * @param n neighbor to notify about + * @param result status code for the transmission request + */ +static void +transmit_send_ok (struct TransportClient *client, + struct NeighborList *n, + int result) +{ + struct SendOkMessage send_ok_msg; + + send_ok_msg.header.size = htons (sizeof (send_ok_msg)); + send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); + send_ok_msg.success = htonl (result); + send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency); + send_ok_msg.peer = n->id; + transmit_to_client (client, &send_ok_msg.header, GNUNET_NO); +} + + /** * Function called by the GNUNET_TRANSPORT_TransmitFunction * upon "completion" of a send request. This tells the API @@ -1019,60 +1052,32 @@ transmit_send_continuation (void *cls, int result) { struct MessageQueue *mq = cls; - /*struct ReadyList *rl;*/ /* We no longer use the ReadyList for anything here, safe to remove? */ - struct SendOkMessage send_ok_msg; struct NeighborList *n; - GNUNET_assert (mq != NULL); n = find_neighbor(&mq->neighbor_id); - if (n == NULL) /* Neighbor must have been removed asynchronously! */ - return; - - /* Otherwise, let's make sure we've got the right peer */ - GNUNET_assert (0 == - memcmp (&n->id, target, - sizeof (struct GNUNET_PeerIdentity))); - - if (result == GNUNET_OK) + GNUNET_assert (n != NULL); + if (mq->specific_address != NULL) { - if (mq->specific_address != NULL) + if (result == GNUNET_OK) { mq->specific_address->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); mq->specific_address->connected = GNUNET_YES; - } - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission to peer `%s' failed, marking connection as down.\n", - GNUNET_i2s (target)); - if (mq->specific_address != NULL) - mq->specific_address->connected = GNUNET_NO; + } + else + { + mq->specific_address->connected = GNUNET_NO; + } + if (! mq->internal_msg) + mq->specific_address->in_transmit = GNUNET_NO; } - if ( (! mq->internal_msg) && - (mq->specific_address != NULL) ) - mq->specific_address->in_transmit = GNUNET_NO; - if (mq->client != NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Notifying client %p about transmission to peer `%4s'.\n", - mq->client, GNUNET_i2s (target)); - send_ok_msg.header.size = htons (sizeof (send_ok_msg)); - send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); - send_ok_msg.success = htonl (result); - send_ok_msg.peer = n->id; - transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO); - } + transmit_send_ok (mq->client, n, result); GNUNET_free (mq); - /* one plugin just became ready again, try transmitting - another message (if available) */ - if (result == GNUNET_OK) - try_transmission_to_peer (n); - else - disconnect_neighbor (n, GNUNET_YES); + try_transmission_to_peer (n); + if (result != GNUNET_OK) + disconnect_neighbor (n, GNUNET_YES); } @@ -1159,8 +1164,12 @@ try_transmission_to_peer (struct NeighborList *neighbor) min_latency = GNUNET_TIME_UNIT_FOREVER_REL; rl = NULL; mq = neighbor->messages_head; + /* FIXME: support bi-directional use of TCP */ if (mq->specific_address == NULL) mq->specific_address = find_ready_address(neighbor); + GNUNET_CONTAINER_DLL_remove (neighbor->messages_head, + neighbor->messages_tail, + mq); if (mq->specific_address == NULL) { #if DEBUG_TRANSPORT @@ -1169,14 +1178,14 @@ try_transmission_to_peer (struct NeighborList *neighbor) mq->message_buf_size, GNUNET_i2s (&mq->neighbor_id)); #endif + if (mq->client != NULL) + transmit_send_ok (mq->client, neighbor, GNUNET_NO); + GNUNET_free (mq); return; /* nobody ready */ } if (mq->specific_address->connected == GNUNET_NO) mq->specific_address->connect_attempts++; rl = mq->specific_address->ready_list; - GNUNET_CONTAINER_DLL_remove (neighbor->messages_head, - neighbor->messages_tail, - mq); mq->plugin = rl->plugin; if (!mq->internal_msg) mq->specific_address->in_transmit = GNUNET_YES; @@ -1251,7 +1260,6 @@ transmit_to_peer (struct TransportClient *client, memcpy(&mq->neighbor_id, &neighbor->id, sizeof(struct GNUNET_PeerIdentity)); mq->internal_msg = is_internal; mq->priority = priority; - if (is_internal) GNUNET_CONTAINER_DLL_insert (neighbor->messages_head, neighbor->messages_tail, @@ -1495,14 +1503,20 @@ plugin_env_notify_address (void *cls, */ static void notify_clients_connect (const struct GNUNET_PeerIdentity *peer, - struct GNUNET_TIME_Relative latency) + struct GNUNET_TIME_Relative latency, + uint32_t distance) { struct ConnectInfoMessage cim; struct TransportClient *cpos; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Notifying clients about connection from `%s'\n", + GNUNET_i2s (peer)); +#endif cim.header.size = htons (sizeof (struct ConnectInfoMessage)); cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); + cim.distance = htonl (distance); cim.latency = GNUNET_TIME_relative_hton (latency); memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity)); cpos = clients; @@ -1523,6 +1537,11 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) struct DisconnectInfoMessage dim; struct TransportClient *cpos; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Notifying clients about lost connection to `%s'\n", + GNUNET_i2s (peer)); +#endif dim.header.size = htons (sizeof (struct DisconnectInfoMessage)); dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); dim.reserved = htonl (0); @@ -1612,6 +1631,7 @@ add_peer_address(struct NeighborList *neighbor, ret->expires = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); ret->latency = GNUNET_TIME_relative_get_forever(); + ret->distance = -1; ret->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); ret->ready_list = head; @@ -1731,6 +1751,12 @@ check_pending_validation (void *cls, n->latency = fal->latency; else n->latency.value = (fal->latency.value + n->latency.value) / 2; + n->distance = fal->distance; + if (GNUNET_NO == n->received_pong) + { + notify_clients_connect (&target, n->latency, n->distance); + n->received_pong = GNUNET_YES; + } } /* clean up validation entry */ @@ -1795,6 +1821,7 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message, #endif return; } + #if 0 /* FIXME: add given address to potential pool of our addresses (for voting) */ @@ -1809,7 +1836,7 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message, static void neighbor_timeout_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct NeighborList *n = cls; @@ -1861,13 +1888,13 @@ setup_new_neighbor (const struct GNUNET_PeerIdentity *peer) tp = tp->next; } n->latency = GNUNET_TIME_UNIT_FOREVER_REL; + n->distance = -1; n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, &neighbor_timeout_task, n); transmit_to_peer (NULL, NULL, 0, (const char *) our_hello, GNUNET_HELLO_size(our_hello), GNUNET_NO, n); - notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL); return n; } @@ -2266,28 +2293,15 @@ process_hello (struct TransportPlugin *plugin, * disconnect? */ static void -disconnect_neighbor (struct NeighborList *current_handle, int check) +disconnect_neighbor (struct NeighborList *n, int check) { struct ReadyList *rpos; struct NeighborList *npos; struct NeighborList *nprev; - struct NeighborList *n; struct MessageQueue *mq; struct ForeignAddressList *peer_addresses; struct ForeignAddressList *peer_pos; - if (neighbors == NULL) - return; /* We don't have any neighbors, so client has an already removed handle! */ - - npos = neighbors; - while ((npos != NULL) && (current_handle != npos)) - npos = npos->next; - - if (npos == NULL) - return; /* Couldn't find neighbor in existing list, must have been already removed! */ - else - n = npos; - if (GNUNET_YES == check) { rpos = n->plugins; @@ -2303,10 +2317,10 @@ disconnect_neighbor (struct NeighborList *current_handle, int check) rpos = rpos->next; } } - #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id)); + "Disconnecting from `%4s'\n", + GNUNET_i2s (&n->id)); #endif /* remove n from neighbors list */ nprev = NULL; @@ -2323,7 +2337,8 @@ disconnect_neighbor (struct NeighborList *current_handle, int check) nprev->next = n->next; /* notify all clients about disconnect */ - notify_clients_disconnect (&n->id); + if (GNUNET_YES == n->received_pong) + notify_clients_disconnect (&n->id); /* clean up all plugins, cancel connections and pending transmissions */ while (NULL != (rpos = n->plugins)) @@ -2352,7 +2367,10 @@ disconnect_neighbor (struct NeighborList *current_handle, int check) GNUNET_free (mq); } if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (sched, n->timeout_task); + { + GNUNET_SCHEDULER_cancel (sched, n->timeout_task); + n->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } /* finally, free n itself */ GNUNET_free (n); } @@ -2504,9 +2522,10 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, peer_address = add_peer_address(n, plugin->short_name, sender_address, - sender_address_len); + sender_address_len); if (peer_address != NULL) { + peer_address->distance = distance; if (peer_address->connected == GNUNET_NO) { peer_address->connected = GNUNET_YES; @@ -2520,10 +2539,12 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, /* update traffic received amount ... */ msize = ntohs (message->size); n->last_received += msize; - GNUNET_SCHEDULER_cancel (sched, n->timeout_task); + n->distance = distance; n->peer_timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_SCHEDULER_cancel (sched, + n->timeout_task); n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, @@ -2551,6 +2572,11 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, handle_pong(plugin, message, peer, sender_address, sender_address_len); break; default: + if (! n->received_pong) + { + GNUNET_break_op (0); + break; + } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %u from `%4s', sending to all clients.\n", @@ -2591,8 +2617,6 @@ handle_start (void *cls, struct TransportClient *c; struct ConnectInfoMessage cim; struct NeighborList *n; - struct InboundMessage *im; - struct GNUNET_MessageHeader *ack; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2626,34 +2650,18 @@ handle_start (void *cls, /* tell new client about all existing connections */ cim.header.size = htons (sizeof (struct ConnectInfoMessage)); cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim.quota_out = - htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000)); - /* FIXME: this ACK stuff is not nice! */ - im = GNUNET_malloc (sizeof (struct InboundMessage) + - sizeof (struct GNUNET_MessageHeader)); - im->header.size = htons (sizeof (struct InboundMessage) + - sizeof (struct GNUNET_MessageHeader)); - im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV); - im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO); - ack = (struct GNUNET_MessageHeader *) &im[1]; - ack->size = htons (sizeof (struct GNUNET_MessageHeader)); - ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK); - for (n = neighbors; n != NULL; n = n->next) - { - cim.id = n->id; - cim.latency = GNUNET_TIME_relative_hton (n->latency); - transmit_to_client (c, &cim.header, GNUNET_NO); - if (n->received_pong) - { - im->peer = n->id; - transmit_to_client (c, &im->header, GNUNET_NO); + n = neighbors; + while (n != NULL) + { + if (GNUNET_YES == n->received_pong) + { + cim.id = n->id; + cim.latency = GNUNET_TIME_relative_hton (n->latency); + cim.distance = htonl (n->distance); + transmit_to_client (c, &cim.header, GNUNET_NO); } + n = n->next; } - GNUNET_free (im); - } - else - { - fprintf(stderr, "Our hello is NULL!\n"); } GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2725,7 +2733,7 @@ handle_send (void *cls, } n = find_neighbor (&obm->peer); if (n == NULL) - n = setup_new_neighbor (&obm->peer); /* But won't ever add address, we have none! */ + n = setup_new_neighbor (&obm->peer); tc = clients; while ((tc != NULL) && (tc->client != client)) tc = tc->next; @@ -2789,33 +2797,6 @@ handle_set_quota (void *cls, } -/** - * Handle TRY_CONNECT-message. - * - * @param cls closure (always NULL) - * @param client identification of the client - * @param message the actual message - */ -static void -handle_try_connect (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) -{ - const struct TryConnectMessage *tcm; - struct NeighborList *neighbor; - tcm = (const struct TryConnectMessage *) message; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received `%s' request from client %p asking to connect to `%4s'\n", - "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer)); -#endif - neighbor = find_neighbor(&tcm->peer); - if (neighbor == NULL) - setup_new_neighbor (&tcm->peer); - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - static void transmit_address_to_client (void *cls, const char *address) { @@ -2909,9 +2890,6 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = { GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0}, {&handle_set_quota, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)}, - {&handle_try_connect, NULL, - GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT, - sizeof (struct TryConnectMessage)}, {&handle_address_lookup, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP, 0}, diff --git a/src/transport/test_transport_api.c b/src/transport/test_transport_api.c index b7e34f810..a43f971a0 100644 --- a/src/transport/test_transport_api.c +++ b/src/transport/test_transport_api.c @@ -81,7 +81,7 @@ static void end () { /* do work here */ - GNUNET_assert (ok == 8); + GNUNET_assert (ok == 6); GNUNET_SCHEDULER_cancel (sched, die_task); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from transports!\n"); GNUNET_TRANSPORT_disconnect (p1.th); @@ -132,7 +132,7 @@ notify_receive (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %d from peer (%p)!\n", ntohs(message->type), cls); - GNUNET_assert (ok == 7); + GNUNET_assert (ok == 5); OKPP; GNUNET_assert (MTYPE == ntohs (message->type)); @@ -144,23 +144,51 @@ notify_receive (void *cls, } +static size_t +notify_ready (void *cls, size_t size, void *buf) +{ + struct GNUNET_MessageHeader *hdr; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting message to peer (%p) - %u!\n", cls, size); + GNUNET_assert (size >= 256); + GNUNET_assert (ok == 4); + OKPP; + if (buf != NULL) + { + hdr = buf; + hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); + hdr->type = htons (MTYPE); + } + + return sizeof (struct GNUNET_MessageHeader); +} + + static void notify_connect (void *cls, const struct GNUNET_PeerIdentity *peer, struct GNUNET_TIME_Relative latency, uint32_t distance) { + if (cls == &p1) + { + GNUNET_TRANSPORT_notify_transmit_ready (p1.th, + &p2.id, + 256, 0, TIMEOUT, ¬ify_ready, + &p1); + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer `%4s' connected to us (%p)!\n", GNUNET_i2s (peer), cls); - GNUNET_assert ((ok >= 1) && (ok <= 6)); - OKPP; } static void notify_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) { - ok--; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer `%4s' disconnected (%p)!\n", + GNUNET_i2s (peer), cls); } @@ -186,27 +214,6 @@ setup_peer (struct PeerContext *p, const char *cfgname) } -static size_t -notify_ready (void *cls, size_t size, void *buf) -{ - struct GNUNET_MessageHeader *hdr; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting message to peer (%p) - %u!\n", cls, size); - GNUNET_assert (size >= 256); - GNUNET_assert ((ok >= 5) && (ok <= 6)); - OKPP; - if (buf != NULL) - { - hdr = buf; - hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); - hdr->type = htons (MTYPE); - } - - return sizeof (struct GNUNET_MessageHeader); -} - - static void exchange_hello_last (void *cls, const struct GNUNET_MessageHeader *message) @@ -231,11 +238,6 @@ exchange_hello_last (void *cls, /* both HELLOs exchanged, get ready to test transmission! */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Finished exchanging HELLOs, now waiting for transmission!\n"); - - GNUNET_TRANSPORT_notify_transmit_ready (p1.th, - &p2.id, - 256, 0, TIMEOUT, ¬ify_ready, - &p1); } static void diff --git a/src/transport/test_transport_api_tcp_peer2.conf b/src/transport/test_transport_api_tcp_peer2.conf index 7f7e78574..870a4809e 100644 --- a/src/transport/test_transport_api_tcp_peer2.conf +++ b/src/transport/test_transport_api_tcp_peer2.conf @@ -49,7 +49,7 @@ MINIMUM-FRIENDS = 0 [transport] PLUGINS = tcp -DEBUG = NO +#DEBUG = YES PREFIX = ALLOW_SHUTDOWN = YES ACCEPT_FROM6 = ::1; diff --git a/src/transport/transport.h b/src/transport/transport.h index 5e92bfb1e..a336cd9b0 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -30,7 +30,7 @@ #include "gnunet_time_lib.h" #include "gnunet_transport_service.h" -#define DEBUG_TRANSPORT GNUNET_NO +#define DEBUG_TRANSPORT GNUNET_YES /** * For how long do we allow unused bandwidth @@ -57,10 +57,9 @@ struct ConnectInfoMessage struct GNUNET_MessageHeader header; /** - * Current quota for outbound traffic in bytes/ms. - * (should be equal to system default) + * Transport distance metric (i.e. hops for DV) */ - uint32_t quota_out GNUNET_PACKED; + uint32_t distance; /** * Latency estimate. @@ -72,11 +71,6 @@ struct ConnectInfoMessage */ struct GNUNET_PeerIdentity id; - /** - * Transport distance metric (i.e. hops for DV) - */ - uint32_t distance; - }; @@ -132,31 +126,6 @@ struct QuotaSetMessage }; -/** - * Message used to ask the transport service to connect - * to a particular peer. - */ -struct TryConnectMessage -{ - - /** - * Type will be GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT. - */ - struct GNUNET_MessageHeader header; - - /** - * Always zero. - */ - uint32_t reserved GNUNET_PACKED; - - /** - * About which peer are we talking here? - */ - struct GNUNET_PeerIdentity peer; - -}; - - /** * Message used to notify the transport API about a message * received from the network. The actual message follows. @@ -212,6 +181,11 @@ struct SendOkMessage */ uint32_t success GNUNET_PACKED; + /** + * Latency estimate. + */ + struct GNUNET_TIME_RelativeNBO latency; + /** * Which peer can send more now? */ diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c index ae07421d8..84de9cebe 100644 --- a/src/transport/transport_api.c +++ b/src/transport/transport_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009 Christian Grothoff (and other contributing authors) + (C) 2009, 2010 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -25,6 +25,8 @@ */ #include "platform.h" #include "gnunet_client_lib.h" +#include "gnunet_constants.h" +#include "gnunet_container_lib.h" #include "gnunet_arm_service.h" #include "gnunet_hello_lib.h" #include "gnunet_protocols.h" @@ -57,167 +59,221 @@ */ #define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) + /** - * Entry in linked list of all of our current neighbours. + * What stage are we in for transmission processing? */ -struct NeighbourList +enum TransmitStage + { + /** + * No active message. + */ + TS_NEW = 0, + + /** + * Message in local queue, not given to service. + */ + TS_QUEUED = 1, + + /** + * Message given to service, not confirmed (no SEND_OK). + */ + TS_TRANSMITTED = 2, + + /** + * One message was given to service and before it was confirmed, + * another one was already queued (waiting for SEND_OK to pass on + * to service). + */ + TS_TRANSMITTED_QUEUED = 3 + }; + + +/** + * Handle for a transmission-ready request. + */ +struct GNUNET_TRANSPORT_TransmitHandle { /** - * This is a linked list. + * Neighbour for this handle, NULL for control-traffic. */ - struct NeighbourList *next; + struct NeighbourList *neighbour; /** - * Active transmit handle, can be NULL. Used to move - * from ready to wait list on disconnect and to block - * two transmissions to the same peer from being scheduled - * at the same time. + * Function to call when notify_size bytes are available + * for transmission. */ - struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle; + GNUNET_CONNECTION_TransmitReadyNotify notify; /** - * Identity of this neighbour. + * Closure for notify. */ - struct GNUNET_PeerIdentity id; + void *notify_cls; /** - * At what time did we reset last_sent last? + * transmit_ready task Id. The task is used to introduce the + * artificial delay that may be required to maintain the bandwidth + * limits. Later, this will be the ID of the "transmit_timeout" + * task which is used to signal a timeout if the transmission could + * not be done in a timely fashion. */ - struct GNUNET_TIME_Absolute last_quota_update; + GNUNET_SCHEDULER_TaskIdentifier notify_delay_task; /** - * How many bytes have we sent since the "last_quota_update" - * timestamp? + * Timeout for this request. */ - uint64_t last_sent; + struct GNUNET_TIME_Absolute timeout; /** - * Quota for outbound traffic to the neighbour in bytes/ms. + * How many bytes is our notify callback waiting for? */ - uint32_t quota_out; + size_t notify_size; /** - * Set to GNUNET_YES if we are currently allowed to - * transmit a message to the transport service for this - * peer, GNUNET_NO otherwise. + * How important is this message? */ - int transmit_ok; + unsigned int priority; -#if ACK - /** - * Set to GNUNET_YES if we have received an ACK for the - * given peer. Peers that receive our HELLO always respond - * with an ACK to let us know that we are successfully - * communicating. Note that a PING can not be used for this - * since PINGs are only send if a HELLO address requires - * confirmation (and also, PINGs are not passed to the - * transport API itself). - */ - int received_ack; -#endif }; /** - * Linked list of requests from clients for our HELLO - * that were deferred. + * Handle for a control message queue entry. */ -struct HelloWaitList +struct ControlMessage { /** - * This is a linked list. + * This is a doubly-linked list. */ - struct HelloWaitList *next; + struct ControlMessage *next; /** - * Reference back to our transport handle. + * This is a doubly-linked list. */ - struct GNUNET_TRANSPORT_Handle *handle; + struct ControlMessage *prev; /** - * Callback to call once we got our HELLO. + * Overall transport handle. */ - GNUNET_TRANSPORT_HelloUpdateCallback rec; + struct GNUNET_TRANSPORT_Handle *h; /** - * Closure for rec. + * Function to call when notify_size bytes are available + * for transmission. */ - void *rec_cls; + GNUNET_CONNECTION_TransmitReadyNotify notify; + + /** + * Closure for notify. + */ + void *notify_cls; + + /** + * transmit_ready task Id. The task is used to introduce the + * artificial delay that may be required to maintain the bandwidth + * limits. Later, this will be the ID of the "transmit_timeout" + * task which is used to signal a timeout if the transmission could + * not be done in a timely fashion. + */ + GNUNET_SCHEDULER_TaskIdentifier notify_delay_task; + + /** + * How many bytes is our notify callback waiting for? + */ + size_t notify_size; }; /** - * Opaque handle for a transmission-ready request. + * Entry in linked list of all of our current neighbours. */ -struct GNUNET_TRANSPORT_TransmitHandle +struct NeighbourList { /** - * We keep the transmit handles that are waiting for - * a transport-level connection in a doubly linked list. + * This is a linked list. */ - struct GNUNET_TRANSPORT_TransmitHandle *next; + struct NeighbourList *next; /** - * We keep the transmit handles that are waiting for - * a transport-level connection in a doubly linked list. + * Overall transport handle. */ - struct GNUNET_TRANSPORT_TransmitHandle *prev; + struct GNUNET_TRANSPORT_Handle *h; /** - * Handle of the main transport data structure. + * Active transmit handle; available if 'transmit_forbidden' + * is GNUNET_NO. */ - struct GNUNET_TRANSPORT_Handle *handle; + struct GNUNET_TRANSPORT_TransmitHandle transmit_handle; /** - * Neighbour for this handle, can be NULL if the service - * is not yet connected to the target. + * Identity of this neighbour. */ - struct NeighbourList *neighbour; + struct GNUNET_PeerIdentity id; /** - * Which peer is this transmission going to be for? All - * zeros if it is control-traffic to the service. + * At what time did we reset last_sent last? */ - struct GNUNET_PeerIdentity target; + struct GNUNET_TIME_Absolute last_quota_update; /** - * Function to call when notify_size bytes are available - * for transmission. + * How many bytes have we sent since the "last_quota_update" + * timestamp? */ - GNUNET_CONNECTION_TransmitReadyNotify notify; + uint64_t last_sent; /** - * Closure for notify. + * Quota for outbound traffic to the neighbour in bytes/ms. */ - void *notify_cls; + uint32_t quota_out; /** - * transmit_ready task Id. The task is used to introduce the - * artificial delay that may be required to maintain the bandwidth - * limits. Later, this will be the ID of the "transmit_timeout" - * task which is used to signal a timeout if the transmission could - * not be done in a timely fashion. + * Set to GNUNET_NO if we are currently allowed to accept a + * message to the transport service for this peer, GNUNET_YES + * if we have one and are waiting for transmission, GNUNET_SYSERR + * if we are waiting for confirmation AND have already accepted + * yet another message. */ - GNUNET_SCHEDULER_TaskIdentifier notify_delay_task; + enum TransmitStage transmit_stage; /** - * Timeout for this request. + * Have we received a notification that this peer is connected + * to us right now? */ - struct GNUNET_TIME_Absolute timeout; + int is_connected; + +}; + + +/** + * Linked list of requests from clients for our HELLO that were + * deferred. + */ +struct HelloWaitList +{ /** - * How many bytes is our notify callback waiting for? + * This is a linked list. */ - size_t notify_size; + struct HelloWaitList *next; /** - * How important is this message? + * Reference back to our transport handle. */ - unsigned int priority; + struct GNUNET_TRANSPORT_Handle *handle; + + /** + * Callback to call once we got our HELLO. + */ + GNUNET_TRANSPORT_HelloUpdateCallback rec; + + /** + * Closure for rec. + */ + void *rec_cls; }; @@ -249,6 +305,16 @@ struct GNUNET_TRANSPORT_Handle */ GNUNET_TRANSPORT_NotifyDisconnect nd_cb; + /** + * Head of DLL of control messages. + */ + struct ControlMessage *control_head; + + /** + * Tail of DLL of control messages. + */ + struct ControlMessage *control_tail; + /** * The current HELLO message for this peer. Updated * whenever transports change their addresses. @@ -265,26 +331,6 @@ struct GNUNET_TRANSPORT_Handle */ struct GNUNET_CLIENT_TransmitHandle *network_handle; - /** - * Linked list of transmit handles that are waiting for the - * transport to connect to the respective peer. When we - * receive notification that the transport connected to a - * peer, we go over this list and check if someone has already - * requested a transmission to the new peer; if so, we trigger - * the next step. - */ - struct GNUNET_TRANSPORT_TransmitHandle *connect_wait_head; - - /** - * Linked list of transmit handles that are waiting for the - * transport to be ready for transmission to the respective - * peer. When we - * receive notification that the transport disconnected from - * a peer, we go over this list and move the entry back to - * the connect_wait list. - */ - struct GNUNET_TRANSPORT_TransmitHandle *connect_ready_head; - /** * Linked list of pending requests for our HELLO. */ @@ -306,27 +352,34 @@ struct GNUNET_TRANSPORT_Handle struct NeighbourList *neighbours; /** - * ID of the task trying to reconnect to the - * service. + * ID of the task trying to reconnect to the service. */ GNUNET_SCHEDULER_TaskIdentifier reconnect_task; /** - * Delay until we try to reconnect. + * ID of the task trying to trigger transmission for a peer + * while maintaining bandwidth quotas. */ - struct GNUNET_TIME_Relative reconnect_delay; + GNUNET_SCHEDULER_TaskIdentifier quota_task; /** - * Do we currently have a transmission pending? - * (schedule transmission was called but has not - * yet succeeded)? + * Delay until we try to reconnect. */ - int transmission_scheduled; + struct GNUNET_TIME_Relative reconnect_delay; + }; +// FIXME: replace with hash map! +/** + * Get the neighbour list entry for the given peer + * + * @param h our context + * @param peer peer to look up + * @return NULL if no such peer entry exists + */ static struct NeighbourList * -find_neighbour (struct GNUNET_TRANSPORT_Handle *h, +neighbour_find (struct GNUNET_TRANSPORT_Handle *h, const struct GNUNET_PeerIdentity *peer) { struct NeighbourList *pos; @@ -340,51 +393,186 @@ find_neighbour (struct GNUNET_TRANSPORT_Handle *h, /** - * Schedule the task to send one message from the - * connect_ready list to the service. + * Schedule the task to send one message, either from the control + * list or the peer message queues to the service. */ static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h); /** - * Transmit message to client... + * Function called by the scheduler when the timeout for bandwidth + * availablility for the target neighbour is reached. + * + * @param cls the 'struct GNUNET_TRANSPORT_Handle*' + * @param tc scheduler context + */ +static void +quota_transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + + h->quota_task = GNUNET_SCHEDULER_NO_TASK; + schedule_transmission (h); +} + + +/** + * Update the quota values for the given neighbour now. + * + * @param n neighbour to update + */ +static void +update_quota (struct NeighbourList *n) +{ + struct GNUNET_TIME_Relative delta; + uint64_t allowed; + uint64_t remaining; + + delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); + allowed = delta.value * n->quota_out; + if (n->last_sent < allowed) + { + remaining = allowed - n->last_sent; + if (n->quota_out > 0) + remaining /= n->quota_out; + else + remaining = 0; + if (remaining > MAX_BANDWIDTH_CARRY) + remaining = MAX_BANDWIDTH_CARRY; + n->last_sent = 0; + n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update.value -= remaining; + } + else + { + n->last_sent -= allowed; + n->last_quota_update = GNUNET_TIME_absolute_get (); + } +} + + +/** + * Figure out which transmission to a peer can be done right now. + * If none can, schedule a task to call 'schedule_transmission' + * whenever a peer transmission can be done in the future and + * return NULL. Otherwise return the next transmission to be + * performed. + * + * @param h handle to transport + * @return NULL to wait longer before doing any peer transmissions + */ +static struct GNUNET_TRANSPORT_TransmitHandle * +schedule_peer_transmission (struct GNUNET_TRANSPORT_Handle *h) +{ + struct GNUNET_TRANSPORT_TransmitHandle *ret; + struct GNUNET_TRANSPORT_TransmitHandle *th; + struct NeighbourList *n; + struct NeighbourList *next; + struct GNUNET_TIME_Relative retry_time; + struct GNUNET_TIME_Relative duration; + uint64_t available; + + if (h->quota_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, + h->quota_task); + h->quota_task = GNUNET_SCHEDULER_NO_TASK; + } + retry_time = GNUNET_TIME_UNIT_FOREVER_REL; + ret = NULL; + next = h->neighbours; + while (NULL != (n = next)) + { + next = n->next; + if (n->transmit_stage != TS_QUEUED) + continue; /* not eligible */ + th = &n->transmit_handle; + /* check outgoing quota */ + duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); + if (duration.value > MIN_QUOTA_REFRESH_TIME) + { + update_quota (n); + duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); + } + available = duration.value * n->quota_out; + if (available < n->last_sent + th->notify_size) + { + /* calculate how much bandwidth we'd still need to + accumulate and based on that how long we'll have + to wait... */ + available = n->last_sent + th->notify_size - available; + duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + available / n->quota_out); + if (th->timeout.value < + GNUNET_TIME_relative_to_absolute (duration).value) + { + /* signal timeout! */ +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n", + duration.value, GNUNET_i2s (&n->id)); +#endif + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } + n->transmit_stage = TS_NEW; + if (NULL != th->notify) + GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); + continue; + } +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n", + GNUNET_i2s (&n->id), duration.value); +#endif + retry_time = GNUNET_TIME_relative_min (retry_time, + duration); + continue; + } +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bandwidth available for transmission to `%4s'\n", + GNUNET_i2s (&n->id)); +#endif + if ( (ret == NULL) || + (ret->priority < th->priority) ) + ret = th; + } + if (ret == NULL) + h->quota_task = GNUNET_SCHEDULER_add_delayed (h->sched, + retry_time, + "a_transmit_ready, + h); + return ret; +} + + +/** + * Transmit message(s) to service. + * + * @param cls handle to transport + * @param size number of bytes available in buf + * @param buf where to copy the message + * @return number of bytes copied to buf */ static size_t transport_notify_ready (void *cls, size_t size, void *buf) { struct GNUNET_TRANSPORT_Handle *h = cls; + struct ControlMessage *cm; struct GNUNET_TRANSPORT_TransmitHandle *th; struct NeighbourList *n; + struct OutboundMessage obm; size_t ret; + size_t mret; char *cbuf; h->network_handle = NULL; - h->transmission_scheduled = GNUNET_NO; if (buf == NULL) { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not transmit to transport service, cancelling pending requests\n"); -#endif - th = h->connect_ready_head; - if (th->next != NULL) - th->next->prev = NULL; - h->connect_ready_head = th->next; - if (NULL != (n = th->neighbour)) - { - GNUNET_assert (n->transmit_handle == th); - n->transmit_handle = NULL; - } - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - if (h->connect_ready_head != NULL) - schedule_transmission (h); /* FIXME: is this ok? */ + schedule_transmission (h); return 0; } #if DEBUG_TRANSPORT @@ -393,35 +581,64 @@ transport_notify_ready (void *cls, size_t size, void *buf) #endif cbuf = buf; ret = 0; - h->network_handle = NULL; - h->transmission_scheduled = GNUNET_NO; - while ((h->connect_ready_head != NULL) && - (h->connect_ready_head->notify_size <= size)) + while ( (NULL != (cm = h->control_head)) && + (cm->notify_size <= size) ) + { + if (cm->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, cm->notify_delay_task); + cm->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_CONTAINER_DLL_remove (h->control_head, + h->control_tail, + cm); + ret += cm->notify (cm->notify_cls, size, &cbuf[ret]); + GNUNET_free (cm); + size -= ret; + } + while ( (NULL != (th = schedule_peer_transmission (h))) && + (th->notify_size <= size) ) { - th = h->connect_ready_head; if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; } - GNUNET_assert (th->notify_size <= size); - if (th->next != NULL) - th->next->prev = NULL; - h->connect_ready_head = th->next; - if (NULL != (n = th->neighbour)) - { - GNUNET_assert (n->transmit_handle == th); - n->transmit_handle = NULL; - } - if (NULL != th->notify) - ret += th->notify (th->notify_cls, size, &cbuf[ret]); - GNUNET_free (th); - if (n != NULL) - n->last_sent += ret; - size -= ret; + n = th->neighbour; + switch (n->transmit_stage) + { + case TS_NEW: + GNUNET_break (0); + break; + case TS_QUEUED: + n->transmit_stage = TS_TRANSMITTED; + break; + case TS_TRANSMITTED: + GNUNET_break (0); + break; + case TS_TRANSMITTED_QUEUED: + GNUNET_break (0); + break; + default: + GNUNET_break (0); + } + GNUNET_assert (size >= sizeof (struct OutboundMessage)); + mret = th->notify (th->notify_cls, + size - sizeof (struct OutboundMessage), + &cbuf[ret + sizeof (struct OutboundMessage)]); + GNUNET_assert (mret <= size - sizeof (struct OutboundMessage)); + if (mret != 0) + { + obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); + obm.header.size = htons (mret + sizeof (struct OutboundMessage)); + obm.priority = htonl (th->priority); + obm.peer = n->id; + memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage)); + ret += (mret + sizeof (struct OutboundMessage)); + size -= (mret + sizeof (struct OutboundMessage)); + } } - if (h->connect_ready_head != NULL) - schedule_transmission (h); + schedule_transmission (h); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to transport service\n", ret); @@ -431,126 +648,51 @@ transport_notify_ready (void *cls, size_t size, void *buf) /** - * Schedule the task to send one message from the - * connect_ready list to the service. + * Schedule the task to send one message, either from the control + * list or the peer message queues to the service. */ static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h) -{ +{ + size_t size; + struct GNUNET_TIME_Relative timeout; struct GNUNET_TRANSPORT_TransmitHandle *th; - GNUNET_assert (NULL == h->network_handle); + if (NULL != h->network_handle) + return; if (h->client == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Could not yet schedule transmission: we are not yet connected to the transport service!\n"); return; /* not yet connected */ } - th = h->connect_ready_head; - if (th == NULL) - return; /* no request pending */ - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - /* remove existing time out task, will be integrated - with transmit_ready notification! */ - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - h->transmission_scheduled = GNUNET_YES; - h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client, - th->notify_size, - GNUNET_TIME_absolute_get_remaining - (th->timeout), - GNUNET_NO, - &transport_notify_ready, - h); - GNUNET_assert (NULL != h->network_handle); -} - - -/** - * Insert the given transmit handle in the given sorted - * doubly linked list based on timeout. - * - * @param head pointer to the head of the linked list - * @param th element to insert into the list - */ -static void -insert_transmit_handle (struct GNUNET_TRANSPORT_TransmitHandle **head, - struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - struct GNUNET_TRANSPORT_TransmitHandle *pos; - struct GNUNET_TRANSPORT_TransmitHandle *prev; - - pos = *head; - prev = NULL; - while ((pos != NULL) && (pos->timeout.value < th->timeout.value)) - { - prev = pos; - pos = pos->next; - } - if (prev == NULL) - { - th->next = *head; - if (th->next != NULL) - th->next->prev = th; - *head = th; - } - else - { - th->next = pos; - th->prev = prev; - prev->next = th; - if (pos != NULL) - pos->prev = th; - } -} - - -/** - * Cancel a pending notify delay task (if pending) and also remove the - * given transmit handle from whatever list is on. - * - * @param th handle for the transmission request to manipulate - */ -static void -remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - struct GNUNET_TRANSPORT_Handle *h; - - h = th->handle; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (th->prev == NULL) + if (NULL != h->control_head) { - if (th == h->connect_wait_head) - h->connect_wait_head = th->next; - else - h->connect_ready_head = th->next; + size = h->control_head->notify_size; + timeout = GNUNET_TIME_UNIT_FOREVER_REL; } else { - th->prev->next = th->next; + th = schedule_peer_transmission (h); + if (th == NULL) + { + /* no transmission ready right now */ + return; + } + size = th->notify_size; + timeout = GNUNET_TIME_absolute_get_remaining (th->timeout); } - if (th->next != NULL) - th->next->prev = th->prev; + h->network_handle = + GNUNET_CLIENT_notify_transmit_ready (h->client, + size, + timeout, + GNUNET_NO, + &transport_notify_ready, + h); + GNUNET_assert (NULL != h->network_handle); } -/** - * Schedule a request to connect to the given - * neighbour (and if successful, add the specified - * handle to the wait list). - * - * @param th handle for a request to transmit once we - * have connected - */ -static void try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th); - - /** * Called when our transmit request timed out before any transport * reported success connecting to the desired peer or before the @@ -558,28 +700,21 @@ static void try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th); * TransmitHandle. */ static void -peer_transmit_timeout (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +control_transmit_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct GNUNET_TRANSPORT_TransmitHandle *th = cls; + struct ControlMessage *th = cls; th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - if (th->neighbour != NULL) - th->neighbour->transmit_handle = NULL; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Request for transmission to peer `%s' timed out.\n", - GNUNET_i2s (&th->target)); -#endif - remove_from_any_list (th); if (NULL != th->notify) th->notify (th->notify_cls, 0, NULL); + GNUNET_CONTAINER_DLL_remove (th->h->control_head, + th->h->control_tail, + th); GNUNET_free (th); } - - /** * Queue control request for transmission to the transport * service. @@ -600,68 +735,31 @@ schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h, GNUNET_CONNECTION_TransmitReadyNotify notify, void *notify_cls) { - struct GNUNET_TRANSPORT_TransmitHandle *th; + struct ControlMessage *th; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Control transmit of %u bytes within %llums requested\n", size, (unsigned long long) timeout.value); #endif - th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); - th->handle = h; + th = GNUNET_malloc (sizeof (struct ControlMessage)); + th->h = h; th->notify = notify; th->notify_cls = notify_cls; - th->timeout = GNUNET_TIME_relative_to_absolute (timeout); th->notify_size = size; th->notify_delay_task = GNUNET_SCHEDULER_add_delayed (h->sched, - timeout, &peer_transmit_timeout, th); - if (at_head) - { - th->next = h->connect_ready_head; - h->connect_ready_head = th; - if (th->next != NULL) - th->next->prev = th; - } + timeout, &control_transmit_timeout, th); + if (at_head) + GNUNET_CONTAINER_DLL_insert (h->control_head, + h->control_tail, + th); else - { - insert_transmit_handle (&h->connect_ready_head, th); - } - if (GNUNET_NO == h->transmission_scheduled) - schedule_transmission (h); -} - - -/** - * Update the quota values for the given neighbour now. - */ -static void -update_quota (struct NeighbourList *n) -{ - struct GNUNET_TIME_Relative delta; - uint64_t allowed; - uint64_t remaining; - - delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - allowed = delta.value * n->quota_out; - if (n->last_sent < allowed) - { - remaining = allowed - n->last_sent; - if (n->quota_out > 0) - remaining /= n->quota_out; - else - remaining = 0; - if (remaining > MAX_BANDWIDTH_CARRY) - remaining = MAX_BANDWIDTH_CARRY; - n->last_sent = 0; - n->last_quota_update = GNUNET_TIME_absolute_get (); - n->last_quota_update.value -= remaining; - } - else - { - n->last_sent -= allowed; - n->last_quota_update = GNUNET_TIME_absolute_get (); - } + GNUNET_CONTAINER_DLL_insert_after (h->control_head, + h->control_tail, + h->control_tail, + th); + schedule_transmission (h); } @@ -681,6 +779,14 @@ struct SetQuotaContext }; +/** + * Send SET_QUOTA message to the service. + * + * @param cls the 'struct SetQuotaContext' + * @param size number of bytes available in buf + * @param buf where to copy the message + * @return number of bytes copied to buf + */ static size_t send_set_quota (void *cls, size_t size, void *buf) { @@ -699,7 +805,8 @@ send_set_quota (void *cls, size_t size, void *buf) #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' request with respect to `%4s'.\n", - "SET_QUOTA", GNUNET_i2s (&sqc->target)); + "SET_QUOTA", + GNUNET_i2s (&sqc->target)); #endif GNUNET_assert (size >= sizeof (struct QuotaSetMessage)); msg = buf; @@ -742,7 +849,7 @@ GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle, struct NeighbourList *n; struct SetQuotaContext *sqc; - n = find_neighbour (handle, target); + n = neighbour_find (handle, target); if (n != NULL) { update_quota (n); @@ -830,6 +937,14 @@ GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle, } +/** + * Send HELLO message to the service. + * + * @param cls the HELLO message to send + * @param size number of bytes available in buf + * @param buf where to copy the message + * @return number of bytes copied to buf + */ static size_t send_hello (void *cls, size_t size, void *buf) { @@ -873,14 +988,6 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, struct GNUNET_MessageHeader *hc; uint16_t size; - if (handle->client == NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not connected to transport service, dropping offered HELLO\n"); -#endif - return; - } GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO); size = ntohs (hello->size); GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader)); @@ -893,11 +1000,13 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, /** - * Function we use for handling incoming messages. + * Transmit START message to service. + * + * @param cls unused + * @param size number of bytes available in buf + * @param buf where to copy the message + * @return number of bytes copied to buf */ -static void demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg); - - static size_t send_start (void *cls, size_t size, void *buf) { @@ -905,9 +1014,10 @@ send_start (void *cls, size_t size, void *buf) if (buf == NULL) { + /* Can only be shutdown, just give up */ #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Timeout while trying to transmit `%s' request.\n", + "Shutdown while trying to transmit `%s' request.\n", "START"); #endif return 0; @@ -924,186 +1034,101 @@ send_start (void *cls, size_t size, void *buf) /** - * We're ready to transmit the request that the transport service - * should connect to a new peer. In addition to sending the - * request, schedule the next phase for the transmission processing - * that caused the connect request in the first place. - */ -static size_t -request_connect (void *cls, size_t size, void *buf) -{ - struct GNUNET_TRANSPORT_TransmitHandle *th = cls; - struct TryConnectMessage *tcm; - struct GNUNET_TRANSPORT_Handle *h; - - GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK); - h = th->handle; - - if (buf == NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit `%s' request for `%4s' to service.\n", - "TRY_CONNECT", GNUNET_i2s (&th->target)); -#endif - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - return 0; - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting `%s' message for `%4s' (need connection in %llu ms).\n", - "TRY_CONNECT", GNUNET_i2s (&th->target), - GNUNET_TIME_absolute_get_remaining (th->timeout).value); -#endif - GNUNET_assert (size >= sizeof (struct TryConnectMessage)); - tcm = buf; - tcm->header.size = htons (sizeof (struct TryConnectMessage)); - tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT); - tcm->reserved = htonl (0); - memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity)); - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (h->sched, - GNUNET_TIME_absolute_get_remaining - (th->timeout), - &peer_transmit_timeout, th); - insert_transmit_handle (&h->connect_wait_head, th); - return sizeof (struct TryConnectMessage); -} - - -/** - * Schedule a request to connect to the given - * neighbour (and if successful, add the specified - * handle to the wait list). - * - * @param th handle for a request to transmit once we - * have connected - */ -static void -try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK); - schedule_control_transmit (th->handle, - sizeof (struct TryConnectMessage), - GNUNET_NO, - GNUNET_TIME_absolute_get_remaining (th->timeout), - &request_connect, th); -} - - -/** - * Task for delayed attempts to reconnect to a peer. - * - * @param cls must be a transmit handle that determines the peer - * to which we will try to connect - * @param tc scheduler information about why we were triggered (not used) - */ -static void -try_connect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_TRANSPORT_TransmitHandle *th = cls; - - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - try_connect (th); -} - - -/** - * Remove neighbour from our list. Will automatically - * trigger a re-connect attempt if we have messages pending - * for this peer. + * Free neighbour. * * @param h our state - * @param peer the peer to remove + * @param n the entry to free */ static void -remove_neighbour (struct GNUNET_TRANSPORT_Handle *h, - const struct GNUNET_PeerIdentity *peer) +neighbour_free (struct NeighbourList *n) { + struct GNUNET_TRANSPORT_Handle *h; struct NeighbourList *prev; struct NeighbourList *pos; - struct GNUNET_TRANSPORT_TransmitHandle *th; + h = n->h; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Removing neighbour `%s' from list of connected peers.\n", - GNUNET_i2s (peer)); + GNUNET_i2s (&n->id)); #endif + GNUNET_break (n->is_connected == GNUNET_NO); + GNUNET_break (n->transmit_stage == TS_NEW); + prev = NULL; pos = h->neighbours; - while ((pos != NULL) && - (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity)))) + while (pos != n) { prev = pos; pos = pos->next; } - if (pos == NULL) - { - GNUNET_break (0); - return; - } if (prev == NULL) - h->neighbours = pos->next; + h->neighbours = n->next; else - prev->next = pos->next; - if (NULL != (th = pos->transmit_handle)) - { - pos->transmit_handle = NULL; - th->neighbour = NULL; - remove_from_any_list (th); - if (GNUNET_TIME_absolute_get_remaining (th->timeout).value <= - CONNECT_RETRY_TIMEOUT.value) - { - /* signal error */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Connection with `%4s' failed and timeout was in the past, giving up on message delivery.\n"), - GNUNET_i2s (peer)); - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task); - peer_transmit_timeout (th, NULL); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Connection with `%4s' failed, will keep trying for %llu ms to deliver message\n"), - GNUNET_i2s (peer), - GNUNET_TIME_absolute_get_remaining (th->timeout).value); - /* try again in a bit */ - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task); - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (h->sched, - CONNECT_RETRY_TIMEOUT, - &try_connect_task, th); - } - } + prev->next = n->next; + GNUNET_free (n); +} + + +/** + * Mark neighbour as disconnected. + * + * @param n the entry to mark as disconnected + */ +static void +neighbour_disconnect (struct NeighbourList *n) +{ + struct GNUNET_TRANSPORT_Handle *h = n->h; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Removing neighbour `%s' from list of connected peers.\n", + GNUNET_i2s (&n->id)); +#endif + GNUNET_break (n->is_connected == GNUNET_YES); + n->is_connected = GNUNET_NO; if (h->nc_cb != NULL) - h->nd_cb (h->cls, peer); - GNUNET_free (pos); + h->nd_cb (h->cls, &n->id); + if (n->transmit_stage == TS_NEW) + neighbour_free (n); } +/** + * Function we use for handling incoming messages. + * + * @param cls closure (struct GNUNET_TRANSPORT_Handle *) + * @param msg message received, NULL on timeout or fatal error + */ +static void demultiplexer (void *cls, + const struct GNUNET_MessageHeader *msg); + + /** * Try again to connect to transport service. + * + * @param cls the handle to the transport service + * @param tc scheduler context */ static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +reconnect (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_TRANSPORT_Handle *h = cls; - struct GNUNET_TRANSPORT_TransmitHandle *pos; + struct ControlMessage *pos; struct NeighbourList *n; - /* Forget about all neighbours that we used to be connected - to */ - while (NULL != (n = h->neighbours)) - remove_neighbour (h, &n->id); + if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) + { + /* shutdown, just give up */ + return; + } + /* Forget about all neighbours that we used to be connected to */ + n = h->neighbours; + while (NULL != n) + { + neighbour_disconnect (n); + n = n->next; + } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); #endif @@ -1111,20 +1136,16 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg); GNUNET_assert (h->client != NULL); - /* make sure we don't send "START" twice, - remove existing entry from queue (if present) */ - pos = h->connect_ready_head; + /* make sure we don't send "START" twice, remove existing entry from + queue (if present) */ + pos = h->control_head; while (pos != NULL) { if (pos->notify == &send_start) { - if (pos->prev == NULL) - h->connect_ready_head = pos->next; - else - pos->prev->next = pos->next; - if (pos->next != NULL) - pos->next->prev = pos->prev; - GNUNET_assert (pos->neighbour == NULL); + GNUNET_CONTAINER_DLL_remove (h->control_head, + h->control_tail, + pos); if (GNUNET_SCHEDULER_NO_TASK != pos->notify_delay_task) { GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task); @@ -1147,6 +1168,8 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /** * Function that will schedule the job that will try * to connect us again to the client. + * + * @param h transport service to reconnect */ static void schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) @@ -1161,215 +1184,47 @@ schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched, h->reconnect_delay, &reconnect, h); - h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS; -} - - -/** - * We are connected to the respective peer, check the - * bandwidth limits and schedule the transmission. - */ -static void schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th); - - -/** - * Function called by the scheduler when the timeout - * for bandwidth availablility for the target - * neighbour is reached. - */ -static void -transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_TRANSPORT_TransmitHandle *th = cls; - - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - schedule_request (th); -} - - -/** - * Remove the given transmit handle from the wait list. Does NOT free - * it. - */ -static void -remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - if (th->prev == NULL) - th->handle->connect_wait_head = th->next; - else - th->prev->next = th->next; - if (th->next != NULL) - th->next->prev = th->prev; -} - - -/** - * We are connected to the respective peer, check the - * bandwidth limits and schedule the transmission. - */ -static void -schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - struct GNUNET_TRANSPORT_Handle *h; - struct GNUNET_TIME_Relative duration; - struct NeighbourList *n; - uint64_t available; - - h = th->handle; - n = th->neighbour; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - /* check outgoing quota */ - duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - if (duration.value > MIN_QUOTA_REFRESH_TIME) + if (h->reconnect_delay.value == 0) { - update_quota (n); - duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - } - available = duration.value * n->quota_out; - if (available < n->last_sent + th->notify_size) - { - /* calculate how much bandwidth we'd still need to - accumulate and based on that how long we'll have - to wait... */ - available = n->last_sent + th->notify_size - available; - duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, - available / n->quota_out); - if (th->timeout.value < - GNUNET_TIME_relative_to_absolute (duration).value) - { - /* signal timeout! */ -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n", - duration.value, GNUNET_i2s (&th->target)); -#endif - remove_from_wait_list (th); - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - return; - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n", - GNUNET_i2s (&th->target), duration.value); -#endif - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (h->sched, - duration, &transmit_ready, th); - return; + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Bandwidth available for transmission to `%4s'\n", - GNUNET_i2s (&n->id)); -#endif - if (GNUNET_NO == n->transmit_ok) + else { - /* we may be ready, but transport service is not; - wait for SendOkMessage or timeout */ -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Need to wait for transport service `%s' message\n", - "SEND_OK"); -#endif - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (h->sched, - GNUNET_TIME_absolute_get_remaining - (th->timeout), &peer_transmit_timeout, - th); - return; + h->reconnect_delay = GNUNET_TIME_relative_multiply (h->reconnect_delay, 2); + h->reconnect_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, + h->reconnect_delay); } - n->transmit_ok = GNUNET_NO; - remove_from_wait_list (th); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Moving message for `%4s' to ready list\n", - GNUNET_i2s (&n->id)); -#endif - insert_transmit_handle (&h->connect_ready_head, th); - if (GNUNET_NO == h->transmission_scheduled) - schedule_transmission (h); } /** * Add neighbour to our list */ -static void -add_neighbour (struct GNUNET_TRANSPORT_Handle *h, - uint32_t quota_out, - struct GNUNET_TIME_Relative latency, - uint16_t distance, +static struct NeighbourList * +neighbour_add (struct GNUNET_TRANSPORT_Handle *h, const struct GNUNET_PeerIdentity *pid) { struct NeighbourList *n; - struct GNUNET_TRANSPORT_TransmitHandle *prev; - struct GNUNET_TRANSPORT_TransmitHandle *pos; - struct GNUNET_TRANSPORT_TransmitHandle *next; /* check for duplicates */ - if (NULL != find_neighbour (h, pid)) + if (NULL != (n = neighbour_find (h, pid))) { GNUNET_break (0); - return; + return n; } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating entry for new neighbour `%4s'.\n", GNUNET_i2s (pid)); + "Creating entry for neighbour `%4s'.\n", + GNUNET_i2s (pid)); #endif n = GNUNET_malloc (sizeof (struct NeighbourList)); n->id = *pid; n->last_quota_update = GNUNET_TIME_absolute_get (); - n->quota_out = quota_out; n->next = h->neighbours; - n->transmit_ok = GNUNET_YES; - h->neighbours = n; - if (h->nc_cb != NULL) - h->nc_cb (h->cls, &n->id, latency, distance); - prev = NULL; - pos = h->connect_wait_head; - while (pos != NULL) - { - next = pos->next; - if (0 == memcmp (pid, - &pos->target, sizeof (struct GNUNET_PeerIdentity))) - { - pos->neighbour = n; - GNUNET_assert (NULL == n->transmit_handle); - n->transmit_handle = pos; - if (prev == NULL) - h->connect_wait_head = next; - else - prev->next = next; -#if ACK - if (GNUNET_YES == n->received_ack) - { -#endif -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found pending request for `%4s' will trigger it now.\n", - GNUNET_i2s (&pos->target)); -#endif - if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task); - pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - schedule_request (pos); -#if ACK - } -#endif - - break; - } - prev = pos; - pos = next; - } + n->quota_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT; + n->h = h; + h->neighbours = n; + return n; } @@ -1377,7 +1232,6 @@ add_neighbour (struct GNUNET_TRANSPORT_Handle *h, * Connect to the transport service. Note that the connection may * complete (or fail) asynchronously. * - * @param sched scheduler to use * @param cfg configuration to use * @param cls closure for the callbacks @@ -1423,44 +1277,29 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n"); #endif - while (NULL != (th = handle->connect_ready_head)) - { - handle->connect_ready_head = th->next; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - } - while (NULL != (th = handle->connect_wait_head)) - { - handle->connect_wait_head = th->next; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - } while (NULL != (n = handle->neighbours)) { handle->neighbours = n->next; - if (NULL != (th = n->transmit_handle)) - { - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - } + switch (n->transmit_stage) + { + case TS_NEW: + case TS_TRANSMITTED: + /* nothing to do */ + break; + case TS_QUEUED: + case TS_TRANSMITTED_QUEUED: + th = &n->transmit_handle; + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (handle->sched, + th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); + break; + default: + GNUNET_break (0); + } GNUNET_free (n); } while (NULL != (hwl = handle->hwl_head)) @@ -1479,6 +1318,11 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task); handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK; } + if (handle->quota_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (handle->sched, handle->quota_task); + handle->quota_task = GNUNET_SCHEDULER_NO_TASK; + } GNUNET_free_non_null (handle->my_hello); handle->my_hello = NULL; GNUNET_ARM_stop_services (handle->cfg, handle->sched, "transport", @@ -1502,10 +1346,9 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) /** - * Type of a function to call when we receive a message - * from the service. + * Function we use for handling incoming messages. * - * @param cls closure + * @param cls closure (struct GNUNET_TRANSPORT_Handle *) * @param msg message received, NULL on timeout or fatal error */ static void @@ -1521,59 +1364,29 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) struct HelloWaitList *next_hwl; struct NeighbourList *n; struct GNUNET_PeerIdentity me; - struct GNUNET_TRANSPORT_TransmitHandle *th; - - struct GNUNET_TRANSPORT_TransmitHandle *prev; - struct GNUNET_TRANSPORT_TransmitHandle *pos; - struct GNUNET_TRANSPORT_TransmitHandle *next; uint16_t size; - if ((msg == NULL) || (h->client == NULL)) + if (h->client == NULL) + { + /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect', + finish clean up work! */ + GNUNET_free (h); + return; + } + if (msg == NULL) { - if (h->client != NULL) - { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Error receiving from transport service, disconnecting temporarily.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Error receiving from transport service, disconnecting temporarily.\n"); #endif - if (h->network_handle != NULL) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle); - h->network_handle = NULL; - h->transmission_scheduled = GNUNET_NO; - th = h->connect_ready_head; - /* add timeout again, we canceled the transmit_ready task! */ - - /*GNUNET_assert (th->notify_delay_task == - GNUNET_SCHEDULER_NO_TASK);*/ - - /* START - somehow we are getting here when th->notify_delay_task is already - * set. Not sure why, so just checking and canceling instead of asserting and - * dying. Probably not a *fix*. */ - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - /* END */ - GNUNET_assert (th->notify_delay_task == - GNUNET_SCHEDULER_NO_TASK); - th->notify_delay_task = - GNUNET_SCHEDULER_add_delayed (h->sched, - GNUNET_TIME_absolute_get_remaining - (th->timeout), - &peer_transmit_timeout, th); - } - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; - schedule_reconnect (h); - } - else - { - /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect', - finish clean up work! */ - GNUNET_free (h); - } + if (h->network_handle != NULL) + { + GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle); + h->network_handle = NULL; + } + GNUNET_CLIENT_disconnect (h->client); + h->client = NULL; + schedule_reconnect (h); return; } GNUNET_CLIENT_receive (h->client, @@ -1624,81 +1437,16 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) "Receiving `%s' message for `%4s'.\n", "CONNECT", GNUNET_i2s (&cim->id)); #endif - if (NULL == (n = find_neighbour(h, &cim->id))) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Don't know neighbor, adding!\n"); -#endif - add_neighbour (h, - ntohl (cim->quota_out), - GNUNET_TIME_relative_ntoh (cim->latency), ntohs(cim->distance), &cim->id); - } - else - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Do know neighbor, scheduling transmission!\n"); -#endif -#if ACK - n->received_ack = GNUNET_YES; -#endif - if (NULL != n->transmit_handle) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer connected, scheduling delayed message for delivery now.\n"); -#endif - schedule_request (n->transmit_handle); - } - else - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmit handle is null... Checking for pending stuff(?)\n"); -#endif - prev = NULL; - pos = h->connect_wait_head; - while (pos != NULL) - { - next = pos->next; - if (0 == memcmp (&cim->id, - &pos->target, sizeof (struct GNUNET_PeerIdentity))) - { - pos->neighbour = n; - GNUNET_assert (NULL == n->transmit_handle); - n->transmit_handle = pos; - if (prev == NULL) - h->connect_wait_head = next; - else - prev->next = next; -#if ACK - if (GNUNET_YES == n->received_ack) - { -#endif - #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found pending request for `%4s' will trigger it now.\n", - GNUNET_i2s (&pos->target)); - #endif - if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task); - pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - schedule_request (pos); -#if ACK - } -#endif - - break; - } - prev = pos; - pos = next; - } - } - } - + n = neighbour_find (h, &cim->id); + if (n == NULL) + n = neighbour_add (h, + &cim->id); + GNUNET_break (n->is_connected == GNUNET_NO); + n->is_connected = GNUNET_YES; + if (h->nc_cb != NULL) + h->nc_cb (h->cls, &n->id, + GNUNET_TIME_relative_ntoh (cim->latency), + ntohs (cim->distance)); break; case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT: if (size != sizeof (struct DisconnectInfoMessage)) @@ -1710,9 +1458,13 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving `%s' message for `%4s'.\n", - "DISCONNECT", GNUNET_i2s (&dim->peer)); + "DISCONNECT", + GNUNET_i2s (&dim->peer)); #endif - remove_neighbour (h, &dim->peer); + n = neighbour_find (h, &cim->id); + GNUNET_break (n != NULL); + if (n != NULL) + neighbour_disconnect (n); break; case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK: if (size != sizeof (struct SendOkMessage)) @@ -1726,21 +1478,26 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) "Receiving `%s' message, transmission %s.\n", "SEND_OK", ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); #endif - n = find_neighbour (h, &okm->peer); + n = neighbour_find (h, &okm->peer); GNUNET_assert (n != NULL); - n->transmit_ok = GNUNET_YES; - if (n->transmit_handle != NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Processing pending message for `%4s'\n", - GNUNET_i2s (&n->id)); -#endif - GNUNET_SCHEDULER_cancel (h->sched, - n->transmit_handle->notify_delay_task); - n->transmit_handle->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - schedule_request (n->transmit_handle); - } + switch (n->transmit_stage) + { + case TS_NEW: + GNUNET_break (0); + break; + case TS_QUEUED: + GNUNET_break (0); + break; + case TS_TRANSMITTED: + n->transmit_stage = TS_NEW; + break; + case TS_TRANSMITTED_QUEUED: + n->transmit_stage = TS_QUEUED; + schedule_transmission (h); + break; + default: + GNUNET_break (0); + } break; case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV: #if DEBUG_TRANSPORT @@ -1761,42 +1518,20 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) GNUNET_break (0); break; } - switch (ntohs (imm->type)) - { - case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Receiving `%s' message from `%4s'.\n", - "ACK", GNUNET_i2s (&im->peer)); -#endif - break; - default: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u from `%4s'.\n", - ntohs (imm->type), GNUNET_i2s (&im->peer)); -#endif - - n = find_neighbour (h, &im->peer); - if (n == NULL) - { - GNUNET_break (0); - break; - } - - if (NULL != n->transmit_handle) - { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer connected, scheduling delayed message for delivery now.\n"); -#endif - schedule_request (n->transmit_handle); - } - if (h->rec != NULL) - h->rec (h->cls, &im->peer, imm, - GNUNET_TIME_relative_ntoh (im->latency), ntohs(im->distance)); - break; - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u from `%4s'.\n", + ntohs (imm->type), GNUNET_i2s (&im->peer)); +#endif + n = neighbour_find (h, &im->peer); + if (n == NULL) + { + GNUNET_break (0); + break; + } + if (h->rec != NULL) + h->rec (h->cls, &im->peer, imm, + GNUNET_TIME_relative_ntoh (im->latency), ntohs(im->distance)); break; default: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -1809,73 +1544,48 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) } -struct ClientTransmitWrapper -{ - GNUNET_CONNECTION_TransmitReadyNotify notify; - void *notify_cls; - struct GNUNET_TRANSPORT_TransmitHandle *th; -}; - - /** - * Transmit message of a client destined for another - * peer to the service. + * Called when our transmit request timed out before any transport + * reported success connecting to the desired peer or before the + * transport was ready to receive. Signal error and free + * TransmitHandle. + * + * @param cls the 'struct GNUNET_TRANSPORT_TransmitHandle*' that is timing out + * @param tc scheduler context */ -static size_t -client_notify_wrapper (void *cls, size_t size, void *buf) +static void +peer_transmit_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct ClientTransmitWrapper *ctw = cls; - struct OutboundMessage *obm; - struct GNUNET_MessageHeader *hdr; - size_t ret; + struct GNUNET_TRANSPORT_TransmitHandle *th = cls; + struct NeighbourList *n; - if (size == 0) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission request could not be satisfied.\n"); -#endif - if (NULL != ctw->notify) - GNUNET_assert (0 == ctw->notify (ctw->notify_cls, 0, NULL)); - GNUNET_free (ctw); - return 0; - } - GNUNET_assert (size >= sizeof (struct OutboundMessage)); - obm = buf; - if (ctw->notify != NULL) - ret = ctw->notify (ctw->notify_cls, - size - sizeof (struct OutboundMessage), - (void *) &obm[1]); - else - ret = 0; - if (ret == 0) + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + n = th->neighbour; + switch (n->transmit_stage) { - /* Need to reset flag, no SEND means no SEND_OK! */ - ctw->th->neighbour->transmit_ok = GNUNET_YES; - GNUNET_free (ctw); - return 0; + case TS_NEW: + GNUNET_break (0); + break; + case TS_QUEUED: + n->transmit_stage = TS_NEW; + if (n->is_connected == GNUNET_NO) + neighbour_free (n); + break; + case TS_TRANSMITTED: + GNUNET_break (0); + break; + case TS_TRANSMITTED_QUEUED: + n->transmit_stage = TS_TRANSMITTED; + break; + default: + GNUNET_break (0); } - GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader)); - hdr = (struct GNUNET_MessageHeader *) &obm[1]; - GNUNET_assert (ntohs (hdr->size) == ret); - GNUNET_assert (ret + sizeof (struct OutboundMessage) < - GNUNET_SERVER_MAX_MESSAGE_SIZE); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting `%s' message with data for `%4s'\n", - "SEND", GNUNET_i2s (&ctw->th->target)); -#endif - ret += sizeof (struct OutboundMessage); - obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); - obm->header.size = htons (ret); - obm->priority = htonl (ctw->th->priority); - obm->peer = ctw->th->target; - GNUNET_free (ctw); - return ret; + if (NULL != th->notify) + th->notify (th->notify_cls, 0, NULL); } - /** * Check if we could queue a message of the given size for * transmission. The transport service will take both its @@ -1905,10 +1615,8 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle GNUNET_CONNECTION_TransmitReadyNotify notify, void *notify_cls) { - struct GNUNET_TRANSPORT_TransmitHandle *pos; struct GNUNET_TRANSPORT_TransmitHandle *th; struct NeighbourList *n; - struct ClientTransmitWrapper *ctw; if (size + sizeof (struct OutboundMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) @@ -1926,66 +1634,39 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle "Asking transport service for transmission of %u bytes to peer `%4s'.\n", size, GNUNET_i2s (target)); #endif - n = find_neighbour (handle, target); - if ((n != NULL) && (n->transmit_handle != NULL)) - return NULL; /* already have a request pending for this peer! */ - ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper)); - th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); - ctw->notify = notify; - ctw->notify_cls = notify_cls; - ctw->th = th; - th->handle = handle; + n = neighbour_find (handle, target); + if (n == NULL) + n = neighbour_add (handle, target); + if (n == NULL) + return NULL; + switch (n->transmit_stage) + { + case TS_NEW: + n->transmit_stage = TS_QUEUED; + break; + case TS_QUEUED: + break; + case TS_TRANSMITTED: + n->transmit_stage = TS_TRANSMITTED_QUEUED; + break; + case TS_TRANSMITTED_QUEUED: + return NULL; + break; + default: + GNUNET_break (0); + return NULL; + } + th = &n->transmit_handle; th->neighbour = n; - th->target = *target; - th->notify = &client_notify_wrapper; - th->notify_cls = ctw; + th->notify = notify; + th->notify_cls = notify_cls; th->timeout = GNUNET_TIME_relative_to_absolute (timeout); th->notify_size = size + sizeof (struct OutboundMessage); th->priority = priority; - if (NULL == n) - { - pos = handle->connect_wait_head; - while (pos != NULL) - { - GNUNET_assert (0 != memcmp (target, - &pos->target, - sizeof (struct GNUNET_PeerIdentity))); - pos = pos->next; - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Will now try to connect to `%4s'.\n", GNUNET_i2s (target)); -#endif - try_connect (th); - return th; - } - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission request queued for transmission to transport service.\n"); -#endif - GNUNET_assert (NULL == n->transmit_handle); - n->transmit_handle = th; - if (GNUNET_YES != n->transmit_ok) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llu ms) only.\n", - GNUNET_i2s (target), timeout.value); -#endif - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (handle->sched, - timeout, &peer_transmit_timeout, th); - return th; - } - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer `%4s' is ready to receive, scheduling message for delivery now.\n", - GNUNET_i2s (target)); -#endif th->notify_delay_task - = GNUNET_SCHEDULER_add_now (handle->sched, &transmit_ready, th); + = GNUNET_SCHEDULER_add_delayed (handle->sched, timeout, + &peer_transmit_timeout, th); + schedule_transmission (handle); return th; } @@ -1998,26 +1679,34 @@ GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th) { - struct GNUNET_TRANSPORT_Handle *h; + struct NeighbourList *n; + n = th->neighbour; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission request of %u bytes to `%4s' was cancelled.\n", th->notify_size - sizeof (struct OutboundMessage), - GNUNET_i2s (&th->target)); + GNUNET_i2s (&n->id)); #endif - GNUNET_assert (th->notify == &client_notify_wrapper); - remove_from_any_list (th); - h = th->handle; - if ((h->connect_ready_head == NULL) && (h->network_handle != NULL)) + switch (n->transmit_stage) { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle); - h->network_handle = NULL; - h->transmission_scheduled = GNUNET_NO; + case TS_NEW: + GNUNET_break (0); + break; + case TS_QUEUED: + n->transmit_stage = TS_NEW; + if (n->is_connected == GNUNET_NO) + neighbour_free (n); + break; + case TS_TRANSMITTED: + GNUNET_break (0); + break; + case TS_TRANSMITTED_QUEUED: + n->transmit_stage = TS_TRANSMITTED; + break; + default: + GNUNET_break (0); } - GNUNET_free (th->notify_cls); - GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK); - GNUNET_free (th); } -- cgit v1.2.3