diff options
Diffstat (limited to 'src/transport/transport_api.c')
-rw-r--r-- | src/transport/transport_api.c | 1725 |
1 files changed, 707 insertions, 1018 deletions
diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c index ae07421d8..84de9cebe 100644 --- a/src/transport/transport_api.c +++ b/src/transport/transport_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009 Christian Grothoff (and other contributing authors) + (C) 2009, 2010 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -25,6 +25,8 @@ */ #include "platform.h" #include "gnunet_client_lib.h" +#include "gnunet_constants.h" +#include "gnunet_container_lib.h" #include "gnunet_arm_service.h" #include "gnunet_hello_lib.h" #include "gnunet_protocols.h" @@ -57,167 +59,221 @@ */ #define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) + /** - * Entry in linked list of all of our current neighbours. + * What stage are we in for transmission processing? */ -struct NeighbourList +enum TransmitStage + { + /** + * No active message. + */ + TS_NEW = 0, + + /** + * Message in local queue, not given to service. + */ + TS_QUEUED = 1, + + /** + * Message given to service, not confirmed (no SEND_OK). + */ + TS_TRANSMITTED = 2, + + /** + * One message was given to service and before it was confirmed, + * another one was already queued (waiting for SEND_OK to pass on + * to service). + */ + TS_TRANSMITTED_QUEUED = 3 + }; + + +/** + * Handle for a transmission-ready request. + */ +struct GNUNET_TRANSPORT_TransmitHandle { /** - * This is a linked list. + * Neighbour for this handle, NULL for control-traffic. */ - struct NeighbourList *next; + struct NeighbourList *neighbour; /** - * Active transmit handle, can be NULL. Used to move - * from ready to wait list on disconnect and to block - * two transmissions to the same peer from being scheduled - * at the same time. + * Function to call when notify_size bytes are available + * for transmission. */ - struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle; + GNUNET_CONNECTION_TransmitReadyNotify notify; /** - * Identity of this neighbour. + * Closure for notify. */ - struct GNUNET_PeerIdentity id; + void *notify_cls; /** - * At what time did we reset last_sent last? + * transmit_ready task Id. The task is used to introduce the + * artificial delay that may be required to maintain the bandwidth + * limits. Later, this will be the ID of the "transmit_timeout" + * task which is used to signal a timeout if the transmission could + * not be done in a timely fashion. */ - struct GNUNET_TIME_Absolute last_quota_update; + GNUNET_SCHEDULER_TaskIdentifier notify_delay_task; /** - * How many bytes have we sent since the "last_quota_update" - * timestamp? + * Timeout for this request. */ - uint64_t last_sent; + struct GNUNET_TIME_Absolute timeout; /** - * Quota for outbound traffic to the neighbour in bytes/ms. + * How many bytes is our notify callback waiting for? */ - uint32_t quota_out; + size_t notify_size; /** - * Set to GNUNET_YES if we are currently allowed to - * transmit a message to the transport service for this - * peer, GNUNET_NO otherwise. + * How important is this message? */ - int transmit_ok; + unsigned int priority; -#if ACK - /** - * Set to GNUNET_YES if we have received an ACK for the - * given peer. Peers that receive our HELLO always respond - * with an ACK to let us know that we are successfully - * communicating. Note that a PING can not be used for this - * since PINGs are only send if a HELLO address requires - * confirmation (and also, PINGs are not passed to the - * transport API itself). - */ - int received_ack; -#endif }; /** - * Linked list of requests from clients for our HELLO - * that were deferred. + * Handle for a control message queue entry. */ -struct HelloWaitList +struct ControlMessage { /** - * This is a linked list. + * This is a doubly-linked list. */ - struct HelloWaitList *next; + struct ControlMessage *next; /** - * Reference back to our transport handle. + * This is a doubly-linked list. */ - struct GNUNET_TRANSPORT_Handle *handle; + struct ControlMessage *prev; /** - * Callback to call once we got our HELLO. + * Overall transport handle. */ - GNUNET_TRANSPORT_HelloUpdateCallback rec; + struct GNUNET_TRANSPORT_Handle *h; /** - * Closure for rec. + * Function to call when notify_size bytes are available + * for transmission. */ - void *rec_cls; + GNUNET_CONNECTION_TransmitReadyNotify notify; + + /** + * Closure for notify. + */ + void *notify_cls; + + /** + * transmit_ready task Id. The task is used to introduce the + * artificial delay that may be required to maintain the bandwidth + * limits. Later, this will be the ID of the "transmit_timeout" + * task which is used to signal a timeout if the transmission could + * not be done in a timely fashion. + */ + GNUNET_SCHEDULER_TaskIdentifier notify_delay_task; + + /** + * How many bytes is our notify callback waiting for? + */ + size_t notify_size; }; /** - * Opaque handle for a transmission-ready request. + * Entry in linked list of all of our current neighbours. */ -struct GNUNET_TRANSPORT_TransmitHandle +struct NeighbourList { /** - * We keep the transmit handles that are waiting for - * a transport-level connection in a doubly linked list. + * This is a linked list. */ - struct GNUNET_TRANSPORT_TransmitHandle *next; + struct NeighbourList *next; /** - * We keep the transmit handles that are waiting for - * a transport-level connection in a doubly linked list. + * Overall transport handle. */ - struct GNUNET_TRANSPORT_TransmitHandle *prev; + struct GNUNET_TRANSPORT_Handle *h; /** - * Handle of the main transport data structure. + * Active transmit handle; available if 'transmit_forbidden' + * is GNUNET_NO. */ - struct GNUNET_TRANSPORT_Handle *handle; + struct GNUNET_TRANSPORT_TransmitHandle transmit_handle; /** - * Neighbour for this handle, can be NULL if the service - * is not yet connected to the target. + * Identity of this neighbour. */ - struct NeighbourList *neighbour; + struct GNUNET_PeerIdentity id; /** - * Which peer is this transmission going to be for? All - * zeros if it is control-traffic to the service. + * At what time did we reset last_sent last? */ - struct GNUNET_PeerIdentity target; + struct GNUNET_TIME_Absolute last_quota_update; /** - * Function to call when notify_size bytes are available - * for transmission. + * How many bytes have we sent since the "last_quota_update" + * timestamp? */ - GNUNET_CONNECTION_TransmitReadyNotify notify; + uint64_t last_sent; /** - * Closure for notify. + * Quota for outbound traffic to the neighbour in bytes/ms. */ - void *notify_cls; + uint32_t quota_out; /** - * transmit_ready task Id. The task is used to introduce the - * artificial delay that may be required to maintain the bandwidth - * limits. Later, this will be the ID of the "transmit_timeout" - * task which is used to signal a timeout if the transmission could - * not be done in a timely fashion. + * Set to GNUNET_NO if we are currently allowed to accept a + * message to the transport service for this peer, GNUNET_YES + * if we have one and are waiting for transmission, GNUNET_SYSERR + * if we are waiting for confirmation AND have already accepted + * yet another message. */ - GNUNET_SCHEDULER_TaskIdentifier notify_delay_task; + enum TransmitStage transmit_stage; /** - * Timeout for this request. + * Have we received a notification that this peer is connected + * to us right now? */ - struct GNUNET_TIME_Absolute timeout; + int is_connected; + +}; + + +/** + * Linked list of requests from clients for our HELLO that were + * deferred. + */ +struct HelloWaitList +{ /** - * How many bytes is our notify callback waiting for? + * This is a linked list. */ - size_t notify_size; + struct HelloWaitList *next; /** - * How important is this message? + * Reference back to our transport handle. */ - unsigned int priority; + struct GNUNET_TRANSPORT_Handle *handle; + + /** + * Callback to call once we got our HELLO. + */ + GNUNET_TRANSPORT_HelloUpdateCallback rec; + + /** + * Closure for rec. + */ + void *rec_cls; }; @@ -250,6 +306,16 @@ struct GNUNET_TRANSPORT_Handle GNUNET_TRANSPORT_NotifyDisconnect nd_cb; /** + * Head of DLL of control messages. + */ + struct ControlMessage *control_head; + + /** + * Tail of DLL of control messages. + */ + struct ControlMessage *control_tail; + + /** * The current HELLO message for this peer. Updated * whenever transports change their addresses. */ @@ -266,26 +332,6 @@ struct GNUNET_TRANSPORT_Handle struct GNUNET_CLIENT_TransmitHandle *network_handle; /** - * Linked list of transmit handles that are waiting for the - * transport to connect to the respective peer. When we - * receive notification that the transport connected to a - * peer, we go over this list and check if someone has already - * requested a transmission to the new peer; if so, we trigger - * the next step. - */ - struct GNUNET_TRANSPORT_TransmitHandle *connect_wait_head; - - /** - * Linked list of transmit handles that are waiting for the - * transport to be ready for transmission to the respective - * peer. When we - * receive notification that the transport disconnected from - * a peer, we go over this list and move the entry back to - * the connect_wait list. - */ - struct GNUNET_TRANSPORT_TransmitHandle *connect_ready_head; - - /** * Linked list of pending requests for our HELLO. */ struct HelloWaitList *hwl_head; @@ -306,27 +352,34 @@ struct GNUNET_TRANSPORT_Handle struct NeighbourList *neighbours; /** - * ID of the task trying to reconnect to the - * service. + * ID of the task trying to reconnect to the service. */ GNUNET_SCHEDULER_TaskIdentifier reconnect_task; /** - * Delay until we try to reconnect. + * ID of the task trying to trigger transmission for a peer + * while maintaining bandwidth quotas. */ - struct GNUNET_TIME_Relative reconnect_delay; + GNUNET_SCHEDULER_TaskIdentifier quota_task; /** - * Do we currently have a transmission pending? - * (schedule transmission was called but has not - * yet succeeded)? + * Delay until we try to reconnect. */ - int transmission_scheduled; + struct GNUNET_TIME_Relative reconnect_delay; + }; +// FIXME: replace with hash map! +/** + * Get the neighbour list entry for the given peer + * + * @param h our context + * @param peer peer to look up + * @return NULL if no such peer entry exists + */ static struct NeighbourList * -find_neighbour (struct GNUNET_TRANSPORT_Handle *h, +neighbour_find (struct GNUNET_TRANSPORT_Handle *h, const struct GNUNET_PeerIdentity *peer) { struct NeighbourList *pos; @@ -340,51 +393,186 @@ find_neighbour (struct GNUNET_TRANSPORT_Handle *h, /** - * Schedule the task to send one message from the - * connect_ready list to the service. + * Schedule the task to send one message, either from the control + * list or the peer message queues to the service. */ static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h); /** - * Transmit message to client... + * Function called by the scheduler when the timeout for bandwidth + * availablility for the target neighbour is reached. + * + * @param cls the 'struct GNUNET_TRANSPORT_Handle*' + * @param tc scheduler context + */ +static void +quota_transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_TRANSPORT_Handle *h = cls; + + h->quota_task = GNUNET_SCHEDULER_NO_TASK; + schedule_transmission (h); +} + + +/** + * Update the quota values for the given neighbour now. + * + * @param n neighbour to update + */ +static void +update_quota (struct NeighbourList *n) +{ + struct GNUNET_TIME_Relative delta; + uint64_t allowed; + uint64_t remaining; + + delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); + allowed = delta.value * n->quota_out; + if (n->last_sent < allowed) + { + remaining = allowed - n->last_sent; + if (n->quota_out > 0) + remaining /= n->quota_out; + else + remaining = 0; + if (remaining > MAX_BANDWIDTH_CARRY) + remaining = MAX_BANDWIDTH_CARRY; + n->last_sent = 0; + n->last_quota_update = GNUNET_TIME_absolute_get (); + n->last_quota_update.value -= remaining; + } + else + { + n->last_sent -= allowed; + n->last_quota_update = GNUNET_TIME_absolute_get (); + } +} + + +/** + * Figure out which transmission to a peer can be done right now. + * If none can, schedule a task to call 'schedule_transmission' + * whenever a peer transmission can be done in the future and + * return NULL. Otherwise return the next transmission to be + * performed. + * + * @param h handle to transport + * @return NULL to wait longer before doing any peer transmissions + */ +static struct GNUNET_TRANSPORT_TransmitHandle * +schedule_peer_transmission (struct GNUNET_TRANSPORT_Handle *h) +{ + struct GNUNET_TRANSPORT_TransmitHandle *ret; + struct GNUNET_TRANSPORT_TransmitHandle *th; + struct NeighbourList *n; + struct NeighbourList *next; + struct GNUNET_TIME_Relative retry_time; + struct GNUNET_TIME_Relative duration; + uint64_t available; + + if (h->quota_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, + h->quota_task); + h->quota_task = GNUNET_SCHEDULER_NO_TASK; + } + retry_time = GNUNET_TIME_UNIT_FOREVER_REL; + ret = NULL; + next = h->neighbours; + while (NULL != (n = next)) + { + next = n->next; + if (n->transmit_stage != TS_QUEUED) + continue; /* not eligible */ + th = &n->transmit_handle; + /* check outgoing quota */ + duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); + if (duration.value > MIN_QUOTA_REFRESH_TIME) + { + update_quota (n); + duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); + } + available = duration.value * n->quota_out; + if (available < n->last_sent + th->notify_size) + { + /* calculate how much bandwidth we'd still need to + accumulate and based on that how long we'll have + to wait... */ + available = n->last_sent + th->notify_size - available; + duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + available / n->quota_out); + if (th->timeout.value < + GNUNET_TIME_relative_to_absolute (duration).value) + { + /* signal timeout! */ +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n", + duration.value, GNUNET_i2s (&n->id)); +#endif + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } + n->transmit_stage = TS_NEW; + if (NULL != th->notify) + GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); + continue; + } +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n", + GNUNET_i2s (&n->id), duration.value); +#endif + retry_time = GNUNET_TIME_relative_min (retry_time, + duration); + continue; + } +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Bandwidth available for transmission to `%4s'\n", + GNUNET_i2s (&n->id)); +#endif + if ( (ret == NULL) || + (ret->priority < th->priority) ) + ret = th; + } + if (ret == NULL) + h->quota_task = GNUNET_SCHEDULER_add_delayed (h->sched, + retry_time, + "a_transmit_ready, + h); + return ret; +} + + +/** + * Transmit message(s) to service. + * + * @param cls handle to transport + * @param size number of bytes available in buf + * @param buf where to copy the message + * @return number of bytes copied to buf */ static size_t transport_notify_ready (void *cls, size_t size, void *buf) { struct GNUNET_TRANSPORT_Handle *h = cls; + struct ControlMessage *cm; struct GNUNET_TRANSPORT_TransmitHandle *th; struct NeighbourList *n; + struct OutboundMessage obm; size_t ret; + size_t mret; char *cbuf; h->network_handle = NULL; - h->transmission_scheduled = GNUNET_NO; if (buf == NULL) { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not transmit to transport service, cancelling pending requests\n"); -#endif - th = h->connect_ready_head; - if (th->next != NULL) - th->next->prev = NULL; - h->connect_ready_head = th->next; - if (NULL != (n = th->neighbour)) - { - GNUNET_assert (n->transmit_handle == th); - n->transmit_handle = NULL; - } - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - if (h->connect_ready_head != NULL) - schedule_transmission (h); /* FIXME: is this ok? */ + schedule_transmission (h); return 0; } #if DEBUG_TRANSPORT @@ -393,35 +581,64 @@ transport_notify_ready (void *cls, size_t size, void *buf) #endif cbuf = buf; ret = 0; - h->network_handle = NULL; - h->transmission_scheduled = GNUNET_NO; - while ((h->connect_ready_head != NULL) && - (h->connect_ready_head->notify_size <= size)) + while ( (NULL != (cm = h->control_head)) && + (cm->notify_size <= size) ) + { + if (cm->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (h->sched, cm->notify_delay_task); + cm->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_CONTAINER_DLL_remove (h->control_head, + h->control_tail, + cm); + ret += cm->notify (cm->notify_cls, size, &cbuf[ret]); + GNUNET_free (cm); + size -= ret; + } + while ( (NULL != (th = schedule_peer_transmission (h))) && + (th->notify_size <= size) ) { - th = h->connect_ready_head; if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; } - GNUNET_assert (th->notify_size <= size); - if (th->next != NULL) - th->next->prev = NULL; - h->connect_ready_head = th->next; - if (NULL != (n = th->neighbour)) - { - GNUNET_assert (n->transmit_handle == th); - n->transmit_handle = NULL; - } - if (NULL != th->notify) - ret += th->notify (th->notify_cls, size, &cbuf[ret]); - GNUNET_free (th); - if (n != NULL) - n->last_sent += ret; - size -= ret; + n = th->neighbour; + switch (n->transmit_stage) + { + case TS_NEW: + GNUNET_break (0); + break; + case TS_QUEUED: + n->transmit_stage = TS_TRANSMITTED; + break; + case TS_TRANSMITTED: + GNUNET_break (0); + break; + case TS_TRANSMITTED_QUEUED: + GNUNET_break (0); + break; + default: + GNUNET_break (0); + } + GNUNET_assert (size >= sizeof (struct OutboundMessage)); + mret = th->notify (th->notify_cls, + size - sizeof (struct OutboundMessage), + &cbuf[ret + sizeof (struct OutboundMessage)]); + GNUNET_assert (mret <= size - sizeof (struct OutboundMessage)); + if (mret != 0) + { + obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); + obm.header.size = htons (mret + sizeof (struct OutboundMessage)); + obm.priority = htonl (th->priority); + obm.peer = n->id; + memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage)); + ret += (mret + sizeof (struct OutboundMessage)); + size -= (mret + sizeof (struct OutboundMessage)); + } } - if (h->connect_ready_head != NULL) - schedule_transmission (h); + schedule_transmission (h); #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to transport service\n", ret); @@ -431,155 +648,73 @@ transport_notify_ready (void *cls, size_t size, void *buf) /** - * Schedule the task to send one message from the - * connect_ready list to the service. + * Schedule the task to send one message, either from the control + * list or the peer message queues to the service. */ static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h) -{ +{ + size_t size; + struct GNUNET_TIME_Relative timeout; struct GNUNET_TRANSPORT_TransmitHandle *th; - GNUNET_assert (NULL == h->network_handle); + if (NULL != h->network_handle) + return; if (h->client == NULL) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Could not yet schedule transmission: we are not yet connected to the transport service!\n"); return; /* not yet connected */ } - th = h->connect_ready_head; - if (th == NULL) - return; /* no request pending */ - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + if (NULL != h->control_head) { - /* remove existing time out task, will be integrated - with transmit_ready notification! */ - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - h->transmission_scheduled = GNUNET_YES; - h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client, - th->notify_size, - GNUNET_TIME_absolute_get_remaining - (th->timeout), - GNUNET_NO, - &transport_notify_ready, - h); - GNUNET_assert (NULL != h->network_handle); -} - - -/** - * Insert the given transmit handle in the given sorted - * doubly linked list based on timeout. - * - * @param head pointer to the head of the linked list - * @param th element to insert into the list - */ -static void -insert_transmit_handle (struct GNUNET_TRANSPORT_TransmitHandle **head, - struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - struct GNUNET_TRANSPORT_TransmitHandle *pos; - struct GNUNET_TRANSPORT_TransmitHandle *prev; - - pos = *head; - prev = NULL; - while ((pos != NULL) && (pos->timeout.value < th->timeout.value)) - { - prev = pos; - pos = pos->next; - } - if (prev == NULL) - { - th->next = *head; - if (th->next != NULL) - th->next->prev = th; - *head = th; - } - else - { - th->next = pos; - th->prev = prev; - prev->next = th; - if (pos != NULL) - pos->prev = th; - } -} - - -/** - * Cancel a pending notify delay task (if pending) and also remove the - * given transmit handle from whatever list is on. - * - * @param th handle for the transmission request to manipulate - */ -static void -remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - struct GNUNET_TRANSPORT_Handle *h; - - h = th->handle; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (th->prev == NULL) - { - if (th == h->connect_wait_head) - h->connect_wait_head = th->next; - else - h->connect_ready_head = th->next; + size = h->control_head->notify_size; + timeout = GNUNET_TIME_UNIT_FOREVER_REL; } else { - th->prev->next = th->next; + th = schedule_peer_transmission (h); + if (th == NULL) + { + /* no transmission ready right now */ + return; + } + size = th->notify_size; + timeout = GNUNET_TIME_absolute_get_remaining (th->timeout); } - if (th->next != NULL) - th->next->prev = th->prev; + h->network_handle = + GNUNET_CLIENT_notify_transmit_ready (h->client, + size, + timeout, + GNUNET_NO, + &transport_notify_ready, + h); + GNUNET_assert (NULL != h->network_handle); } /** - * Schedule a request to connect to the given - * neighbour (and if successful, add the specified - * handle to the wait list). - * - * @param th handle for a request to transmit once we - * have connected - */ -static void try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th); - - -/** * Called when our transmit request timed out before any transport * reported success connecting to the desired peer or before the * transport was ready to receive. Signal error and free * TransmitHandle. */ static void -peer_transmit_timeout (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +control_transmit_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct GNUNET_TRANSPORT_TransmitHandle *th = cls; + struct ControlMessage *th = cls; th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - if (th->neighbour != NULL) - th->neighbour->transmit_handle = NULL; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Request for transmission to peer `%s' timed out.\n", - GNUNET_i2s (&th->target)); -#endif - remove_from_any_list (th); if (NULL != th->notify) th->notify (th->notify_cls, 0, NULL); + GNUNET_CONTAINER_DLL_remove (th->h->control_head, + th->h->control_tail, + th); GNUNET_free (th); } - - /** * Queue control request for transmission to the transport * service. @@ -600,68 +735,31 @@ schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h, GNUNET_CONNECTION_TransmitReadyNotify notify, void *notify_cls) { - struct GNUNET_TRANSPORT_TransmitHandle *th; + struct ControlMessage *th; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Control transmit of %u bytes within %llums requested\n", size, (unsigned long long) timeout.value); #endif - th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); - th->handle = h; + th = GNUNET_malloc (sizeof (struct ControlMessage)); + th->h = h; th->notify = notify; th->notify_cls = notify_cls; - th->timeout = GNUNET_TIME_relative_to_absolute (timeout); th->notify_size = size; th->notify_delay_task = GNUNET_SCHEDULER_add_delayed (h->sched, - timeout, &peer_transmit_timeout, th); - if (at_head) - { - th->next = h->connect_ready_head; - h->connect_ready_head = th; - if (th->next != NULL) - th->next->prev = th; - } + timeout, &control_transmit_timeout, th); + if (at_head) + GNUNET_CONTAINER_DLL_insert (h->control_head, + h->control_tail, + th); else - { - insert_transmit_handle (&h->connect_ready_head, th); - } - if (GNUNET_NO == h->transmission_scheduled) - schedule_transmission (h); -} - - -/** - * Update the quota values for the given neighbour now. - */ -static void -update_quota (struct NeighbourList *n) -{ - struct GNUNET_TIME_Relative delta; - uint64_t allowed; - uint64_t remaining; - - delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - allowed = delta.value * n->quota_out; - if (n->last_sent < allowed) - { - remaining = allowed - n->last_sent; - if (n->quota_out > 0) - remaining /= n->quota_out; - else - remaining = 0; - if (remaining > MAX_BANDWIDTH_CARRY) - remaining = MAX_BANDWIDTH_CARRY; - n->last_sent = 0; - n->last_quota_update = GNUNET_TIME_absolute_get (); - n->last_quota_update.value -= remaining; - } - else - { - n->last_sent -= allowed; - n->last_quota_update = GNUNET_TIME_absolute_get (); - } + GNUNET_CONTAINER_DLL_insert_after (h->control_head, + h->control_tail, + h->control_tail, + th); + schedule_transmission (h); } @@ -681,6 +779,14 @@ struct SetQuotaContext }; +/** + * Send SET_QUOTA message to the service. + * + * @param cls the 'struct SetQuotaContext' + * @param size number of bytes available in buf + * @param buf where to copy the message + * @return number of bytes copied to buf + */ static size_t send_set_quota (void *cls, size_t size, void *buf) { @@ -699,7 +805,8 @@ send_set_quota (void *cls, size_t size, void *buf) #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting `%s' request with respect to `%4s'.\n", - "SET_QUOTA", GNUNET_i2s (&sqc->target)); + "SET_QUOTA", + GNUNET_i2s (&sqc->target)); #endif GNUNET_assert (size >= sizeof (struct QuotaSetMessage)); msg = buf; @@ -742,7 +849,7 @@ GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle, struct NeighbourList *n; struct SetQuotaContext *sqc; - n = find_neighbour (handle, target); + n = neighbour_find (handle, target); if (n != NULL) { update_quota (n); @@ -830,6 +937,14 @@ GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle, } +/** + * Send HELLO message to the service. + * + * @param cls the HELLO message to send + * @param size number of bytes available in buf + * @param buf where to copy the message + * @return number of bytes copied to buf + */ static size_t send_hello (void *cls, size_t size, void *buf) { @@ -873,14 +988,6 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, struct GNUNET_MessageHeader *hc; uint16_t size; - if (handle->client == NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not connected to transport service, dropping offered HELLO\n"); -#endif - return; - } GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO); size = ntohs (hello->size); GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader)); @@ -893,11 +1000,13 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle, /** - * Function we use for handling incoming messages. + * Transmit START message to service. + * + * @param cls unused + * @param size number of bytes available in buf + * @param buf where to copy the message + * @return number of bytes copied to buf */ -static void demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg); - - static size_t send_start (void *cls, size_t size, void *buf) { @@ -905,9 +1014,10 @@ send_start (void *cls, size_t size, void *buf) if (buf == NULL) { + /* Can only be shutdown, just give up */ #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Timeout while trying to transmit `%s' request.\n", + "Shutdown while trying to transmit `%s' request.\n", "START"); #endif return 0; @@ -924,186 +1034,101 @@ send_start (void *cls, size_t size, void *buf) /** - * We're ready to transmit the request that the transport service - * should connect to a new peer. In addition to sending the - * request, schedule the next phase for the transmission processing - * that caused the connect request in the first place. - */ -static size_t -request_connect (void *cls, size_t size, void *buf) -{ - struct GNUNET_TRANSPORT_TransmitHandle *th = cls; - struct TryConnectMessage *tcm; - struct GNUNET_TRANSPORT_Handle *h; - - GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK); - h = th->handle; - - if (buf == NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit `%s' request for `%4s' to service.\n", - "TRY_CONNECT", GNUNET_i2s (&th->target)); -#endif - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - return 0; - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting `%s' message for `%4s' (need connection in %llu ms).\n", - "TRY_CONNECT", GNUNET_i2s (&th->target), - GNUNET_TIME_absolute_get_remaining (th->timeout).value); -#endif - GNUNET_assert (size >= sizeof (struct TryConnectMessage)); - tcm = buf; - tcm->header.size = htons (sizeof (struct TryConnectMessage)); - tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT); - tcm->reserved = htonl (0); - memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity)); - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (h->sched, - GNUNET_TIME_absolute_get_remaining - (th->timeout), - &peer_transmit_timeout, th); - insert_transmit_handle (&h->connect_wait_head, th); - return sizeof (struct TryConnectMessage); -} - - -/** - * Schedule a request to connect to the given - * neighbour (and if successful, add the specified - * handle to the wait list). - * - * @param th handle for a request to transmit once we - * have connected - */ -static void -try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK); - schedule_control_transmit (th->handle, - sizeof (struct TryConnectMessage), - GNUNET_NO, - GNUNET_TIME_absolute_get_remaining (th->timeout), - &request_connect, th); -} - - -/** - * Task for delayed attempts to reconnect to a peer. - * - * @param cls must be a transmit handle that determines the peer - * to which we will try to connect - * @param tc scheduler information about why we were triggered (not used) - */ -static void -try_connect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_TRANSPORT_TransmitHandle *th = cls; - - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - try_connect (th); -} - - -/** - * Remove neighbour from our list. Will automatically - * trigger a re-connect attempt if we have messages pending - * for this peer. + * Free neighbour. * * @param h our state - * @param peer the peer to remove + * @param n the entry to free */ static void -remove_neighbour (struct GNUNET_TRANSPORT_Handle *h, - const struct GNUNET_PeerIdentity *peer) +neighbour_free (struct NeighbourList *n) { + struct GNUNET_TRANSPORT_Handle *h; struct NeighbourList *prev; struct NeighbourList *pos; - struct GNUNET_TRANSPORT_TransmitHandle *th; + h = n->h; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Removing neighbour `%s' from list of connected peers.\n", - GNUNET_i2s (peer)); + GNUNET_i2s (&n->id)); #endif + GNUNET_break (n->is_connected == GNUNET_NO); + GNUNET_break (n->transmit_stage == TS_NEW); + prev = NULL; pos = h->neighbours; - while ((pos != NULL) && - (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity)))) + while (pos != n) { prev = pos; pos = pos->next; } - if (pos == NULL) - { - GNUNET_break (0); - return; - } if (prev == NULL) - h->neighbours = pos->next; + h->neighbours = n->next; else - prev->next = pos->next; - if (NULL != (th = pos->transmit_handle)) - { - pos->transmit_handle = NULL; - th->neighbour = NULL; - remove_from_any_list (th); - if (GNUNET_TIME_absolute_get_remaining (th->timeout).value <= - CONNECT_RETRY_TIMEOUT.value) - { - /* signal error */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Connection with `%4s' failed and timeout was in the past, giving up on message delivery.\n"), - GNUNET_i2s (peer)); - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task); - peer_transmit_timeout (th, NULL); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _ - ("Connection with `%4s' failed, will keep trying for %llu ms to deliver message\n"), - GNUNET_i2s (peer), - GNUNET_TIME_absolute_get_remaining (th->timeout).value); - /* try again in a bit */ - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task); - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (h->sched, - CONNECT_RETRY_TIMEOUT, - &try_connect_task, th); - } - } + prev->next = n->next; + GNUNET_free (n); +} + + +/** + * Mark neighbour as disconnected. + * + * @param n the entry to mark as disconnected + */ +static void +neighbour_disconnect (struct NeighbourList *n) +{ + struct GNUNET_TRANSPORT_Handle *h = n->h; +#if DEBUG_TRANSPORT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Removing neighbour `%s' from list of connected peers.\n", + GNUNET_i2s (&n->id)); +#endif + GNUNET_break (n->is_connected == GNUNET_YES); + n->is_connected = GNUNET_NO; if (h->nc_cb != NULL) - h->nd_cb (h->cls, peer); - GNUNET_free (pos); + h->nd_cb (h->cls, &n->id); + if (n->transmit_stage == TS_NEW) + neighbour_free (n); } /** + * Function we use for handling incoming messages. + * + * @param cls closure (struct GNUNET_TRANSPORT_Handle *) + * @param msg message received, NULL on timeout or fatal error + */ +static void demultiplexer (void *cls, + const struct GNUNET_MessageHeader *msg); + + +/** * Try again to connect to transport service. + * + * @param cls the handle to the transport service + * @param tc scheduler context */ static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +reconnect (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_TRANSPORT_Handle *h = cls; - struct GNUNET_TRANSPORT_TransmitHandle *pos; + struct ControlMessage *pos; struct NeighbourList *n; - /* Forget about all neighbours that we used to be connected - to */ - while (NULL != (n = h->neighbours)) - remove_neighbour (h, &n->id); + if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) + { + /* shutdown, just give up */ + return; + } + /* Forget about all neighbours that we used to be connected to */ + n = h->neighbours; + while (NULL != n) + { + neighbour_disconnect (n); + n = n->next; + } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); #endif @@ -1111,20 +1136,16 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg); GNUNET_assert (h->client != NULL); - /* make sure we don't send "START" twice, - remove existing entry from queue (if present) */ - pos = h->connect_ready_head; + /* make sure we don't send "START" twice, remove existing entry from + queue (if present) */ + pos = h->control_head; while (pos != NULL) { if (pos->notify == &send_start) { - if (pos->prev == NULL) - h->connect_ready_head = pos->next; - else - pos->prev->next = pos->next; - if (pos->next != NULL) - pos->next->prev = pos->prev; - GNUNET_assert (pos->neighbour == NULL); + GNUNET_CONTAINER_DLL_remove (h->control_head, + h->control_tail, + pos); if (GNUNET_SCHEDULER_NO_TASK != pos->notify_delay_task) { GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task); @@ -1147,6 +1168,8 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /** * Function that will schedule the job that will try * to connect us again to the client. + * + * @param h transport service to reconnect */ static void schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) @@ -1161,215 +1184,47 @@ schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h) h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched, h->reconnect_delay, &reconnect, h); - h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS; -} - - -/** - * We are connected to the respective peer, check the - * bandwidth limits and schedule the transmission. - */ -static void schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th); - - -/** - * Function called by the scheduler when the timeout - * for bandwidth availablility for the target - * neighbour is reached. - */ -static void -transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_TRANSPORT_TransmitHandle *th = cls; - - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - schedule_request (th); -} - - -/** - * Remove the given transmit handle from the wait list. Does NOT free - * it. - */ -static void -remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - if (th->prev == NULL) - th->handle->connect_wait_head = th->next; - else - th->prev->next = th->next; - if (th->next != NULL) - th->next->prev = th->prev; -} - - -/** - * We are connected to the respective peer, check the - * bandwidth limits and schedule the transmission. - */ -static void -schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th) -{ - struct GNUNET_TRANSPORT_Handle *h; - struct GNUNET_TIME_Relative duration; - struct NeighbourList *n; - uint64_t available; - - h = th->handle; - n = th->neighbour; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + if (h->reconnect_delay.value == 0) { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } - /* check outgoing quota */ - duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - if (duration.value > MIN_QUOTA_REFRESH_TIME) + else { - update_quota (n); - duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update); - } - available = duration.value * n->quota_out; - if (available < n->last_sent + th->notify_size) - { - /* calculate how much bandwidth we'd still need to - accumulate and based on that how long we'll have - to wait... */ - available = n->last_sent + th->notify_size - available; - duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, - available / n->quota_out); - if (th->timeout.value < - GNUNET_TIME_relative_to_absolute (duration).value) - { - /* signal timeout! */ -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n", - duration.value, GNUNET_i2s (&th->target)); -#endif - remove_from_wait_list (th); - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - return; - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n", - GNUNET_i2s (&th->target), duration.value); -#endif - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (h->sched, - duration, &transmit_ready, th); - return; + h->reconnect_delay = GNUNET_TIME_relative_multiply (h->reconnect_delay, 2); + h->reconnect_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, + h->reconnect_delay); } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Bandwidth available for transmission to `%4s'\n", - GNUNET_i2s (&n->id)); -#endif - if (GNUNET_NO == n->transmit_ok) - { - /* we may be ready, but transport service is not; - wait for SendOkMessage or timeout */ -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Need to wait for transport service `%s' message\n", - "SEND_OK"); -#endif - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (h->sched, - GNUNET_TIME_absolute_get_remaining - (th->timeout), &peer_transmit_timeout, - th); - return; - } - n->transmit_ok = GNUNET_NO; - remove_from_wait_list (th); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Moving message for `%4s' to ready list\n", - GNUNET_i2s (&n->id)); -#endif - insert_transmit_handle (&h->connect_ready_head, th); - if (GNUNET_NO == h->transmission_scheduled) - schedule_transmission (h); } /** * Add neighbour to our list */ -static void -add_neighbour (struct GNUNET_TRANSPORT_Handle *h, - uint32_t quota_out, - struct GNUNET_TIME_Relative latency, - uint16_t distance, +static struct NeighbourList * +neighbour_add (struct GNUNET_TRANSPORT_Handle *h, const struct GNUNET_PeerIdentity *pid) { struct NeighbourList *n; - struct GNUNET_TRANSPORT_TransmitHandle *prev; - struct GNUNET_TRANSPORT_TransmitHandle *pos; - struct GNUNET_TRANSPORT_TransmitHandle *next; /* check for duplicates */ - if (NULL != find_neighbour (h, pid)) + if (NULL != (n = neighbour_find (h, pid))) { GNUNET_break (0); - return; + return n; } #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Creating entry for new neighbour `%4s'.\n", GNUNET_i2s (pid)); + "Creating entry for neighbour `%4s'.\n", + GNUNET_i2s (pid)); #endif n = GNUNET_malloc (sizeof (struct NeighbourList)); n->id = *pid; n->last_quota_update = GNUNET_TIME_absolute_get (); - n->quota_out = quota_out; n->next = h->neighbours; - n->transmit_ok = GNUNET_YES; - h->neighbours = n; - if (h->nc_cb != NULL) - h->nc_cb (h->cls, &n->id, latency, distance); - prev = NULL; - pos = h->connect_wait_head; - while (pos != NULL) - { - next = pos->next; - if (0 == memcmp (pid, - &pos->target, sizeof (struct GNUNET_PeerIdentity))) - { - pos->neighbour = n; - GNUNET_assert (NULL == n->transmit_handle); - n->transmit_handle = pos; - if (prev == NULL) - h->connect_wait_head = next; - else - prev->next = next; -#if ACK - if (GNUNET_YES == n->received_ack) - { -#endif -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found pending request for `%4s' will trigger it now.\n", - GNUNET_i2s (&pos->target)); -#endif - if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task); - pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - schedule_request (pos); -#if ACK - } -#endif - - break; - } - prev = pos; - pos = next; - } + n->quota_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT; + n->h = h; + h->neighbours = n; + return n; } @@ -1377,7 +1232,6 @@ add_neighbour (struct GNUNET_TRANSPORT_Handle *h, * Connect to the transport service. Note that the connection may * complete (or fail) asynchronously. * - * @param sched scheduler to use * @param cfg configuration to use * @param cls closure for the callbacks @@ -1423,44 +1277,29 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n"); #endif - while (NULL != (th = handle->connect_ready_head)) - { - handle->connect_ready_head = th->next; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - } - while (NULL != (th = handle->connect_wait_head)) - { - handle->connect_wait_head = th->next; - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - } while (NULL != (n = handle->neighbours)) { handle->neighbours = n->next; - if (NULL != (th = n->transmit_handle)) - { - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != th->notify) - GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - } + switch (n->transmit_stage) + { + case TS_NEW: + case TS_TRANSMITTED: + /* nothing to do */ + break; + case TS_QUEUED: + case TS_TRANSMITTED_QUEUED: + th = &n->transmit_handle; + if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (handle->sched, + th->notify_delay_task); + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL)); + break; + default: + GNUNET_break (0); + } GNUNET_free (n); } while (NULL != (hwl = handle->hwl_head)) @@ -1479,6 +1318,11 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task); handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK; } + if (handle->quota_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (handle->sched, handle->quota_task); + handle->quota_task = GNUNET_SCHEDULER_NO_TASK; + } GNUNET_free_non_null (handle->my_hello); handle->my_hello = NULL; GNUNET_ARM_stop_services (handle->cfg, handle->sched, "transport", @@ -1502,10 +1346,9 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle) /** - * Type of a function to call when we receive a message - * from the service. + * Function we use for handling incoming messages. * - * @param cls closure + * @param cls closure (struct GNUNET_TRANSPORT_Handle *) * @param msg message received, NULL on timeout or fatal error */ static void @@ -1521,59 +1364,29 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) struct HelloWaitList *next_hwl; struct NeighbourList *n; struct GNUNET_PeerIdentity me; - struct GNUNET_TRANSPORT_TransmitHandle *th; - - struct GNUNET_TRANSPORT_TransmitHandle *prev; - struct GNUNET_TRANSPORT_TransmitHandle *pos; - struct GNUNET_TRANSPORT_TransmitHandle *next; uint16_t size; - if ((msg == NULL) || (h->client == NULL)) + if (h->client == NULL) + { + /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect', + finish clean up work! */ + GNUNET_free (h); + return; + } + if (msg == NULL) { - if (h->client != NULL) - { #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Error receiving from transport service, disconnecting temporarily.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Error receiving from transport service, disconnecting temporarily.\n"); #endif - if (h->network_handle != NULL) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle); - h->network_handle = NULL; - h->transmission_scheduled = GNUNET_NO; - th = h->connect_ready_head; - /* add timeout again, we canceled the transmit_ready task! */ - - /*GNUNET_assert (th->notify_delay_task == - GNUNET_SCHEDULER_NO_TASK);*/ - - /* START - somehow we are getting here when th->notify_delay_task is already - * set. Not sure why, so just checking and canceling instead of asserting and - * dying. Probably not a *fix*. */ - if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task); - th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - /* END */ - GNUNET_assert (th->notify_delay_task == - GNUNET_SCHEDULER_NO_TASK); - th->notify_delay_task = - GNUNET_SCHEDULER_add_delayed (h->sched, - GNUNET_TIME_absolute_get_remaining - (th->timeout), - &peer_transmit_timeout, th); - } - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; - schedule_reconnect (h); - } - else - { - /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect', - finish clean up work! */ - GNUNET_free (h); - } + if (h->network_handle != NULL) + { + GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle); + h->network_handle = NULL; + } + GNUNET_CLIENT_disconnect (h->client); + h->client = NULL; + schedule_reconnect (h); return; } GNUNET_CLIENT_receive (h->client, @@ -1624,81 +1437,16 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) "Receiving `%s' message for `%4s'.\n", "CONNECT", GNUNET_i2s (&cim->id)); #endif - if (NULL == (n = find_neighbour(h, &cim->id))) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Don't know neighbor, adding!\n"); -#endif - add_neighbour (h, - ntohl (cim->quota_out), - GNUNET_TIME_relative_ntoh (cim->latency), ntohs(cim->distance), &cim->id); - } - else - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Do know neighbor, scheduling transmission!\n"); -#endif -#if ACK - n->received_ack = GNUNET_YES; -#endif - if (NULL != n->transmit_handle) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer connected, scheduling delayed message for delivery now.\n"); -#endif - schedule_request (n->transmit_handle); - } - else - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmit handle is null... Checking for pending stuff(?)\n"); -#endif - prev = NULL; - pos = h->connect_wait_head; - while (pos != NULL) - { - next = pos->next; - if (0 == memcmp (&cim->id, - &pos->target, sizeof (struct GNUNET_PeerIdentity))) - { - pos->neighbour = n; - GNUNET_assert (NULL == n->transmit_handle); - n->transmit_handle = pos; - if (prev == NULL) - h->connect_wait_head = next; - else - prev->next = next; -#if ACK - if (GNUNET_YES == n->received_ack) - { -#endif - #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found pending request for `%4s' will trigger it now.\n", - GNUNET_i2s (&pos->target)); - #endif - if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task); - pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - } - schedule_request (pos); -#if ACK - } -#endif - - break; - } - prev = pos; - pos = next; - } - } - } - + n = neighbour_find (h, &cim->id); + if (n == NULL) + n = neighbour_add (h, + &cim->id); + GNUNET_break (n->is_connected == GNUNET_NO); + n->is_connected = GNUNET_YES; + if (h->nc_cb != NULL) + h->nc_cb (h->cls, &n->id, + GNUNET_TIME_relative_ntoh (cim->latency), + ntohs (cim->distance)); break; case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT: if (size != sizeof (struct DisconnectInfoMessage)) @@ -1710,9 +1458,13 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving `%s' message for `%4s'.\n", - "DISCONNECT", GNUNET_i2s (&dim->peer)); + "DISCONNECT", + GNUNET_i2s (&dim->peer)); #endif - remove_neighbour (h, &dim->peer); + n = neighbour_find (h, &cim->id); + GNUNET_break (n != NULL); + if (n != NULL) + neighbour_disconnect (n); break; case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK: if (size != sizeof (struct SendOkMessage)) @@ -1726,21 +1478,26 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) "Receiving `%s' message, transmission %s.\n", "SEND_OK", ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); #endif - n = find_neighbour (h, &okm->peer); + n = neighbour_find (h, &okm->peer); GNUNET_assert (n != NULL); - n->transmit_ok = GNUNET_YES; - if (n->transmit_handle != NULL) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Processing pending message for `%4s'\n", - GNUNET_i2s (&n->id)); -#endif - GNUNET_SCHEDULER_cancel (h->sched, - n->transmit_handle->notify_delay_task); - n->transmit_handle->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; - schedule_request (n->transmit_handle); - } + switch (n->transmit_stage) + { + case TS_NEW: + GNUNET_break (0); + break; + case TS_QUEUED: + GNUNET_break (0); + break; + case TS_TRANSMITTED: + n->transmit_stage = TS_NEW; + break; + case TS_TRANSMITTED_QUEUED: + n->transmit_stage = TS_QUEUED; + schedule_transmission (h); + break; + default: + GNUNET_break (0); + } break; case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV: #if DEBUG_TRANSPORT @@ -1761,42 +1518,20 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) GNUNET_break (0); break; } - switch (ntohs (imm->type)) - { - case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK: -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Receiving `%s' message from `%4s'.\n", - "ACK", GNUNET_i2s (&im->peer)); -#endif - break; - default: #if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u from `%4s'.\n", - ntohs (imm->type), GNUNET_i2s (&im->peer)); -#endif - - n = find_neighbour (h, &im->peer); - if (n == NULL) - { - GNUNET_break (0); - break; - } - - if (NULL != n->transmit_handle) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer connected, scheduling delayed message for delivery now.\n"); -#endif - schedule_request (n->transmit_handle); - } - if (h->rec != NULL) - h->rec (h->cls, &im->peer, imm, - GNUNET_TIME_relative_ntoh (im->latency), ntohs(im->distance)); - break; - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u from `%4s'.\n", + ntohs (imm->type), GNUNET_i2s (&im->peer)); +#endif + n = neighbour_find (h, &im->peer); + if (n == NULL) + { + GNUNET_break (0); + break; + } + if (h->rec != NULL) + h->rec (h->cls, &im->peer, imm, + GNUNET_TIME_relative_ntoh (im->latency), ntohs(im->distance)); break; default: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -1809,73 +1544,48 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg) } -struct ClientTransmitWrapper -{ - GNUNET_CONNECTION_TransmitReadyNotify notify; - void *notify_cls; - struct GNUNET_TRANSPORT_TransmitHandle *th; -}; - - /** - * Transmit message of a client destined for another - * peer to the service. + * Called when our transmit request timed out before any transport + * reported success connecting to the desired peer or before the + * transport was ready to receive. Signal error and free + * TransmitHandle. + * + * @param cls the 'struct GNUNET_TRANSPORT_TransmitHandle*' that is timing out + * @param tc scheduler context */ -static size_t -client_notify_wrapper (void *cls, size_t size, void *buf) +static void +peer_transmit_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct ClientTransmitWrapper *ctw = cls; - struct OutboundMessage *obm; - struct GNUNET_MessageHeader *hdr; - size_t ret; + struct GNUNET_TRANSPORT_TransmitHandle *th = cls; + struct NeighbourList *n; - if (size == 0) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission request could not be satisfied.\n"); -#endif - if (NULL != ctw->notify) - GNUNET_assert (0 == ctw->notify (ctw->notify_cls, 0, NULL)); - GNUNET_free (ctw); - return 0; - } - GNUNET_assert (size >= sizeof (struct OutboundMessage)); - obm = buf; - if (ctw->notify != NULL) - ret = ctw->notify (ctw->notify_cls, - size - sizeof (struct OutboundMessage), - (void *) &obm[1]); - else - ret = 0; - if (ret == 0) + th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK; + n = th->neighbour; + switch (n->transmit_stage) { - /* Need to reset flag, no SEND means no SEND_OK! */ - ctw->th->neighbour->transmit_ok = GNUNET_YES; - GNUNET_free (ctw); - return 0; + case TS_NEW: + GNUNET_break (0); + break; + case TS_QUEUED: + n->transmit_stage = TS_NEW; + if (n->is_connected == GNUNET_NO) + neighbour_free (n); + break; + case TS_TRANSMITTED: + GNUNET_break (0); + break; + case TS_TRANSMITTED_QUEUED: + n->transmit_stage = TS_TRANSMITTED; + break; + default: + GNUNET_break (0); } - GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader)); - hdr = (struct GNUNET_MessageHeader *) &obm[1]; - GNUNET_assert (ntohs (hdr->size) == ret); - GNUNET_assert (ret + sizeof (struct OutboundMessage) < - GNUNET_SERVER_MAX_MESSAGE_SIZE); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting `%s' message with data for `%4s'\n", - "SEND", GNUNET_i2s (&ctw->th->target)); -#endif - ret += sizeof (struct OutboundMessage); - obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND); - obm->header.size = htons (ret); - obm->priority = htonl (ctw->th->priority); - obm->peer = ctw->th->target; - GNUNET_free (ctw); - return ret; + if (NULL != th->notify) + th->notify (th->notify_cls, 0, NULL); } - /** * Check if we could queue a message of the given size for * transmission. The transport service will take both its @@ -1905,10 +1615,8 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle GNUNET_CONNECTION_TransmitReadyNotify notify, void *notify_cls) { - struct GNUNET_TRANSPORT_TransmitHandle *pos; struct GNUNET_TRANSPORT_TransmitHandle *th; struct NeighbourList *n; - struct ClientTransmitWrapper *ctw; if (size + sizeof (struct OutboundMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) @@ -1926,66 +1634,39 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle "Asking transport service for transmission of %u bytes to peer `%4s'.\n", size, GNUNET_i2s (target)); #endif - n = find_neighbour (handle, target); - if ((n != NULL) && (n->transmit_handle != NULL)) - return NULL; /* already have a request pending for this peer! */ - ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper)); - th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle)); - ctw->notify = notify; - ctw->notify_cls = notify_cls; - ctw->th = th; - th->handle = handle; + n = neighbour_find (handle, target); + if (n == NULL) + n = neighbour_add (handle, target); + if (n == NULL) + return NULL; + switch (n->transmit_stage) + { + case TS_NEW: + n->transmit_stage = TS_QUEUED; + break; + case TS_QUEUED: + break; + case TS_TRANSMITTED: + n->transmit_stage = TS_TRANSMITTED_QUEUED; + break; + case TS_TRANSMITTED_QUEUED: + return NULL; + break; + default: + GNUNET_break (0); + return NULL; + } + th = &n->transmit_handle; th->neighbour = n; - th->target = *target; - th->notify = &client_notify_wrapper; - th->notify_cls = ctw; + th->notify = notify; + th->notify_cls = notify_cls; th->timeout = GNUNET_TIME_relative_to_absolute (timeout); th->notify_size = size + sizeof (struct OutboundMessage); th->priority = priority; - if (NULL == n) - { - pos = handle->connect_wait_head; - while (pos != NULL) - { - GNUNET_assert (0 != memcmp (target, - &pos->target, - sizeof (struct GNUNET_PeerIdentity))); - pos = pos->next; - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Will now try to connect to `%4s'.\n", GNUNET_i2s (target)); -#endif - try_connect (th); - return th; - } - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission request queued for transmission to transport service.\n"); -#endif - GNUNET_assert (NULL == n->transmit_handle); - n->transmit_handle = th; - if (GNUNET_YES != n->transmit_ok) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llu ms) only.\n", - GNUNET_i2s (target), timeout.value); -#endif - th->notify_delay_task - = GNUNET_SCHEDULER_add_delayed (handle->sched, - timeout, &peer_transmit_timeout, th); - return th; - } - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer `%4s' is ready to receive, scheduling message for delivery now.\n", - GNUNET_i2s (target)); -#endif th->notify_delay_task - = GNUNET_SCHEDULER_add_now (handle->sched, &transmit_ready, th); + = GNUNET_SCHEDULER_add_delayed (handle->sched, timeout, + &peer_transmit_timeout, th); + schedule_transmission (handle); return th; } @@ -1998,26 +1679,34 @@ GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th) { - struct GNUNET_TRANSPORT_Handle *h; + struct NeighbourList *n; + n = th->neighbour; #if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission request of %u bytes to `%4s' was cancelled.\n", th->notify_size - sizeof (struct OutboundMessage), - GNUNET_i2s (&th->target)); + GNUNET_i2s (&n->id)); #endif - GNUNET_assert (th->notify == &client_notify_wrapper); - remove_from_any_list (th); - h = th->handle; - if ((h->connect_ready_head == NULL) && (h->network_handle != NULL)) + switch (n->transmit_stage) { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle); - h->network_handle = NULL; - h->transmission_scheduled = GNUNET_NO; + case TS_NEW: + GNUNET_break (0); + break; + case TS_QUEUED: + n->transmit_stage = TS_NEW; + if (n->is_connected == GNUNET_NO) + neighbour_free (n); + break; + case TS_TRANSMITTED: + GNUNET_break (0); + break; + case TS_TRANSMITTED_QUEUED: + n->transmit_stage = TS_TRANSMITTED; + break; + default: + GNUNET_break (0); } - GNUNET_free (th->notify_cls); - GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK); - GNUNET_free (th); } |