summaryrefslogtreecommitdiff
path: root/src/transport
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-04-28 19:32:10 +0200
committerChristian Grothoff <christian@grothoff.org>2019-04-28 19:32:20 +0200
commit3f945e6798d8d736ceb104b59ea1269a7abdfe8a (patch)
treeb93e3dc99deda0987e85cb256b3903de8bd74853 /src/transport
parent1227fc30369a55b82e77d35d8d128090e37dd437 (diff)
towards flow control in TNG
Diffstat (limited to 'src/transport')
-rw-r--r--src/transport/gnunet-service-tng.c575
-rw-r--r--src/transport/gnunet-transport-profiler.c232
-rw-r--r--src/transport/gnunet-transport.c571
-rw-r--r--src/transport/transport-testing.h201
-rw-r--r--src/transport/transport.h56
-rw-r--r--src/transport/transport_api2_core.c506
-rw-r--r--src/transport/transport_api_core.c260
7 files changed, 1055 insertions, 1346 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index c2922dd7e..825d45522 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -24,6 +24,11 @@
*
* TODO:
* Implement next:
+ * - complete flow control push back from CORE via TRANSPORT to communicators:
+ * + resume communicators in handle_client_recv_ok (see FIXME)
+ * + count transmissions to CORE and suspend communicators if window is full
+ * - check flow control push back from TRANSPROT to CORE:
+ * + check when to send ACKs
* - change transport-core API to provide proper flow control in both
* directions, allow multiple messages per peer simultaneously (tag
* confirmations with unique message ID), and replace quota-out with
@@ -114,6 +119,16 @@
#define MAX_DV_DISCOVERY_SELECTION 16
/**
+ * Window size. How many messages to the same target do we pass
+ * to CORE without a RECV_OK in between? Small values limit
+ * thoughput, large values will increase latency.
+ *
+ * FIXME-OPTIMIZE: find out what good values are experimentally,
+ * maybe set adaptively (i.e. to observed available bandwidth).
+ */
+#define RECV_WINDOW_SIZE 4
+
+/**
* Minimum number of hops we should forward DV learn messages
* even if they are NOT useful for us in hope of looping
* back to the initiator?
@@ -1100,6 +1115,48 @@ struct PendingMessage;
*/
struct DistanceVectorHop;
+/**
+ * A virtual link is another reachable peer that is known to CORE. It
+ * can be either a `struct Neighbour` with at least one confirmed
+ * `struct Queue`, or a `struct DistanceVector` with at least one
+ * confirmed `struct DistanceVectorHop`. With a virtual link we track
+ * data that is per neighbour that is not specific to how the
+ * connectivity is established.
+ */
+struct VirtualLink
+{
+ /**
+ * Identity of the peer at the other end of the link.
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * Task scheduled to possibly notfiy core that this peer is no
+ * longer counting as confirmed. Runs the #core_visibility_check(),
+ * which checks that some DV-path or a queue exists that is still
+ * considered confirmed.
+ */
+ struct GNUNET_SCHEDULER_Task *visibility_task;
+
+ /**
+ * Neighbour used by this virtual link, NULL if @e dv is used.
+ */
+ struct Neighbour *n;
+
+ /**
+ * Distance vector used by this virtual link, NULL if @e n is used.
+ */
+ struct DistanceVector *dv;
+
+ /**
+ * How many more messages can we send to core before we exhaust
+ * the receive window of CORE for this peer? If this hits zero,
+ * we must tell communicators to stop providing us more messages
+ * for this peer.
+ */
+ unsigned int core_recv_window;
+};
+
/**
* Data structure kept when we are waiting for an acknowledgement.
@@ -1316,31 +1373,10 @@ struct DistanceVector
struct GNUNET_SCHEDULER_Task *timeout_task;
/**
- * Task scheduled to possibly notfiy core that this queue is no longer
- * counting as confirmed. Runs the #core_queue_visibility_check().
- */
- struct GNUNET_SCHEDULER_Task *visibility_task;
-
- /**
- * Quota at which CORE is allowed to transmit to this peer
- * (note that the value CORE should actually be told is this
- * value plus the respective value in `struct Neighbour`).
- * Should match the sum of the quotas of all of the paths.
- *
- * FIXME: not yet set, tricky to get right given multiple paths,
- * many of which may be inactive! (=> Idea: measure???)
- * FIXME: how do we set this value initially when we tell CORE?
- * Options: start at a minimum value or at literally zero?
- * (=> Current thought: clean would be zero!)
- */
- struct GNUNET_BANDWIDTH_Value32NBO quota_out;
-
- /**
- * Is one of the DV paths in this struct 'confirmed' and thus
- * the cause for CORE to see this peer as connected? (Note that
- * the same may apply to a `struct Neighbour` at the same time.)
+ * Do we have a confirmed working queue and are thus visible to
+ * CORE? If so, this is the virtual link, otherwise NULL.
*/
- int core_visible;
+ struct VirtualLink *link;
};
@@ -1451,12 +1487,6 @@ struct Queue
struct GNUNET_SCHEDULER_Task *transmit_task;
/**
- * Task scheduled to possibly notfiy core that this queue is no longer
- * counting as confirmed. Runs the #core_queue_visibility_check().
- */
- struct GNUNET_SCHEDULER_Task *visibility_task;
-
- /**
* How long do *we* consider this @e address to be valid? In the past or
* zero if we have not yet validated it. Can be updated based on
* challenge-response validations (via address validation logic), or when we
@@ -1643,11 +1673,6 @@ struct Neighbour
struct Queue *queue_tail;
/**
- * Task run to cleanup pending messages that have exceeded their timeout.
- */
- struct GNUNET_SCHEDULER_Task *timeout_task;
-
- /**
* Handle for an operation to fetch @e last_dv_learn_monotime information from
* the PEERSTORE, or NULL.
*/
@@ -1660,18 +1685,10 @@ struct Neighbour
struct GNUNET_PEERSTORE_StoreContext *sc;
/**
- * Quota at which CORE is allowed to transmit to this peer
- * (note that the value CORE should actually be told is this
- * value plus the respective value in `struct DistanceVector`).
- * Should match the sum of the quotas of all of the queues.
- *
- * FIXME: not yet set, tricky to get right given multiple queues!
- * (=> Idea: measure???)
- * FIXME: how do we set this value initially when we tell CORE?
- * Options: start at a minimum value or at literally zero?
- * (=> Current thought: clean would be zero!)
+ * Do we have a confirmed working queue and are thus visible to
+ * CORE? If so, this is the virtual link, otherwise NULL.
*/
- struct GNUNET_BANDWIDTH_Value32NBO quota_out;
+ struct VirtualLink *link;
/**
* Latest DVLearn monotonic time seen from this peer. Initialized only
@@ -1680,17 +1697,6 @@ struct Neighbour
struct GNUNET_TIME_Absolute last_dv_learn_monotime;
/**
- * What is the earliest timeout of any message in @e pending_msg_tail?
- */
- struct GNUNET_TIME_Absolute earliest_timeout;
-
- /**
- * Do we have a confirmed working queue and are thus visible to
- * CORE?
- */
- int core_visible;
-
- /**
* Do we have the lastest value for @e last_dv_learn_monotime from
* PEERSTORE yet, or are we still waiting for a reply of PEERSTORE?
*/
@@ -2417,6 +2423,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes;
static struct GNUNET_CONTAINER_MultiPeerMap *validation_map;
/**
+ * Map from PIDs to `struct VirtualLink` entries describing
+ * links CORE knows to exist.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *links;
+
+/**
* Map from challenges to `struct LearnLaunchEntry` values.
*/
static struct GNUNET_CONTAINER_MultiShortmap *dvlearn_map;
@@ -2564,6 +2576,26 @@ free_ephemeral (struct EphemeralCacheEntry *ece)
/**
+ * Free virtual link.
+ *
+ * @param vl link data to free
+ */
+static void
+free_virtual_link (struct VirtualLink *vl)
+{
+ GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl);
+ if (NULL != vl->visibility_task)
+ {
+ GNUNET_SCHEDULER_cancel (vl->visibility_task);
+ vl->visibility_task = NULL;
+ }
+ GNUNET_break (NULL == vl->n);
+ GNUNET_break (NULL == vl->dv);
+ GNUNET_free (vl);
+}
+
+
+/**
* Free validation state.
*
* @param vs validation state to free
@@ -2684,8 +2716,6 @@ free_dv_route (struct DistanceVector *dv)
GNUNET_assert (
GNUNET_YES ==
GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
- if (NULL != dv->visibility_task)
- GNUNET_SCHEDULER_cancel (dv->visibility_task);
if (NULL != dv->timeout_task)
GNUNET_SCHEDULER_cancel (dv->timeout_task);
GNUNET_free (dv);
@@ -2873,8 +2903,6 @@ free_neighbour (struct Neighbour *neighbour)
GNUNET_CONTAINER_multipeermap_remove (neighbours,
&neighbour->pid,
neighbour));
- if (NULL != neighbour->timeout_task)
- GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
if (NULL != neighbour->reassembly_map)
{
GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map,
@@ -2917,19 +2945,16 @@ free_neighbour (struct Neighbour *neighbour)
*
* @param tc client to inform (must be CORE client)
* @param pid peer the connection is for
- * @param quota_out current quota for the peer
*/
static void
core_send_connect_info (struct TransportClient *tc,
- const struct GNUNET_PeerIdentity *pid,
- struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+ const struct GNUNET_PeerIdentity *pid)
{
struct GNUNET_MQ_Envelope *env;
struct ConnectInfoMessage *cim;
GNUNET_assert (CT_CORE == tc->type);
env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
- cim->quota_out = quota_out;
cim->id = *pid;
GNUNET_MQ_send (tc->mq, env);
}
@@ -2939,11 +2964,9 @@ core_send_connect_info (struct TransportClient *tc,
* Send message to CORE clients that we gained a connection
*
* @param pid peer the queue was for
- * @param quota_out current quota for the peer
*/
static void
-cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
- struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+cores_send_connect_info (const struct GNUNET_PeerIdentity *pid)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Informing CORE clients about connection to %s\n",
@@ -2952,7 +2975,7 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
{
if (CT_CORE != tc->type)
continue;
- core_send_connect_info (tc, pid, quota_out);
+ core_send_connect_info (tc, pid);
}
}
@@ -3059,13 +3082,43 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job)
/**
- * Check whether the CORE visibility of @a n changed. If so,
- * check whether we need to notify CORE.
+ * Task run to check whether the hops of the @a cls still
+ * are validated, or if we need to core about disconnection.
*
- * @param n neighbour to perform the check for
+ * @param cls a `struct VirtualLink`
*/
static void
-update_neighbour_core_visibility (struct Neighbour *n);
+check_link_down (void *cls)
+{
+ struct VirtualLink *vl = cls;
+ struct DistanceVector *dv = vl->dv;
+ struct Neighbour *n = vl->n;
+ struct GNUNET_TIME_Absolute dvh_timeout;
+ struct GNUNET_TIME_Absolute q_timeout;
+
+ vl->visibility_task = NULL;
+ dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+ for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+ pos = pos->next_dv)
+ dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
+ if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+ vl->dv = NULL;
+ q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
+ for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
+ q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
+ if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
+ vl->n = NULL;
+ if ((NULL == vl->n) && (NULL == vl->dv))
+ {
+ cores_send_disconnect_info (&dv->target);
+ free_virtual_link (vl);
+ return;
+ }
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
+ &check_link_down,
+ vl);
+}
/**
@@ -3083,17 +3136,13 @@ free_queue (struct Queue *queue)
struct QueueEntry *qe;
int maxxed;
struct PendingAcknowledgement *pa;
+ struct VirtualLink *vl;
if (NULL != queue->transmit_task)
{
GNUNET_SCHEDULER_cancel (queue->transmit_task);
queue->transmit_task = NULL;
}
- if (NULL != queue->visibility_task)
- {
- GNUNET_SCHEDULER_cancel (queue->visibility_task);
- queue->visibility_task = NULL;
- }
while (NULL != (pa = queue->pa_head))
{
GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
@@ -3139,9 +3188,12 @@ free_queue (struct Queue *queue)
notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
GNUNET_free (queue);
- update_neighbour_core_visibility (neighbour);
- cores_send_disconnect_info (&neighbour->pid);
-
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid);
+ if ((NULL != vl) && (neighbour == vl->n))
+ {
+ GNUNET_SCHEDULER_cancel (vl->visibility_task);
+ check_link_down (vl);
+ }
if (NULL == neighbour->queue_head)
{
free_neighbour (neighbour);
@@ -3281,12 +3333,12 @@ notify_client_connect_info (void *cls,
void *value)
{
struct TransportClient *tc = cls;
- struct Neighbour *neighbour = value;
+ (void) value;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Telling new CORE client about existing connection to %s\n",
GNUNET_i2s (pid));
- core_send_connect_info (tc, pid, neighbour->quota_out);
+ core_send_connect_info (tc, pid);
return GNUNET_OK;
}
@@ -3469,9 +3521,6 @@ client_send_response (struct PendingMessage *pm,
if (NULL != tc)
{
env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
- som->success = htonl ((uint32_t) success);
- som->bytes_msg = htons (pm->bytes_msg);
- som->bytes_physical = htonl (bytes_physical);
som->peer = target->pid;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Confirming %s transmission of %u/%u bytes to %s\n",
@@ -3486,45 +3535,6 @@ client_send_response (struct PendingMessage *pm,
/**
- * Checks the message queue for a neighbour for messages that have timed
- * out and purges them.
- *
- * @param cls a `struct Neighbour`
- */
-static void
-check_queue_timeouts (void *cls)
-{
- struct Neighbour *n = cls;
- struct PendingMessage *pm;
- struct GNUNET_TIME_Absolute now;
- struct GNUNET_TIME_Absolute earliest_timeout;
-
- n->timeout_task = NULL;
- earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
- now = GNUNET_TIME_absolute_get ();
- for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos; pos = pm)
- {
- pm = pos->next_neighbour;
- if (pos->timeout.abs_value_us <= now.abs_value_us)
- {
- GNUNET_STATISTICS_update (GST_stats,
- "# messages dropped (timeout before confirmation)",
- 1,
- GNUNET_NO);
- client_send_response (pm, GNUNET_NO, 0);
- continue;
- }
- earliest_timeout =
- GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout);
- }
- n->earliest_timeout = earliest_timeout;
- if (NULL != n->pending_msg_head)
- n->timeout_task =
- GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n);
-}
-
-
-/**
* Create a DV Box message.
*
* @param total_hops how many hops did the message take so far
@@ -3689,30 +3699,18 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
const void *payload;
size_t payload_size;
struct TransportDVBoxMessage *dvb;
+ struct VirtualLink *vl;
GNUNET_assert (CT_CORE == tc->type);
obmm = (const struct GNUNET_MessageHeader *) &obm[1];
bytes_msg = ntohs (obmm->size);
- target = lookup_neighbour (&obm->peer);
- if (NULL == target)
- dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
- else
- dv = NULL;
- if ((NULL == target) && ((NULL == dv) || (GNUNET_NO == dv->core_visible)))
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer);
+ if (NULL == vl)
{
/* Failure: don't have this peer as a neighbour (anymore).
Might have gone down asynchronously, so this is NOT
a protocol violation by CORE. Still count the event,
as this should be rare. */
- struct GNUNET_MQ_Envelope *env;
- struct SendOkMessage *som;
-
- env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
- som->success = htonl (GNUNET_SYSERR);
- som->bytes_msg = htonl (bytes_msg);
- som->bytes_physical = htonl (0);
- som->peer = obm->peer;
- GNUNET_MQ_send (tc->mq, env);
GNUNET_SERVICE_client_continue (tc->client);
GNUNET_STATISTICS_update (GST_stats,
"# messages dropped (neighbour unknown)",
@@ -3720,6 +3718,12 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
GNUNET_NO);
return;
}
+ target = lookup_neighbour (&obm->peer);
+ if (NULL == target)
+ dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer);
+ else
+ dv = NULL;
+ GNUNET_assert ((NULL != target) || (NULL != dv));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending %u bytes to %s using %s\n",
bytes_msg,
@@ -3756,8 +3760,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
pm->client = tc;
pm->target = target;
pm->bytes_msg = payload_size;
- pm->timeout =
- GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
memcpy (&pm[1], payload, payload_size);
GNUNET_free_non_null (dvb);
dvb = NULL;
@@ -3777,15 +3779,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm)
tc->details.core.pending_msg_head,
tc->details.core.pending_msg_tail,
pm);
- if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
- {
- target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
- if (NULL != target->timeout_task)
- GNUNET_SCHEDULER_cancel (target->timeout_task);
- target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout,
- &check_queue_timeouts,
- target);
- }
if (! was_empty)
return; /* all queues must already be busy */
for (struct Queue *queue = target->queue_head; NULL != queue;
@@ -3834,6 +3827,47 @@ check_communicator_available (
/**
+ * Client confirms that it is done handling message(s) to a particular
+ * peer. We may now provide more messages to CORE for this peer.
+ *
+ * Notifies the respective queues that more messages can now be received.
+ *
+ * @param cls the client
+ * @param rom the message that was sent
+ */
+static void
+handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
+{
+ struct TransportClient *tc = cls;
+ struct VirtualLink *vl;
+ uint32_t delta;
+
+ if (CT_CORE != tc->type)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer);
+ if (NULL == vl)
+ {
+ GNUNET_STATISTICS_update (GST_stats,
+ "# RECV_OK dropped: virtual link unknown",
+ 1,
+ GNUNET_NO);
+ GNUNET_SERVICE_client_continue (tc->client);
+ return;
+ }
+ delta = ntohl (rom->increase_window_delta);
+ vl->core_recv_window += delta;
+ if (delta == vl->core_recv_window)
+ {
+ // FIXME: resume communicators!
+ }
+}
+
+
+/**
* Communicator started. Process the request.
*
* @param cls the client
@@ -4090,20 +4124,18 @@ route_via_neighbour (const struct Neighbour *n,
for (struct Queue *pos = n->queue_head; NULL != pos;
pos = pos->next_neighbour)
{
- /* Count the queue with the visibility task in all cases, as
- otherwise we may end up with no queues just because the
- time for the visibility task just expired but the scheduler
- just ran this task first */
if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
- (pos->validated_until.abs_value_us > now.abs_value_us) ||
- (NULL != pos->visibility_task))
+ (pos->validated_until.abs_value_us > now.abs_value_us))
candidates++;
}
if (0 == candidates)
{
- /* Given that we above check for pos->visibility task,
- this should be strictly impossible. */
- GNUNET_break (0);
+ /* This can happen rarely if the last confirmed queue timed
+ out just as we were beginning to process this message. */
+ GNUNET_STATISTICS_update (GST_stats,
+ "# route selection failed (all no valid queue)",
+ 1,
+ GNUNET_NO);
return;
}
sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates);
@@ -4115,12 +4147,8 @@ route_via_neighbour (const struct Neighbour *n,
for (struct Queue *pos = n->queue_head; NULL != pos;
pos = pos->next_neighbour)
{
- /* Count the queue with the visibility task in all cases, as
- otherwise we may end up with no queues just because the
- time for the visibility task just expired but the scheduler
- just ran this task first */
- if ((pos->validated_until.abs_value_us > now.abs_value_us) ||
- (NULL != pos->visibility_task))
+ if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
+ (pos->validated_until.abs_value_us > now.abs_value_us))
{
if ((sel1 == candidates) || (sel2 == candidates))
queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
@@ -4197,21 +4225,21 @@ route_message (const struct GNUNET_PeerIdentity *target,
struct GNUNET_MessageHeader *hdr,
enum RouteMessageOptions options)
{
+ struct VirtualLink *vl;
struct Neighbour *n;
struct DistanceVector *dv;
- n = lookup_neighbour (target);
- dv = (0 != (options & RMO_DV_ALLOWED))
- ? GNUNET_CONTAINER_multipeermap_get (dv_routes, target)
- : NULL;
+ vl = GNUNET_CONTAINER_multipeermap_get (links, target);
+ n = vl->n;
+ dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
{
/* if confirmed is required, and we do not have anything
confirmed, drop respective options */
- if ((NULL != n) && (GNUNET_NO == n->core_visible))
- n = NULL;
- if ((NULL != dv) && (GNUNET_NO == dv->core_visible))
- dv = NULL;
+ if (NULL == n)
+ n = lookup_neighbour (target);
+ if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
+ dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
}
if ((NULL == n) && (NULL == dv))
{
@@ -5758,40 +5786,6 @@ path_cleanup_cb (void *cls)
GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv);
}
-/**
- * Task run to check whether the hops of the @a cls still
- * are validated, or if we need to core about disconnection.
- *
- * @param cls a `struct DistanceVector` (with core_visible set!)
- */
-static void
-check_dv_path_down (void *cls)
-{
- struct DistanceVector *dv = cls;
- struct Neighbour *n;
-
- dv->visibility_task = NULL;
- GNUNET_assert (GNUNET_YES == dv->core_visible);
- for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
- pos = pos->next_dv)
- {
- if (0 <
- GNUNET_TIME_absolute_get_remaining (pos->path_valid_until).rel_value_us)
- {
- dv->visibility_task = GNUNET_SCHEDULER_add_at (pos->path_valid_until,
- &check_dv_path_down,
- dv);
- return;
- }
- }
- /* all paths invalid, make dv core-invisible */
- dv->core_visible = GNUNET_NO;
- n = lookup_neighbour (&dv->target);
- if ((NULL != n) && (GNUNET_YES == n->core_visible))
- return; /* no need to tell core, connection still up! */
- cores_send_disconnect_info (&dv->target);
-}
-
/**
* The @a hop is a validated path to the respective target
@@ -5804,22 +5798,30 @@ static void
activate_core_visible_dv_path (struct DistanceVectorHop *hop)
{
struct DistanceVector *dv = hop->dv;
- struct Neighbour *n;
-
- GNUNET_assert (GNUNET_NO == dv->core_visible);
- GNUNET_assert (NULL == dv->visibility_task);
+ struct VirtualLink *vl;
- dv->core_visible = GNUNET_YES;
- dv->visibility_task =
- GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_dv_path_down, dv);
- n = lookup_neighbour (&dv->target);
- if ((NULL != n) && (GNUNET_YES == n->core_visible))
- return; /* no need to tell core, connection already up! */
- cores_send_connect_info (&dv->target,
- (NULL != n)
- ? GNUNET_BANDWIDTH_value_sum (n->quota_out,
- dv->quota_out)
- : dv->quota_out);
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target);
+ if (NULL != vl)
+ {
+ /* Link was already up, remember dv is also now available and we are done */
+ vl->dv = dv;
+ return;
+ }
+ vl = GNUNET_new (struct VirtualLink);
+ vl->target = dv->target;
+ vl->dv = dv;
+ vl->core_recv_window = RECV_WINDOW_SIZE;
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ links,
+ &vl->target,
+ vl,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ /* We lacked a confirmed connection to the target
+ before, so tell CORE about it (finally!) */
+ cores_send_connect_info (&dv->target);
}
@@ -5934,9 +5936,8 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
- if ((GNUNET_NO == dv->core_visible) &&
- (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until)
- .rel_value_us))
+ if (0 <
+ GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
activate_core_visible_dv_path (pos);
if (last_timeout.rel_value_us <
GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT,
@@ -5976,8 +5977,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
next_hop->dv_head,
next_hop->dv_tail,
hop);
- if ((GNUNET_NO == dv->core_visible) &&
- (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us))
+ if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
activate_core_visible_dv_path (hop);
return GNUNET_YES;
}
@@ -6943,75 +6943,6 @@ find_queue (const struct GNUNET_PeerIdentity *pid, const char *address)
/**
- * Task run periodically to check whether the validity of the given queue has
- * run its course. If so, finds either another queue to take over, or clears
- * the neighbour's `core_visible` flag. In the latter case, gives DV routes a
- * chance to take over, and if that fails, notifies CORE about the disconnect.
- *
- * @param cls a `struct Queue`
- */
-static void
-core_queue_visibility_check (void *cls)
-{
- struct Queue *q = cls;
-
- q->visibility_task = NULL;
- if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
- {
- q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until,
- &core_queue_visibility_check,
- q);
- return;
- }
- update_neighbour_core_visibility (q->neighbour);
-}
-
-
-/**
- * Check whether the CORE visibility of @a n should change. Finds either a
- * queue to preserve the visibility, or clears the neighbour's `core_visible`
- * flag. In the latter case, gives DV routes a chance to take over, and if
- * that fails, notifies CORE about the disconnect. If so, check whether we
- * need to notify CORE.
- *
- * @param n neighbour to perform the check for
- */
-static void
-update_neighbour_core_visibility (struct Neighbour *n)
-{
- struct DistanceVector *dv;
-
- GNUNET_assert (GNUNET_YES == n->core_visible);
- /* Check if _any_ queue of this neighbour is still valid, if so, schedule
- the #core_queue_visibility_check() task for that queue */
- for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
- {
- if (0 !=
- GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us)
- {
- /* found a valid queue, use this one */
- q->visibility_task =
- GNUNET_SCHEDULER_add_at (q->validated_until,
- &core_queue_visibility_check,
- q);
- return;
- }
- }
- n->core_visible = GNUNET_NO;
-
- /* Check if _any_ DV route to this neighbour is currently
- valid, if so, do NOT tell core about the loss of direct
- connectivity (DV still counts!) */
- dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid);
- if (GNUNET_YES == dv->core_visible)
- return;
- /* Nothing works anymore, need to tell CORE about the loss of
- connectivity! */
- cores_send_disconnect_info (&n->pid);
-}
-
-
-/**
* Communicator gave us a transport address validation response. Process the
* request.
*
@@ -7030,8 +6961,8 @@ handle_validation_response (
.vs = NULL};
struct GNUNET_TIME_Absolute origin_time;
struct Queue *q;
- struct DistanceVector *dv;
struct Neighbour *n;
+ struct VirtualLink *vl;
/* check this is one of our challenges */
(void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
@@ -7129,24 +7060,28 @@ handle_validation_response (
q->validated_until = vs->validated_until;
q->pd.aged_rtt = vs->validation_rtt;
n = q->neighbour;
- if (GNUNET_NO != n->core_visible)
- return; /* nothing changed, we are done here */
- n->core_visible = GNUNET_YES;
- q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until,
- &core_queue_visibility_check,
- q);
- /* Check if _any_ DV route to this neighbour is
- currently valid, if so, do NOT tell core anything! */
- dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid);
- if ((NULL != dv) && (GNUNET_YES == dv->core_visible))
- return; /* nothing changed, done */
- /* We lacked a confirmed connection to the neighbour
+ vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid);
+ if (NULL != vl)
+ {
+ /* Link was already up, remember n is also now available and we are done */
+ vl->n = n;
+ return;
+ }
+ vl = GNUNET_new (struct VirtualLink);
+ vl->target = n->pid;
+ vl->n = n;
+ vl->core_recv_window = RECV_WINDOW_SIZE;
+ vl->visibility_task =
+ GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
+ GNUNET_break (GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_put (
+ links,
+ &vl->target,
+ vl,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ /* We lacked a confirmed connection to the target
before, so tell CORE about it (finally!) */
- cores_send_connect_info (&n->pid,
- (NULL != dv)
- ? GNUNET_BANDWIDTH_value_sum (dv->quota_out,
- n->quota_out)
- : n->quota_out);
+ cores_send_connect_info (&n->pid);
}
@@ -8256,7 +8191,6 @@ handle_add_queue_message (void *cls,
if (NULL == neighbour)
{
neighbour = GNUNET_new (struct Neighbour);
- neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
neighbour->pid = aqm->receiver;
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multipeermap_put (
@@ -8872,8 +8806,12 @@ do_shutdown (void *cls)
NULL);
GNUNET_CONTAINER_multishortmap_destroy (pending_acks);
pending_acks = NULL;
+ GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours));
GNUNET_CONTAINER_multipeermap_destroy (neighbours);
neighbours = NULL;
+ GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (links));
+ GNUNET_CONTAINER_multipeermap_destroy (links);
+ links = NULL;
GNUNET_CONTAINER_multipeermap_iterate (backtalkers,
&free_backtalker_cb,
NULL);
@@ -8926,6 +8864,7 @@ run (void *cls,
pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES);
ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES);
neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
+ links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES);
dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES);
ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES);
ephemeral_heap =
@@ -8995,6 +8934,10 @@ GNUNET_SERVICE_MAIN (
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
struct OutboundMessage,
NULL),
+ GNUNET_MQ_hd_fixed_size (client_recv_ok,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK,
+ struct RecvOkMessage,
+ NULL),
/* communication with communicators */
GNUNET_MQ_hd_var_size (communicator_available,
GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
diff --git a/src/transport/gnunet-transport-profiler.c b/src/transport/gnunet-transport-profiler.c
index 9160a78b2..89f5b4108 100644
--- a/src/transport/gnunet-transport-profiler.c
+++ b/src/transport/gnunet-transport-profiler.c
@@ -32,7 +32,6 @@
#include "gnunet_protocols.h"
#include "gnunet_ats_service.h"
#include "gnunet_transport_service.h"
-#include "gnunet_transport_core_service.h"
struct Iteration
@@ -54,7 +53,8 @@ struct Iteration
/**
* Timeout for a connections
*/
-#define CONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
+#define CONNECT_TIMEOUT \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
/**
* Benchmarking block size in bye
@@ -214,15 +214,16 @@ shutdown_task (void *cls)
{
inext = icur->next;
icur->rate = ((benchmark_count * benchmark_size) / 1024) /
- ((float) icur->dur.rel_value_us / (1000 * 1000));
+ ((float) icur->dur.rel_value_us / (1000 * 1000));
if (verbosity > 0)
- FPRINTF (stdout, _("%llu B in %llu ms == %.2f KB/s!\n"),
- ((long long unsigned int) benchmark_count * benchmark_size),
- ((long long unsigned int) icur->dur.rel_value_us / 1000),
- (float) icur->rate);
+ FPRINTF (stdout,
+ _ ("%llu B in %llu ms == %.2f KB/s!\n"),
+ ((long long unsigned int) benchmark_count * benchmark_size),
+ ((long long unsigned int) icur->dur.rel_value_us / 1000),
+ (float) icur->rate);
avg_duration += icur->dur.rel_value_us / (1000);
- avg_rate += icur->rate;
+ avg_rate += icur->rate;
iterations++;
}
if (0 == iterations)
@@ -238,19 +239,17 @@ shutdown_task (void *cls)
while (NULL != (icur = inext))
{
inext = icur->next;
- stddev_rate += ((icur->rate-avg_rate) *
- (icur->rate-avg_rate));
+ stddev_rate += ((icur->rate - avg_rate) * (icur->rate - avg_rate));
stddev_duration += (((icur->dur.rel_value_us / 1000) - avg_duration) *
- ((icur->dur.rel_value_us / 1000) - avg_duration));
-
+ ((icur->dur.rel_value_us / 1000) - avg_duration));
}
/* Calculate standard deviation rate */
stddev_rate = stddev_rate / iterations;
- stddev_rate = sqrtf(stddev_rate);
+ stddev_rate = sqrtf (stddev_rate);
/* Calculate standard deviation duration */
stddev_duration = stddev_duration / iterations;
- stddev_duration = sqrtf(stddev_duration);
+ stddev_duration = sqrtf (stddev_duration);
/* Output */
FPRINTF (stdout,
@@ -266,9 +265,7 @@ shutdown_task (void *cls)
while (NULL != (icur = inext))
{
inext = icur->next;
- GNUNET_CONTAINER_DLL_remove (ihead,
- itail,
- icur);
+ GNUNET_CONTAINER_DLL_remove (ihead, itail, icur);
FPRINTF (stdout,
";%llu;%.2f",
@@ -316,27 +313,19 @@ send_msg (void *cls)
if (NULL == mq)
return;
- env = GNUNET_MQ_msg_extra (m,
- benchmark_size,
- GNUNET_MESSAGE_TYPE_DUMMY);
- memset (&m[1],
- 52,
- benchmark_size - sizeof(struct GNUNET_MessageHeader));
-
+ env = GNUNET_MQ_msg_extra (m, benchmark_size, GNUNET_MESSAGE_TYPE_DUMMY);
+ memset (&m[1], 52, benchmark_size - sizeof (struct GNUNET_MessageHeader));
+
if (itail->msgs_sent < benchmark_count)
{
- GNUNET_MQ_notify_sent (env,
- &send_msg,
- NULL);
+ GNUNET_MQ_notify_sent (env, &send_msg, NULL);
}
else
{
iteration_done ();
}
- GNUNET_MQ_send (mq,
- env);
- if ( (verbosity > 0) &&
- (0 == itail->msgs_sent % 10) )
+ GNUNET_MQ_send (mq, env);
+ if ((verbosity > 0) && (0 == itail->msgs_sent % 10))
FPRINTF (stdout, ".");
}
@@ -351,15 +340,14 @@ iteration_start ()
return;
benchmark_running = GNUNET_YES;
icur = GNUNET_new (struct Iteration);
- GNUNET_CONTAINER_DLL_insert_tail (ihead,
- itail,
- icur);
- icur->start = GNUNET_TIME_absolute_get();
+ GNUNET_CONTAINER_DLL_insert_tail (ihead, itail, icur);
+ icur->start = GNUNET_TIME_absolute_get ();
if (verbosity > 0)
- FPRINTF (stdout,
- "\nStarting benchmark, starting to send %u messages in %u byte blocks\n",
- benchmark_count,
- benchmark_size);
+ FPRINTF (
+ stdout,
+ "\nStarting benchmark, starting to send %u messages in %u byte blocks\n",
+ benchmark_count,
+ benchmark_size);
send_msg (NULL);
}
@@ -393,22 +381,16 @@ iteration_done ()
static void *
notify_connect (void *cls,
const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_MQ_Handle *m)
+ struct GNUNET_MQ_Handle *m)
{
- if (0 != memcmp (&pid,
- peer,
- sizeof(struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity)))
{
- FPRINTF (stdout,
- "Connected to different peer `%s'\n",
- GNUNET_i2s (&pid));
+ FPRINTF (stdout, "Connected to different peer `%s'\n", GNUNET_i2s (&pid));
return NULL;
}
if (verbosity > 0)
- FPRINTF (stdout,
- "Successfully connected to `%s'\n",
- GNUNET_i2s (&pid));
+ FPRINTF (stdout, "Successfully connected to `%s'\n", GNUNET_i2s (&pid));
mq = m;
iteration_start ();
return NULL;
@@ -426,18 +408,16 @@ notify_connect (void *cls,
static void
notify_disconnect (void *cls,
const struct GNUNET_PeerIdentity *peer,
- void *internal_cls)
+ void *internal_cls)
{
- if (0 != memcmp (&pid,
- peer,
- sizeof(struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity)))
return;
mq = NULL;
if (GNUNET_YES == benchmark_running)
{
FPRINTF (stdout,
"Disconnected from peer `%s' while benchmarking\n",
- GNUNET_i2s (&pid));
+ GNUNET_i2s (&pid));
return;
}
}
@@ -451,8 +431,7 @@ notify_disconnect (void *cls,
* @return #GNUNET_OK
*/
static int
-check_dummy (void *cls,
- const struct GNUNET_MessageHeader *message)
+check_dummy (void *cls, const struct GNUNET_MessageHeader *message)
{
return GNUNET_OK; /* all messages are fine */
}
@@ -465,30 +444,24 @@ check_dummy (void *cls,
* @param message the message
*/
static void
-handle_dummy (void *cls,
- const struct GNUNET_MessageHeader *message)
+handle_dummy (void *cls, const struct GNUNET_MessageHeader *message)
{
if (! benchmark_receive)
return;
if (verbosity > 0)
FPRINTF (stdout,
- "Received %u bytes\n",
- (unsigned int) ntohs (message->size));
+ "Received %u bytes\n",
+ (unsigned int) ntohs (message->size));
}
static int
-blacklist_cb (void *cls,
- const struct GNUNET_PeerIdentity *peer)
+blacklist_cb (void *cls, const struct GNUNET_PeerIdentity *peer)
{
- if (0 != memcmp (&pid,
- peer,
- sizeof(struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity)))
{
if (verbosity > 0)
- FPRINTF (stdout,
- "Denying connection to `%s'\n",
- GNUNET_i2s (peer));
+ FPRINTF (stdout, "Denying connection to `%s'\n", GNUNET_i2s (peer));
return GNUNET_SYSERR;
}
return GNUNET_OK;
@@ -509,38 +482,32 @@ run (void *cls,
const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *mycfg)
{
- struct GNUNET_MQ_MessageHandler handlers[] = {
- GNUNET_MQ_hd_var_size (dummy,
- GNUNET_MESSAGE_TYPE_DUMMY,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_handler_end ()
- };
-
+ struct GNUNET_MQ_MessageHandler handlers[] =
+ {GNUNET_MQ_hd_var_size (dummy,
+ GNUNET_MESSAGE_TYPE_DUMMY,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_handler_end ()};
+
cfg = (struct GNUNET_CONFIGURATION_Handle *) mycfg;
ret = 1;
if (GNUNET_MAX_MESSAGE_SIZE <= benchmark_size)
{
- FPRINTF (stderr,
- "Message size too big!\n");
+ FPRINTF (stderr, "Message size too big!\n");
return;
}
if (NULL == cpid)
{
- FPRINTF (stderr,
- "No peer identity given\n");
+ FPRINTF (stderr, "No peer identity given\n");
return;
}
- if (GNUNET_OK !=
- GNUNET_CRYPTO_eddsa_public_key_from_string (cpid,
- strlen (cpid),
- &pid.public_key))
+ if (GNUNET_OK != GNUNET_CRYPTO_eddsa_public_key_from_string (cpid,
+ strlen (cpid),
+ &pid.public_key))
{
- FPRINTF (stderr,
- "Failed to parse peer identity `%s'\n",
- cpid);
+ FPRINTF (stderr, "Failed to parse peer identity `%s'\n", cpid);
return;
}
if (1 == benchmark_send)
@@ -548,7 +515,8 @@ run (void *cls,
if (verbosity > 0)
FPRINTF (stderr,
"Trying to send %u messages with size %u to peer `%s'\n",
- benchmark_count, benchmark_size,
+ benchmark_count,
+ benchmark_size,
GNUNET_i2s (&pid));
}
else if (1 == benchmark_receive)
@@ -559,50 +527,42 @@ run (void *cls,
}
else
{
- FPRINTF (stderr,
- "No operation given\n");
+ FPRINTF (stderr, "No operation given\n");
return;
}
ats = GNUNET_ATS_connectivity_init (cfg);
if (NULL == ats)
{
- FPRINTF (stderr,
- "Failed to connect to ATS service\n");
+ FPRINTF (stderr, "Failed to connect to ATS service\n");
ret = 1;
return;
}
handle = GNUNET_TRANSPORT_core_connect (cfg,
- NULL,
- handlers,
- NULL,
- &notify_connect,
- &notify_disconnect,
- NULL);
+ NULL,
+ handlers,
+ NULL,
+ &notify_connect,
+ &notify_disconnect,
+ NULL);
if (NULL == handle)
{
- FPRINTF (stderr,
- "Failed to connect to transport service\n");
+ FPRINTF (stderr, "Failed to connect to transport service\n");
GNUNET_ATS_connectivity_done (ats);
ats = NULL;
ret = 1;
return;
}
- bl_handle = GNUNET_TRANSPORT_blacklist (cfg,
- &blacklist_cb,
- NULL);
- ats_sh = GNUNET_ATS_connectivity_suggest (ats,
- &pid,
- 1);
- GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
- NULL);
+ bl_handle = GNUNET_TRANSPORT_blacklist (cfg, &blacklist_cb, NULL);
+ ats_sh = GNUNET_ATS_connectivity_suggest (ats, &pid, 1);
+ GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
}
int
-main (int argc, char * const *argv)
+main (int argc, char *const *argv)
{
int res;
benchmark_count = DEFAULT_MESSAGE_COUNT;
@@ -613,46 +573,48 @@ main (int argc, char * const *argv)
struct GNUNET_GETOPT_CommandLineOption options[] = {
GNUNET_GETOPT_option_flag ('s',
- "send",
- gettext_noop ("send data to peer"),
- &benchmark_send),
+ "send",
+ gettext_noop ("send data to peer"),
+ &benchmark_send),
GNUNET_GETOPT_option_flag ('r',
- "receive",
- gettext_noop ("receive data from peer"),
- &benchmark_receive),
+ "receive",
+ gettext_noop ("receive data from peer"),
+ &benchmark_receive),
GNUNET_GETOPT_option_uint ('i',
- "iterations",
- NULL,
- gettext_noop ("iterations"),
- &benchmark_iterations),
+ "iterations",
+ NULL,
+ gettext_noop ("iterations"),
+ &benchmark_iterations),
GNUNET_GETOPT_option_uint ('n',
- "number",
- NULL,
- gettext_noop ("number of messages to send"),
- &benchmark_count),
+ "number",
+ NULL,
+ gettext_noop ("number of messages to send"),
+ &benchmark_count),
GNUNET_GETOPT_option_uint ('m',
- "messagesize",
- NULL,
- gettext_noop ("message size to use"),
- &benchmark_size),
+ "messagesize",
+ NULL,
+ gettext_noop ("message size to use"),
+ &benchmark_size),
GNUNET_GETOPT_option_string ('p',
"peer",
"PEER",
gettext_noop ("peer identity"),
&cpid),
GNUNET_GETOPT_option_verbose (&verbosity),
- GNUNET_GETOPT_OPTION_END
- };
+ GNUNET_GETOPT_OPTION_END};
if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv))
return 2;
- res = GNUNET_PROGRAM_run (argc, argv,
- "gnunet-transport",
- gettext_noop ("Direct access to transport service."),
- options,
- &run, NULL);
- GNUNET_free((void *) argv);
+ res =
+ GNUNET_PROGRAM_run (argc,
+ argv,
+ "gnunet-transport",
+ gettext_noop ("Direct access to transport service."),
+ options,
+ &run,
+ NULL);
+ GNUNET_free ((void *) argv);
if (GNUNET_OK == res)
return ret;
return 1;
diff --git a/src/transport/gnunet-transport.c b/src/transport/gnunet-transport.c
index c3c1afc38..36c8fc451 100644
--- a/src/transport/gnunet-transport.c
+++ b/src/transport/gnunet-transport.c
@@ -29,12 +29,12 @@
#include "gnunet_resolver_service.h"
#include "gnunet_protocols.h"
#include "gnunet_transport_service.h"
-#include "gnunet_transport_core_service.h"
/**
* Timeout for a name resolution
*/
-#define RESOLUTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
+#define RESOLUTION_TIMEOUT \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
/**
* Timeout for an operation
@@ -332,16 +332,13 @@ static struct PeerResolutionContext *rc_tail;
* @return #GNUNET_OK (continue to iterate)
*/
static int
-destroy_it (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
+destroy_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
{
struct MonitoredPeer *m = value;
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_remove (monitored_peers,
- key,
- value));
+ GNUNET_assert (
+ GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_remove (monitored_peers, key, value));
GNUNET_free_non_null (m->address);
GNUNET_free (value);
return GNUNET_OK;
@@ -384,18 +381,14 @@ shutdown_task (void *cls)
next = cur->next;
GNUNET_TRANSPORT_address_to_string_cancel (cur->asc);
- GNUNET_CONTAINER_DLL_remove (vc_head,
- vc_tail,
- cur);
+ GNUNET_CONTAINER_DLL_remove (vc_head, vc_tail, cur);
GNUNET_free (cur->transport);
GNUNET_HELLO_address_free (cur->addrcp);
GNUNET_free (cur);
}
while (NULL != (rc = rc_head))
{
- GNUNET_CONTAINER_DLL_remove (rc_head,
- rc_tail,
- rc);
+ GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
GNUNET_TRANSPORT_address_to_string_cancel (rc->asc);
GNUNET_free (rc->transport);
GNUNET_free (rc->addrcp);
@@ -410,35 +403,30 @@ shutdown_task (void *cls)
{
duration = GNUNET_TIME_absolute_get_duration (start_time);
FPRINTF (stdout,
- _("Transmitted %llu bytes/s (%llu bytes in %s)\n"),
+ _ ("Transmitted %llu bytes/s (%llu bytes in %s)\n"),
1000LL * 1000LL * traffic_sent / (1 + duration.rel_value_us),
traffic_sent,
- GNUNET_STRINGS_relative_time_to_string (duration,
- GNUNET_YES));
+ GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_YES));
}
if (benchmark_receive)
{
duration = GNUNET_TIME_absolute_get_duration (start_time);
FPRINTF (stdout,
- _("Received %llu bytes/s (%llu bytes in %s)\n"),
+ _ ("Received %llu bytes/s (%llu bytes in %s)\n"),
1000LL * 1000LL * traffic_received / (1 + duration.rel_value_us),
traffic_received,
- GNUNET_STRINGS_relative_time_to_string (duration,
- GNUNET_YES));
+ GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_YES));
}
if (NULL != monitored_peers)
{
- GNUNET_CONTAINER_multipeermap_iterate (monitored_peers,
- &destroy_it,
- NULL);
+ GNUNET_CONTAINER_multipeermap_iterate (monitored_peers, &destroy_it, NULL);
GNUNET_CONTAINER_multipeermap_destroy (monitored_peers);
monitored_peers = NULL;
}
if (NULL != monitored_plugins)
{
- GNUNET_break (0 ==
- GNUNET_CONTAINER_multipeermap_size (monitored_plugins));
+ GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (monitored_plugins));
GNUNET_CONTAINER_multipeermap_destroy (monitored_plugins);
monitored_plugins = NULL;
}
@@ -463,9 +451,7 @@ operation_timeout (void *cls)
op_timeout = NULL;
if ((benchmark_send) || (benchmark_receive))
{
- FPRINTF (stdout,
- _("Failed to connect to `%s'\n"),
- GNUNET_i2s_full (&pid));
+ FPRINTF (stdout, _ ("Failed to connect to `%s'\n"), GNUNET_i2s_full (&pid));
GNUNET_SCHEDULER_shutdown ();
ret = 1;
return;
@@ -477,21 +463,18 @@ operation_timeout (void *cls)
{
next = cur->next;
FPRINTF (stdout,
- _("Failed to resolve address for peer `%s'\n"),
+ _ ("Failed to resolve address for peer `%s'\n"),
GNUNET_i2s (&cur->addrcp->peer));
- GNUNET_CONTAINER_DLL_remove(rc_head,
- rc_tail,
- cur);
+ GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, cur);
GNUNET_TRANSPORT_address_to_string_cancel (cur->asc);
GNUNET_free (cur->transport);
GNUNET_free (cur->addrcp);
GNUNET_free (cur);
-
}
FPRINTF (stdout,
"%s",
- _("Failed to list connections, timeout occurred\n"));
+ _ ("Failed to list connections, timeout occurred\n"));
GNUNET_SCHEDULER_shutdown ();
ret = 1;
return;
@@ -512,22 +495,15 @@ do_send (void *cls)
struct GNUNET_MessageHeader *m;
struct GNUNET_MQ_Envelope *env;
- env = GNUNET_MQ_msg_extra (m,
- BLOCKSIZE * 1024,
- GNUNET_MESSAGE_TYPE_DUMMY);
- memset (&m[1],
- 52,
- BLOCKSIZE * 1024 - sizeof(struct GNUNET_MessageHeader));
+ env = GNUNET_MQ_msg_extra (m, BLOCKSIZE * 1024, GNUNET_MESSAGE_TYPE_DUMMY);
+ memset (&m[1], 52, BLOCKSIZE * 1024 - sizeof (struct GNUNET_MessageHeader));
traffic_sent += BLOCKSIZE * 1024;
- GNUNET_MQ_notify_sent (env,
- &do_send,
- mq);
+ GNUNET_MQ_notify_sent (env, &do_send, mq);
if (verbosity > 0)
FPRINTF (stdout,
- _("Transmitting %u bytes\n"),
- (unsigned int) BLOCKSIZE * 1024);
- GNUNET_MQ_send (mq,
- env);
+ _ ("Transmitting %u bytes\n"),
+ (unsigned int) BLOCKSIZE * 1024);
+ GNUNET_MQ_send (mq, env);
}
@@ -542,11 +518,9 @@ do_send (void *cls)
static void *
notify_connect (void *cls,
const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_MQ_Handle *mq)
+ struct GNUNET_MQ_Handle *mq)
{
- if (0 != memcmp (&pid,
- peer,
- sizeof(struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity)))
return NULL;
ret = 0;
if (! benchmark_send)
@@ -557,10 +531,12 @@ notify_connect (void *cls,
op_timeout = NULL;
}
if (verbosity > 0)
- FPRINTF (stdout,
- _("Successfully connected to `%s', starting to send benchmark data in %u Kb blocks\n"),
- GNUNET_i2s (peer),
- BLOCKSIZE);
+ FPRINTF (
+ stdout,
+ _ (
+ "Successfully connected to `%s', starting to send benchmark data in %u Kb blocks\n"),
+ GNUNET_i2s (peer),
+ BLOCKSIZE);
start_time = GNUNET_TIME_absolute_get ();
do_send (mq);
return mq;
@@ -578,19 +554,17 @@ notify_connect (void *cls,
static void
notify_disconnect (void *cls,
const struct GNUNET_PeerIdentity *peer,
- void *internal_cls)
+ void *internal_cls)
{
- if (0 != memcmp (&pid,
- peer,
- sizeof(struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity)))
return;
if (NULL == internal_cls)
return; /* not about target peer */
if (! benchmark_send)
return; /* not transmitting */
FPRINTF (stdout,
- _("Disconnected from peer `%s' while benchmarking\n"),
- GNUNET_i2s (&pid));
+ _ ("Disconnected from peer `%s' while benchmarking\n"),
+ GNUNET_i2s (&pid));
}
@@ -606,16 +580,16 @@ notify_disconnect (void *cls,
static void *
monitor_notify_connect (void *cls,
const struct GNUNET_PeerIdentity *peer,
- struct GNUNET_MQ_Handle *mq)
+ struct GNUNET_MQ_Handle *mq)
{
struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
const char *now_str = GNUNET_STRINGS_absolute_time_to_string (now);
monitor_connect_counter++;
FPRINTF (stdout,
- _("%24s: %-17s %4s (%u connections in total)\n"),
+ _ ("%24s: %-17s %4s (%u connections in total)\n"),
now_str,
- _("Connected to"),
+ _ ("Connected to"),
GNUNET_i2s (peer),
monitor_connect_counter);
return NULL;
@@ -633,18 +607,18 @@ monitor_notify_connect (void *cls,
static void
monitor_notify_disconnect (void *cls,
const struct GNUNET_PeerIdentity *peer,
- void *internal_cls)
+ void *internal_cls)
{
struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
const char *now_str = GNUNET_STRINGS_absolute_time_to_string (now);
- GNUNET_assert(monitor_connect_counter > 0);
+ GNUNET_assert (monitor_connect_counter > 0);
monitor_connect_counter--;
FPRINTF (stdout,
- _("%24s: %-17s %4s (%u connections in total)\n"),
+ _ ("%24s: %-17s %4s (%u connections in total)\n"),
now_str,
- _("Disconnected from"),
+ _ ("Disconnected from"),
GNUNET_i2s (peer),
monitor_connect_counter);
}
@@ -658,8 +632,7 @@ monitor_notify_disconnect (void *cls,
* @return #GNUNET_OK
*/
static int
-check_dummy (void *cls,
- const struct GNUNET_MessageHeader *message)
+check_dummy (void *cls, const struct GNUNET_MessageHeader *message)
{
return GNUNET_OK; /* all messages are fine */
}
@@ -672,15 +645,14 @@ check_dummy (void *cls,
* @param message the message
*/
static void
-handle_dummy (void *cls,
- const struct GNUNET_MessageHeader *message)
+handle_dummy (void *cls, const struct GNUNET_MessageHeader *message)
{
if (! benchmark_receive)
return;
if (verbosity > 0)
FPRINTF (stdout,
- _("Received %u bytes\n"),
- (unsigned int) ntohs (message->size));
+ _ ("Received %u bytes\n"),
+ (unsigned int) ntohs (message->size));
if (0 == traffic_received)
start_time = GNUNET_TIME_absolute_get ();
traffic_received += ntohs (message->size);
@@ -711,24 +683,23 @@ print_info (const struct GNUNET_PeerIdentity *id,
struct GNUNET_TIME_Absolute state_timeout)
{
- if ( ((GNUNET_YES == iterate_connections) &&
- (GNUNET_YES == iterate_all)) ||
- (GNUNET_YES == monitor_connections))
+ if (((GNUNET_YES == iterate_connections) && (GNUNET_YES == iterate_all)) ||
+ (GNUNET_YES == monitor_connections))
{
FPRINTF (stdout,
- _("Peer `%s': %s %s in state `%s' until %s\n"),
+ _ ("Peer `%s': %s %s in state `%s' until %s\n"),
GNUNET_i2s (id),
(NULL == transport) ? "<none>" : transport,
(NULL == transport) ? "<none>" : addr,
GNUNET_TRANSPORT_ps2s (state),
GNUNET_STRINGS_absolute_time_to_string (state_timeout));
}
- else if ( (GNUNET_YES == iterate_connections) &&
- (GNUNET_TRANSPORT_is_connected(state)) )
+ else if ((GNUNET_YES == iterate_connections) &&
+ (GNUNET_TRANSPORT_is_connected (state)))
{
/* Only connected peers, skip state */
FPRINTF (stdout,
- _("Peer `%s': %s %s\n"),
+ _ ("Peer `%s': %s %s\n"),
GNUNET_i2s (id),
transport,
addr);
@@ -753,9 +724,7 @@ print_info (const struct GNUNET_PeerIdentity *id,
* if #GNUNET_SYSERR: communication error (IPC error)
*/
static void
-process_peer_string (void *cls,
- const char *address,
- int res)
+process_peer_string (void *cls, const char *address, int res)
{
struct PeerResolutionContext *rc = cls;
@@ -763,11 +732,12 @@ process_peer_string (void *cls,
{
if (GNUNET_SYSERR == res)
{
- FPRINTF (stderr,
- "Failed to convert address for peer `%s' plugin `%s' length %u to string \n",
- GNUNET_i2s (&rc->addrcp->peer),
- rc->addrcp->transport_name,
- (unsigned int) rc->addrcp->address_length);
+ FPRINTF (
+ stderr,
+ "Failed to convert address for peer `%s' plugin `%s' length %u to string \n",
+ GNUNET_i2s (&rc->addrcp->peer),
+ rc->addrcp->transport_name,
+ (unsigned int) rc->addrcp->address_length);
print_info (&rc->addrcp->peer,
rc->transport,
NULL,
@@ -818,9 +788,7 @@ process_peer_string (void *cls,
}
GNUNET_free (rc->transport);
GNUNET_free (rc->addrcp);
- GNUNET_CONTAINER_DLL_remove (rc_head,
- rc_tail,
- rc);
+ GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
GNUNET_free (rc);
if ((0 == address_resolutions) && (iterate_connections))
{
@@ -854,9 +822,7 @@ resolve_peer_address (const struct GNUNET_HELLO_Address *address,
struct PeerResolutionContext *rc;
rc = GNUNET_new (struct PeerResolutionContext);
- GNUNET_CONTAINER_DLL_insert (rc_head,
- rc_tail,
- rc);
+ GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
address_resolutions++;
rc->transport = GNUNET_strdup (address->transport_name);
rc->addrcp = GNUNET_HELLO_address_copy (address);
@@ -869,7 +835,7 @@ resolve_peer_address (const struct GNUNET_HELLO_Address *address,
numeric,
RESOLUTION_TIMEOUT,
&process_peer_string,
- rc);
+ rc);
}
@@ -897,15 +863,14 @@ process_peer_iteration_cb (void *cls,
return;
}
- if ( (GNUNET_NO == iterate_all) &&
- (GNUNET_NO == GNUNET_TRANSPORT_is_connected(state)))
- return; /* Display only connected peers */
+ if ((GNUNET_NO == iterate_all) &&
+ (GNUNET_NO == GNUNET_TRANSPORT_is_connected (state)))
+ return; /* Display only connected peers */
if (NULL != op_timeout)
GNUNET_SCHEDULER_cancel (op_timeout);
- op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT,
- &operation_timeout,
- NULL);
+ op_timeout =
+ GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received address for peer `%s': %s\n",
@@ -913,16 +878,9 @@ process_peer_iteration_cb (void *cls,
address ? address->transport_name : "");
if (NULL != address)
- resolve_peer_address (address,
- numeric,
- state,
- state_timeout);
+ resolve_peer_address (address, numeric, state, state_timeout);
else
- print_info (peer,
- NULL,
- NULL,
- state,
- state_timeout);
+ print_info (peer, NULL, NULL, state, state_timeout);
}
@@ -958,7 +916,7 @@ struct PluginMonitorAddress
*/
static void
print_plugin_event_info (struct PluginMonitorAddress *addr,
- const struct GNUNET_TRANSPORT_SessionInfo *info)
+ const struct GNUNET_TRANSPORT_SessionInfo *info)
{
const char *state;
@@ -987,20 +945,22 @@ print_plugin_event_info (struct PluginMonitorAddress *addr,
"%s: state %s timeout in %s @ %s%s\n",
GNUNET_i2s (&info->address->peer),
state,
- GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (info->session_timeout),
- GNUNET_YES),
- addr->str,
+ GNUNET_STRINGS_relative_time_to_string (
+ GNUNET_TIME_absolute_get_remaining (info->session_timeout),
+ GNUNET_YES),
+ addr->str,
(info->is_inbound == GNUNET_YES) ? " (INBOUND)" : "");
fprintf (stdout,
"%s: queue has %3u messages and %6u bytes\n",
GNUNET_i2s (&info->address->peer),
info->num_msg_pending,
info->num_bytes_pending);
- if (0 != GNUNET_TIME_absolute_get_remaining (info->receive_delay).rel_value_us)
+ if (0 !=
+ GNUNET_TIME_absolute_get_remaining (info->receive_delay).rel_value_us)
fprintf (stdout,
- "%s: receiving blocked until %s\n",
- GNUNET_i2s (&info->address->peer),
- GNUNET_STRINGS_absolute_time_to_string (info->receive_delay));
+ "%s: receiving blocked until %s\n",
+ GNUNET_i2s (&info->address->peer),
+ GNUNET_STRINGS_absolute_time_to_string (info->receive_delay));
}
@@ -1021,9 +981,7 @@ print_plugin_event_info (struct PluginMonitorAddress *addr,
* if #GNUNET_SYSERR: communication error (IPC error)
*/
static void
-address_cb (void *cls,
- const char *address,
- int res)
+address_cb (void *cls, const char *address, int res)
{
struct PluginMonitorAddress *addr = cls;
@@ -1035,8 +993,7 @@ address_cb (void *cls,
if (NULL != addr->str)
return;
addr->str = GNUNET_strdup (address);
- print_plugin_event_info (addr,
- &addr->si);
+ print_plugin_event_info (addr, &addr->si);
}
@@ -1065,8 +1022,7 @@ plugin_monitoring_cb (void *cls,
{
struct PluginMonitorAddress *addr;
- if ( (NULL == info) &&
- (NULL == session) )
+ if ((NULL == info) && (NULL == session))
return; /* in sync with transport service */
addr = *session_ctx;
if (NULL == info)
@@ -1084,26 +1040,25 @@ plugin_monitoring_cb (void *cls,
}
return; /* shutdown */
}
- if (0 != memcmp (&info->address->peer,
- &pid,
- sizeof (struct GNUNET_PeerIdentity)))
+ if (0 !=
+ memcmp (&info->address->peer, &pid, sizeof (struct GNUNET_PeerIdentity)))
return; /* filtered */
if (NULL == addr)
{
addr = GNUNET_new (struct PluginMonitorAddress);
- addr->asc = GNUNET_TRANSPORT_address_to_string (cfg,
- info->address,
- numeric,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &address_cb,
- addr);
+ addr->asc =
+ GNUNET_TRANSPORT_address_to_string (cfg,
+ info->address,
+ numeric,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &address_cb,
+ addr);
*session_ctx = addr;
}
if (NULL == addr->str)
addr->si = *info;
else
- print_plugin_event_info (addr,
- info);
+ print_plugin_event_info (addr, info);
if (GNUNET_TRANSPORT_SS_DONE == info->state)
{
if (NULL != addr->asc)
@@ -1141,38 +1096,35 @@ process_peer_monitoring_cb (void *cls,
{
FPRINTF (stdout,
"%s",
- _("Monitor disconnected from transport service. Reconnecting.\n"));
+ _ (
+ "Monitor disconnected from transport service. Reconnecting.\n"));
return;
}
if (NULL != op_timeout)
GNUNET_SCHEDULER_cancel (op_timeout);
- op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT,
- &operation_timeout,
- NULL);
+ op_timeout =
+ GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL);
- if (NULL == (m = GNUNET_CONTAINER_multipeermap_get (monitored_peers,
- peer)))
+ if (NULL == (m = GNUNET_CONTAINER_multipeermap_get (monitored_peers, peer)))
{
m = GNUNET_new (struct MonitoredPeer);
- GNUNET_CONTAINER_multipeermap_put (monitored_peers,
- peer,
- m,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ GNUNET_CONTAINER_multipeermap_put (
+ monitored_peers,
+ peer,
+ m,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
else
{
- if ( (m->state == state) &&
- (m->state_timeout.abs_value_us == state_timeout.abs_value_us) &&
- (NULL == address) &&
- (NULL == m->address) )
+ if ((m->state == state) &&
+ (m->state_timeout.abs_value_us == state_timeout.abs_value_us) &&
+ (NULL == address) && (NULL == m->address))
{
return; /* No real change */
}
- if ( (m->state == state) &&
- (NULL != address) &&
- (NULL != m->address) &&
- (0 == GNUNET_HELLO_address_cmp(m->address, address)) )
+ if ((m->state == state) && (NULL != address) && (NULL != m->address) &&
+ (0 == GNUNET_HELLO_address_cmp (m->address, address)))
return; /* No real change */
}
@@ -1187,16 +1139,9 @@ process_peer_monitoring_cb (void *cls,
m->state_timeout = state_timeout;
if (NULL != address)
- resolve_peer_address (m->address,
- numeric,
- m->state,
- m->state_timeout);
+ resolve_peer_address (m->address, numeric, m->state, m->state_timeout);
else
- print_info (peer,
- NULL,
- NULL,
- m->state,
- m->state_timeout);
+ print_info (peer, NULL, NULL, m->state, m->state_timeout);
}
@@ -1210,12 +1155,9 @@ process_peer_monitoring_cb (void *cls,
* @return #GNUNET_OK if the connection is allowed, #GNUNET_SYSERR if not
*/
static int
-blacklist_cb (void *cls,
- const struct GNUNET_PeerIdentity *cpid)
+blacklist_cb (void *cls, const struct GNUNET_PeerIdentity *cpid)
{
- if (0 == memcmp (cpid,
- &pid,
- sizeof (struct GNUNET_PeerIdentity)))
+ if (0 == memcmp (cpid, &pid, sizeof (struct GNUNET_PeerIdentity)))
return GNUNET_SYSERR;
return GNUNET_OK;
}
@@ -1231,7 +1173,7 @@ blacklist_cb (void *cls,
*/
static void
run (void *cls,
- char * const *args,
+ char *const *args,
const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *mycfg)
{
@@ -1241,127 +1183,119 @@ run (void *cls,
cfg = (struct GNUNET_CONFIGURATION_Handle *) mycfg;
- counter = benchmark_send + benchmark_receive + iterate_connections
- + monitor_connections + monitor_connects + do_disconnect +
- monitor_plugins;
+ counter = benchmark_send + benchmark_receive + iterate_connections +
+ monitor_connections + monitor_connects + do_disconnect +
+ monitor_plugins;
if (1 < counter)
{
- FPRINTF (stderr,
- _("Multiple operations given. Please choose only one operation: %s, %s, %s, %s, %s, %s %s\n"),
- "disconnect",
- "benchmark send",
- "benchmark receive",
- "information",
- "monitor",
- "events",
- "plugins");
+ FPRINTF (
+ stderr,
+ _ (
+ "Multiple operations given. Please choose only one operation: %s, %s, %s, %s, %s, %s %s\n"),
+ "disconnect",
+ "benchmark send",
+ "benchmark receive",
+ "information",
+ "monitor",
+ "events",
+ "plugins");
return;
}
if (0 == counter)
{
- FPRINTF (stderr,
- _("No operation given. Please choose one operation: %s, %s, %s, %s, %s, %s, %s\n"),
- "disconnect",
- "benchmark send",
- "benchmark receive",
- "information",
- "monitor",
- "events",
- "plugins");
+ FPRINTF (
+ stderr,
+ _ (
+ "No operation given. Please choose one operation: %s, %s, %s, %s, %s, %s, %s\n"),
+ "disconnect",
+ "benchmark send",
+ "benchmark receive",
+ "information",
+ "monitor",
+ "events",
+ "plugins");
return;
}
if (do_disconnect) /* -D: Disconnect from peer */
{
- if (0 == memcmp (&zero_pid,
- &pid,
- sizeof (pid)))
+ if (0 == memcmp (&zero_pid, &pid, sizeof (pid)))
{
FPRINTF (stderr,
- _("Option `%s' makes no sense without option `%s'.\n"),
- "-D", "-p");
+ _ ("Option `%s' makes no sense without option `%s'.\n"),
+ "-D",
+ "-p");
ret = 1;
return;
}
- blacklist = GNUNET_TRANSPORT_blacklist (cfg,
- &blacklist_cb,
- NULL);
+ blacklist = GNUNET_TRANSPORT_blacklist (cfg, &blacklist_cb, NULL);
if (NULL == blacklist)
{
FPRINTF (stderr,
"%s",
- _("Failed to connect to transport service for disconnection\n"));
+ _ (
+ "Failed to connect to transport service for disconnection\n"));
ret = 1;
return;
}
FPRINTF (stdout,
"%s",
- _("Blacklisting request in place, stop with CTRL-C\n"));
+ _ ("Blacklisting request in place, stop with CTRL-C\n"));
}
else if (benchmark_send) /* -s: Benchmark sending */
{
- if (0 == memcmp (&zero_pid,
- &pid,
- sizeof (pid)))
+ if (0 == memcmp (&zero_pid, &pid, sizeof (pid)))
{
FPRINTF (stderr,
- _("Option `%s' makes no sense without option `%s'.\n"),
- "-s", "-p");
+ _ ("Option `%s' makes no sense without option `%s'.\n"),
+ "-s",
+ "-p");
ret = 1;
return;
}
handle = GNUNET_TRANSPORT_core_connect (cfg,
- NULL,
- NULL,
- NULL,
- &notify_connect,
- &notify_disconnect,
- NULL);
+ NULL,
+ NULL,
+ NULL,
+ &notify_connect,
+ &notify_disconnect,
+ NULL);
if (NULL == handle)
{
- FPRINTF (stderr,
- "%s",
- _("Failed to connect to transport service\n"));
+ FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n"));
ret = 1;
return;
}
start_time = GNUNET_TIME_absolute_get ();
- op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT,
- &operation_timeout,
- NULL);
+ op_timeout =
+ GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL);
}
else if (benchmark_receive) /* -b: Benchmark receiving */
{
- struct GNUNET_MQ_MessageHandler handlers[] = {
- GNUNET_MQ_hd_var_size (dummy,
- GNUNET_MESSAGE_TYPE_DUMMY,
- struct GNUNET_MessageHeader,
- NULL),
- GNUNET_MQ_handler_end ()
- };
+ struct GNUNET_MQ_MessageHandler handlers[] =
+ {GNUNET_MQ_hd_var_size (dummy,
+ GNUNET_MESSAGE_TYPE_DUMMY,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_handler_end ()};
handle = GNUNET_TRANSPORT_core_connect (cfg,
- NULL,
- handlers,
- NULL,
- NULL,
- NULL,
- NULL);
+ NULL,
+ handlers,
+ NULL,
+ NULL,
+ NULL,
+ NULL);
if (NULL == handle)
{
- FPRINTF (stderr,
- "%s",
- _("Failed to connect to transport service\n"));
+ FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n"));
ret = 1;
return;
}
if (verbosity > 0)
- FPRINTF (stdout,
- "%s",
- _("Starting to receive benchmark data\n"));
+ FPRINTF (stdout, "%s", _ ("Starting to receive benchmark data\n"));
start_time = GNUNET_TIME_absolute_get ();
-
}
else if (iterate_connections) /* -i: List information about peers once */
{
@@ -1370,42 +1304,38 @@ run (void *cls,
GNUNET_YES,
&process_peer_iteration_cb,
(void *) cfg);
- op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT,
- &operation_timeout,
- NULL);
+ op_timeout =
+ GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL);
}
- else if (monitor_connections) /* -m: List information about peers continuously */
+ else if (monitor_connections) /* -m: List information about peers continuously
+ */
{
- monitored_peers = GNUNET_CONTAINER_multipeermap_create (10,
- GNUNET_NO);
+ monitored_peers = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
pic = GNUNET_TRANSPORT_monitor_peers (cfg,
- &pid,
+ &pid,
GNUNET_NO,
&process_peer_monitoring_cb,
NULL);
}
- else if (monitor_plugins) /* -P: List information about plugins continuously */
+ else if (monitor_plugins) /* -P: List information about plugins continuously
+ */
{
monitored_plugins = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO);
- pm = GNUNET_TRANSPORT_monitor_plugins (cfg,
- &plugin_monitoring_cb,
- NULL);
+ pm = GNUNET_TRANSPORT_monitor_plugins (cfg, &plugin_monitoring_cb, NULL);
}
else if (monitor_connects) /* -e : Monitor (dis)connect events continuously */
{
monitor_connect_counter = 0;
handle = GNUNET_TRANSPORT_core_connect (cfg,
- NULL,
- NULL,
- NULL,
- &monitor_notify_connect,
- &monitor_notify_disconnect,
- NULL);
+ NULL,
+ NULL,
+ NULL,
+ &monitor_notify_connect,
+ &monitor_notify_disconnect,
+ NULL);
if (NULL == handle)
{
- FPRINTF (stderr,
- "%s",
- _("Failed to connect to transport service\n"));
+ FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n"));
ret = 1;
return;
}
@@ -1413,75 +1343,86 @@ run (void *cls,
}
else
{
- GNUNET_break(0);
+ GNUNET_break (0);
return;
}
- GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
- NULL);
+ GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
}
int
-main (int argc,
- char * const *argv)
+main (int argc, char *const *argv)
{
int res;
- struct GNUNET_GETOPT_CommandLineOption options[] = {
- GNUNET_GETOPT_option_flag ('a',
- "all",
- gettext_noop ("print information for all peers (instead of only connected peers)"),
- &iterate_all),
- GNUNET_GETOPT_option_flag ('b',
- "benchmark",
- gettext_noop ("measure how fast we are receiving data from all peers (until CTRL-C)"),
- &benchmark_receive),
- GNUNET_GETOPT_option_flag ('D',
- "disconnect",
- gettext_noop ("disconnect from a peer"),
- &do_disconnect),
- GNUNET_GETOPT_option_flag ('i',
- "information",
- gettext_noop ("provide information about all current connections (once)"),
- &iterate_connections),
- GNUNET_GETOPT_option_flag ('m',
- "monitor",
- gettext_noop ("provide information about all current connections (continuously)"),
- &monitor_connections),
- GNUNET_GETOPT_option_flag ('e',
- "events",
- gettext_noop ("provide information about all connects and disconnect events (continuously)"),
- &monitor_connects),
- GNUNET_GETOPT_option_flag ('n',
- "numeric",
- gettext_noop ("do not resolve hostnames"),
- &numeric),
- GNUNET_GETOPT_option_base32_auto ('p',
- "peer",
- "PEER",
- gettext_noop ("peer identity"),
- &pid),
- GNUNET_GETOPT_option_flag ('P',
- "plugins",
- gettext_noop ("monitor plugin sessions"),
- &monitor_plugins),
- GNUNET_GETOPT_option_flag ('s',
- "send",
- gettext_noop
- ("send data for benchmarking to the other peer (until CTRL-C)"),
- &benchmark_send),
- GNUNET_GETOPT_option_verbose (&verbosity),
- GNUNET_GETOPT_OPTION_END
- };
+ struct GNUNET_GETOPT_CommandLineOption options[] =
+ {GNUNET_GETOPT_option_flag (
+ 'a',
+ "all",
+ gettext_noop (
+ "print information for all peers (instead of only connected peers)"),
+ &iterate_all),
+ GNUNET_GETOPT_option_flag (
+ 'b',
+ "benchmark",
+ gettext_noop (
+ "measure how fast we are receiving data from all peers (until CTRL-C)"),
+ &benchmark_receive),
+ GNUNET_GETOPT_option_flag ('D',
+ "disconnect",
+ gettext_noop ("disconnect from a peer"),
+ &do_disconnect),
+ GNUNET_GETOPT_option_flag (
+ 'i',
+ "information",
+ gettext_noop (
+ "provide information about all current connections (once)"),
+ &iterate_connections),
+ GNUNET_GETOPT_option_flag (
+ 'm',
+ "monitor",
+ gettext_noop (
+ "provide information about all current connections (continuously)"),
+ &monitor_connections),
+ GNUNET_GETOPT_option_flag (
+ 'e',
+ "events",
+ gettext_noop (
+ "provide information about all connects and disconnect events (continuously)"),
+ &monitor_connects),
+ GNUNET_GETOPT_option_flag ('n',
+ "numeric",
+ gettext_noop ("do not resolve hostnames"),
+ &numeric),
+ GNUNET_GETOPT_option_base32_auto ('p',
+ "peer",
+ "PEER",
+ gettext_noop ("peer identity"),
+ &pid),
+ GNUNET_GETOPT_option_flag ('P',
+ "plugins",
+ gettext_noop ("monitor plugin sessions"),
+ &monitor_plugins),
+ GNUNET_GETOPT_option_flag (
+ 's',
+ "send",
+ gettext_noop (
+ "send data for benchmarking to the other peer (until CTRL-C)"),
+ &benchmark_send),
+ GNUNET_GETOPT_option_verbose (&verbosity),
+ GNUNET_GETOPT_OPTION_END};
if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv))
return 2;
- res = GNUNET_PROGRAM_run (argc, argv,
- "gnunet-transport",
- gettext_noop ("Direct access to transport service."),
- options,
- &run, NULL);
+ res =
+ GNUNET_PROGRAM_run (argc,
+ argv,
+ "gnunet-transport",
+ gettext_noop ("Direct access to transport service."),
+ options,
+ &run,
+ NULL);
GNUNET_free ((void *) argv);
if (GNUNET_OK == res)
return ret;
diff --git a/src/transport/transport-testing.h b/src/transport/transport-testing.h
index 4629d6125..83bbf277b 100644
--- a/src/transport/transport-testing.h
+++ b/src/transport/transport-testing.h
@@ -11,7 +11,7 @@
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
-
+
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
@@ -30,7 +30,6 @@
#include "gnunet_util_lib.h"
#include "gnunet_hello_lib.h"
#include "gnunet_transport_service.h"
-#include "gnunet_transport_core_service.h"
#include "gnunet_transport_hello_service.h"
#include "gnunet_transport_manipulation_service.h"
#include "gnunet_testing_lib.h"
@@ -143,7 +142,7 @@ struct GNUNET_TRANSPORT_TESTING_PeerContext
* Closure for @e start_cb.
*/
void *start_cb_cls;
-
+
/**
* An unique number to identify the peer
*/
@@ -207,12 +206,12 @@ struct GNUNET_TRANSPORT_TESTING_ConnectRequest
*/
struct GNUNET_MQ_Handle *mq;
- /**
+ /**
* Set if peer1 says the connection is up to peer2.
*/
int p1_c;
- /**
+ /**
* Set if peer2 says the connection is up to peer1.
*/
int p2_c;
@@ -289,15 +288,16 @@ GNUNET_TRANSPORT_TESTING_done (struct GNUNET_TRANSPORT_TESTING_Handle *tth);
* @return the peer context
*/
struct GNUNET_TRANSPORT_TESTING_PeerContext *
-GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_Handle *tth,
- const char *cfgname,
- int peer_id,
- const struct GNUNET_MQ_MessageHandler *handlers,
- GNUNET_TRANSPORT_NotifyConnect nc,
- GNUNET_TRANSPORT_NotifyDisconnect nd,
- void *cb_cls,
- GNUNET_SCHEDULER_TaskCallback start_cb,
- void *start_cb_cls);
+GNUNET_TRANSPORT_TESTING_start_peer (
+ struct GNUNET_TRANSPORT_TESTING_Handle *tth,
+ const char *cfgname,
+ int peer_id,
+ const struct GNUNET_MQ_MessageHandler *handlers,
+ GNUNET_TRANSPORT_NotifyConnect nc,
+ GNUNET_TRANSPORT_NotifyDisconnect nd,
+ void *cb_cls,
+ GNUNET_SCHEDULER_TaskCallback start_cb,
+ void *start_cb_cls);
/**
@@ -306,7 +306,8 @@ GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_Handle *tth
* @param p the peer
*/
void
-GNUNET_TRANSPORT_TESTING_stop_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext *pc);
+GNUNET_TRANSPORT_TESTING_stop_peer (
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *pc);
/**
@@ -318,10 +319,10 @@ GNUNET_TRANSPORT_TESTING_stop_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext
* @return #GNUNET_OK in success otherwise #GNUNET_SYSERR
*/
int
-GNUNET_TRANSPORT_TESTING_restart_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext *p,
- GNUNET_SCHEDULER_TaskCallback restart_cb,
- void *restart_cb_cls);
-
+GNUNET_TRANSPORT_TESTING_restart_peer (
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *p,
+ GNUNET_SCHEDULER_TaskCallback restart_cb,
+ void *restart_cb_cls);
/**
@@ -331,15 +332,17 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct GNUNET_TRANSPORT_TESTING_PeerConte
*
* @param p1 peer 1
* @param p2 peer 2
- * @param cb the callback to call when both peers notified that they are connected
+ * @param cb the callback to call when both peers notified that they are
+ * connected
* @param cls callback cls
* @return a connect request handle
*/
struct GNUNET_TRANSPORT_TESTING_ConnectRequest *
-GNUNET_TRANSPORT_TESTING_connect_peers (struct GNUNET_TRANSPORT_TESTING_PeerContext *p1,
- struct GNUNET_TRANSPORT_TESTING_PeerContext *p2,
- GNUNET_SCHEDULER_TaskCallback cb,
- void *cls);
+GNUNET_TRANSPORT_TESTING_connect_peers (
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *p1,
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *p2,
+ GNUNET_SCHEDULER_TaskCallback cb,
+ void *cls);
/**
@@ -350,7 +353,8 @@ GNUNET_TRANSPORT_TESTING_connect_peers (struct GNUNET_TRANSPORT_TESTING_PeerCont
* @param cc a connect request handle
*/
void
-GNUNET_TRANSPORT_TESTING_connect_peers_cancel (struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc);
+GNUNET_TRANSPORT_TESTING_connect_peers_cancel (
+ struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc);
/**
@@ -359,9 +363,9 @@ GNUNET_TRANSPORT_TESTING_connect_peers_cancel (struct GNUNET_TRANSPORT_TESTING_C
* @param cls closure
* @param cc request matching the query
*/
-typedef void
-(*GNUNET_TRANSPORT_TESTING_ConnectContextCallback)(void *cls,
- struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc);
+typedef void (*GNUNET_TRANSPORT_TESTING_ConnectContextCallback) (
+ void *cls,
+ struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc);
/**
@@ -369,14 +373,15 @@ typedef void
*
* @param p1 first peer
* @param p2 second peer
- * @param cb function to call
+ * @param cb function to call
* @param cb_cls closure for @a cb
*/
void
-GNUNET_TRANSPORT_TESTING_find_connecting_context (struct GNUNET_TRANSPORT_TESTING_PeerContext *p1,
- struct GNUNET_TRANSPORT_TESTING_PeerContext *p2,
- GNUNET_TRANSPORT_TESTING_ConnectContextCallback cb,
- void *cb_cls);
+GNUNET_TRANSPORT_TESTING_find_connecting_context (
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *p1,
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *p2,
+ GNUNET_TRANSPORT_TESTING_ConnectContextCallback cb,
+ void *cb_cls);
/* ********************** high-level process functions *************** */
@@ -390,10 +395,10 @@ GNUNET_TRANSPORT_TESTING_find_connecting_context (struct GNUNET_TRANSPORT_TESTIN
* @param num_peers size of the @a p array
* @param p the peers that were launched
*/
-typedef void
-(*GNUNET_TRANSPORT_TESTING_ConnectContinuation)(void *cls,
- unsigned int num_peers,
- struct GNUNET_TRANSPORT_TESTING_PeerContext *p[]);
+typedef void (*GNUNET_TRANSPORT_TESTING_ConnectContinuation) (
+ void *cls,
+ unsigned int num_peers,
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *p[]);
/**
@@ -423,7 +428,6 @@ struct GNUNET_TRANSPORT_TESTING_TestMessage
GNUNET_NETWORK_STRUCT_END
-
/**
* Function called by the transport for each received message.
*
@@ -432,11 +436,11 @@ GNUNET_NETWORK_STRUCT_END
* @param sender sender of the message
* @param message the message
*/
-typedef void
-(*GNUNET_TRANSPORT_TESTING_ReceiveCallback) (void *cls,
- struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver,
- const struct GNUNET_PeerIdentity *sender,
- const struct GNUNET_TRANSPORT_TESTING_TestMessage *message);
+typedef void (*GNUNET_TRANSPORT_TESTING_ReceiveCallback) (
+ void *cls,
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_TRANSPORT_TESTING_TestMessage *message);
/**
@@ -447,10 +451,10 @@ typedef void
* @param me peer experiencing the event
* @param other peer that connected to @a me
*/
-typedef void
-(*GNUNET_TRANSPORT_TESTING_NotifyConnect) (void *cls,
- struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
- const struct GNUNET_PeerIdentity *other);
+typedef void (*GNUNET_TRANSPORT_TESTING_NotifyConnect) (
+ void *cls,
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
+ const struct GNUNET_PeerIdentity *other);
/**
@@ -461,10 +465,10 @@ typedef void
* @param me peer experiencing the event
* @param other peer that disconnected from @a me
*/
-typedef void
-(*GNUNET_TRANSPORT_TESTING_NotifyDisconnect) (void *cls,
- struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
- const struct GNUNET_PeerIdentity *other);
+typedef void (*GNUNET_TRANSPORT_TESTING_NotifyDisconnect) (
+ void *cls,
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
+ const struct GNUNET_PeerIdentity *other);
/**
@@ -593,7 +597,7 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext
* message.
*/
uint32_t send_num_gen;
-
+
/* ******* internal state, clients should not mess with this **** */
/**
@@ -625,7 +629,6 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext
* Array with @e num_peers entries.
*/
struct GNUNET_TRANSPORT_TESTING_InternalPeerContext *ip;
-
};
@@ -637,8 +640,9 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext
* @return NULL if @a peer was not found
*/
struct GNUNET_TRANSPORT_TESTING_PeerContext *
-GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext *ccc,
- const struct GNUNET_PeerIdentity *peer);
+GNUNET_TRANSPORT_TESTING_find_peer (
+ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext *ccc,
+ const struct GNUNET_PeerIdentity *peer);
/**
@@ -648,7 +652,8 @@ GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheck
* abort the test, and a shutdown handler to clean up properly
* on exit.
*
- * @param cls closure of type `struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext`
+ * @param cls closure of type `struct
+ * GNUNET_TRANSPORT_TESTING_ConnectCheckContext`
* @param tth_ initialized testing handle
* @param test_plugin_ name of the plugin
* @param test_name_ name of the test
@@ -657,12 +662,13 @@ GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheck
* @return #GNUNET_SYSERR on error
*/
int
-GNUNET_TRANSPORT_TESTING_connect_check (void *cls,
- struct GNUNET_TRANSPORT_TESTING_Handle *tth_,
- const char *test_plugin_,
- const char *test_name_,
- unsigned int num_peers,
- char *cfg_files[]);
+GNUNET_TRANSPORT_TESTING_connect_check (
+ void *cls,
+ struct GNUNET_TRANSPORT_TESTING_Handle *tth_,
+ const char *test_plugin_,
+ const char *test_name_,
+ unsigned int num_peers,
+ char *cfg_files[]);
/**
@@ -677,13 +683,13 @@ GNUNET_TRANSPORT_TESTING_connect_check (void *cls,
* @param cfg_files array of names of configuration files for the peers
* @return #GNUNET_SYSERR on error
*/
-typedef int
-(*GNUNET_TRANSPORT_TESTING_CheckCallback)(void *cls,
- struct GNUNET_TRANSPORT_TESTING_Handle *tth_,
- const char *test_plugin_,
- const char *test_name_,
- unsigned int num_peers,
- char *cfg_files[]);
+typedef int (*GNUNET_TRANSPORT_TESTING_CheckCallback) (
+ void *cls,
+ struct GNUNET_TRANSPORT_TESTING_Handle *tth_,
+ const char *test_plugin_,
+ const char *test_name_,
+ unsigned int num_peers,
+ char *cfg_files[]);
/**
@@ -712,8 +718,12 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0,
* @param check_cls closure for @a check
* @return #GNUNET_OK on success
*/
-#define GNUNET_TRANSPORT_TESTING_main(num_peers,check,check_cls) \
- GNUNET_TRANSPORT_TESTING_main_ (argv[0], __FILE__, num_peers, check, check_cls)
+#define GNUNET_TRANSPORT_TESTING_main(num_peers, check, check_cls) \
+ GNUNET_TRANSPORT_TESTING_main_ (argv[0], \
+ __FILE__, \
+ num_peers, \
+ check, \
+ check_cls)
/* ***************** Convenience functions for sending ********* */
@@ -725,7 +735,8 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0,
* @param sender the sending peer
* @param receiver the receiving peer
* @param mtype message type to use
- * @param msize size of the message, at least `sizeof (struct GNUNET_TRANSPORT_TESTING_TestMessage)`
+ * @param msize size of the message, at least `sizeof (struct
+ * GNUNET_TRANSPORT_TESTING_TestMessage)`
* @param num unique message number
* @param cont continuation to call after transmission
* @param cont_cls closure for @a cont
@@ -734,13 +745,14 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0,
* #GNUNET_SYSERR if @a msize is illegal
*/
int
-GNUNET_TRANSPORT_TESTING_send (struct GNUNET_TRANSPORT_TESTING_PeerContext *sender,
- struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver,
- uint16_t mtype,
- uint16_t msize,
- uint32_t num,
- GNUNET_SCHEDULER_TaskCallback cont,
- void *cont_cls);
+GNUNET_TRANSPORT_TESTING_send (
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *sender,
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver,
+ uint16_t mtype,
+ uint16_t msize,
+ uint32_t num,
+ GNUNET_SCHEDULER_TaskCallback cont,
+ void *cont_cls);
/**
@@ -771,14 +783,14 @@ struct GNUNET_TRANSPORT_TESTING_SendClosure
* the message size, can be NULL in which case the message
* size is the default.
*/
- size_t (*get_size_cb)(unsigned int n);
-
+ size_t (*get_size_cb) (unsigned int n);
+
/**
* Number of messages to be transmitted in a loop.
* Use zero for "forever" (until external shutdown).
*/
unsigned int num_messages;
-
+
/**
* Function to call after all transmissions, can be NULL.
*/
@@ -788,12 +800,11 @@ struct GNUNET_TRANSPORT_TESTING_SendClosure
* Closure for @e cont.
*/
void *cont_cls;
-
};
/**
- * Task that sends a minimalistic test message from the
+ * Task that sends a minimalistic test message from the
* first peer to the second peer.
*
* @param cls the `struct GNUNET_TRANSPORT_TESTING_SendClosure`
@@ -804,14 +815,14 @@ void
GNUNET_TRANSPORT_TESTING_simple_send (void *cls);
/**
- * Size of a message sent with
+ * Size of a message sent with
* #GNUNET_TRANSPORT_TESTING_large_send(). Big enough
* to usually force defragmentation.
*/
#define GNUNET_TRANSPORT_TESTING_LARGE_MESSAGE_SIZE 2600
/**
- * Task that sends a large test message from the
+ * Task that sends a large test message from the
* first peer to the second peer.
*
* @param cls the `struct GNUNET_TRANSPORT_TESTING_SendClosure`
@@ -833,9 +844,10 @@ GNUNET_TRANSPORT_TESTING_large_send (void *cls);
* @param other peer that connected.
*/
void
-GNUNET_TRANSPORT_TESTING_log_connect (void *cls,
- struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
- const struct GNUNET_PeerIdentity *other);
+GNUNET_TRANSPORT_TESTING_log_connect (
+ void *cls,
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
+ const struct GNUNET_PeerIdentity *other);
/**
@@ -846,10 +858,10 @@ GNUNET_TRANSPORT_TESTING_log_connect (void *cls,
* @param other peer that disconnected.
*/
void
-GNUNET_TRANSPORT_TESTING_log_disconnect (void *cls,
- struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
- const struct GNUNET_PeerIdentity *other);
-
+GNUNET_TRANSPORT_TESTING_log_disconnect (
+ void *cls,
+ struct GNUNET_TRANSPORT_TESTING_PeerContext *me,
+ const struct GNUNET_PeerIdentity *other);
/* ********************** low-level filename functions *************** */
@@ -875,8 +887,7 @@ GNUNET_TRANSPORT_TESTING_get_test_name (const char *file);
* @return configuration name to use
*/
char *
-GNUNET_TRANSPORT_TESTING_get_config_name (const char *file,
- int count);
+GNUNET_TRANSPORT_TESTING_get_config_name (const char *file, int count);
/**
diff --git a/src/transport/transport.h b/src/transport/transport.h
index d2a3a262b..ed89940cc 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -123,10 +123,21 @@ struct ConnectInfoMessage
*/
struct GNUNET_MessageHeader header;
+#if (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \
+ defined(GNUNET_TRANSPORT_CORE_VERSION))
+
+ /**
+ * Always zero, for alignment.
+ */
+ uint32_t reserved GNUNET_PACKED;
+
+#else
+
/**
* Current outbound quota for this peer
*/
struct GNUNET_BANDWIDTH_Value32NBO quota_out;
+#endif
/**
* Identity of the new neighbour.
@@ -163,6 +174,8 @@ struct DisconnectInfoMessage
* Message used to set a particular bandwidth quota. Sent TO the
* service to set an incoming quota, sent FROM the service to update
* an outgoing quota.
+ *
+ * NOTE: no longer used in TNG!
*/
struct QuotaSetMessage
{
@@ -215,6 +228,13 @@ struct SendOkMessage
*/
struct GNUNET_MessageHeader header;
+#if (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \
+ defined(GNUNET_TRANSPORT_CORE_VERSION))
+
+ uint32_t reserved GNUNET_PACKED;
+
+#else
+
/**
* #GNUNET_OK if the transmission succeeded,
* #GNUNET_SYSERR if it failed (i.e. network disconnect);
@@ -229,11 +249,13 @@ struct SendOkMessage
uint16_t bytes_msg GNUNET_PACKED;
/**
- * Size of message sent over wire
- * Includes plugin and protocol specific overhead
+ * Size of message sent over wire.
+ * Includes plugin and protocol specific overheads.
*/
uint32_t bytes_physical GNUNET_PACKED;
+#endif
+
/**
* Which peer can send more now?
*/
@@ -242,6 +264,32 @@ struct SendOkMessage
/**
+ * Message used to notify the transport API that it can
+ * send another message to the transport service.
+ * (Used to implement flow control.)
+ */
+struct RecvOkMessage
+{
+
+ /**
+ * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Number of messages by which to increase the window, greater or
+ * equal to one.
+ */
+ uint32_t increase_window_delta GNUNET_PACKED;
+
+ /**
+ * Which peer can CORE handle more from now?
+ */
+ struct GNUNET_PeerIdentity peer;
+};
+
+
+/**
* Message used to notify the transport service about a message
* to be transmitted to another peer. The actual message follows.
*/
@@ -258,10 +306,14 @@ struct OutboundMessage
*/
uint32_t reserved GNUNET_PACKED;
+#if ! (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \
+ defined(GNUNET_TRANSPORT_CORE_VERSION))
+
/**
* Allowed delay.
*/
struct GNUNET_TIME_RelativeNBO timeout;
+#endif
/**
* Which peer should receive the message?
diff --git a/src/transport/transport_api2_core.c b/src/transport/transport_api2_core.c
index f00d00a44..a3c49e94f 100644
--- a/src/transport/transport_api2_core.c
+++ b/src/transport/transport_api2_core.c
@@ -32,13 +32,23 @@
#include "gnunet_transport_core_service.h"
#include "transport.h"
-#define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__)
+#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__)
/**
* How large to start with for the hashmap of neighbours.
*/
#define STARTING_NEIGHBOURS_SIZE 16
+/**
+ * Window size. How many messages to the same target do we pass
+ * to TRANSPORT without a SEND_OK in between? Small values limit
+ * thoughput, large values will increase latency.
+ *
+ * FIXME-OPTIMIZE: find out what good values are experimentally,
+ * maybe set adaptively (i.e. to observed available bandwidth).
+ */
+#define SEND_WINDOW_SIZE 4
+
/**
* Entry in hash table of all of our current (connected) neighbours.
@@ -72,46 +82,27 @@ struct Neighbour
void *handlers_cls;
/**
- * Entry in our readyness heap (which is sorted by @e next_ready
- * value). NULL if there is no pending transmission request for
- * this neighbour or if we're waiting for @e is_ready to become
- * true AFTER the @e out_tracker suggested that this peer's quota
- * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
- * we should immediately go back into the heap).
+ * How many messages can we still send to this peer before we should
+ * throttle?
*/
- struct GNUNET_CONTAINER_HeapNode *hn;
+ unsigned int ready_window;
/**
- * Task to trigger MQ when we have enough bandwidth for the
- * next transmission.
+ * Used to indicate our status if @e env is non-NULL. Set to
+ * #GNUNET_YES if we did pass a message to the MQ and are waiting
+ * for the call to #notify_send_done(). Set to #GNUNET_NO if the @e
+ * ready_window is 0 and @e env is waiting for a
+ * #GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK?
*/
- struct GNUNET_SCHEDULER_Task *timeout_task;
-
- /**
- * Outbound bandwidh tracker.
- */
- struct GNUNET_BANDWIDTH_Tracker out_tracker;
-
- /**
- * Sending consumed more bytes on wire than payload was announced
- * This overhead is added to the delay of next sending operation
- */
- unsigned long long traffic_overhead;
-
- /**
- * Is this peer currently ready to receive a message?
- */
- int is_ready;
+ int16_t awaiting_done;
/**
* Size of the message in @e env.
*/
uint16_t env_size;
-
};
-
/**
* Handle for the transport service (includes all of the
* state for the transport service).
@@ -141,11 +132,6 @@ struct GNUNET_TRANSPORT_CoreHandle
GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
/**
- * function to call on excess bandwidth events
- */
- GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
-
- /**
* My client connection to the transport service.
*/
struct GNUNET_MQ_Handle *mq;
@@ -181,7 +167,6 @@ struct GNUNET_TRANSPORT_CoreHandle
* (if #GNUNET_NO, then @e self is all zeros!).
*/
int check_self;
-
};
@@ -206,31 +191,7 @@ static struct Neighbour *
neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
const struct GNUNET_PeerIdentity *peer)
{
- return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
- peer);
-}
-
-
-/**
- * Function called by the bandwidth tracker if we have excess
- * bandwidth.
- *
- * @param cls the `struct Neighbour` that has excess bandwidth
- */
-static void
-notify_excess_cb (void *cls)
-{
- struct Neighbour *n = cls;
- struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Notifying CORE that more bandwidth is available for %s\n",
- GNUNET_i2s (&n->id));
-
- if (NULL != h->neb_cb)
- h->neb_cb (h->cls,
- &n->id,
- n->handlers_cls);
+ return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer);
}
@@ -245,9 +206,7 @@ notify_excess_cb (void *cls)
* #GNUNET_NO if not.
*/
static int
-neighbour_delete (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
+neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
{
struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
struct Neighbour *n = value;
@@ -255,16 +214,8 @@ neighbour_delete (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Dropping entry for neighbour `%s'.\n",
GNUNET_i2s (key));
- GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
if (NULL != handle->nd_cb)
- handle->nd_cb (handle->cls,
- &n->id,
- n->handlers_cls);
- if (NULL != n->timeout_task)
- {
- GNUNET_SCHEDULER_cancel (n->timeout_task);
- n->timeout_task = NULL;
- }
+ handle->nd_cb (handle->cls, &n->id, n->handlers_cls);
if (NULL != n->env)
{
GNUNET_MQ_send_cancel (n->env);
@@ -272,10 +223,9 @@ neighbour_delete (void *cls,
}
GNUNET_MQ_destroy (n->mq);
GNUNET_assert (NULL == n->mq);
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
- key,
- n));
+ GNUNET_assert (
+ GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n));
GNUNET_free (n);
return GNUNET_YES;
}
@@ -291,8 +241,7 @@ neighbour_delete (void *cls,
* @param error error code
*/
static void
-mq_error_handler (void *cls,
- enum GNUNET_MQ_Error error)
+mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
@@ -306,57 +255,42 @@ mq_error_handler (void *cls,
* A message from the handler's message queue to a neighbour was
* transmitted. Now trigger (possibly delayed) notification of the
* neighbour's message queue that we are done and thus ready for
- * the next message.
+ * the next message. Note that the MQ being ready is independent
+ * of the send window, as we may queue many messages and simply
+ * not pass them to TRANSPORT if the send window is insufficient.
*
* @param cls the `struct Neighbour` where the message was sent
*/
static void
-notify_send_done_fin (void *cls)
+notify_send_done (void *cls)
{
struct Neighbour *n = cls;
- n->timeout_task = NULL;
- n->is_ready = GNUNET_YES;
+ n->awaiting_done = GNUNET_NO;
+ n->env = NULL;
GNUNET_MQ_impl_send_continue (n->mq);
}
/**
- * A message from the handler's message queue to a neighbour was
- * transmitted. Now trigger (possibly delayed) notification of the
- * neighbour's message queue that we are done and thus ready for
- * the next message.
+ * We have an envelope waiting for transmission at @a n, and
+ * our transmission window is positive. Perform the transmission.
*
- * @param cls the `struct Neighbour` where the message was sent
+ * @param n neighbour to perform transmission for
*/
static void
-notify_send_done (void *cls)
+do_send (struct Neighbour *n)
{
- struct Neighbour *n = cls;
- struct GNUNET_TIME_Relative delay;
-
- n->timeout_task = NULL;
- if (NULL != n->env)
- {
- GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
- n->env_size + n->traffic_overhead);
- n->env = NULL;
- n->traffic_overhead = 0;
- }
- delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
- 128);
- if (0 == delay.rel_value_us)
- {
- n->is_ready = GNUNET_YES;
- GNUNET_MQ_impl_send_continue (n->mq);
- return;
- }
- GNUNET_MQ_impl_send_in_flight (n->mq);
- /* cannot send even a small message without violating
- quota, wait a before allowing MQ to send next message */
- n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
- &notify_send_done_fin,
- n);
+ GNUNET_assert (0 < n->ready_window);
+ GNUNET_assert (NULL != n->env);
+ n->ready_window--;
+ n->awaiting_done = GNUNET_YES;
+ GNUNET_MQ_notify_sent (n->env, &notify_send_done, n);
+ GNUNET_MQ_send (n->h->mq, n->env);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Passed message of type %u for neighbour `%s' to TRANSPORT.\n",
+ ntohs (GNUNET_MQ_env_get_msg (n->env)->type),
+ GNUNET_i2s (&n->id));
}
@@ -376,11 +310,9 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
void *impl_state)
{
struct Neighbour *n = impl_state;
- struct GNUNET_TRANSPORT_CoreHandle *h = n->h;
struct OutboundMessage *obm;
uint16_t msize;
- GNUNET_assert (GNUNET_YES == n->is_ready);
msize = ntohs (msg->size);
if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm))
{
@@ -388,25 +320,24 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
GNUNET_MQ_impl_send_continue (mq);
return;
}
- GNUNET_assert (NULL == n->env);
- n->env = GNUNET_MQ_msg_nested_mh (obm,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
- msg);
- obm->reserved = htonl (0);
- obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
- obm->peer = n->id;
- GNUNET_assert (NULL == n->timeout_task);
- n->is_ready = GNUNET_NO;
- n->env_size = ntohs (msg->size);
- GNUNET_MQ_notify_sent (n->env,
- &notify_send_done,
- n);
- GNUNET_MQ_send (h->mq,
- n->env);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queued message of type %u for neighbour `%s'.\n",
+ "CORE requested transmission of message of type %u to neighbour `%s'.\n",
ntohs (msg->type),
GNUNET_i2s (&n->id));
+
+ GNUNET_assert (NULL == n->env);
+ n->env =
+ GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg);
+ n->env_size = ntohs (msg->size);
+ obm->reserved = htonl (0);
+ obm->peer = n->id;
+ if (0 == n->ready_window)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Flow control delays transmission to CORE until we see SEND_OK.\n");
+ return; /* can't send yet, need to wait for SEND_OK */
+ }
+ do_send (n);
}
@@ -418,8 +349,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
* @param impl_state state of the implementation
*/
static void
-mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
+mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
{
struct Neighbour *n = impl_state;
@@ -436,19 +366,22 @@ mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
* @param impl_state state specific to the implementation
*/
static void
-mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
+mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
{
struct Neighbour *n = impl_state;
- GNUNET_assert (GNUNET_NO == n->is_ready);
- if (NULL != n->env)
+ n->ready_window++;
+ if (GNUNET_YES == n->awaiting_done)
{
GNUNET_MQ_send_cancel (n->env);
n->env = NULL;
+ n->awaiting_done = GNUNET_NO;
+ }
+ else
+ {
+ GNUNET_assert (0 == n->ready_window);
+ n->env = NULL;
}
-
- n->is_ready = GNUNET_YES;
}
@@ -461,8 +394,7 @@ mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
* @param error error code
*/
static void
-peer_mq_error_handler (void *cls,
- enum GNUNET_MQ_Error error)
+peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
{
/* struct Neighbour *n = cls; */
@@ -471,47 +403,21 @@ peer_mq_error_handler (void *cls,
/**
- * The outbound quota has changed in a way that may require
- * us to reset the timeout. Update the timeout.
- *
- * @param cls the `struct Neighbour` for which the timeout changed
- */
-static void
-outbound_bw_tracker_update (void *cls)
-{
- struct Neighbour *n = cls;
- struct GNUNET_TIME_Relative delay;
-
- if (NULL == n->timeout_task)
- return;
- delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
- 128);
- GNUNET_SCHEDULER_cancel (n->timeout_task);
- n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
- &notify_send_done,
- n);
-}
-
-
-/**
* Function we use for handling incoming connect messages.
*
* @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
* @param cim message received
*/
static void
-handle_connect (void *cls,
- const struct ConnectInfoMessage *cim)
+handle_connect (void *cls, const struct ConnectInfoMessage *cim)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
struct Neighbour *n;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving CONNECT message for `%s' with quota %u\n",
- GNUNET_i2s (&cim->id),
- ntohl (cim->quota_out.value__));
- n = neighbour_find (h,
- &cim->id);
+ "Receiving CONNECT message for `%s'\n",
+ GNUNET_i2s (&cim->id));
+ n = neighbour_find (h, &cim->id);
if (NULL != n)
{
GNUNET_break (0);
@@ -521,23 +427,14 @@ handle_connect (void *cls,
n = GNUNET_new (struct Neighbour);
n->id = cim->id;
n->h = h;
- n->is_ready = GNUNET_YES;
- n->traffic_overhead = 0;
- GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
- &outbound_bw_tracker_update,
- n,
- GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
- MAX_BANDWIDTH_CARRY_S,
- &notify_excess_cb,
- n);
+ n->ready_window = SEND_WINDOW_SIZE;
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_put (h->neighbours,
- &n->id,
- n,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ GNUNET_CONTAINER_multipeermap_put (
+ h->neighbours,
+ &n->id,
+ n,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
- cim->quota_out);
n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
&mq_destroy_impl,
&mq_cancel_impl,
@@ -547,11 +444,8 @@ handle_connect (void *cls,
n);
if (NULL != h->nc_cb)
{
- n->handlers_cls = h->nc_cb (h->cls,
- &n->id,
- n->mq);
- GNUNET_MQ_set_handlers_closure (n->mq,
- n->handlers_cls);
+ n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq);
+ GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls);
}
}
@@ -563,8 +457,7 @@ handle_connect (void *cls,
* @param dim message received
*/
static void
-handle_disconnect (void *cls,
- const struct DisconnectInfoMessage *dim)
+handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
struct Neighbour *n;
@@ -573,18 +466,14 @@ handle_disconnect (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Receiving DISCONNECT message for `%s'.\n",
GNUNET_i2s (&dim->peer));
- n = neighbour_find (h,
- &dim->peer);
+ n = neighbour_find (h, &dim->peer);
if (NULL == n)
{
GNUNET_break (0);
disconnect_and_schedule_reconnect (h);
return;
}
- GNUNET_assert (GNUNET_YES ==
- neighbour_delete (h,
- &dim->peer,
- n));
+ GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n));
}
@@ -595,24 +484,15 @@ handle_disconnect (void *cls,
* @param okm message received
*/
static void
-handle_send_ok (void *cls,
- const struct SendOkMessage *okm)
+handle_send_ok (void *cls, const struct SendOkMessage *okm)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
struct Neighbour *n;
- uint16_t bytes_msg;
- uint32_t bytes_physical;
- bytes_msg = ntohs (okm->bytes_msg);
- bytes_physical = ntohl (okm->bytes_physical);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving SEND_OK message, transmission to %s %s.\n",
- GNUNET_i2s (&okm->peer),
- (GNUNET_OK == ntohs (okm->success))
- ? "succeeded"
- : "failed");
- n = neighbour_find (h,
- &okm->peer);
+ "Receiving SEND_OK message for transmission to %s\n",
+ GNUNET_i2s (&okm->peer));
+ n = neighbour_find (h, &okm->peer);
if (NULL == n)
{
/* We should never get a 'SEND_OK' for a peer that we are not
@@ -621,14 +501,9 @@ handle_send_ok (void *cls,
disconnect_and_schedule_reconnect (h);
return;
}
- if (bytes_physical > bytes_msg)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Overhead for %u byte message was %u\n",
- (unsigned int) bytes_msg,
- (unsigned int) (bytes_physical - bytes_msg));
- n->traffic_overhead += bytes_physical - bytes_msg;
- }
+ n->ready_window++;
+ if ((NULL != n->env) && (1 == n->ready_window))
+ do_send (n);
}
@@ -639,8 +514,7 @@ handle_send_ok (void *cls,
* @param im message received
*/
static int
-check_recv (void *cls,
- const struct InboundMessage *im)
+check_recv (void *cls, const struct InboundMessage *im)
{
const struct GNUNET_MessageHeader *imm;
uint16_t size;
@@ -668,12 +542,11 @@ check_recv (void *cls,
* @param im message received
*/
static void
-handle_recv (void *cls,
- const struct InboundMessage *im)
+handle_recv (void *cls, const struct InboundMessage *im)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
- const struct GNUNET_MessageHeader *imm
- = (const struct GNUNET_MessageHeader *) &im[1];
+ const struct GNUNET_MessageHeader *imm =
+ (const struct GNUNET_MessageHeader *) &im[1];
struct Neighbour *n;
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -681,46 +554,14 @@ handle_recv (void *cls,
(unsigned int) ntohs (imm->type),
(unsigned int) ntohs (imm->size),
GNUNET_i2s (&im->peer));
- n = neighbour_find (h,
- &im->peer);
- if (NULL == n)
- {
- GNUNET_break (0);
- disconnect_and_schedule_reconnect (h);
- return;
- }
- GNUNET_MQ_inject_message (n->mq,
- imm);
-}
-
-
-/**
- * Function we use for handling incoming set quota messages.
- *
- * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *`
- * @param msg message received
- */
-static void
-handle_set_quota (void *cls,
- const struct QuotaSetMessage *qm)
-{
- struct GNUNET_TRANSPORT_CoreHandle *h = cls;
- struct Neighbour *n;
-
- n = neighbour_find (h,
- &qm->peer);
+ n = neighbour_find (h, &im->peer);
if (NULL == n)
{
GNUNET_break (0);
disconnect_and_schedule_reconnect (h);
return;
}
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Receiving SET_QUOTA message for `%s' with quota %u\n",
- GNUNET_i2s (&qm->peer),
- (unsigned int) ntohl (qm->quota.value__));
- GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
- qm->quota);
+ GNUNET_MQ_inject_message (n->mq, imm);
}
@@ -733,46 +574,36 @@ static void
reconnect (void *cls)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
- struct GNUNET_MQ_MessageHandler handlers[] = {
- GNUNET_MQ_hd_fixed_size (connect,
- GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
- struct ConnectInfoMessage,
- h),
- GNUNET_MQ_hd_fixed_size (disconnect,
- GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
- struct DisconnectInfoMessage,
- h),
- GNUNET_MQ_hd_fixed_size (send_ok,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
- struct SendOkMessage,
- h),
- GNUNET_MQ_hd_var_size (recv,
- GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
- struct InboundMessage,
- h),
- GNUNET_MQ_hd_fixed_size (set_quota,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
- struct QuotaSetMessage,
- h),
- GNUNET_MQ_handler_end ()
- };
+ struct GNUNET_MQ_MessageHandler handlers[] =
+ {GNUNET_MQ_hd_fixed_size (connect,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
+ struct ConnectInfoMessage,
+ h),
+ GNUNET_MQ_hd_fixed_size (disconnect,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
+ struct DisconnectInfoMessage,
+ h),
+ GNUNET_MQ_hd_fixed_size (send_ok,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
+ struct SendOkMessage,
+ h),
+ GNUNET_MQ_hd_var_size (recv,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
+ struct InboundMessage,
+ h),
+ GNUNET_MQ_handler_end ()};
struct GNUNET_MQ_Envelope *env;
struct StartMessage *s;
uint32_t options;
h->reconnect_task = NULL;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Connecting to transport service.\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
GNUNET_assert (NULL == h->mq);
- h->mq = GNUNET_CLIENT_connect (h->cfg,
- "transport",
- handlers,
- &mq_error_handler,
- h);
+ h->mq =
+ GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h);
if (NULL == h->mq)
return;
- env = GNUNET_MQ_msg (s,
- GNUNET_MESSAGE_TYPE_TRANSPORT_START);
+ env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START);
options = 0;
if (h->check_self)
options |= 1;
@@ -780,8 +611,7 @@ reconnect (void *cls)
options |= 2;
s->options = htonl (options);
s->self = h->self;
- GNUNET_MQ_send (h->mq,
- env);
+ GNUNET_MQ_send (h->mq, env);
}
@@ -793,9 +623,7 @@ reconnect (void *cls)
static void
disconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
{
- GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
- &neighbour_delete,
- h);
+ GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h);
if (NULL != h->mq)
{
GNUNET_MQ_destroy (h->mq);
@@ -817,12 +645,9 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
disconnect (h);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Scheduling task to reconnect to transport service in %s.\n",
- GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
- GNUNET_YES));
+ GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
h->reconnect_task =
- GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
- &reconnect,
- h);
+ GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
}
@@ -840,8 +665,7 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
{
struct Neighbour *n;
- n = neighbour_find (handle,
- peer);
+ n = neighbour_find (handle, peer);
if (NULL == n)
return NULL;
return n->mq;
@@ -849,6 +673,45 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
/**
+ * Notification from the CORE service to the TRANSPORT service
+ * that the CORE service has finished processing a message from
+ * TRANSPORT (via the @code{handlers} of #GNUNET_TRANSPORT_core_connect())
+ * and that it is thus now OK for TRANSPORT to send more messages
+ * for @a pid.
+ *
+ * Used to provide flow control, this is our equivalent to
+ * #GNUNET_SERVICE_client_continue() of an ordinary service.
+ *
+ * Note that due to the use of a window, TRANSPORT may send multiple
+ * messages destined for the same peer even without an intermediate
+ * call to this function. However, CORE must still call this function
+ * once per message received, as otherwise eventually the window will
+ * be full and TRANSPORT will stop providing messages to CORE for @a
+ * pid.
+ *
+ * @param ch core handle
+ * @param pid which peer was the message from that was fully processed by CORE
+ */
+void
+GNUNET_TRANSPORT_core_receive_continue (struct GNUNET_TRANSPORT_CoreHandle *ch,
+ const struct GNUNET_PeerIdentity *pid)
+{
+ struct GNUNET_MQ_Envelope *env;
+ struct RecvOkMessage *rok;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Message for %s finished CORE processing, sending RECV_OK.\n",
+ GNUNET_i2s (pid));
+ if (NULL == ch->mq)
+ return;
+ env = GNUNET_MQ_msg (rok, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK);
+ rok->increase_window_delta = htonl (1);
+ rok->peer = *pid;
+ GNUNET_MQ_send (ch->mq, env);
+}
+
+
+/**
* Connect to the transport service. Note that the connection may
* complete (or fail) asynchronously.
*
@@ -859,17 +722,15 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
* @param rec receive function to call
* @param nc function to call on connect events
* @param nd function to call on disconnect events
- * @param neb function to call if we have excess bandwidth to a peer
* @return NULL on error
*/
struct GNUNET_TRANSPORT_CoreHandle *
GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
- const struct GNUNET_PeerIdentity *self,
- const struct GNUNET_MQ_MessageHandler *handlers,
- void *cls,
- GNUNET_TRANSPORT_NotifyConnect nc,
- GNUNET_TRANSPORT_NotifyDisconnect nd,
- GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
+ const struct GNUNET_PeerIdentity *self,
+ const struct GNUNET_MQ_MessageHandler *handlers,
+ void *cls,
+ GNUNET_TRANSPORT_NotifyConnect nc,
+ GNUNET_TRANSPORT_NotifyDisconnect nd)
{
struct GNUNET_TRANSPORT_CoreHandle *h;
unsigned int i;
@@ -884,19 +745,17 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
h->cls = cls;
h->nc_cb = nc;
h->nd_cb = nd;
- h->neb_cb = neb;
h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
if (NULL != handlers)
{
- for (i=0;NULL != handlers[i].cb; i++) ;
- h->handlers = GNUNET_new_array (i + 1,
- struct GNUNET_MQ_MessageHandler);
+ for (i = 0; NULL != handlers[i].cb; i++)
+ ;
+ h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler);
GNUNET_memcpy (h->handlers,
- handlers,
- i * sizeof (struct GNUNET_MQ_MessageHandler));
+ handlers,
+ i * sizeof (struct GNUNET_MQ_MessageHandler));
}
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Connecting to transport service\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n");
reconnect (h);
if (NULL == h->mq)
{
@@ -905,8 +764,7 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
return NULL;
}
h->neighbours =
- GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
- GNUNET_YES);
+ GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
return h;
}
@@ -914,13 +772,13 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
/**
* Disconnect from the transport service.
*
- * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect()
+ * @param handle handle to the service as returned from
+ * #GNUNET_TRANSPORT_core_connect()
*/
void
GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transport disconnect called!\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
/* this disconnects all neighbours... */
disconnect (handle);
/* and now we stop trying to connect again... */
diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c
index e86499173..a163d7ccf 100644
--- a/src/transport/transport_api_core.c
+++ b/src/transport/transport_api_core.c
@@ -29,11 +29,10 @@
#include "gnunet_arm_service.h"
#include "gnunet_hello_lib.h"
#include "gnunet_protocols.h"
-#include "gnunet_transport_core_service.h"
#include "gnunet_transport_service.h"
#include "transport.h"
-#define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__)
+#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__)
/**
* If we could not send any payload to a peer for this amount of
@@ -113,11 +112,9 @@ struct Neighbour
* Size of the message in @e env.
*/
uint16_t env_size;
-
};
-
/**
* Handle for the transport service (includes all of the
* state for the transport service).
@@ -187,7 +184,6 @@ struct GNUNET_TRANSPORT_CoreHandle
* (if #GNUNET_NO, then @e self is all zeros!).
*/
int check_self;
-
};
@@ -212,8 +208,7 @@ static struct Neighbour *
neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h,
const struct GNUNET_PeerIdentity *peer)
{
- return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
- peer);
+ return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer);
}
@@ -234,9 +229,7 @@ notify_excess_cb (void *cls)
GNUNET_i2s (&n->id));
if (NULL != h->neb_cb)
- h->neb_cb (h->cls,
- &n->id,
- n->handlers_cls);
+ h->neb_cb (h->cls, &n->id, n->handlers_cls);
}
@@ -251,9 +244,7 @@ notify_excess_cb (void *cls)
* #GNUNET_NO if not.
*/
static int
-neighbour_delete (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
+neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
{
struct GNUNET_TRANSPORT_CoreHandle *handle = cls;
struct Neighbour *n = value;
@@ -263,9 +254,7 @@ neighbour_delete (void *cls,
GNUNET_i2s (key));
GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
if (NULL != handle->nd_cb)
- handle->nd_cb (handle->cls,
- &n->id,
- n->handlers_cls);
+ handle->nd_cb (handle->cls, &n->id, n->handlers_cls);
if (NULL != n->timeout_task)
{
GNUNET_SCHEDULER_cancel (n->timeout_task);
@@ -278,10 +267,9 @@ neighbour_delete (void *cls,
}
GNUNET_MQ_destroy (n->mq);
GNUNET_assert (NULL == n->mq);
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
- key,
- n));
+ GNUNET_assert (
+ GNUNET_YES ==
+ GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n));
GNUNET_free (n);
return GNUNET_YES;
}
@@ -297,8 +285,7 @@ neighbour_delete (void *cls,
* @param error error code
*/
static void
-mq_error_handler (void *cls,
- enum GNUNET_MQ_Error error)
+mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
@@ -317,14 +304,12 @@ mq_error_handler (void *cls,
* @return #GNUNET_OK if message is well-formed
*/
static int
-check_hello (void *cls,
- const struct GNUNET_MessageHeader *msg)
+check_hello (void *cls, const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_PeerIdentity me;
if (GNUNET_OK !=
- GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
- &me))
+ GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, &me))
{
GNUNET_break (0);
return GNUNET_SYSERR;
@@ -340,8 +325,7 @@ check_hello (void *cls,
* @param msg message received
*/
static void
-handle_hello (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_hello (void *cls, const struct GNUNET_MessageHeader *msg)
{
/* we do not care => FIXME: signal in options to NEVER send HELLOs! */
}
@@ -388,8 +372,7 @@ notify_send_done (void *cls)
n->env = NULL;
n->traffic_overhead = 0;
}
- delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
- 128);
+ delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128);
if (0 == delay.rel_value_us)
{
n->is_ready = GNUNET_YES;
@@ -399,9 +382,8 @@ notify_send_done (void *cls)
GNUNET_MQ_impl_send_in_flight (n->mq);
/* cannot send even a small message without violating
quota, wait a before allowing MQ to send next message */
- n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
- &notify_send_done_fin,
- n);
+ n->timeout_task =
+ GNUNET_SCHEDULER_add_delayed (delay, &notify_send_done_fin, n);
}
@@ -434,20 +416,17 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
return;
}
GNUNET_assert (NULL == n->env);
- n->env = GNUNET_MQ_msg_nested_mh (obm,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
- msg);
+ n->env =
+ GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg);
obm->reserved = htonl (0);
- obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
+ obm->timeout = GNUNET_TIME_relative_hton (
+ GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */
obm->peer = n->id;
GNUNET_assert (NULL == n->timeout_task);
n->is_ready = GNUNET_NO;
n->env_size = ntohs (msg->size);
- GNUNET_MQ_notify_sent (n->env,
- &notify_send_done,
- n);
- GNUNET_MQ_send (h->mq,
- n->env);
+ GNUNET_MQ_notify_sent (n->env, &notify_send_done, n);
+ GNUNET_MQ_send (h->mq, n->env);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Queued message of type %u for neighbour `%s'.\n",
ntohs (msg->type),
@@ -463,8 +442,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq,
* @param impl_state state of the implementation
*/
static void
-mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
+mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
{
struct Neighbour *n = impl_state;
@@ -481,8 +459,7 @@ mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
* @param impl_state state specific to the implementation
*/
static void
-mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
+mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
{
struct Neighbour *n = impl_state;
@@ -506,8 +483,7 @@ mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
* @param error error code
*/
static void
-peer_mq_error_handler (void *cls,
- enum GNUNET_MQ_Error error)
+peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error)
{
/* struct Neighbour *n = cls; */
@@ -529,12 +505,9 @@ outbound_bw_tracker_update (void *cls)
if (NULL == n->timeout_task)
return;
- delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
- 128);
+ delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128);
GNUNET_SCHEDULER_cancel (n->timeout_task);
- n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
- &notify_send_done,
- n);
+ n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, &notify_send_done, n);
}
@@ -545,8 +518,7 @@ outbound_bw_tracker_update (void *cls)
* @param cim message received
*/
static void
-handle_connect (void *cls,
- const struct ConnectInfoMessage *cim)
+handle_connect (void *cls, const struct ConnectInfoMessage *cim)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
struct Neighbour *n;
@@ -555,8 +527,7 @@ handle_connect (void *cls,
"Receiving CONNECT message for `%s' with quota %u\n",
GNUNET_i2s (&cim->id),
ntohl (cim->quota_out.value__));
- n = neighbour_find (h,
- &cim->id);
+ n = neighbour_find (h, &cim->id);
if (NULL != n)
{
GNUNET_break (0); /* FIXME: this assertion seems to fail sometimes!? */
@@ -576,13 +547,13 @@ handle_connect (void *cls,
&notify_excess_cb,
n);
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_put (h->neighbours,
- &n->id,
- n,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ GNUNET_CONTAINER_multipeermap_put (
+ h->neighbours,
+ &n->id,
+ n,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
- cim->quota_out);
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, cim->quota_out);
n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl,
&mq_destroy_impl,
&mq_cancel_impl,
@@ -592,11 +563,8 @@ handle_connect (void *cls,
n);
if (NULL != h->nc_cb)
{
- n->handlers_cls = h->nc_cb (h->cls,
- &n->id,
- n->mq);
- GNUNET_MQ_set_handlers_closure (n->mq,
- n->handlers_cls);
+ n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq);
+ GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls);
}
}
@@ -608,8 +576,7 @@ handle_connect (void *cls,
* @param dim message received
*/
static void
-handle_disconnect (void *cls,
- const struct DisconnectInfoMessage *dim)
+handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
struct Neighbour *n;
@@ -625,10 +592,7 @@ handle_disconnect (void *cls,
disconnect_and_schedule_reconnect (h);
return;
}
- GNUNET_assert (GNUNET_YES ==
- neighbour_delete (h,
- &dim->peer,
- n));
+ GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n));
}
@@ -639,8 +603,7 @@ handle_disconnect (void *cls,
* @param okm message received
*/
static void
-handle_send_ok (void *cls,
- const struct SendOkMessage *okm)
+handle_send_ok (void *cls, const struct SendOkMessage *okm)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
struct Neighbour *n;
@@ -653,8 +616,7 @@ handle_send_ok (void *cls,
"Receiving SEND_OK message, transmission to %s %s.\n",
GNUNET_i2s (&okm->peer),
ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
- n = neighbour_find (h,
- &okm->peer);
+ n = neighbour_find (h, &okm->peer);
if (NULL == n)
{
/* We should never get a 'SEND_OK' for a peer that we are not
@@ -681,8 +643,7 @@ handle_send_ok (void *cls,
* @param im message received
*/
static int
-check_recv (void *cls,
- const struct InboundMessage *im)
+check_recv (void *cls, const struct InboundMessage *im)
{
const struct GNUNET_MessageHeader *imm;
uint16_t size;
@@ -710,12 +671,11 @@ check_recv (void *cls,
* @param im message received
*/
static void
-handle_recv (void *cls,
- const struct InboundMessage *im)
+handle_recv (void *cls, const struct InboundMessage *im)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
- const struct GNUNET_MessageHeader *imm
- = (const struct GNUNET_MessageHeader *) &im[1];
+ const struct GNUNET_MessageHeader *imm =
+ (const struct GNUNET_MessageHeader *) &im[1];
struct Neighbour *n;
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -730,8 +690,7 @@ handle_recv (void *cls,
disconnect_and_schedule_reconnect (h);
return;
}
- GNUNET_MQ_inject_message (n->mq,
- imm);
+ GNUNET_MQ_inject_message (n->mq, imm);
}
@@ -742,8 +701,7 @@ handle_recv (void *cls,
* @param msg message received
*/
static void
-handle_set_quota (void *cls,
- const struct QuotaSetMessage *qm)
+handle_set_quota (void *cls, const struct QuotaSetMessage *qm)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
struct Neighbour *n;
@@ -752,16 +710,15 @@ handle_set_quota (void *cls,
"Receiving SET_QUOTA message for `%s' with quota %u\n",
GNUNET_i2s (&qm->peer),
ntohl (qm->quota.value__));
- n = neighbour_find (h,
- &qm->peer);
+ n = neighbour_find (h, &qm->peer);
if (NULL == n)
{
- GNUNET_break (0); /* FIXME: julius reports this assertion fails sometimes? */
+ GNUNET_break (
+ 0); /* FIXME: julius reports this assertion fails sometimes? */
disconnect_and_schedule_reconnect (h);
return;
}
- GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
- qm->quota);
+ GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, qm->quota);
}
@@ -774,50 +731,44 @@ static void
reconnect (void *cls)
{
struct GNUNET_TRANSPORT_CoreHandle *h = cls;
- struct GNUNET_MQ_MessageHandler handlers[] = {
- GNUNET_MQ_hd_var_size (hello,
- GNUNET_MESSAGE_TYPE_HELLO,
- struct GNUNET_MessageHeader,
- h),
- GNUNET_MQ_hd_fixed_size (connect,
- GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
- struct ConnectInfoMessage,
- h),
- GNUNET_MQ_hd_fixed_size (disconnect,
- GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
- struct DisconnectInfoMessage,
- h),
- GNUNET_MQ_hd_fixed_size (send_ok,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
- struct SendOkMessage,
- h),
- GNUNET_MQ_hd_var_size (recv,
- GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
- struct InboundMessage,
- h),
- GNUNET_MQ_hd_fixed_size (set_quota,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
- struct QuotaSetMessage,
- h),
- GNUNET_MQ_handler_end ()
- };
+ struct GNUNET_MQ_MessageHandler handlers[] =
+ {GNUNET_MQ_hd_var_size (hello,
+ GNUNET_MESSAGE_TYPE_HELLO,
+ struct GNUNET_MessageHeader,
+ h),
+ GNUNET_MQ_hd_fixed_size (connect,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT,
+ struct ConnectInfoMessage,
+ h),
+ GNUNET_MQ_hd_fixed_size (disconnect,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT,
+ struct DisconnectInfoMessage,
+ h),
+ GNUNET_MQ_hd_fixed_size (send_ok,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK,
+ struct SendOkMessage,
+ h),
+ GNUNET_MQ_hd_var_size (recv,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_RECV,
+ struct InboundMessage,
+ h),
+ GNUNET_MQ_hd_fixed_size (set_quota,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA,
+ struct QuotaSetMessage,
+ h),
+ GNUNET_MQ_handler_end ()};
struct GNUNET_MQ_Envelope *env;
struct StartMessage *s;
uint32_t options;
h->reconnect_task = NULL;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Connecting to transport service.\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
GNUNET_assert (NULL == h->mq);
- h->mq = GNUNET_CLIENT_connect (h->cfg,
- "transport",
- handlers,
- &mq_error_handler,
- h);
+ h->mq =
+ GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h);
if (NULL == h->mq)
return;
- env = GNUNET_MQ_msg (s,
- GNUNET_MESSAGE_TYPE_TRANSPORT_START);
+ env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START);
options = 0;
if (h->check_self)
options |= 1;
@@ -825,8 +776,7 @@ reconnect (void *cls)
options |= 2;
s->options = htonl (options);
s->self = h->self;
- GNUNET_MQ_send (h->mq,
- env);
+ GNUNET_MQ_send (h->mq, env);
}
@@ -841,9 +791,7 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
{
GNUNET_assert (NULL == h->reconnect_task);
/* Forget about all neighbours that we used to be connected to */
- GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
- &neighbour_delete,
- h);
+ GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h);
if (NULL != h->mq)
{
GNUNET_MQ_destroy (h->mq);
@@ -851,12 +799,9 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h)
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Scheduling task to reconnect to transport service in %s.\n",
- GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
- GNUNET_YES));
+ GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES));
h->reconnect_task =
- GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
- &reconnect,
- h);
+ GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
}
@@ -874,8 +819,7 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
{
struct Neighbour *n;
- n = neighbour_find (handle,
- peer);
+ n = neighbour_find (handle, peer);
if (NULL == n)
return NULL;
return n->mq;
@@ -898,12 +842,12 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle,
*/
struct GNUNET_TRANSPORT_CoreHandle *
GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
- const struct GNUNET_PeerIdentity *self,
- const struct GNUNET_MQ_MessageHandler *handlers,
- void *cls,
- GNUNET_TRANSPORT_NotifyConnect nc,
- GNUNET_TRANSPORT_NotifyDisconnect nd,
- GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
+ const struct GNUNET_PeerIdentity *self,
+ const struct GNUNET_MQ_MessageHandler *handlers,
+ void *cls,
+ GNUNET_TRANSPORT_NotifyConnect nc,
+ GNUNET_TRANSPORT_NotifyDisconnect nd,
+ GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
{
struct GNUNET_TRANSPORT_CoreHandle *h;
unsigned int i;
@@ -922,15 +866,14 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
h->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
if (NULL != handlers)
{
- for (i=0;NULL != handlers[i].cb; i++) ;
- h->handlers = GNUNET_new_array (i + 1,
- struct GNUNET_MQ_MessageHandler);
+ for (i = 0; NULL != handlers[i].cb; i++)
+ ;
+ h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler);
GNUNET_memcpy (h->handlers,
- handlers,
- i * sizeof (struct GNUNET_MQ_MessageHandler));
+ handlers,
+ i * sizeof (struct GNUNET_MQ_MessageHandler));
}
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Connecting to transport service\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n");
reconnect (h);
if (NULL == h->mq)
{
@@ -939,8 +882,7 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
return NULL;
}
h->neighbours =
- GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
- GNUNET_YES);
+ GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES);
return h;
}
@@ -948,13 +890,13 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
/**
* Disconnect from the transport service.
*
- * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect()
+ * @param handle handle to the service as returned from
+ * #GNUNET_TRANSPORT_core_connect()
*/
void
GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Transport disconnect called!\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
/* this disconnects all neighbours... */
if (NULL == handle->reconnect_task)
disconnect_and_schedule_reconnect (handle);