summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/gnunet-service-core.c2
-rw-r--r--src/include/gnunet_protocols.h24
-rw-r--r--src/include/gnunet_transport_service.h4
-rw-r--r--src/transport/gnunet-service-transport.c244
-rw-r--r--src/transport/test_transport_api.c64
-rw-r--r--src/transport/test_transport_api_tcp_peer2.conf2
-rw-r--r--src/transport/transport.h42
-rw-r--r--src/transport/transport_api.c1725
8 files changed, 867 insertions, 1240 deletions
diff --git a/src/core/gnunet-service-core.c b/src/core/gnunet-service-core.c
index 47f6b41c2..08c1c5455 100644
--- a/src/core/gnunet-service-core.c
+++ b/src/core/gnunet-service-core.c
@@ -1096,7 +1096,7 @@ process_encrypted_neighbour_queue (struct Neighbour *n)
n);
if (n->th == NULL)
{
- /* message request too large (oops) */
+ /* message request too large or duplicate request */
GNUNET_break (0);
/* discard encrypted message */
GNUNET_assert (NULL != (m = n->encrypted_head));
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 249ba3de0..e43143168 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -164,48 +164,36 @@ extern "C"
#define GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA 26
/**
- * Message telling transport to try to connect to the
- * given peer.
- */
-#define GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT 27
-
-/**
- * Response to another peer confirming that communication was
- * established.
- */
-#define GNUNET_MESSAGE_TYPE_TRANSPORT_ACK 28
-
-/**
* Request to look addresses of peers in server.
*/
-#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP 29
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP 27
/**
* Response to the address lookup request.
*/
-#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY 30
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_REPLY 28
/**
* Change in blacklisting status of a peer.
*/
-#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST 31
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST 29
/**
* Request to transport to notify us about any blacklisting status
* changes on this connection (and to immediately send all
* active blacklist entries).
*/
-#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST_NOTIFY 32
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_BLACKLIST_NOTIFY 30
/**
* Transport PING message
*/
-#define GNUNET_MESSAGE_TYPE_TRANSPORT_PING 33
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_PING 32
/**
* Transport PONG message
*/
-#define GNUNET_MESSAGE_TYPE_TRANSPORT_PONG 34
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_PONG 33
/**
* Request addition of a HELLO
diff --git a/src/include/gnunet_transport_service.h b/src/include/gnunet_transport_service.h
index 650dd2e68..7a3ccbf7b 100644
--- a/src/include/gnunet_transport_service.h
+++ b/src/include/gnunet_transport_service.h
@@ -22,10 +22,6 @@
* @file include/gnunet_transport_service.h
* @brief low-level P2P IO
* @author Christian Grothoff
- *
- * TODO:
- * - define API for blacklisting, un-blacklisting and notifications
- * about blacklisted peers
*/
#ifndef GNUNET_TRANSPORT_SERVICE_H
diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c
index e2616f078..854eef773 100644
--- a/src/transport/gnunet-service-transport.c
+++ b/src/transport/gnunet-service-transport.c
@@ -197,6 +197,12 @@ struct ForeignAddressList
*/
unsigned int connect_attempts;
+ /**
+ * DV distance to this peer (1 if no DV is used).
+ * FIXME: need to set this from transport plugins!
+ */
+ uint32_t distance;
+
};
@@ -446,6 +452,11 @@ struct NeighborList
struct GNUNET_TIME_Relative latency;
/**
+ * DV distance to this peer (1 if no DV is used).
+ */
+ uint32_t distance;
+
+ /**
* How many bytes have we received since the "last_quota_update"
* timestamp?
*/
@@ -465,9 +476,8 @@ struct NeighborList
unsigned int quota_violation_count;
/**
- * Have we seen an ACK from this neighbor in the past?
- * (used to make up a fake ACK for clients connecting after
- * the neighbor connected to us).
+ * Have we seen an PONG from this neighbor in the past (and
+ * not had a disconnect since)?
*/
int received_pong;
@@ -771,7 +781,6 @@ static struct CheckHelloValidatedContext *chvc_tail;
static struct GNUNET_CONTAINER_MultiHashMap *validation_map;
-
/**
* The peer specified by the given neighbor has timed-out or a plugin
* has disconnected. We may either need to do nothing (other plugins
@@ -1000,6 +1009,30 @@ transmit_to_client (struct TransportClient *client,
/**
+ * Transmit a 'SEND_OK' notification to the given client for the
+ * given neighbor.
+ *
+ * @param client who to notify
+ * @param n neighbor to notify about
+ * @param result status code for the transmission request
+ */
+static void
+transmit_send_ok (struct TransportClient *client,
+ struct NeighborList *n,
+ int result)
+{
+ struct SendOkMessage send_ok_msg;
+
+ send_ok_msg.header.size = htons (sizeof (send_ok_msg));
+ send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
+ send_ok_msg.success = htonl (result);
+ send_ok_msg.latency = GNUNET_TIME_relative_hton (n->latency);
+ send_ok_msg.peer = n->id;
+ transmit_to_client (client, &send_ok_msg.header, GNUNET_NO);
+}
+
+
+/**
* Function called by the GNUNET_TRANSPORT_TransmitFunction
* upon "completion" of a send request. This tells the API
* that it is now legal to send another message to the given
@@ -1019,60 +1052,32 @@ transmit_send_continuation (void *cls,
int result)
{
struct MessageQueue *mq = cls;
- /*struct ReadyList *rl;*/ /* We no longer use the ReadyList for anything here, safe to remove? */
- struct SendOkMessage send_ok_msg;
struct NeighborList *n;
- GNUNET_assert (mq != NULL);
n = find_neighbor(&mq->neighbor_id);
- if (n == NULL) /* Neighbor must have been removed asynchronously! */
- return;
-
- /* Otherwise, let's make sure we've got the right peer */
- GNUNET_assert (0 ==
- memcmp (&n->id, target,
- sizeof (struct GNUNET_PeerIdentity)));
-
- if (result == GNUNET_OK)
+ GNUNET_assert (n != NULL);
+ if (mq->specific_address != NULL)
{
- if (mq->specific_address != NULL)
+ if (result == GNUNET_OK)
{
mq->specific_address->timeout =
GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
mq->specific_address->connected = GNUNET_YES;
- }
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission to peer `%s' failed, marking connection as down.\n",
- GNUNET_i2s (target));
- if (mq->specific_address != NULL)
- mq->specific_address->connected = GNUNET_NO;
+ }
+ else
+ {
+ mq->specific_address->connected = GNUNET_NO;
+ }
+ if (! mq->internal_msg)
+ mq->specific_address->in_transmit = GNUNET_NO;
}
- if ( (! mq->internal_msg) &&
- (mq->specific_address != NULL) )
- mq->specific_address->in_transmit = GNUNET_NO;
-
if (mq->client != NULL)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Notifying client %p about transmission to peer `%4s'.\n",
- mq->client, GNUNET_i2s (target));
- send_ok_msg.header.size = htons (sizeof (send_ok_msg));
- send_ok_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
- send_ok_msg.success = htonl (result);
- send_ok_msg.peer = n->id;
- transmit_to_client (mq->client, &send_ok_msg.header, GNUNET_NO);
- }
+ transmit_send_ok (mq->client, n, result);
GNUNET_free (mq);
- /* one plugin just became ready again, try transmitting
- another message (if available) */
- if (result == GNUNET_OK)
- try_transmission_to_peer (n);
- else
- disconnect_neighbor (n, GNUNET_YES);
+ try_transmission_to_peer (n);
+ if (result != GNUNET_OK)
+ disconnect_neighbor (n, GNUNET_YES);
}
@@ -1159,8 +1164,12 @@ try_transmission_to_peer (struct NeighborList *neighbor)
min_latency = GNUNET_TIME_UNIT_FOREVER_REL;
rl = NULL;
mq = neighbor->messages_head;
+ /* FIXME: support bi-directional use of TCP */
if (mq->specific_address == NULL)
mq->specific_address = find_ready_address(neighbor);
+ GNUNET_CONTAINER_DLL_remove (neighbor->messages_head,
+ neighbor->messages_tail,
+ mq);
if (mq->specific_address == NULL)
{
#if DEBUG_TRANSPORT
@@ -1169,14 +1178,14 @@ try_transmission_to_peer (struct NeighborList *neighbor)
mq->message_buf_size,
GNUNET_i2s (&mq->neighbor_id));
#endif
+ if (mq->client != NULL)
+ transmit_send_ok (mq->client, neighbor, GNUNET_NO);
+ GNUNET_free (mq);
return; /* nobody ready */
}
if (mq->specific_address->connected == GNUNET_NO)
mq->specific_address->connect_attempts++;
rl = mq->specific_address->ready_list;
- GNUNET_CONTAINER_DLL_remove (neighbor->messages_head,
- neighbor->messages_tail,
- mq);
mq->plugin = rl->plugin;
if (!mq->internal_msg)
mq->specific_address->in_transmit = GNUNET_YES;
@@ -1251,7 +1260,6 @@ transmit_to_peer (struct TransportClient *client,
memcpy(&mq->neighbor_id, &neighbor->id, sizeof(struct GNUNET_PeerIdentity));
mq->internal_msg = is_internal;
mq->priority = priority;
-
if (is_internal)
GNUNET_CONTAINER_DLL_insert (neighbor->messages_head,
neighbor->messages_tail,
@@ -1495,14 +1503,20 @@ plugin_env_notify_address (void *cls,
*/
static void
notify_clients_connect (const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_TIME_Relative latency)
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
{
struct ConnectInfoMessage cim;
struct TransportClient *cpos;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Notifying clients about connection from `%s'\n",
+ GNUNET_i2s (peer));
+#endif
cim.header.size = htons (sizeof (struct ConnectInfoMessage));
cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
- cim.quota_out = htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
+ cim.distance = htonl (distance);
cim.latency = GNUNET_TIME_relative_hton (latency);
memcpy (&cim.id, peer, sizeof (struct GNUNET_PeerIdentity));
cpos = clients;
@@ -1523,6 +1537,11 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
struct DisconnectInfoMessage dim;
struct TransportClient *cpos;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Notifying clients about lost connection to `%s'\n",
+ GNUNET_i2s (peer));
+#endif
dim.header.size = htons (sizeof (struct DisconnectInfoMessage));
dim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
dim.reserved = htonl (0);
@@ -1612,6 +1631,7 @@ add_peer_address(struct NeighborList *neighbor,
ret->expires = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
ret->latency = GNUNET_TIME_relative_get_forever();
+ ret->distance = -1;
ret->timeout = GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
ret->ready_list = head;
@@ -1731,6 +1751,12 @@ check_pending_validation (void *cls,
n->latency = fal->latency;
else
n->latency.value = (fal->latency.value + n->latency.value) / 2;
+ n->distance = fal->distance;
+ if (GNUNET_NO == n->received_pong)
+ {
+ notify_clients_connect (&target, n->latency, n->distance);
+ n->received_pong = GNUNET_YES;
+ }
}
/* clean up validation entry */
@@ -1795,6 +1821,7 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
#endif
return;
}
+
#if 0
/* FIXME: add given address to potential pool of our addresses
(for voting) */
@@ -1809,7 +1836,7 @@ handle_pong (void *cls, const struct GNUNET_MessageHeader *message,
static void
neighbor_timeout_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct NeighborList *n = cls;
@@ -1861,13 +1888,13 @@ setup_new_neighbor (const struct GNUNET_PeerIdentity *peer)
tp = tp->next;
}
n->latency = GNUNET_TIME_UNIT_FOREVER_REL;
+ n->distance = -1;
n->timeout_task = GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
&neighbor_timeout_task, n);
transmit_to_peer (NULL, NULL, 0,
(const char *) our_hello, GNUNET_HELLO_size(our_hello),
GNUNET_NO, n);
- notify_clients_connect (peer, GNUNET_TIME_UNIT_FOREVER_REL);
return n;
}
@@ -2266,28 +2293,15 @@ process_hello (struct TransportPlugin *plugin,
* disconnect?
*/
static void
-disconnect_neighbor (struct NeighborList *current_handle, int check)
+disconnect_neighbor (struct NeighborList *n, int check)
{
struct ReadyList *rpos;
struct NeighborList *npos;
struct NeighborList *nprev;
- struct NeighborList *n;
struct MessageQueue *mq;
struct ForeignAddressList *peer_addresses;
struct ForeignAddressList *peer_pos;
- if (neighbors == NULL)
- return; /* We don't have any neighbors, so client has an already removed handle! */
-
- npos = neighbors;
- while ((npos != NULL) && (current_handle != npos))
- npos = npos->next;
-
- if (npos == NULL)
- return; /* Couldn't find neighbor in existing list, must have been already removed! */
- else
- n = npos;
-
if (GNUNET_YES == check)
{
rpos = n->plugins;
@@ -2303,10 +2317,10 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
rpos = rpos->next;
}
}
-
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
- "Disconnecting from `%4s'\n", GNUNET_i2s (&n->id));
+ "Disconnecting from `%4s'\n",
+ GNUNET_i2s (&n->id));
#endif
/* remove n from neighbors list */
nprev = NULL;
@@ -2323,7 +2337,8 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
nprev->next = n->next;
/* notify all clients about disconnect */
- notify_clients_disconnect (&n->id);
+ if (GNUNET_YES == n->received_pong)
+ notify_clients_disconnect (&n->id);
/* clean up all plugins, cancel connections and pending transmissions */
while (NULL != (rpos = n->plugins))
@@ -2352,7 +2367,10 @@ disconnect_neighbor (struct NeighborList *current_handle, int check)
GNUNET_free (mq);
}
if (n->timeout_task != GNUNET_SCHEDULER_NO_TASK)
- GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+ {
+ GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+ n->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
/* finally, free n itself */
GNUNET_free (n);
}
@@ -2504,9 +2522,10 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
peer_address = add_peer_address(n,
plugin->short_name,
sender_address,
- sender_address_len);
+ sender_address_len);
if (peer_address != NULL)
{
+ peer_address->distance = distance;
if (peer_address->connected == GNUNET_NO)
{
peer_address->connected = GNUNET_YES;
@@ -2520,10 +2539,12 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
/* update traffic received amount ... */
msize = ntohs (message->size);
n->last_received += msize;
- GNUNET_SCHEDULER_cancel (sched, n->timeout_task);
+ n->distance = distance;
n->peer_timeout =
GNUNET_TIME_relative_to_absolute
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+ GNUNET_SCHEDULER_cancel (sched,
+ n->timeout_task);
n->timeout_task =
GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
@@ -2551,6 +2572,11 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
handle_pong(plugin, message, peer, sender_address, sender_address_len);
break;
default:
+ if (! n->received_pong)
+ {
+ GNUNET_break_op (0);
+ break;
+ }
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received message of type %u from `%4s', sending to all clients.\n",
@@ -2591,8 +2617,6 @@ handle_start (void *cls,
struct TransportClient *c;
struct ConnectInfoMessage cim;
struct NeighborList *n;
- struct InboundMessage *im;
- struct GNUNET_MessageHeader *ack;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2626,34 +2650,18 @@ handle_start (void *cls,
/* tell new client about all existing connections */
cim.header.size = htons (sizeof (struct ConnectInfoMessage));
cim.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
- cim.quota_out =
- htonl (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT / (60 * 1000));
- /* FIXME: this ACK stuff is not nice! */
- im = GNUNET_malloc (sizeof (struct InboundMessage) +
- sizeof (struct GNUNET_MessageHeader));
- im->header.size = htons (sizeof (struct InboundMessage) +
- sizeof (struct GNUNET_MessageHeader));
- im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
- im->latency = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
- ack = (struct GNUNET_MessageHeader *) &im[1];
- ack->size = htons (sizeof (struct GNUNET_MessageHeader));
- ack->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_ACK);
- for (n = neighbors; n != NULL; n = n->next)
- {
- cim.id = n->id;
- cim.latency = GNUNET_TIME_relative_hton (n->latency);
- transmit_to_client (c, &cim.header, GNUNET_NO);
- if (n->received_pong)
- {
- im->peer = n->id;
- transmit_to_client (c, &im->header, GNUNET_NO);
+ n = neighbors;
+ while (n != NULL)
+ {
+ if (GNUNET_YES == n->received_pong)
+ {
+ cim.id = n->id;
+ cim.latency = GNUNET_TIME_relative_hton (n->latency);
+ cim.distance = htonl (n->distance);
+ transmit_to_client (c, &cim.header, GNUNET_NO);
}
+ n = n->next;
}
- GNUNET_free (im);
- }
- else
- {
- fprintf(stderr, "Our hello is NULL!\n");
}
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -2725,7 +2733,7 @@ handle_send (void *cls,
}
n = find_neighbor (&obm->peer);
if (n == NULL)
- n = setup_new_neighbor (&obm->peer); /* But won't ever add address, we have none! */
+ n = setup_new_neighbor (&obm->peer);
tc = clients;
while ((tc != NULL) && (tc->client != client))
tc = tc->next;
@@ -2789,33 +2797,6 @@ handle_set_quota (void *cls,
}
-/**
- * Handle TRY_CONNECT-message.
- *
- * @param cls closure (always NULL)
- * @param client identification of the client
- * @param message the actual message
- */
-static void
-handle_try_connect (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
-{
- const struct TryConnectMessage *tcm;
- struct NeighborList *neighbor;
- tcm = (const struct TryConnectMessage *) message;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received `%s' request from client %p asking to connect to `%4s'\n",
- "TRY_CONNECT", client, GNUNET_i2s (&tcm->peer));
-#endif
- neighbor = find_neighbor(&tcm->peer);
- if (neighbor == NULL)
- setup_new_neighbor (&tcm->peer);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
static void
transmit_address_to_client (void *cls, const char *address)
{
@@ -2909,9 +2890,6 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = {
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, 0},
{&handle_set_quota, NULL,
GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, sizeof (struct QuotaSetMessage)},
- {&handle_try_connect, NULL,
- GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT,
- sizeof (struct TryConnectMessage)},
{&handle_address_lookup, NULL,
GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_LOOKUP,
0},
diff --git a/src/transport/test_transport_api.c b/src/transport/test_transport_api.c
index b7e34f810..a43f971a0 100644
--- a/src/transport/test_transport_api.c
+++ b/src/transport/test_transport_api.c
@@ -81,7 +81,7 @@ static void
end ()
{
/* do work here */
- GNUNET_assert (ok == 8);
+ GNUNET_assert (ok == 6);
GNUNET_SCHEDULER_cancel (sched, die_task);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from transports!\n");
GNUNET_TRANSPORT_disconnect (p1.th);
@@ -132,7 +132,7 @@ notify_receive (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %d from peer (%p)!\n",
ntohs(message->type), cls);
- GNUNET_assert (ok == 7);
+ GNUNET_assert (ok == 5);
OKPP;
GNUNET_assert (MTYPE == ntohs (message->type));
@@ -144,23 +144,51 @@ notify_receive (void *cls,
}
+static size_t
+notify_ready (void *cls, size_t size, void *buf)
+{
+ struct GNUNET_MessageHeader *hdr;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitting message to peer (%p) - %u!\n", cls, size);
+ GNUNET_assert (size >= 256);
+ GNUNET_assert (ok == 4);
+ OKPP;
+ if (buf != NULL)
+ {
+ hdr = buf;
+ hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
+ hdr->type = htons (MTYPE);
+ }
+
+ return sizeof (struct GNUNET_MessageHeader);
+}
+
+
static void
notify_connect (void *cls,
const struct GNUNET_PeerIdentity *peer,
struct GNUNET_TIME_Relative latency,
uint32_t distance)
{
+ if (cls == &p1)
+ {
+ GNUNET_TRANSPORT_notify_transmit_ready (p1.th,
+ &p2.id,
+ 256, 0, TIMEOUT, &notify_ready,
+ &p1);
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Peer `%4s' connected to us (%p)!\n", GNUNET_i2s (peer), cls);
- GNUNET_assert ((ok >= 1) && (ok <= 6));
- OKPP;
}
static void
notify_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
{
- ok--;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer `%4s' disconnected (%p)!\n",
+ GNUNET_i2s (peer), cls);
}
@@ -186,27 +214,6 @@ setup_peer (struct PeerContext *p, const char *cfgname)
}
-static size_t
-notify_ready (void *cls, size_t size, void *buf)
-{
- struct GNUNET_MessageHeader *hdr;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting message to peer (%p) - %u!\n", cls, size);
- GNUNET_assert (size >= 256);
- GNUNET_assert ((ok >= 5) && (ok <= 6));
- OKPP;
- if (buf != NULL)
- {
- hdr = buf;
- hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
- hdr->type = htons (MTYPE);
- }
-
- return sizeof (struct GNUNET_MessageHeader);
-}
-
-
static void
exchange_hello_last (void *cls,
const struct GNUNET_MessageHeader *message)
@@ -231,11 +238,6 @@ exchange_hello_last (void *cls,
/* both HELLOs exchanged, get ready to test transmission! */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Finished exchanging HELLOs, now waiting for transmission!\n");
-
- GNUNET_TRANSPORT_notify_transmit_ready (p1.th,
- &p2.id,
- 256, 0, TIMEOUT, &notify_ready,
- &p1);
}
static void
diff --git a/src/transport/test_transport_api_tcp_peer2.conf b/src/transport/test_transport_api_tcp_peer2.conf
index 7f7e78574..870a4809e 100644
--- a/src/transport/test_transport_api_tcp_peer2.conf
+++ b/src/transport/test_transport_api_tcp_peer2.conf
@@ -49,7 +49,7 @@ MINIMUM-FRIENDS = 0
[transport]
PLUGINS = tcp
-DEBUG = NO
+#DEBUG = YES
PREFIX =
ALLOW_SHUTDOWN = YES
ACCEPT_FROM6 = ::1;
diff --git a/src/transport/transport.h b/src/transport/transport.h
index 5e92bfb1e..a336cd9b0 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -30,7 +30,7 @@
#include "gnunet_time_lib.h"
#include "gnunet_transport_service.h"
-#define DEBUG_TRANSPORT GNUNET_NO
+#define DEBUG_TRANSPORT GNUNET_YES
/**
* For how long do we allow unused bandwidth
@@ -57,10 +57,9 @@ struct ConnectInfoMessage
struct GNUNET_MessageHeader header;
/**
- * Current quota for outbound traffic in bytes/ms.
- * (should be equal to system default)
+ * Transport distance metric (i.e. hops for DV)
*/
- uint32_t quota_out GNUNET_PACKED;
+ uint32_t distance;
/**
* Latency estimate.
@@ -72,11 +71,6 @@ struct ConnectInfoMessage
*/
struct GNUNET_PeerIdentity id;
- /**
- * Transport distance metric (i.e. hops for DV)
- */
- uint32_t distance;
-
};
@@ -133,31 +127,6 @@ struct QuotaSetMessage
/**
- * Message used to ask the transport service to connect
- * to a particular peer.
- */
-struct TryConnectMessage
-{
-
- /**
- * Type will be GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT.
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Always zero.
- */
- uint32_t reserved GNUNET_PACKED;
-
- /**
- * About which peer are we talking here?
- */
- struct GNUNET_PeerIdentity peer;
-
-};
-
-
-/**
* Message used to notify the transport API about a message
* received from the network. The actual message follows.
*/
@@ -213,6 +182,11 @@ struct SendOkMessage
uint32_t success GNUNET_PACKED;
/**
+ * Latency estimate.
+ */
+ struct GNUNET_TIME_RelativeNBO latency;
+
+ /**
* Which peer can send more now?
*/
struct GNUNET_PeerIdentity peer;
diff --git a/src/transport/transport_api.c b/src/transport/transport_api.c
index ae07421d8..84de9cebe 100644
--- a/src/transport/transport_api.c
+++ b/src/transport/transport_api.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- (C) 2009 Christian Grothoff (and other contributing authors)
+ (C) 2009, 2010 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -25,6 +25,8 @@
*/
#include "platform.h"
#include "gnunet_client_lib.h"
+#include "gnunet_constants.h"
+#include "gnunet_container_lib.h"
#include "gnunet_arm_service.h"
#include "gnunet_hello_lib.h"
#include "gnunet_protocols.h"
@@ -57,167 +59,221 @@
*/
#define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+
/**
- * Entry in linked list of all of our current neighbours.
+ * What stage are we in for transmission processing?
*/
-struct NeighbourList
+enum TransmitStage
+ {
+ /**
+ * No active message.
+ */
+ TS_NEW = 0,
+
+ /**
+ * Message in local queue, not given to service.
+ */
+ TS_QUEUED = 1,
+
+ /**
+ * Message given to service, not confirmed (no SEND_OK).
+ */
+ TS_TRANSMITTED = 2,
+
+ /**
+ * One message was given to service and before it was confirmed,
+ * another one was already queued (waiting for SEND_OK to pass on
+ * to service).
+ */
+ TS_TRANSMITTED_QUEUED = 3
+ };
+
+
+/**
+ * Handle for a transmission-ready request.
+ */
+struct GNUNET_TRANSPORT_TransmitHandle
{
/**
- * This is a linked list.
+ * Neighbour for this handle, NULL for control-traffic.
*/
- struct NeighbourList *next;
+ struct NeighbourList *neighbour;
/**
- * Active transmit handle, can be NULL. Used to move
- * from ready to wait list on disconnect and to block
- * two transmissions to the same peer from being scheduled
- * at the same time.
+ * Function to call when notify_size bytes are available
+ * for transmission.
*/
- struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle;
+ GNUNET_CONNECTION_TransmitReadyNotify notify;
/**
- * Identity of this neighbour.
+ * Closure for notify.
*/
- struct GNUNET_PeerIdentity id;
+ void *notify_cls;
/**
- * At what time did we reset last_sent last?
+ * transmit_ready task Id. The task is used to introduce the
+ * artificial delay that may be required to maintain the bandwidth
+ * limits. Later, this will be the ID of the "transmit_timeout"
+ * task which is used to signal a timeout if the transmission could
+ * not be done in a timely fashion.
*/
- struct GNUNET_TIME_Absolute last_quota_update;
+ GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
/**
- * How many bytes have we sent since the "last_quota_update"
- * timestamp?
+ * Timeout for this request.
*/
- uint64_t last_sent;
+ struct GNUNET_TIME_Absolute timeout;
/**
- * Quota for outbound traffic to the neighbour in bytes/ms.
+ * How many bytes is our notify callback waiting for?
*/
- uint32_t quota_out;
+ size_t notify_size;
/**
- * Set to GNUNET_YES if we are currently allowed to
- * transmit a message to the transport service for this
- * peer, GNUNET_NO otherwise.
+ * How important is this message?
*/
- int transmit_ok;
+ unsigned int priority;
-#if ACK
- /**
- * Set to GNUNET_YES if we have received an ACK for the
- * given peer. Peers that receive our HELLO always respond
- * with an ACK to let us know that we are successfully
- * communicating. Note that a PING can not be used for this
- * since PINGs are only send if a HELLO address requires
- * confirmation (and also, PINGs are not passed to the
- * transport API itself).
- */
- int received_ack;
-#endif
};
/**
- * Linked list of requests from clients for our HELLO
- * that were deferred.
+ * Handle for a control message queue entry.
*/
-struct HelloWaitList
+struct ControlMessage
{
/**
- * This is a linked list.
+ * This is a doubly-linked list.
*/
- struct HelloWaitList *next;
+ struct ControlMessage *next;
/**
- * Reference back to our transport handle.
+ * This is a doubly-linked list.
*/
- struct GNUNET_TRANSPORT_Handle *handle;
+ struct ControlMessage *prev;
/**
- * Callback to call once we got our HELLO.
+ * Overall transport handle.
*/
- GNUNET_TRANSPORT_HelloUpdateCallback rec;
+ struct GNUNET_TRANSPORT_Handle *h;
/**
- * Closure for rec.
+ * Function to call when notify_size bytes are available
+ * for transmission.
*/
- void *rec_cls;
+ GNUNET_CONNECTION_TransmitReadyNotify notify;
+
+ /**
+ * Closure for notify.
+ */
+ void *notify_cls;
+
+ /**
+ * transmit_ready task Id. The task is used to introduce the
+ * artificial delay that may be required to maintain the bandwidth
+ * limits. Later, this will be the ID of the "transmit_timeout"
+ * task which is used to signal a timeout if the transmission could
+ * not be done in a timely fashion.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
+
+ /**
+ * How many bytes is our notify callback waiting for?
+ */
+ size_t notify_size;
};
/**
- * Opaque handle for a transmission-ready request.
+ * Entry in linked list of all of our current neighbours.
*/
-struct GNUNET_TRANSPORT_TransmitHandle
+struct NeighbourList
{
/**
- * We keep the transmit handles that are waiting for
- * a transport-level connection in a doubly linked list.
+ * This is a linked list.
*/
- struct GNUNET_TRANSPORT_TransmitHandle *next;
+ struct NeighbourList *next;
/**
- * We keep the transmit handles that are waiting for
- * a transport-level connection in a doubly linked list.
+ * Overall transport handle.
*/
- struct GNUNET_TRANSPORT_TransmitHandle *prev;
+ struct GNUNET_TRANSPORT_Handle *h;
/**
- * Handle of the main transport data structure.
+ * Active transmit handle; available if 'transmit_forbidden'
+ * is GNUNET_NO.
*/
- struct GNUNET_TRANSPORT_Handle *handle;
+ struct GNUNET_TRANSPORT_TransmitHandle transmit_handle;
/**
- * Neighbour for this handle, can be NULL if the service
- * is not yet connected to the target.
+ * Identity of this neighbour.
*/
- struct NeighbourList *neighbour;
+ struct GNUNET_PeerIdentity id;
/**
- * Which peer is this transmission going to be for? All
- * zeros if it is control-traffic to the service.
+ * At what time did we reset last_sent last?
*/
- struct GNUNET_PeerIdentity target;
+ struct GNUNET_TIME_Absolute last_quota_update;
/**
- * Function to call when notify_size bytes are available
- * for transmission.
+ * How many bytes have we sent since the "last_quota_update"
+ * timestamp?
*/
- GNUNET_CONNECTION_TransmitReadyNotify notify;
+ uint64_t last_sent;
/**
- * Closure for notify.
+ * Quota for outbound traffic to the neighbour in bytes/ms.
*/
- void *notify_cls;
+ uint32_t quota_out;
/**
- * transmit_ready task Id. The task is used to introduce the
- * artificial delay that may be required to maintain the bandwidth
- * limits. Later, this will be the ID of the "transmit_timeout"
- * task which is used to signal a timeout if the transmission could
- * not be done in a timely fashion.
+ * Set to GNUNET_NO if we are currently allowed to accept a
+ * message to the transport service for this peer, GNUNET_YES
+ * if we have one and are waiting for transmission, GNUNET_SYSERR
+ * if we are waiting for confirmation AND have already accepted
+ * yet another message.
*/
- GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
+ enum TransmitStage transmit_stage;
/**
- * Timeout for this request.
+ * Have we received a notification that this peer is connected
+ * to us right now?
*/
- struct GNUNET_TIME_Absolute timeout;
+ int is_connected;
+
+};
+
+
+/**
+ * Linked list of requests from clients for our HELLO that were
+ * deferred.
+ */
+struct HelloWaitList
+{
/**
- * How many bytes is our notify callback waiting for?
+ * This is a linked list.
*/
- size_t notify_size;
+ struct HelloWaitList *next;
/**
- * How important is this message?
+ * Reference back to our transport handle.
*/
- unsigned int priority;
+ struct GNUNET_TRANSPORT_Handle *handle;
+
+ /**
+ * Callback to call once we got our HELLO.
+ */
+ GNUNET_TRANSPORT_HelloUpdateCallback rec;
+
+ /**
+ * Closure for rec.
+ */
+ void *rec_cls;
};
@@ -250,6 +306,16 @@ struct GNUNET_TRANSPORT_Handle
GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
/**
+ * Head of DLL of control messages.
+ */
+ struct ControlMessage *control_head;
+
+ /**
+ * Tail of DLL of control messages.
+ */
+ struct ControlMessage *control_tail;
+
+ /**
* The current HELLO message for this peer. Updated
* whenever transports change their addresses.
*/
@@ -266,26 +332,6 @@ struct GNUNET_TRANSPORT_Handle
struct GNUNET_CLIENT_TransmitHandle *network_handle;
/**
- * Linked list of transmit handles that are waiting for the
- * transport to connect to the respective peer. When we
- * receive notification that the transport connected to a
- * peer, we go over this list and check if someone has already
- * requested a transmission to the new peer; if so, we trigger
- * the next step.
- */
- struct GNUNET_TRANSPORT_TransmitHandle *connect_wait_head;
-
- /**
- * Linked list of transmit handles that are waiting for the
- * transport to be ready for transmission to the respective
- * peer. When we
- * receive notification that the transport disconnected from
- * a peer, we go over this list and move the entry back to
- * the connect_wait list.
- */
- struct GNUNET_TRANSPORT_TransmitHandle *connect_ready_head;
-
- /**
* Linked list of pending requests for our HELLO.
*/
struct HelloWaitList *hwl_head;
@@ -306,27 +352,34 @@ struct GNUNET_TRANSPORT_Handle
struct NeighbourList *neighbours;
/**
- * ID of the task trying to reconnect to the
- * service.
+ * ID of the task trying to reconnect to the service.
*/
GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
/**
- * Delay until we try to reconnect.
+ * ID of the task trying to trigger transmission for a peer
+ * while maintaining bandwidth quotas.
*/
- struct GNUNET_TIME_Relative reconnect_delay;
+ GNUNET_SCHEDULER_TaskIdentifier quota_task;
/**
- * Do we currently have a transmission pending?
- * (schedule transmission was called but has not
- * yet succeeded)?
+ * Delay until we try to reconnect.
*/
- int transmission_scheduled;
+ struct GNUNET_TIME_Relative reconnect_delay;
+
};
+// FIXME: replace with hash map!
+/**
+ * Get the neighbour list entry for the given peer
+ *
+ * @param h our context
+ * @param peer peer to look up
+ * @return NULL if no such peer entry exists
+ */
static struct NeighbourList *
-find_neighbour (struct GNUNET_TRANSPORT_Handle *h,
+neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
const struct GNUNET_PeerIdentity *peer)
{
struct NeighbourList *pos;
@@ -340,51 +393,186 @@ find_neighbour (struct GNUNET_TRANSPORT_Handle *h,
/**
- * Schedule the task to send one message from the
- * connect_ready list to the service.
+ * Schedule the task to send one message, either from the control
+ * list or the peer message queues to the service.
*/
static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
/**
- * Transmit message to client...
+ * Function called by the scheduler when the timeout for bandwidth
+ * availablility for the target neighbour is reached.
+ *
+ * @param cls the 'struct GNUNET_TRANSPORT_Handle*'
+ * @param tc scheduler context
+ */
+static void
+quota_transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_TRANSPORT_Handle *h = cls;
+
+ h->quota_task = GNUNET_SCHEDULER_NO_TASK;
+ schedule_transmission (h);
+}
+
+
+/**
+ * Update the quota values for the given neighbour now.
+ *
+ * @param n neighbour to update
+ */
+static void
+update_quota (struct NeighbourList *n)
+{
+ struct GNUNET_TIME_Relative delta;
+ uint64_t allowed;
+ uint64_t remaining;
+
+ delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
+ allowed = delta.value * n->quota_out;
+ if (n->last_sent < allowed)
+ {
+ remaining = allowed - n->last_sent;
+ if (n->quota_out > 0)
+ remaining /= n->quota_out;
+ else
+ remaining = 0;
+ if (remaining > MAX_BANDWIDTH_CARRY)
+ remaining = MAX_BANDWIDTH_CARRY;
+ n->last_sent = 0;
+ n->last_quota_update = GNUNET_TIME_absolute_get ();
+ n->last_quota_update.value -= remaining;
+ }
+ else
+ {
+ n->last_sent -= allowed;
+ n->last_quota_update = GNUNET_TIME_absolute_get ();
+ }
+}
+
+
+/**
+ * Figure out which transmission to a peer can be done right now.
+ * If none can, schedule a task to call 'schedule_transmission'
+ * whenever a peer transmission can be done in the future and
+ * return NULL. Otherwise return the next transmission to be
+ * performed.
+ *
+ * @param h handle to transport
+ * @return NULL to wait longer before doing any peer transmissions
+ */
+static struct GNUNET_TRANSPORT_TransmitHandle *
+schedule_peer_transmission (struct GNUNET_TRANSPORT_Handle *h)
+{
+ struct GNUNET_TRANSPORT_TransmitHandle *ret;
+ struct GNUNET_TRANSPORT_TransmitHandle *th;
+ struct NeighbourList *n;
+ struct NeighbourList *next;
+ struct GNUNET_TIME_Relative retry_time;
+ struct GNUNET_TIME_Relative duration;
+ uint64_t available;
+
+ if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (h->sched,
+ h->quota_task);
+ h->quota_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
+ ret = NULL;
+ next = h->neighbours;
+ while (NULL != (n = next))
+ {
+ next = n->next;
+ if (n->transmit_stage != TS_QUEUED)
+ continue; /* not eligible */
+ th = &n->transmit_handle;
+ /* check outgoing quota */
+ duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
+ if (duration.value > MIN_QUOTA_REFRESH_TIME)
+ {
+ update_quota (n);
+ duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
+ }
+ available = duration.value * n->quota_out;
+ if (available < n->last_sent + th->notify_size)
+ {
+ /* calculate how much bandwidth we'd still need to
+ accumulate and based on that how long we'll have
+ to wait... */
+ available = n->last_sent + th->notify_size - available;
+ duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ available / n->quota_out);
+ if (th->timeout.value <
+ GNUNET_TIME_relative_to_absolute (duration).value)
+ {
+ /* signal timeout! */
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n",
+ duration.value, GNUNET_i2s (&n->id));
+#endif
+ if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ n->transmit_stage = TS_NEW;
+ if (NULL != th->notify)
+ GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
+ continue;
+ }
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n",
+ GNUNET_i2s (&n->id), duration.value);
+#endif
+ retry_time = GNUNET_TIME_relative_min (retry_time,
+ duration);
+ continue;
+ }
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Bandwidth available for transmission to `%4s'\n",
+ GNUNET_i2s (&n->id));
+#endif
+ if ( (ret == NULL) ||
+ (ret->priority < th->priority) )
+ ret = th;
+ }
+ if (ret == NULL)
+ h->quota_task = GNUNET_SCHEDULER_add_delayed (h->sched,
+ retry_time,
+ &quota_transmit_ready,
+ h);
+ return ret;
+}
+
+
+/**
+ * Transmit message(s) to service.
+ *
+ * @param cls handle to transport
+ * @param size number of bytes available in buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
*/
static size_t
transport_notify_ready (void *cls, size_t size, void *buf)
{
struct GNUNET_TRANSPORT_Handle *h = cls;
+ struct ControlMessage *cm;
struct GNUNET_TRANSPORT_TransmitHandle *th;
struct NeighbourList *n;
+ struct OutboundMessage obm;
size_t ret;
+ size_t mret;
char *cbuf;
h->network_handle = NULL;
- h->transmission_scheduled = GNUNET_NO;
if (buf == NULL)
{
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Could not transmit to transport service, cancelling pending requests\n");
-#endif
- th = h->connect_ready_head;
- if (th->next != NULL)
- th->next->prev = NULL;
- h->connect_ready_head = th->next;
- if (NULL != (n = th->neighbour))
- {
- GNUNET_assert (n->transmit_handle == th);
- n->transmit_handle = NULL;
- }
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != th->notify)
- GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
- GNUNET_free (th);
- if (h->connect_ready_head != NULL)
- schedule_transmission (h); /* FIXME: is this ok? */
+ schedule_transmission (h);
return 0;
}
#if DEBUG_TRANSPORT
@@ -393,35 +581,64 @@ transport_notify_ready (void *cls, size_t size, void *buf)
#endif
cbuf = buf;
ret = 0;
- h->network_handle = NULL;
- h->transmission_scheduled = GNUNET_NO;
- while ((h->connect_ready_head != NULL) &&
- (h->connect_ready_head->notify_size <= size))
+ while ( (NULL != (cm = h->control_head)) &&
+ (cm->notify_size <= size) )
+ {
+ if (cm->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (h->sched, cm->notify_delay_task);
+ cm->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_CONTAINER_DLL_remove (h->control_head,
+ h->control_tail,
+ cm);
+ ret += cm->notify (cm->notify_cls, size, &cbuf[ret]);
+ GNUNET_free (cm);
+ size -= ret;
+ }
+ while ( (NULL != (th = schedule_peer_transmission (h))) &&
+ (th->notify_size <= size) )
{
- th = h->connect_ready_head;
if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
}
- GNUNET_assert (th->notify_size <= size);
- if (th->next != NULL)
- th->next->prev = NULL;
- h->connect_ready_head = th->next;
- if (NULL != (n = th->neighbour))
- {
- GNUNET_assert (n->transmit_handle == th);
- n->transmit_handle = NULL;
- }
- if (NULL != th->notify)
- ret += th->notify (th->notify_cls, size, &cbuf[ret]);
- GNUNET_free (th);
- if (n != NULL)
- n->last_sent += ret;
- size -= ret;
+ n = th->neighbour;
+ switch (n->transmit_stage)
+ {
+ case TS_NEW:
+ GNUNET_break (0);
+ break;
+ case TS_QUEUED:
+ n->transmit_stage = TS_TRANSMITTED;
+ break;
+ case TS_TRANSMITTED:
+ GNUNET_break (0);
+ break;
+ case TS_TRANSMITTED_QUEUED:
+ GNUNET_break (0);
+ break;
+ default:
+ GNUNET_break (0);
+ }
+ GNUNET_assert (size >= sizeof (struct OutboundMessage));
+ mret = th->notify (th->notify_cls,
+ size - sizeof (struct OutboundMessage),
+ &cbuf[ret + sizeof (struct OutboundMessage)]);
+ GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
+ if (mret != 0)
+ {
+ obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
+ obm.header.size = htons (mret + sizeof (struct OutboundMessage));
+ obm.priority = htonl (th->priority);
+ obm.peer = n->id;
+ memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage));
+ ret += (mret + sizeof (struct OutboundMessage));
+ size -= (mret + sizeof (struct OutboundMessage));
+ }
}
- if (h->connect_ready_head != NULL)
- schedule_transmission (h);
+ schedule_transmission (h);
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting %u bytes to transport service\n", ret);
@@ -431,155 +648,73 @@ transport_notify_ready (void *cls, size_t size, void *buf)
/**
- * Schedule the task to send one message from the
- * connect_ready list to the service.
+ * Schedule the task to send one message, either from the control
+ * list or the peer message queues to the service.
*/
static void
schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
-{
+{
+ size_t size;
+ struct GNUNET_TIME_Relative timeout;
struct GNUNET_TRANSPORT_TransmitHandle *th;
- GNUNET_assert (NULL == h->network_handle);
+ if (NULL != h->network_handle)
+ return;
if (h->client == NULL)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Could not yet schedule transmission: we are not yet connected to the transport service!\n");
return; /* not yet connected */
}
- th = h->connect_ready_head;
- if (th == NULL)
- return; /* no request pending */
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+ if (NULL != h->control_head)
{
- /* remove existing time out task, will be integrated
- with transmit_ready notification! */
- GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- }
- h->transmission_scheduled = GNUNET_YES;
- h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client,
- th->notify_size,
- GNUNET_TIME_absolute_get_remaining
- (th->timeout),
- GNUNET_NO,
- &transport_notify_ready,
- h);
- GNUNET_assert (NULL != h->network_handle);
-}
-
-
-/**
- * Insert the given transmit handle in the given sorted
- * doubly linked list based on timeout.
- *
- * @param head pointer to the head of the linked list
- * @param th element to insert into the list
- */
-static void
-insert_transmit_handle (struct GNUNET_TRANSPORT_TransmitHandle **head,
- struct GNUNET_TRANSPORT_TransmitHandle *th)
-{
- struct GNUNET_TRANSPORT_TransmitHandle *pos;
- struct GNUNET_TRANSPORT_TransmitHandle *prev;
-
- pos = *head;
- prev = NULL;
- while ((pos != NULL) && (pos->timeout.value < th->timeout.value))
- {
- prev = pos;
- pos = pos->next;
- }
- if (prev == NULL)
- {
- th->next = *head;
- if (th->next != NULL)
- th->next->prev = th;
- *head = th;
- }
- else
- {
- th->next = pos;
- th->prev = prev;
- prev->next = th;
- if (pos != NULL)
- pos->prev = th;
- }
-}
-
-
-/**
- * Cancel a pending notify delay task (if pending) and also remove the
- * given transmit handle from whatever list is on.
- *
- * @param th handle for the transmission request to manipulate
- */
-static void
-remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
-{
- struct GNUNET_TRANSPORT_Handle *h;
-
- h = th->handle;
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (th->prev == NULL)
- {
- if (th == h->connect_wait_head)
- h->connect_wait_head = th->next;
- else
- h->connect_ready_head = th->next;
+ size = h->control_head->notify_size;
+ timeout = GNUNET_TIME_UNIT_FOREVER_REL;
}
else
{
- th->prev->next = th->next;
+ th = schedule_peer_transmission (h);
+ if (th == NULL)
+ {
+ /* no transmission ready right now */
+ return;
+ }
+ size = th->notify_size;
+ timeout = GNUNET_TIME_absolute_get_remaining (th->timeout);
}
- if (th->next != NULL)
- th->next->prev = th->prev;
+ h->network_handle =
+ GNUNET_CLIENT_notify_transmit_ready (h->client,
+ size,
+ timeout,
+ GNUNET_NO,
+ &transport_notify_ready,
+ h);
+ GNUNET_assert (NULL != h->network_handle);
}
/**
- * Schedule a request to connect to the given
- * neighbour (and if successful, add the specified
- * handle to the wait list).
- *
- * @param th handle for a request to transmit once we
- * have connected
- */
-static void try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th);
-
-
-/**
* Called when our transmit request timed out before any transport
* reported success connecting to the desired peer or before the
* transport was ready to receive. Signal error and free
* TransmitHandle.
*/
static void
-peer_transmit_timeout (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+control_transmit_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
+ struct ControlMessage *th = cls;
th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- if (th->neighbour != NULL)
- th->neighbour->transmit_handle = NULL;
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Request for transmission to peer `%s' timed out.\n",
- GNUNET_i2s (&th->target));
-#endif
- remove_from_any_list (th);
if (NULL != th->notify)
th->notify (th->notify_cls, 0, NULL);
+ GNUNET_CONTAINER_DLL_remove (th->h->control_head,
+ th->h->control_tail,
+ th);
GNUNET_free (th);
}
-
-
/**
* Queue control request for transmission to the transport
* service.
@@ -600,68 +735,31 @@ schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
GNUNET_CONNECTION_TransmitReadyNotify notify,
void *notify_cls)
{
- struct GNUNET_TRANSPORT_TransmitHandle *th;
+ struct ControlMessage *th;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Control transmit of %u bytes within %llums requested\n",
size, (unsigned long long) timeout.value);
#endif
- th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
- th->handle = h;
+ th = GNUNET_malloc (sizeof (struct ControlMessage));
+ th->h = h;
th->notify = notify;
th->notify_cls = notify_cls;
- th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
th->notify_size = size;
th->notify_delay_task
= GNUNET_SCHEDULER_add_delayed (h->sched,
- timeout, &peer_transmit_timeout, th);
- if (at_head)
- {
- th->next = h->connect_ready_head;
- h->connect_ready_head = th;
- if (th->next != NULL)
- th->next->prev = th;
- }
+ timeout, &control_transmit_timeout, th);
+ if (at_head)
+ GNUNET_CONTAINER_DLL_insert (h->control_head,
+ h->control_tail,
+ th);
else
- {
- insert_transmit_handle (&h->connect_ready_head, th);
- }
- if (GNUNET_NO == h->transmission_scheduled)
- schedule_transmission (h);
-}
-
-
-/**
- * Update the quota values for the given neighbour now.
- */
-static void
-update_quota (struct NeighbourList *n)
-{
- struct GNUNET_TIME_Relative delta;
- uint64_t allowed;
- uint64_t remaining;
-
- delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
- allowed = delta.value * n->quota_out;
- if (n->last_sent < allowed)
- {
- remaining = allowed - n->last_sent;
- if (n->quota_out > 0)
- remaining /= n->quota_out;
- else
- remaining = 0;
- if (remaining > MAX_BANDWIDTH_CARRY)
- remaining = MAX_BANDWIDTH_CARRY;
- n->last_sent = 0;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->last_quota_update.value -= remaining;
- }
- else
- {
- n->last_sent -= allowed;
- n->last_quota_update = GNUNET_TIME_absolute_get ();
- }
+ GNUNET_CONTAINER_DLL_insert_after (h->control_head,
+ h->control_tail,
+ h->control_tail,
+ th);
+ schedule_transmission (h);
}
@@ -681,6 +779,14 @@ struct SetQuotaContext
};
+/**
+ * Send SET_QUOTA message to the service.
+ *
+ * @param cls the 'struct SetQuotaContext'
+ * @param size number of bytes available in buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
+ */
static size_t
send_set_quota (void *cls, size_t size, void *buf)
{
@@ -699,7 +805,8 @@ send_set_quota (void *cls, size_t size, void *buf)
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting `%s' request with respect to `%4s'.\n",
- "SET_QUOTA", GNUNET_i2s (&sqc->target));
+ "SET_QUOTA",
+ GNUNET_i2s (&sqc->target));
#endif
GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
msg = buf;
@@ -742,7 +849,7 @@ GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
struct NeighbourList *n;
struct SetQuotaContext *sqc;
- n = find_neighbour (handle, target);
+ n = neighbour_find (handle, target);
if (n != NULL)
{
update_quota (n);
@@ -830,6 +937,14 @@ GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle,
}
+/**
+ * Send HELLO message to the service.
+ *
+ * @param cls the HELLO message to send
+ * @param size number of bytes available in buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
+ */
static size_t
send_hello (void *cls, size_t size, void *buf)
{
@@ -873,14 +988,6 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
struct GNUNET_MessageHeader *hc;
uint16_t size;
- if (handle->client == NULL)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Not connected to transport service, dropping offered HELLO\n");
-#endif
- return;
- }
GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
size = ntohs (hello->size);
GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
@@ -893,11 +1000,13 @@ GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
/**
- * Function we use for handling incoming messages.
+ * Transmit START message to service.
+ *
+ * @param cls unused
+ * @param size number of bytes available in buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
*/
-static void demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg);
-
-
static size_t
send_start (void *cls, size_t size, void *buf)
{
@@ -905,9 +1014,10 @@ send_start (void *cls, size_t size, void *buf)
if (buf == NULL)
{
+ /* Can only be shutdown, just give up */
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout while trying to transmit `%s' request.\n",
+ "Shutdown while trying to transmit `%s' request.\n",
"START");
#endif
return 0;
@@ -924,186 +1034,101 @@ send_start (void *cls, size_t size, void *buf)
/**
- * We're ready to transmit the request that the transport service
- * should connect to a new peer. In addition to sending the
- * request, schedule the next phase for the transmission processing
- * that caused the connect request in the first place.
- */
-static size_t
-request_connect (void *cls, size_t size, void *buf)
-{
- struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
- struct TryConnectMessage *tcm;
- struct GNUNET_TRANSPORT_Handle *h;
-
- GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
- h = th->handle;
-
- if (buf == NULL)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Failed to transmit `%s' request for `%4s' to service.\n",
- "TRY_CONNECT", GNUNET_i2s (&th->target));
-#endif
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != th->notify)
- GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
- GNUNET_free (th);
- return 0;
- }
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting `%s' message for `%4s' (need connection in %llu ms).\n",
- "TRY_CONNECT", GNUNET_i2s (&th->target),
- GNUNET_TIME_absolute_get_remaining (th->timeout).value);
-#endif
- GNUNET_assert (size >= sizeof (struct TryConnectMessage));
- tcm = buf;
- tcm->header.size = htons (sizeof (struct TryConnectMessage));
- tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT);
- tcm->reserved = htonl (0);
- memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity));
- th->notify_delay_task
- = GNUNET_SCHEDULER_add_delayed (h->sched,
- GNUNET_TIME_absolute_get_remaining
- (th->timeout),
- &peer_transmit_timeout, th);
- insert_transmit_handle (&h->connect_wait_head, th);
- return sizeof (struct TryConnectMessage);
-}
-
-
-/**
- * Schedule a request to connect to the given
- * neighbour (and if successful, add the specified
- * handle to the wait list).
- *
- * @param th handle for a request to transmit once we
- * have connected
- */
-static void
-try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th)
-{
- GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
- schedule_control_transmit (th->handle,
- sizeof (struct TryConnectMessage),
- GNUNET_NO,
- GNUNET_TIME_absolute_get_remaining (th->timeout),
- &request_connect, th);
-}
-
-
-/**
- * Task for delayed attempts to reconnect to a peer.
- *
- * @param cls must be a transmit handle that determines the peer
- * to which we will try to connect
- * @param tc scheduler information about why we were triggered (not used)
- */
-static void
-try_connect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
-
- th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- try_connect (th);
-}
-
-
-/**
- * Remove neighbour from our list. Will automatically
- * trigger a re-connect attempt if we have messages pending
- * for this peer.
+ * Free neighbour.
*
* @param h our state
- * @param peer the peer to remove
+ * @param n the entry to free
*/
static void
-remove_neighbour (struct GNUNET_TRANSPORT_Handle *h,
- const struct GNUNET_PeerIdentity *peer)
+neighbour_free (struct NeighbourList *n)
{
+ struct GNUNET_TRANSPORT_Handle *h;
struct NeighbourList *prev;
struct NeighbourList *pos;
- struct GNUNET_TRANSPORT_TransmitHandle *th;
+ h = n->h;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Removing neighbour `%s' from list of connected peers.\n",
- GNUNET_i2s (peer));
+ GNUNET_i2s (&n->id));
#endif
+ GNUNET_break (n->is_connected == GNUNET_NO);
+ GNUNET_break (n->transmit_stage == TS_NEW);
+
prev = NULL;
pos = h->neighbours;
- while ((pos != NULL) &&
- (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
+ while (pos != n)
{
prev = pos;
pos = pos->next;
}
- if (pos == NULL)
- {
- GNUNET_break (0);
- return;
- }
if (prev == NULL)
- h->neighbours = pos->next;
+ h->neighbours = n->next;
else
- prev->next = pos->next;
- if (NULL != (th = pos->transmit_handle))
- {
- pos->transmit_handle = NULL;
- th->neighbour = NULL;
- remove_from_any_list (th);
- if (GNUNET_TIME_absolute_get_remaining (th->timeout).value <=
- CONNECT_RETRY_TIMEOUT.value)
- {
- /* signal error */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _
- ("Connection with `%4s' failed and timeout was in the past, giving up on message delivery.\n"),
- GNUNET_i2s (peer));
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task);
- peer_transmit_timeout (th, NULL);
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _
- ("Connection with `%4s' failed, will keep trying for %llu ms to deliver message\n"),
- GNUNET_i2s (peer),
- GNUNET_TIME_absolute_get_remaining (th->timeout).value);
- /* try again in a bit */
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == th->notify_delay_task);
- th->notify_delay_task
- = GNUNET_SCHEDULER_add_delayed (h->sched,
- CONNECT_RETRY_TIMEOUT,
- &try_connect_task, th);
- }
- }
+ prev->next = n->next;
+ GNUNET_free (n);
+}
+
+
+/**
+ * Mark neighbour as disconnected.
+ *
+ * @param n the entry to mark as disconnected
+ */
+static void
+neighbour_disconnect (struct NeighbourList *n)
+{
+ struct GNUNET_TRANSPORT_Handle *h = n->h;
+#if DEBUG_TRANSPORT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Removing neighbour `%s' from list of connected peers.\n",
+ GNUNET_i2s (&n->id));
+#endif
+ GNUNET_break (n->is_connected == GNUNET_YES);
+ n->is_connected = GNUNET_NO;
if (h->nc_cb != NULL)
- h->nd_cb (h->cls, peer);
- GNUNET_free (pos);
+ h->nd_cb (h->cls, &n->id);
+ if (n->transmit_stage == TS_NEW)
+ neighbour_free (n);
}
/**
+ * Function we use for handling incoming messages.
+ *
+ * @param cls closure (struct GNUNET_TRANSPORT_Handle *)
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void demultiplexer (void *cls,
+ const struct GNUNET_MessageHeader *msg);
+
+
+/**
* Try again to connect to transport service.
+ *
+ * @param cls the handle to the transport service
+ * @param tc scheduler context
*/
static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+reconnect (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_TRANSPORT_Handle *h = cls;
- struct GNUNET_TRANSPORT_TransmitHandle *pos;
+ struct ControlMessage *pos;
struct NeighbourList *n;
- /* Forget about all neighbours that we used to be connected
- to */
- while (NULL != (n = h->neighbours))
- remove_neighbour (h, &n->id);
+ if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
+ {
+ /* shutdown, just give up */
+ return;
+ }
+ /* Forget about all neighbours that we used to be connected to */
+ n = h->neighbours;
+ while (NULL != n)
+ {
+ neighbour_disconnect (n);
+ n = n->next;
+ }
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
#endif
@@ -1111,20 +1136,16 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg);
GNUNET_assert (h->client != NULL);
- /* make sure we don't send "START" twice,
- remove existing entry from queue (if present) */
- pos = h->connect_ready_head;
+ /* make sure we don't send "START" twice, remove existing entry from
+ queue (if present) */
+ pos = h->control_head;
while (pos != NULL)
{
if (pos->notify == &send_start)
{
- if (pos->prev == NULL)
- h->connect_ready_head = pos->next;
- else
- pos->prev->next = pos->next;
- if (pos->next != NULL)
- pos->next->prev = pos->prev;
- GNUNET_assert (pos->neighbour == NULL);
+ GNUNET_CONTAINER_DLL_remove (h->control_head,
+ h->control_tail,
+ pos);
if (GNUNET_SCHEDULER_NO_TASK != pos->notify_delay_task)
{
GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
@@ -1147,6 +1168,8 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
/**
* Function that will schedule the job that will try
* to connect us again to the client.
+ *
+ * @param h transport service to reconnect
*/
static void
schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
@@ -1161,215 +1184,47 @@ schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
h->reconnect_task
= GNUNET_SCHEDULER_add_delayed (h->sched,
h->reconnect_delay, &reconnect, h);
- h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS;
-}
-
-
-/**
- * We are connected to the respective peer, check the
- * bandwidth limits and schedule the transmission.
- */
-static void schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th);
-
-
-/**
- * Function called by the scheduler when the timeout
- * for bandwidth availablility for the target
- * neighbour is reached.
- */
-static void
-transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
-
- th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- schedule_request (th);
-}
-
-
-/**
- * Remove the given transmit handle from the wait list. Does NOT free
- * it.
- */
-static void
-remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
-{
- if (th->prev == NULL)
- th->handle->connect_wait_head = th->next;
- else
- th->prev->next = th->next;
- if (th->next != NULL)
- th->next->prev = th->prev;
-}
-
-
-/**
- * We are connected to the respective peer, check the
- * bandwidth limits and schedule the transmission.
- */
-static void
-schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th)
-{
- struct GNUNET_TRANSPORT_Handle *h;
- struct GNUNET_TIME_Relative duration;
- struct NeighbourList *n;
- uint64_t available;
-
- h = th->handle;
- n = th->neighbour;
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+ if (h->reconnect_delay.value == 0)
{
- GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
}
- /* check outgoing quota */
- duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
- if (duration.value > MIN_QUOTA_REFRESH_TIME)
+ else
{
- update_quota (n);
- duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
- }
- available = duration.value * n->quota_out;
- if (available < n->last_sent + th->notify_size)
- {
- /* calculate how much bandwidth we'd still need to
- accumulate and based on that how long we'll have
- to wait... */
- available = n->last_sent + th->notify_size - available;
- duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
- available / n->quota_out);
- if (th->timeout.value <
- GNUNET_TIME_relative_to_absolute (duration).value)
- {
- /* signal timeout! */
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long. Signaling timeout.\n",
- duration.value, GNUNET_i2s (&th->target));
-#endif
- remove_from_wait_list (th);
- if (NULL != th->notify)
- GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
- GNUNET_free (th);
- return;
- }
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n",
- GNUNET_i2s (&th->target), duration.value);
-#endif
- th->notify_delay_task
- = GNUNET_SCHEDULER_add_delayed (h->sched,
- duration, &transmit_ready, th);
- return;
+ h->reconnect_delay = GNUNET_TIME_relative_multiply (h->reconnect_delay, 2);
+ h->reconnect_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
+ h->reconnect_delay);
}
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Bandwidth available for transmission to `%4s'\n",
- GNUNET_i2s (&n->id));
-#endif
- if (GNUNET_NO == n->transmit_ok)
- {
- /* we may be ready, but transport service is not;
- wait for SendOkMessage or timeout */
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Need to wait for transport service `%s' message\n",
- "SEND_OK");
-#endif
- th->notify_delay_task
- = GNUNET_SCHEDULER_add_delayed (h->sched,
- GNUNET_TIME_absolute_get_remaining
- (th->timeout), &peer_transmit_timeout,
- th);
- return;
- }
- n->transmit_ok = GNUNET_NO;
- remove_from_wait_list (th);
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Moving message for `%4s' to ready list\n",
- GNUNET_i2s (&n->id));
-#endif
- insert_transmit_handle (&h->connect_ready_head, th);
- if (GNUNET_NO == h->transmission_scheduled)
- schedule_transmission (h);
}
/**
* Add neighbour to our list
*/
-static void
-add_neighbour (struct GNUNET_TRANSPORT_Handle *h,
- uint32_t quota_out,
- struct GNUNET_TIME_Relative latency,
- uint16_t distance,
+static struct NeighbourList *
+neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
const struct GNUNET_PeerIdentity *pid)
{
struct NeighbourList *n;
- struct GNUNET_TRANSPORT_TransmitHandle *prev;
- struct GNUNET_TRANSPORT_TransmitHandle *pos;
- struct GNUNET_TRANSPORT_TransmitHandle *next;
/* check for duplicates */
- if (NULL != find_neighbour (h, pid))
+ if (NULL != (n = neighbour_find (h, pid)))
{
GNUNET_break (0);
- return;
+ return n;
}
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating entry for new neighbour `%4s'.\n", GNUNET_i2s (pid));
+ "Creating entry for neighbour `%4s'.\n",
+ GNUNET_i2s (pid));
#endif
n = GNUNET_malloc (sizeof (struct NeighbourList));
n->id = *pid;
n->last_quota_update = GNUNET_TIME_absolute_get ();
- n->quota_out = quota_out;
n->next = h->neighbours;
- n->transmit_ok = GNUNET_YES;
- h->neighbours = n;
- if (h->nc_cb != NULL)
- h->nc_cb (h->cls, &n->id, latency, distance);
- prev = NULL;
- pos = h->connect_wait_head;
- while (pos != NULL)
- {
- next = pos->next;
- if (0 == memcmp (pid,
- &pos->target, sizeof (struct GNUNET_PeerIdentity)))
- {
- pos->neighbour = n;
- GNUNET_assert (NULL == n->transmit_handle);
- n->transmit_handle = pos;
- if (prev == NULL)
- h->connect_wait_head = next;
- else
- prev->next = next;
-#if ACK
- if (GNUNET_YES == n->received_ack)
- {
-#endif
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found pending request for `%4s' will trigger it now.\n",
- GNUNET_i2s (&pos->target));
-#endif
- if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
- pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- }
- schedule_request (pos);
-#if ACK
- }
-#endif
-
- break;
- }
- prev = pos;
- pos = next;
- }
+ n->quota_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
+ n->h = h;
+ h->neighbours = n;
+ return n;
}
@@ -1377,7 +1232,6 @@ add_neighbour (struct GNUNET_TRANSPORT_Handle *h,
* Connect to the transport service. Note that the connection may
* complete (or fail) asynchronously.
*
-
* @param sched scheduler to use
* @param cfg configuration to use
* @param cls closure for the callbacks
@@ -1423,44 +1277,29 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
#endif
- while (NULL != (th = handle->connect_ready_head))
- {
- handle->connect_ready_head = th->next;
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != th->notify)
- GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
- GNUNET_free (th);
- }
- while (NULL != (th = handle->connect_wait_head))
- {
- handle->connect_wait_head = th->next;
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != th->notify)
- GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
- GNUNET_free (th);
- }
while (NULL != (n = handle->neighbours))
{
handle->neighbours = n->next;
- if (NULL != (th = n->transmit_handle))
- {
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != th->notify)
- GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
- GNUNET_free (th);
- }
+ switch (n->transmit_stage)
+ {
+ case TS_NEW:
+ case TS_TRANSMITTED:
+ /* nothing to do */
+ break;
+ case TS_QUEUED:
+ case TS_TRANSMITTED_QUEUED:
+ th = &n->transmit_handle;
+ if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (handle->sched,
+ th->notify_delay_task);
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
+ break;
+ default:
+ GNUNET_break (0);
+ }
GNUNET_free (n);
}
while (NULL != (hwl = handle->hwl_head))
@@ -1479,6 +1318,11 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task);
handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
}
+ if (handle->quota_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (handle->sched, handle->quota_task);
+ handle->quota_task = GNUNET_SCHEDULER_NO_TASK;
+ }
GNUNET_free_non_null (handle->my_hello);
handle->my_hello = NULL;
GNUNET_ARM_stop_services (handle->cfg, handle->sched, "transport",
@@ -1502,10 +1346,9 @@ GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
/**
- * Type of a function to call when we receive a message
- * from the service.
+ * Function we use for handling incoming messages.
*
- * @param cls closure
+ * @param cls closure (struct GNUNET_TRANSPORT_Handle *)
* @param msg message received, NULL on timeout or fatal error
*/
static void
@@ -1521,59 +1364,29 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
struct HelloWaitList *next_hwl;
struct NeighbourList *n;
struct GNUNET_PeerIdentity me;
- struct GNUNET_TRANSPORT_TransmitHandle *th;
-
- struct GNUNET_TRANSPORT_TransmitHandle *prev;
- struct GNUNET_TRANSPORT_TransmitHandle *pos;
- struct GNUNET_TRANSPORT_TransmitHandle *next;
uint16_t size;
- if ((msg == NULL) || (h->client == NULL))
+ if (h->client == NULL)
+ {
+ /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
+ finish clean up work! */
+ GNUNET_free (h);
+ return;
+ }
+ if (msg == NULL)
{
- if (h->client != NULL)
- {
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Error receiving from transport service, disconnecting temporarily.\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Error receiving from transport service, disconnecting temporarily.\n");
#endif
- if (h->network_handle != NULL)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle);
- h->network_handle = NULL;
- h->transmission_scheduled = GNUNET_NO;
- th = h->connect_ready_head;
- /* add timeout again, we canceled the transmit_ready task! */
-
- /*GNUNET_assert (th->notify_delay_task ==
- GNUNET_SCHEDULER_NO_TASK);*/
-
- /* START - somehow we are getting here when th->notify_delay_task is already
- * set. Not sure why, so just checking and canceling instead of asserting and
- * dying. Probably not a *fix*. */
- if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
- th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- }
- /* END */
- GNUNET_assert (th->notify_delay_task ==
- GNUNET_SCHEDULER_NO_TASK);
- th->notify_delay_task =
- GNUNET_SCHEDULER_add_delayed (h->sched,
- GNUNET_TIME_absolute_get_remaining
- (th->timeout),
- &peer_transmit_timeout, th);
- }
- GNUNET_CLIENT_disconnect (h->client);
- h->client = NULL;
- schedule_reconnect (h);
- }
- else
- {
- /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
- finish clean up work! */
- GNUNET_free (h);
- }
+ if (h->network_handle != NULL)
+ {
+ GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle);
+ h->network_handle = NULL;
+ }
+ GNUNET_CLIENT_disconnect (h->client);
+ h->client = NULL;
+ schedule_reconnect (h);
return;
}
GNUNET_CLIENT_receive (h->client,
@@ -1624,81 +1437,16 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
"Receiving `%s' message for `%4s'.\n",
"CONNECT", GNUNET_i2s (&cim->id));
#endif
- if (NULL == (n = find_neighbour(h, &cim->id)))
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Don't know neighbor, adding!\n");
-#endif
- add_neighbour (h,
- ntohl (cim->quota_out),
- GNUNET_TIME_relative_ntoh (cim->latency), ntohs(cim->distance), &cim->id);
- }
- else
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Do know neighbor, scheduling transmission!\n");
-#endif
-#if ACK
- n->received_ack = GNUNET_YES;
-#endif
- if (NULL != n->transmit_handle)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer connected, scheduling delayed message for delivery now.\n");
-#endif
- schedule_request (n->transmit_handle);
- }
- else
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmit handle is null... Checking for pending stuff(?)\n");
-#endif
- prev = NULL;
- pos = h->connect_wait_head;
- while (pos != NULL)
- {
- next = pos->next;
- if (0 == memcmp (&cim->id,
- &pos->target, sizeof (struct GNUNET_PeerIdentity)))
- {
- pos->neighbour = n;
- GNUNET_assert (NULL == n->transmit_handle);
- n->transmit_handle = pos;
- if (prev == NULL)
- h->connect_wait_head = next;
- else
- prev->next = next;
-#if ACK
- if (GNUNET_YES == n->received_ack)
- {
-#endif
- #if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found pending request for `%4s' will trigger it now.\n",
- GNUNET_i2s (&pos->target));
- #endif
- if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
- pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- }
- schedule_request (pos);
-#if ACK
- }
-#endif
-
- break;
- }
- prev = pos;
- pos = next;
- }
- }
- }
-
+ n = neighbour_find (h, &cim->id);
+ if (n == NULL)
+ n = neighbour_add (h,
+ &cim->id);
+ GNUNET_break (n->is_connected == GNUNET_NO);
+ n->is_connected = GNUNET_YES;
+ if (h->nc_cb != NULL)
+ h->nc_cb (h->cls, &n->id,
+ GNUNET_TIME_relative_ntoh (cim->latency),
+ ntohs (cim->distance));
break;
case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
if (size != sizeof (struct DisconnectInfoMessage))
@@ -1710,9 +1458,13 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Receiving `%s' message for `%4s'.\n",
- "DISCONNECT", GNUNET_i2s (&dim->peer));
+ "DISCONNECT",
+ GNUNET_i2s (&dim->peer));
#endif
- remove_neighbour (h, &dim->peer);
+ n = neighbour_find (h, &cim->id);
+ GNUNET_break (n != NULL);
+ if (n != NULL)
+ neighbour_disconnect (n);
break;
case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
if (size != sizeof (struct SendOkMessage))
@@ -1726,21 +1478,26 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
"Receiving `%s' message, transmission %s.\n", "SEND_OK",
ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
#endif
- n = find_neighbour (h, &okm->peer);
+ n = neighbour_find (h, &okm->peer);
GNUNET_assert (n != NULL);
- n->transmit_ok = GNUNET_YES;
- if (n->transmit_handle != NULL)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Processing pending message for `%4s'\n",
- GNUNET_i2s (&n->id));
-#endif
- GNUNET_SCHEDULER_cancel (h->sched,
- n->transmit_handle->notify_delay_task);
- n->transmit_handle->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
- schedule_request (n->transmit_handle);
- }
+ switch (n->transmit_stage)
+ {
+ case TS_NEW:
+ GNUNET_break (0);
+ break;
+ case TS_QUEUED:
+ GNUNET_break (0);
+ break;
+ case TS_TRANSMITTED:
+ n->transmit_stage = TS_NEW;
+ break;
+ case TS_TRANSMITTED_QUEUED:
+ n->transmit_stage = TS_QUEUED;
+ schedule_transmission (h);
+ break;
+ default:
+ GNUNET_break (0);
+ }
break;
case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
#if DEBUG_TRANSPORT
@@ -1761,42 +1518,20 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
GNUNET_break (0);
break;
}
- switch (ntohs (imm->type))
- {
- case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving `%s' message from `%4s'.\n",
- "ACK", GNUNET_i2s (&im->peer));
-#endif
- break;
- default:
#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %u from `%4s'.\n",
- ntohs (imm->type), GNUNET_i2s (&im->peer));
-#endif
-
- n = find_neighbour (h, &im->peer);
- if (n == NULL)
- {
- GNUNET_break (0);
- break;
- }
-
- if (NULL != n->transmit_handle)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer connected, scheduling delayed message for delivery now.\n");
-#endif
- schedule_request (n->transmit_handle);
- }
- if (h->rec != NULL)
- h->rec (h->cls, &im->peer, imm,
- GNUNET_TIME_relative_ntoh (im->latency), ntohs(im->distance));
- break;
- }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from `%4s'.\n",
+ ntohs (imm->type), GNUNET_i2s (&im->peer));
+#endif
+ n = neighbour_find (h, &im->peer);
+ if (n == NULL)
+ {
+ GNUNET_break (0);
+ break;
+ }
+ if (h->rec != NULL)
+ h->rec (h->cls, &im->peer, imm,
+ GNUNET_TIME_relative_ntoh (im->latency), ntohs(im->distance));
break;
default:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1809,73 +1544,48 @@ demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
}
-struct ClientTransmitWrapper
-{
- GNUNET_CONNECTION_TransmitReadyNotify notify;
- void *notify_cls;
- struct GNUNET_TRANSPORT_TransmitHandle *th;
-};
-
-
/**
- * Transmit message of a client destined for another
- * peer to the service.
+ * Called when our transmit request timed out before any transport
+ * reported success connecting to the desired peer or before the
+ * transport was ready to receive. Signal error and free
+ * TransmitHandle.
+ *
+ * @param cls the 'struct GNUNET_TRANSPORT_TransmitHandle*' that is timing out
+ * @param tc scheduler context
*/
-static size_t
-client_notify_wrapper (void *cls, size_t size, void *buf)
+static void
+peer_transmit_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct ClientTransmitWrapper *ctw = cls;
- struct OutboundMessage *obm;
- struct GNUNET_MessageHeader *hdr;
- size_t ret;
+ struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
+ struct NeighbourList *n;
- if (size == 0)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission request could not be satisfied.\n");
-#endif
- if (NULL != ctw->notify)
- GNUNET_assert (0 == ctw->notify (ctw->notify_cls, 0, NULL));
- GNUNET_free (ctw);
- return 0;
- }
- GNUNET_assert (size >= sizeof (struct OutboundMessage));
- obm = buf;
- if (ctw->notify != NULL)
- ret = ctw->notify (ctw->notify_cls,
- size - sizeof (struct OutboundMessage),
- (void *) &obm[1]);
- else
- ret = 0;
- if (ret == 0)
+ th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
+ n = th->neighbour;
+ switch (n->transmit_stage)
{
- /* Need to reset flag, no SEND means no SEND_OK! */
- ctw->th->neighbour->transmit_ok = GNUNET_YES;
- GNUNET_free (ctw);
- return 0;
+ case TS_NEW:
+ GNUNET_break (0);
+ break;
+ case TS_QUEUED:
+ n->transmit_stage = TS_NEW;
+ if (n->is_connected == GNUNET_NO)
+ neighbour_free (n);
+ break;
+ case TS_TRANSMITTED:
+ GNUNET_break (0);
+ break;
+ case TS_TRANSMITTED_QUEUED:
+ n->transmit_stage = TS_TRANSMITTED;
+ break;
+ default:
+ GNUNET_break (0);
}
- GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
- hdr = (struct GNUNET_MessageHeader *) &obm[1];
- GNUNET_assert (ntohs (hdr->size) == ret);
- GNUNET_assert (ret + sizeof (struct OutboundMessage) <
- GNUNET_SERVER_MAX_MESSAGE_SIZE);
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting `%s' message with data for `%4s'\n",
- "SEND", GNUNET_i2s (&ctw->th->target));
-#endif
- ret += sizeof (struct OutboundMessage);
- obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
- obm->header.size = htons (ret);
- obm->priority = htonl (ctw->th->priority);
- obm->peer = ctw->th->target;
- GNUNET_free (ctw);
- return ret;
+ if (NULL != th->notify)
+ th->notify (th->notify_cls, 0, NULL);
}
-
/**
* Check if we could queue a message of the given size for
* transmission. The transport service will take both its
@@ -1905,10 +1615,8 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
GNUNET_CONNECTION_TransmitReadyNotify
notify, void *notify_cls)
{
- struct GNUNET_TRANSPORT_TransmitHandle *pos;
struct GNUNET_TRANSPORT_TransmitHandle *th;
struct NeighbourList *n;
- struct ClientTransmitWrapper *ctw;
if (size + sizeof (struct OutboundMessage) >=
GNUNET_SERVER_MAX_MESSAGE_SIZE)
@@ -1926,66 +1634,39 @@ GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
"Asking transport service for transmission of %u bytes to peer `%4s'.\n",
size, GNUNET_i2s (target));
#endif
- n = find_neighbour (handle, target);
- if ((n != NULL) && (n->transmit_handle != NULL))
- return NULL; /* already have a request pending for this peer! */
- ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper));
- th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
- ctw->notify = notify;
- ctw->notify_cls = notify_cls;
- ctw->th = th;
- th->handle = handle;
+ n = neighbour_find (handle, target);
+ if (n == NULL)
+ n = neighbour_add (handle, target);
+ if (n == NULL)
+ return NULL;
+ switch (n->transmit_stage)
+ {
+ case TS_NEW:
+ n->transmit_stage = TS_QUEUED;
+ break;
+ case TS_QUEUED:
+ break;
+ case TS_TRANSMITTED:
+ n->transmit_stage = TS_TRANSMITTED_QUEUED;
+ break;
+ case TS_TRANSMITTED_QUEUED:
+ return NULL;
+ break;
+ default:
+ GNUNET_break (0);
+ return NULL;
+ }
+ th = &n->transmit_handle;
th->neighbour = n;
- th->target = *target;
- th->notify = &client_notify_wrapper;
- th->notify_cls = ctw;
+ th->notify = notify;
+ th->notify_cls = notify_cls;
th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
th->notify_size = size + sizeof (struct OutboundMessage);
th->priority = priority;
- if (NULL == n)
- {
- pos = handle->connect_wait_head;
- while (pos != NULL)
- {
- GNUNET_assert (0 != memcmp (target,
- &pos->target,
- sizeof (struct GNUNET_PeerIdentity)));
- pos = pos->next;
- }
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Will now try to connect to `%4s'.\n", GNUNET_i2s (target));
-#endif
- try_connect (th);
- return th;
- }
-
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission request queued for transmission to transport service.\n");
-#endif
- GNUNET_assert (NULL == n->transmit_handle);
- n->transmit_handle = th;
- if (GNUNET_YES != n->transmit_ok)
- {
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llu ms) only.\n",
- GNUNET_i2s (target), timeout.value);
-#endif
- th->notify_delay_task
- = GNUNET_SCHEDULER_add_delayed (handle->sched,
- timeout, &peer_transmit_timeout, th);
- return th;
- }
-
-#if DEBUG_TRANSPORT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer `%4s' is ready to receive, scheduling message for delivery now.\n",
- GNUNET_i2s (target));
-#endif
th->notify_delay_task
- = GNUNET_SCHEDULER_add_now (handle->sched, &transmit_ready, th);
+ = GNUNET_SCHEDULER_add_delayed (handle->sched, timeout,
+ &peer_transmit_timeout, th);
+ schedule_transmission (handle);
return th;
}
@@ -1998,26 +1679,34 @@ GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
GNUNET_TRANSPORT_TransmitHandle
*th)
{
- struct GNUNET_TRANSPORT_Handle *h;
+ struct NeighbourList *n;
+ n = th->neighbour;
#if DEBUG_TRANSPORT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmission request of %u bytes to `%4s' was cancelled.\n",
th->notify_size - sizeof (struct OutboundMessage),
- GNUNET_i2s (&th->target));
+ GNUNET_i2s (&n->id));
#endif
- GNUNET_assert (th->notify == &client_notify_wrapper);
- remove_from_any_list (th);
- h = th->handle;
- if ((h->connect_ready_head == NULL) && (h->network_handle != NULL))
+ switch (n->transmit_stage)
{
- GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle);
- h->network_handle = NULL;
- h->transmission_scheduled = GNUNET_NO;
+ case TS_NEW:
+ GNUNET_break (0);
+ break;
+ case TS_QUEUED:
+ n->transmit_stage = TS_NEW;
+ if (n->is_connected == GNUNET_NO)
+ neighbour_free (n);
+ break;
+ case TS_TRANSMITTED:
+ GNUNET_break (0);
+ break;
+ case TS_TRANSMITTED_QUEUED:
+ n->transmit_stage = TS_TRANSMITTED;
+ break;
+ default:
+ GNUNET_break (0);
}
- GNUNET_free (th->notify_cls);
- GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_TASK);
- GNUNET_free (th);
}