From 3f945e6798d8d736ceb104b59ea1269a7abdfe8a Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 28 Apr 2019 19:32:10 +0200 Subject: towards flow control in TNG --- src/transport/gnunet-service-tng.c | 575 ++++++++++++++---------------- src/transport/gnunet-transport-profiler.c | 232 +++++------- src/transport/gnunet-transport.c | 571 +++++++++++++---------------- src/transport/transport-testing.h | 201 ++++++----- src/transport/transport.h | 56 ++- src/transport/transport_api2_core.c | 506 ++++++++++---------------- src/transport/transport_api_core.c | 260 ++++++-------- 7 files changed, 1055 insertions(+), 1346 deletions(-) (limited to 'src/transport') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index c2922dd7e..825d45522 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -24,6 +24,11 @@ * * TODO: * Implement next: + * - complete flow control push back from CORE via TRANSPORT to communicators: + * + resume communicators in handle_client_recv_ok (see FIXME) + * + count transmissions to CORE and suspend communicators if window is full + * - check flow control push back from TRANSPROT to CORE: + * + check when to send ACKs * - change transport-core API to provide proper flow control in both * directions, allow multiple messages per peer simultaneously (tag * confirmations with unique message ID), and replace quota-out with @@ -113,6 +118,16 @@ */ #define MAX_DV_DISCOVERY_SELECTION 16 +/** + * Window size. How many messages to the same target do we pass + * to CORE without a RECV_OK in between? Small values limit + * thoughput, large values will increase latency. + * + * FIXME-OPTIMIZE: find out what good values are experimentally, + * maybe set adaptively (i.e. to observed available bandwidth). + */ +#define RECV_WINDOW_SIZE 4 + /** * Minimum number of hops we should forward DV learn messages * even if they are NOT useful for us in hope of looping @@ -1100,6 +1115,48 @@ struct PendingMessage; */ struct DistanceVectorHop; +/** + * A virtual link is another reachable peer that is known to CORE. It + * can be either a `struct Neighbour` with at least one confirmed + * `struct Queue`, or a `struct DistanceVector` with at least one + * confirmed `struct DistanceVectorHop`. With a virtual link we track + * data that is per neighbour that is not specific to how the + * connectivity is established. + */ +struct VirtualLink +{ + /** + * Identity of the peer at the other end of the link. + */ + struct GNUNET_PeerIdentity target; + + /** + * Task scheduled to possibly notfiy core that this peer is no + * longer counting as confirmed. Runs the #core_visibility_check(), + * which checks that some DV-path or a queue exists that is still + * considered confirmed. + */ + struct GNUNET_SCHEDULER_Task *visibility_task; + + /** + * Neighbour used by this virtual link, NULL if @e dv is used. + */ + struct Neighbour *n; + + /** + * Distance vector used by this virtual link, NULL if @e n is used. + */ + struct DistanceVector *dv; + + /** + * How many more messages can we send to core before we exhaust + * the receive window of CORE for this peer? If this hits zero, + * we must tell communicators to stop providing us more messages + * for this peer. + */ + unsigned int core_recv_window; +}; + /** * Data structure kept when we are waiting for an acknowledgement. @@ -1316,31 +1373,10 @@ struct DistanceVector struct GNUNET_SCHEDULER_Task *timeout_task; /** - * Task scheduled to possibly notfiy core that this queue is no longer - * counting as confirmed. Runs the #core_queue_visibility_check(). - */ - struct GNUNET_SCHEDULER_Task *visibility_task; - - /** - * Quota at which CORE is allowed to transmit to this peer - * (note that the value CORE should actually be told is this - * value plus the respective value in `struct Neighbour`). - * Should match the sum of the quotas of all of the paths. - * - * FIXME: not yet set, tricky to get right given multiple paths, - * many of which may be inactive! (=> Idea: measure???) - * FIXME: how do we set this value initially when we tell CORE? - * Options: start at a minimum value or at literally zero? - * (=> Current thought: clean would be zero!) - */ - struct GNUNET_BANDWIDTH_Value32NBO quota_out; - - /** - * Is one of the DV paths in this struct 'confirmed' and thus - * the cause for CORE to see this peer as connected? (Note that - * the same may apply to a `struct Neighbour` at the same time.) + * Do we have a confirmed working queue and are thus visible to + * CORE? If so, this is the virtual link, otherwise NULL. */ - int core_visible; + struct VirtualLink *link; }; @@ -1450,12 +1486,6 @@ struct Queue */ struct GNUNET_SCHEDULER_Task *transmit_task; - /** - * Task scheduled to possibly notfiy core that this queue is no longer - * counting as confirmed. Runs the #core_queue_visibility_check(). - */ - struct GNUNET_SCHEDULER_Task *visibility_task; - /** * How long do *we* consider this @e address to be valid? In the past or * zero if we have not yet validated it. Can be updated based on @@ -1642,11 +1672,6 @@ struct Neighbour */ struct Queue *queue_tail; - /** - * Task run to cleanup pending messages that have exceeded their timeout. - */ - struct GNUNET_SCHEDULER_Task *timeout_task; - /** * Handle for an operation to fetch @e last_dv_learn_monotime information from * the PEERSTORE, or NULL. @@ -1660,18 +1685,10 @@ struct Neighbour struct GNUNET_PEERSTORE_StoreContext *sc; /** - * Quota at which CORE is allowed to transmit to this peer - * (note that the value CORE should actually be told is this - * value plus the respective value in `struct DistanceVector`). - * Should match the sum of the quotas of all of the queues. - * - * FIXME: not yet set, tricky to get right given multiple queues! - * (=> Idea: measure???) - * FIXME: how do we set this value initially when we tell CORE? - * Options: start at a minimum value or at literally zero? - * (=> Current thought: clean would be zero!) + * Do we have a confirmed working queue and are thus visible to + * CORE? If so, this is the virtual link, otherwise NULL. */ - struct GNUNET_BANDWIDTH_Value32NBO quota_out; + struct VirtualLink *link; /** * Latest DVLearn monotonic time seen from this peer. Initialized only @@ -1679,17 +1696,6 @@ struct Neighbour */ struct GNUNET_TIME_Absolute last_dv_learn_monotime; - /** - * What is the earliest timeout of any message in @e pending_msg_tail? - */ - struct GNUNET_TIME_Absolute earliest_timeout; - - /** - * Do we have a confirmed working queue and are thus visible to - * CORE? - */ - int core_visible; - /** * Do we have the lastest value for @e last_dv_learn_monotime from * PEERSTORE yet, or are we still waiting for a reply of PEERSTORE? @@ -2416,6 +2422,12 @@ static struct GNUNET_CONTAINER_MultiPeerMap *dv_routes; */ static struct GNUNET_CONTAINER_MultiPeerMap *validation_map; +/** + * Map from PIDs to `struct VirtualLink` entries describing + * links CORE knows to exist. + */ +static struct GNUNET_CONTAINER_MultiPeerMap *links; + /** * Map from challenges to `struct LearnLaunchEntry` values. */ @@ -2563,6 +2575,26 @@ free_ephemeral (struct EphemeralCacheEntry *ece) } +/** + * Free virtual link. + * + * @param vl link data to free + */ +static void +free_virtual_link (struct VirtualLink *vl) +{ + GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl); + if (NULL != vl->visibility_task) + { + GNUNET_SCHEDULER_cancel (vl->visibility_task); + vl->visibility_task = NULL; + } + GNUNET_break (NULL == vl->n); + GNUNET_break (NULL == vl->dv); + GNUNET_free (vl); +} + + /** * Free validation state. * @@ -2684,8 +2716,6 @@ free_dv_route (struct DistanceVector *dv) GNUNET_assert ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv)); - if (NULL != dv->visibility_task) - GNUNET_SCHEDULER_cancel (dv->visibility_task); if (NULL != dv->timeout_task) GNUNET_SCHEDULER_cancel (dv->timeout_task); GNUNET_free (dv); @@ -2873,8 +2903,6 @@ free_neighbour (struct Neighbour *neighbour) GNUNET_CONTAINER_multipeermap_remove (neighbours, &neighbour->pid, neighbour)); - if (NULL != neighbour->timeout_task) - GNUNET_SCHEDULER_cancel (neighbour->timeout_task); if (NULL != neighbour->reassembly_map) { GNUNET_CONTAINER_multihashmap32_iterate (neighbour->reassembly_map, @@ -2917,19 +2945,16 @@ free_neighbour (struct Neighbour *neighbour) * * @param tc client to inform (must be CORE client) * @param pid peer the connection is for - * @param quota_out current quota for the peer */ static void core_send_connect_info (struct TransportClient *tc, - const struct GNUNET_PeerIdentity *pid, - struct GNUNET_BANDWIDTH_Value32NBO quota_out) + const struct GNUNET_PeerIdentity *pid) { struct GNUNET_MQ_Envelope *env; struct ConnectInfoMessage *cim; GNUNET_assert (CT_CORE == tc->type); env = GNUNET_MQ_msg (cim, GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); - cim->quota_out = quota_out; cim->id = *pid; GNUNET_MQ_send (tc->mq, env); } @@ -2939,11 +2964,9 @@ core_send_connect_info (struct TransportClient *tc, * Send message to CORE clients that we gained a connection * * @param pid peer the queue was for - * @param quota_out current quota for the peer */ static void -cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, - struct GNUNET_BANDWIDTH_Value32NBO quota_out) +cores_send_connect_info (const struct GNUNET_PeerIdentity *pid) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Informing CORE clients about connection to %s\n", @@ -2952,7 +2975,7 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, { if (CT_CORE != tc->type) continue; - core_send_connect_info (tc, pid, quota_out); + core_send_connect_info (tc, pid); } } @@ -3059,13 +3082,43 @@ schedule_transmit_on_queue (struct Queue *queue, int inside_job) /** - * Check whether the CORE visibility of @a n changed. If so, - * check whether we need to notify CORE. + * Task run to check whether the hops of the @a cls still + * are validated, or if we need to core about disconnection. * - * @param n neighbour to perform the check for + * @param cls a `struct VirtualLink` */ static void -update_neighbour_core_visibility (struct Neighbour *n); +check_link_down (void *cls) +{ + struct VirtualLink *vl = cls; + struct DistanceVector *dv = vl->dv; + struct Neighbour *n = vl->n; + struct GNUNET_TIME_Absolute dvh_timeout; + struct GNUNET_TIME_Absolute q_timeout; + + vl->visibility_task = NULL; + dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS; + for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; + pos = pos->next_dv) + dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until); + if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) + vl->dv = NULL; + q_timeout = GNUNET_TIME_UNIT_ZERO_ABS; + for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) + q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until); + if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us) + vl->n = NULL; + if ((NULL == vl->n) && (NULL == vl->dv)) + { + cores_send_disconnect_info (&dv->target); + free_virtual_link (vl); + return; + } + vl->visibility_task = + GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout), + &check_link_down, + vl); +} /** @@ -3083,17 +3136,13 @@ free_queue (struct Queue *queue) struct QueueEntry *qe; int maxxed; struct PendingAcknowledgement *pa; + struct VirtualLink *vl; if (NULL != queue->transmit_task) { GNUNET_SCHEDULER_cancel (queue->transmit_task); queue->transmit_task = NULL; } - if (NULL != queue->visibility_task) - { - GNUNET_SCHEDULER_cancel (queue->visibility_task); - queue->visibility_task = NULL; - } while (NULL != (pa = queue->pa_head)) { GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa); @@ -3139,9 +3188,12 @@ free_queue (struct Queue *queue) notify_monitors (&neighbour->pid, queue->address, queue->nt, &me); GNUNET_free (queue); - update_neighbour_core_visibility (neighbour); - cores_send_disconnect_info (&neighbour->pid); - + vl = GNUNET_CONTAINER_multipeermap_get (links, &neighbour->pid); + if ((NULL != vl) && (neighbour == vl->n)) + { + GNUNET_SCHEDULER_cancel (vl->visibility_task); + check_link_down (vl); + } if (NULL == neighbour->queue_head) { free_neighbour (neighbour); @@ -3281,12 +3333,12 @@ notify_client_connect_info (void *cls, void *value) { struct TransportClient *tc = cls; - struct Neighbour *neighbour = value; + (void) value; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Telling new CORE client about existing connection to %s\n", GNUNET_i2s (pid)); - core_send_connect_info (tc, pid, neighbour->quota_out); + core_send_connect_info (tc, pid); return GNUNET_OK; } @@ -3469,9 +3521,6 @@ client_send_response (struct PendingMessage *pm, if (NULL != tc) { env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); - som->success = htonl ((uint32_t) success); - som->bytes_msg = htons (pm->bytes_msg); - som->bytes_physical = htonl (bytes_physical); som->peer = target->pid; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Confirming %s transmission of %u/%u bytes to %s\n", @@ -3485,45 +3534,6 @@ client_send_response (struct PendingMessage *pm, } -/** - * Checks the message queue for a neighbour for messages that have timed - * out and purges them. - * - * @param cls a `struct Neighbour` - */ -static void -check_queue_timeouts (void *cls) -{ - struct Neighbour *n = cls; - struct PendingMessage *pm; - struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Absolute earliest_timeout; - - n->timeout_task = NULL; - earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; - now = GNUNET_TIME_absolute_get (); - for (struct PendingMessage *pos = n->pending_msg_head; NULL != pos; pos = pm) - { - pm = pos->next_neighbour; - if (pos->timeout.abs_value_us <= now.abs_value_us) - { - GNUNET_STATISTICS_update (GST_stats, - "# messages dropped (timeout before confirmation)", - 1, - GNUNET_NO); - client_send_response (pm, GNUNET_NO, 0); - continue; - } - earliest_timeout = - GNUNET_TIME_absolute_min (earliest_timeout, pos->timeout); - } - n->earliest_timeout = earliest_timeout; - if (NULL != n->pending_msg_head) - n->timeout_task = - GNUNET_SCHEDULER_add_at (earliest_timeout, &check_queue_timeouts, n); -} - - /** * Create a DV Box message. * @@ -3689,30 +3699,18 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) const void *payload; size_t payload_size; struct TransportDVBoxMessage *dvb; + struct VirtualLink *vl; GNUNET_assert (CT_CORE == tc->type); obmm = (const struct GNUNET_MessageHeader *) &obm[1]; bytes_msg = ntohs (obmm->size); - target = lookup_neighbour (&obm->peer); - if (NULL == target) - dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer); - else - dv = NULL; - if ((NULL == target) && ((NULL == dv) || (GNUNET_NO == dv->core_visible))) + vl = GNUNET_CONTAINER_multipeermap_get (links, &obm->peer); + if (NULL == vl) { /* Failure: don't have this peer as a neighbour (anymore). Might have gone down asynchronously, so this is NOT a protocol violation by CORE. Still count the event, as this should be rare. */ - struct GNUNET_MQ_Envelope *env; - struct SendOkMessage *som; - - env = GNUNET_MQ_msg (som, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); - som->success = htonl (GNUNET_SYSERR); - som->bytes_msg = htonl (bytes_msg); - som->bytes_physical = htonl (0); - som->peer = obm->peer; - GNUNET_MQ_send (tc->mq, env); GNUNET_SERVICE_client_continue (tc->client); GNUNET_STATISTICS_update (GST_stats, "# messages dropped (neighbour unknown)", @@ -3720,6 +3718,12 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) GNUNET_NO); return; } + target = lookup_neighbour (&obm->peer); + if (NULL == target) + dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &obm->peer); + else + dv = NULL; + GNUNET_assert ((NULL != target) || (NULL != dv)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending %u bytes to %s using %s\n", bytes_msg, @@ -3756,8 +3760,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) pm->client = tc; pm->target = target; pm->bytes_msg = payload_size; - pm->timeout = - GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout)); memcpy (&pm[1], payload, payload_size); GNUNET_free_non_null (dvb); dvb = NULL; @@ -3777,15 +3779,6 @@ handle_client_send (void *cls, const struct OutboundMessage *obm) tc->details.core.pending_msg_head, tc->details.core.pending_msg_tail, pm); - if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us) - { - target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us; - if (NULL != target->timeout_task) - GNUNET_SCHEDULER_cancel (target->timeout_task); - target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout, - &check_queue_timeouts, - target); - } if (! was_empty) return; /* all queues must already be busy */ for (struct Queue *queue = target->queue_head; NULL != queue; @@ -3833,6 +3826,47 @@ check_communicator_available ( } +/** + * Client confirms that it is done handling message(s) to a particular + * peer. We may now provide more messages to CORE for this peer. + * + * Notifies the respective queues that more messages can now be received. + * + * @param cls the client + * @param rom the message that was sent + */ +static void +handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom) +{ + struct TransportClient *tc = cls; + struct VirtualLink *vl; + uint32_t delta; + + if (CT_CORE != tc->type) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (tc->client); + return; + } + vl = GNUNET_CONTAINER_multipeermap_get (links, &rom->peer); + if (NULL == vl) + { + GNUNET_STATISTICS_update (GST_stats, + "# RECV_OK dropped: virtual link unknown", + 1, + GNUNET_NO); + GNUNET_SERVICE_client_continue (tc->client); + return; + } + delta = ntohl (rom->increase_window_delta); + vl->core_recv_window += delta; + if (delta == vl->core_recv_window) + { + // FIXME: resume communicators! + } +} + + /** * Communicator started. Process the request. * @@ -4090,20 +4124,18 @@ route_via_neighbour (const struct Neighbour *n, for (struct Queue *pos = n->queue_head; NULL != pos; pos = pos->next_neighbour) { - /* Count the queue with the visibility task in all cases, as - otherwise we may end up with no queues just because the - time for the visibility task just expired but the scheduler - just ran this task first */ if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) || - (pos->validated_until.abs_value_us > now.abs_value_us) || - (NULL != pos->visibility_task)) + (pos->validated_until.abs_value_us > now.abs_value_us)) candidates++; } if (0 == candidates) { - /* Given that we above check for pos->visibility task, - this should be strictly impossible. */ - GNUNET_break (0); + /* This can happen rarely if the last confirmed queue timed + out just as we were beginning to process this message. */ + GNUNET_STATISTICS_update (GST_stats, + "# route selection failed (all no valid queue)", + 1, + GNUNET_NO); return; } sel1 = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, candidates); @@ -4115,12 +4147,8 @@ route_via_neighbour (const struct Neighbour *n, for (struct Queue *pos = n->queue_head; NULL != pos; pos = pos->next_neighbour) { - /* Count the queue with the visibility task in all cases, as - otherwise we may end up with no queues just because the - time for the visibility task just expired but the scheduler - just ran this task first */ - if ((pos->validated_until.abs_value_us > now.abs_value_us) || - (NULL != pos->visibility_task)) + if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) || + (pos->validated_until.abs_value_us > now.abs_value_us)) { if ((sel1 == candidates) || (sel2 == candidates)) queue_send_msg (pos, NULL, hdr, ntohs (hdr->size)); @@ -4197,21 +4225,21 @@ route_message (const struct GNUNET_PeerIdentity *target, struct GNUNET_MessageHeader *hdr, enum RouteMessageOptions options) { + struct VirtualLink *vl; struct Neighbour *n; struct DistanceVector *dv; - n = lookup_neighbour (target); - dv = (0 != (options & RMO_DV_ALLOWED)) - ? GNUNET_CONTAINER_multipeermap_get (dv_routes, target) - : NULL; + vl = GNUNET_CONTAINER_multipeermap_get (links, target); + n = vl->n; + dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL; if (0 == (options & RMO_UNCONFIRMED_ALLOWED)) { /* if confirmed is required, and we do not have anything confirmed, drop respective options */ - if ((NULL != n) && (GNUNET_NO == n->core_visible)) - n = NULL; - if ((NULL != dv) && (GNUNET_NO == dv->core_visible)) - dv = NULL; + if (NULL == n) + n = lookup_neighbour (target); + if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED))) + dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target); } if ((NULL == n) && (NULL == dv)) { @@ -5758,40 +5786,6 @@ path_cleanup_cb (void *cls) GNUNET_SCHEDULER_add_at (pos->timeout, &path_cleanup_cb, dv); } -/** - * Task run to check whether the hops of the @a cls still - * are validated, or if we need to core about disconnection. - * - * @param cls a `struct DistanceVector` (with core_visible set!) - */ -static void -check_dv_path_down (void *cls) -{ - struct DistanceVector *dv = cls; - struct Neighbour *n; - - dv->visibility_task = NULL; - GNUNET_assert (GNUNET_YES == dv->core_visible); - for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos; - pos = pos->next_dv) - { - if (0 < - GNUNET_TIME_absolute_get_remaining (pos->path_valid_until).rel_value_us) - { - dv->visibility_task = GNUNET_SCHEDULER_add_at (pos->path_valid_until, - &check_dv_path_down, - dv); - return; - } - } - /* all paths invalid, make dv core-invisible */ - dv->core_visible = GNUNET_NO; - n = lookup_neighbour (&dv->target); - if ((NULL != n) && (GNUNET_YES == n->core_visible)) - return; /* no need to tell core, connection still up! */ - cores_send_disconnect_info (&dv->target); -} - /** * The @a hop is a validated path to the respective target @@ -5804,22 +5798,30 @@ static void activate_core_visible_dv_path (struct DistanceVectorHop *hop) { struct DistanceVector *dv = hop->dv; - struct Neighbour *n; - - GNUNET_assert (GNUNET_NO == dv->core_visible); - GNUNET_assert (NULL == dv->visibility_task); + struct VirtualLink *vl; - dv->core_visible = GNUNET_YES; - dv->visibility_task = - GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_dv_path_down, dv); - n = lookup_neighbour (&dv->target); - if ((NULL != n) && (GNUNET_YES == n->core_visible)) - return; /* no need to tell core, connection already up! */ - cores_send_connect_info (&dv->target, - (NULL != n) - ? GNUNET_BANDWIDTH_value_sum (n->quota_out, - dv->quota_out) - : dv->quota_out); + vl = GNUNET_CONTAINER_multipeermap_get (links, &dv->target); + if (NULL != vl) + { + /* Link was already up, remember dv is also now available and we are done */ + vl->dv = dv; + return; + } + vl = GNUNET_new (struct VirtualLink); + vl->target = dv->target; + vl->dv = dv; + vl->core_recv_window = RECV_WINDOW_SIZE; + vl->visibility_task = + GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + links, + &vl->target, + vl, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + /* We lacked a confirmed connection to the target + before, so tell CORE about it (finally!) */ + cores_send_connect_info (&dv->target); } @@ -5934,9 +5936,8 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path, GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until); GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos); GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos); - if ((GNUNET_NO == dv->core_visible) && - (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until) - .rel_value_us)) + if (0 < + GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us) activate_core_visible_dv_path (pos); if (last_timeout.rel_value_us < GNUNET_TIME_relative_subtract (DV_PATH_VALIDITY_TIMEOUT, @@ -5976,8 +5977,7 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path, next_hop->dv_head, next_hop->dv_tail, hop); - if ((GNUNET_NO == dv->core_visible) && - (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)) + if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us) activate_core_visible_dv_path (hop); return GNUNET_YES; } @@ -6942,75 +6942,6 @@ find_queue (const struct GNUNET_PeerIdentity *pid, const char *address) } -/** - * Task run periodically to check whether the validity of the given queue has - * run its course. If so, finds either another queue to take over, or clears - * the neighbour's `core_visible` flag. In the latter case, gives DV routes a - * chance to take over, and if that fails, notifies CORE about the disconnect. - * - * @param cls a `struct Queue` - */ -static void -core_queue_visibility_check (void *cls) -{ - struct Queue *q = cls; - - q->visibility_task = NULL; - if (0 != GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) - { - q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, - &core_queue_visibility_check, - q); - return; - } - update_neighbour_core_visibility (q->neighbour); -} - - -/** - * Check whether the CORE visibility of @a n should change. Finds either a - * queue to preserve the visibility, or clears the neighbour's `core_visible` - * flag. In the latter case, gives DV routes a chance to take over, and if - * that fails, notifies CORE about the disconnect. If so, check whether we - * need to notify CORE. - * - * @param n neighbour to perform the check for - */ -static void -update_neighbour_core_visibility (struct Neighbour *n) -{ - struct DistanceVector *dv; - - GNUNET_assert (GNUNET_YES == n->core_visible); - /* Check if _any_ queue of this neighbour is still valid, if so, schedule - the #core_queue_visibility_check() task for that queue */ - for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour) - { - if (0 != - GNUNET_TIME_absolute_get_remaining (q->validated_until).rel_value_us) - { - /* found a valid queue, use this one */ - q->visibility_task = - GNUNET_SCHEDULER_add_at (q->validated_until, - &core_queue_visibility_check, - q); - return; - } - } - n->core_visible = GNUNET_NO; - - /* Check if _any_ DV route to this neighbour is currently - valid, if so, do NOT tell core about the loss of direct - connectivity (DV still counts!) */ - dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid); - if (GNUNET_YES == dv->core_visible) - return; - /* Nothing works anymore, need to tell CORE about the loss of - connectivity! */ - cores_send_disconnect_info (&n->pid); -} - - /** * Communicator gave us a transport address validation response. Process the * request. @@ -7030,8 +6961,8 @@ handle_validation_response ( .vs = NULL}; struct GNUNET_TIME_Absolute origin_time; struct Queue *q; - struct DistanceVector *dv; struct Neighbour *n; + struct VirtualLink *vl; /* check this is one of our challenges */ (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map, @@ -7129,24 +7060,28 @@ handle_validation_response ( q->validated_until = vs->validated_until; q->pd.aged_rtt = vs->validation_rtt; n = q->neighbour; - if (GNUNET_NO != n->core_visible) - return; /* nothing changed, we are done here */ - n->core_visible = GNUNET_YES; - q->visibility_task = GNUNET_SCHEDULER_add_at (q->validated_until, - &core_queue_visibility_check, - q); - /* Check if _any_ DV route to this neighbour is - currently valid, if so, do NOT tell core anything! */ - dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &n->pid); - if ((NULL != dv) && (GNUNET_YES == dv->core_visible)) - return; /* nothing changed, done */ - /* We lacked a confirmed connection to the neighbour + vl = GNUNET_CONTAINER_multipeermap_get (links, &vs->pid); + if (NULL != vl) + { + /* Link was already up, remember n is also now available and we are done */ + vl->n = n; + return; + } + vl = GNUNET_new (struct VirtualLink); + vl->target = n->pid; + vl->n = n; + vl->core_recv_window = RECV_WINDOW_SIZE; + vl->visibility_task = + GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put ( + links, + &vl->target, + vl, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + /* We lacked a confirmed connection to the target before, so tell CORE about it (finally!) */ - cores_send_connect_info (&n->pid, - (NULL != dv) - ? GNUNET_BANDWIDTH_value_sum (dv->quota_out, - n->quota_out) - : n->quota_out); + cores_send_connect_info (&n->pid); } @@ -8256,7 +8191,6 @@ handle_add_queue_message (void *cls, if (NULL == neighbour) { neighbour = GNUNET_new (struct Neighbour); - neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; neighbour->pid = aqm->receiver; GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_put ( @@ -8872,8 +8806,12 @@ do_shutdown (void *cls) NULL); GNUNET_CONTAINER_multishortmap_destroy (pending_acks); pending_acks = NULL; + GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (neighbours)); GNUNET_CONTAINER_multipeermap_destroy (neighbours); neighbours = NULL; + GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (links)); + GNUNET_CONTAINER_multipeermap_destroy (links); + links = NULL; GNUNET_CONTAINER_multipeermap_iterate (backtalkers, &free_backtalker_cb, NULL); @@ -8926,6 +8864,7 @@ run (void *cls, pending_acks = GNUNET_CONTAINER_multishortmap_create (32768, GNUNET_YES); ack_cummulators = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES); neighbours = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); + links = GNUNET_CONTAINER_multipeermap_create (512, GNUNET_YES); dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, GNUNET_YES); ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES); ephemeral_heap = @@ -8995,6 +8934,10 @@ GNUNET_SERVICE_MAIN ( GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, struct OutboundMessage, NULL), + GNUNET_MQ_hd_fixed_size (client_recv_ok, + GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK, + struct RecvOkMessage, + NULL), /* communication with communicators */ GNUNET_MQ_hd_var_size (communicator_available, GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR, diff --git a/src/transport/gnunet-transport-profiler.c b/src/transport/gnunet-transport-profiler.c index 9160a78b2..89f5b4108 100644 --- a/src/transport/gnunet-transport-profiler.c +++ b/src/transport/gnunet-transport-profiler.c @@ -32,7 +32,6 @@ #include "gnunet_protocols.h" #include "gnunet_ats_service.h" #include "gnunet_transport_service.h" -#include "gnunet_transport_core_service.h" struct Iteration @@ -54,7 +53,8 @@ struct Iteration /** * Timeout for a connections */ -#define CONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) +#define CONNECT_TIMEOUT \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) /** * Benchmarking block size in bye @@ -214,15 +214,16 @@ shutdown_task (void *cls) { inext = icur->next; icur->rate = ((benchmark_count * benchmark_size) / 1024) / - ((float) icur->dur.rel_value_us / (1000 * 1000)); + ((float) icur->dur.rel_value_us / (1000 * 1000)); if (verbosity > 0) - FPRINTF (stdout, _("%llu B in %llu ms == %.2f KB/s!\n"), - ((long long unsigned int) benchmark_count * benchmark_size), - ((long long unsigned int) icur->dur.rel_value_us / 1000), - (float) icur->rate); + FPRINTF (stdout, + _ ("%llu B in %llu ms == %.2f KB/s!\n"), + ((long long unsigned int) benchmark_count * benchmark_size), + ((long long unsigned int) icur->dur.rel_value_us / 1000), + (float) icur->rate); avg_duration += icur->dur.rel_value_us / (1000); - avg_rate += icur->rate; + avg_rate += icur->rate; iterations++; } if (0 == iterations) @@ -238,19 +239,17 @@ shutdown_task (void *cls) while (NULL != (icur = inext)) { inext = icur->next; - stddev_rate += ((icur->rate-avg_rate) * - (icur->rate-avg_rate)); + stddev_rate += ((icur->rate - avg_rate) * (icur->rate - avg_rate)); stddev_duration += (((icur->dur.rel_value_us / 1000) - avg_duration) * - ((icur->dur.rel_value_us / 1000) - avg_duration)); - + ((icur->dur.rel_value_us / 1000) - avg_duration)); } /* Calculate standard deviation rate */ stddev_rate = stddev_rate / iterations; - stddev_rate = sqrtf(stddev_rate); + stddev_rate = sqrtf (stddev_rate); /* Calculate standard deviation duration */ stddev_duration = stddev_duration / iterations; - stddev_duration = sqrtf(stddev_duration); + stddev_duration = sqrtf (stddev_duration); /* Output */ FPRINTF (stdout, @@ -266,9 +265,7 @@ shutdown_task (void *cls) while (NULL != (icur = inext)) { inext = icur->next; - GNUNET_CONTAINER_DLL_remove (ihead, - itail, - icur); + GNUNET_CONTAINER_DLL_remove (ihead, itail, icur); FPRINTF (stdout, ";%llu;%.2f", @@ -316,27 +313,19 @@ send_msg (void *cls) if (NULL == mq) return; - env = GNUNET_MQ_msg_extra (m, - benchmark_size, - GNUNET_MESSAGE_TYPE_DUMMY); - memset (&m[1], - 52, - benchmark_size - sizeof(struct GNUNET_MessageHeader)); - + env = GNUNET_MQ_msg_extra (m, benchmark_size, GNUNET_MESSAGE_TYPE_DUMMY); + memset (&m[1], 52, benchmark_size - sizeof (struct GNUNET_MessageHeader)); + if (itail->msgs_sent < benchmark_count) { - GNUNET_MQ_notify_sent (env, - &send_msg, - NULL); + GNUNET_MQ_notify_sent (env, &send_msg, NULL); } else { iteration_done (); } - GNUNET_MQ_send (mq, - env); - if ( (verbosity > 0) && - (0 == itail->msgs_sent % 10) ) + GNUNET_MQ_send (mq, env); + if ((verbosity > 0) && (0 == itail->msgs_sent % 10)) FPRINTF (stdout, "."); } @@ -351,15 +340,14 @@ iteration_start () return; benchmark_running = GNUNET_YES; icur = GNUNET_new (struct Iteration); - GNUNET_CONTAINER_DLL_insert_tail (ihead, - itail, - icur); - icur->start = GNUNET_TIME_absolute_get(); + GNUNET_CONTAINER_DLL_insert_tail (ihead, itail, icur); + icur->start = GNUNET_TIME_absolute_get (); if (verbosity > 0) - FPRINTF (stdout, - "\nStarting benchmark, starting to send %u messages in %u byte blocks\n", - benchmark_count, - benchmark_size); + FPRINTF ( + stdout, + "\nStarting benchmark, starting to send %u messages in %u byte blocks\n", + benchmark_count, + benchmark_size); send_msg (NULL); } @@ -393,22 +381,16 @@ iteration_done () static void * notify_connect (void *cls, const struct GNUNET_PeerIdentity *peer, - struct GNUNET_MQ_Handle *m) + struct GNUNET_MQ_Handle *m) { - if (0 != memcmp (&pid, - peer, - sizeof(struct GNUNET_PeerIdentity))) + if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity))) { - FPRINTF (stdout, - "Connected to different peer `%s'\n", - GNUNET_i2s (&pid)); + FPRINTF (stdout, "Connected to different peer `%s'\n", GNUNET_i2s (&pid)); return NULL; } if (verbosity > 0) - FPRINTF (stdout, - "Successfully connected to `%s'\n", - GNUNET_i2s (&pid)); + FPRINTF (stdout, "Successfully connected to `%s'\n", GNUNET_i2s (&pid)); mq = m; iteration_start (); return NULL; @@ -426,18 +408,16 @@ notify_connect (void *cls, static void notify_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer, - void *internal_cls) + void *internal_cls) { - if (0 != memcmp (&pid, - peer, - sizeof(struct GNUNET_PeerIdentity))) + if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity))) return; mq = NULL; if (GNUNET_YES == benchmark_running) { FPRINTF (stdout, "Disconnected from peer `%s' while benchmarking\n", - GNUNET_i2s (&pid)); + GNUNET_i2s (&pid)); return; } } @@ -451,8 +431,7 @@ notify_disconnect (void *cls, * @return #GNUNET_OK */ static int -check_dummy (void *cls, - const struct GNUNET_MessageHeader *message) +check_dummy (void *cls, const struct GNUNET_MessageHeader *message) { return GNUNET_OK; /* all messages are fine */ } @@ -465,30 +444,24 @@ check_dummy (void *cls, * @param message the message */ static void -handle_dummy (void *cls, - const struct GNUNET_MessageHeader *message) +handle_dummy (void *cls, const struct GNUNET_MessageHeader *message) { if (! benchmark_receive) return; if (verbosity > 0) FPRINTF (stdout, - "Received %u bytes\n", - (unsigned int) ntohs (message->size)); + "Received %u bytes\n", + (unsigned int) ntohs (message->size)); } static int -blacklist_cb (void *cls, - const struct GNUNET_PeerIdentity *peer) +blacklist_cb (void *cls, const struct GNUNET_PeerIdentity *peer) { - if (0 != memcmp (&pid, - peer, - sizeof(struct GNUNET_PeerIdentity))) + if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity))) { if (verbosity > 0) - FPRINTF (stdout, - "Denying connection to `%s'\n", - GNUNET_i2s (peer)); + FPRINTF (stdout, "Denying connection to `%s'\n", GNUNET_i2s (peer)); return GNUNET_SYSERR; } return GNUNET_OK; @@ -509,38 +482,32 @@ run (void *cls, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *mycfg) { - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_var_size (dummy, - GNUNET_MESSAGE_TYPE_DUMMY, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_handler_end () - }; - + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_var_size (dummy, + GNUNET_MESSAGE_TYPE_DUMMY, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_handler_end ()}; + cfg = (struct GNUNET_CONFIGURATION_Handle *) mycfg; ret = 1; if (GNUNET_MAX_MESSAGE_SIZE <= benchmark_size) { - FPRINTF (stderr, - "Message size too big!\n"); + FPRINTF (stderr, "Message size too big!\n"); return; } if (NULL == cpid) { - FPRINTF (stderr, - "No peer identity given\n"); + FPRINTF (stderr, "No peer identity given\n"); return; } - if (GNUNET_OK != - GNUNET_CRYPTO_eddsa_public_key_from_string (cpid, - strlen (cpid), - &pid.public_key)) + if (GNUNET_OK != GNUNET_CRYPTO_eddsa_public_key_from_string (cpid, + strlen (cpid), + &pid.public_key)) { - FPRINTF (stderr, - "Failed to parse peer identity `%s'\n", - cpid); + FPRINTF (stderr, "Failed to parse peer identity `%s'\n", cpid); return; } if (1 == benchmark_send) @@ -548,7 +515,8 @@ run (void *cls, if (verbosity > 0) FPRINTF (stderr, "Trying to send %u messages with size %u to peer `%s'\n", - benchmark_count, benchmark_size, + benchmark_count, + benchmark_size, GNUNET_i2s (&pid)); } else if (1 == benchmark_receive) @@ -559,50 +527,42 @@ run (void *cls, } else { - FPRINTF (stderr, - "No operation given\n"); + FPRINTF (stderr, "No operation given\n"); return; } ats = GNUNET_ATS_connectivity_init (cfg); if (NULL == ats) { - FPRINTF (stderr, - "Failed to connect to ATS service\n"); + FPRINTF (stderr, "Failed to connect to ATS service\n"); ret = 1; return; } handle = GNUNET_TRANSPORT_core_connect (cfg, - NULL, - handlers, - NULL, - ¬ify_connect, - ¬ify_disconnect, - NULL); + NULL, + handlers, + NULL, + ¬ify_connect, + ¬ify_disconnect, + NULL); if (NULL == handle) { - FPRINTF (stderr, - "Failed to connect to transport service\n"); + FPRINTF (stderr, "Failed to connect to transport service\n"); GNUNET_ATS_connectivity_done (ats); ats = NULL; ret = 1; return; } - bl_handle = GNUNET_TRANSPORT_blacklist (cfg, - &blacklist_cb, - NULL); - ats_sh = GNUNET_ATS_connectivity_suggest (ats, - &pid, - 1); - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - NULL); + bl_handle = GNUNET_TRANSPORT_blacklist (cfg, &blacklist_cb, NULL); + ats_sh = GNUNET_ATS_connectivity_suggest (ats, &pid, 1); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); } int -main (int argc, char * const *argv) +main (int argc, char *const *argv) { int res; benchmark_count = DEFAULT_MESSAGE_COUNT; @@ -613,46 +573,48 @@ main (int argc, char * const *argv) struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_option_flag ('s', - "send", - gettext_noop ("send data to peer"), - &benchmark_send), + "send", + gettext_noop ("send data to peer"), + &benchmark_send), GNUNET_GETOPT_option_flag ('r', - "receive", - gettext_noop ("receive data from peer"), - &benchmark_receive), + "receive", + gettext_noop ("receive data from peer"), + &benchmark_receive), GNUNET_GETOPT_option_uint ('i', - "iterations", - NULL, - gettext_noop ("iterations"), - &benchmark_iterations), + "iterations", + NULL, + gettext_noop ("iterations"), + &benchmark_iterations), GNUNET_GETOPT_option_uint ('n', - "number", - NULL, - gettext_noop ("number of messages to send"), - &benchmark_count), + "number", + NULL, + gettext_noop ("number of messages to send"), + &benchmark_count), GNUNET_GETOPT_option_uint ('m', - "messagesize", - NULL, - gettext_noop ("message size to use"), - &benchmark_size), + "messagesize", + NULL, + gettext_noop ("message size to use"), + &benchmark_size), GNUNET_GETOPT_option_string ('p', "peer", "PEER", gettext_noop ("peer identity"), &cpid), GNUNET_GETOPT_option_verbose (&verbosity), - GNUNET_GETOPT_OPTION_END - }; + GNUNET_GETOPT_OPTION_END}; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) return 2; - res = GNUNET_PROGRAM_run (argc, argv, - "gnunet-transport", - gettext_noop ("Direct access to transport service."), - options, - &run, NULL); - GNUNET_free((void *) argv); + res = + GNUNET_PROGRAM_run (argc, + argv, + "gnunet-transport", + gettext_noop ("Direct access to transport service."), + options, + &run, + NULL); + GNUNET_free ((void *) argv); if (GNUNET_OK == res) return ret; return 1; diff --git a/src/transport/gnunet-transport.c b/src/transport/gnunet-transport.c index c3c1afc38..36c8fc451 100644 --- a/src/transport/gnunet-transport.c +++ b/src/transport/gnunet-transport.c @@ -29,12 +29,12 @@ #include "gnunet_resolver_service.h" #include "gnunet_protocols.h" #include "gnunet_transport_service.h" -#include "gnunet_transport_core_service.h" /** * Timeout for a name resolution */ -#define RESOLUTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) +#define RESOLUTION_TIMEOUT \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) /** * Timeout for an operation @@ -332,16 +332,13 @@ static struct PeerResolutionContext *rc_tail; * @return #GNUNET_OK (continue to iterate) */ static int -destroy_it (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +destroy_it (void *cls, const struct GNUNET_PeerIdentity *key, void *value) { struct MonitoredPeer *m = value; - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_remove (monitored_peers, - key, - value)); + GNUNET_assert ( + GNUNET_OK == + GNUNET_CONTAINER_multipeermap_remove (monitored_peers, key, value)); GNUNET_free_non_null (m->address); GNUNET_free (value); return GNUNET_OK; @@ -384,18 +381,14 @@ shutdown_task (void *cls) next = cur->next; GNUNET_TRANSPORT_address_to_string_cancel (cur->asc); - GNUNET_CONTAINER_DLL_remove (vc_head, - vc_tail, - cur); + GNUNET_CONTAINER_DLL_remove (vc_head, vc_tail, cur); GNUNET_free (cur->transport); GNUNET_HELLO_address_free (cur->addrcp); GNUNET_free (cur); } while (NULL != (rc = rc_head)) { - GNUNET_CONTAINER_DLL_remove (rc_head, - rc_tail, - rc); + GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc); GNUNET_TRANSPORT_address_to_string_cancel (rc->asc); GNUNET_free (rc->transport); GNUNET_free (rc->addrcp); @@ -410,35 +403,30 @@ shutdown_task (void *cls) { duration = GNUNET_TIME_absolute_get_duration (start_time); FPRINTF (stdout, - _("Transmitted %llu bytes/s (%llu bytes in %s)\n"), + _ ("Transmitted %llu bytes/s (%llu bytes in %s)\n"), 1000LL * 1000LL * traffic_sent / (1 + duration.rel_value_us), traffic_sent, - GNUNET_STRINGS_relative_time_to_string (duration, - GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_YES)); } if (benchmark_receive) { duration = GNUNET_TIME_absolute_get_duration (start_time); FPRINTF (stdout, - _("Received %llu bytes/s (%llu bytes in %s)\n"), + _ ("Received %llu bytes/s (%llu bytes in %s)\n"), 1000LL * 1000LL * traffic_received / (1 + duration.rel_value_us), traffic_received, - GNUNET_STRINGS_relative_time_to_string (duration, - GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_YES)); } if (NULL != monitored_peers) { - GNUNET_CONTAINER_multipeermap_iterate (monitored_peers, - &destroy_it, - NULL); + GNUNET_CONTAINER_multipeermap_iterate (monitored_peers, &destroy_it, NULL); GNUNET_CONTAINER_multipeermap_destroy (monitored_peers); monitored_peers = NULL; } if (NULL != monitored_plugins) { - GNUNET_break (0 == - GNUNET_CONTAINER_multipeermap_size (monitored_plugins)); + GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (monitored_plugins)); GNUNET_CONTAINER_multipeermap_destroy (monitored_plugins); monitored_plugins = NULL; } @@ -463,9 +451,7 @@ operation_timeout (void *cls) op_timeout = NULL; if ((benchmark_send) || (benchmark_receive)) { - FPRINTF (stdout, - _("Failed to connect to `%s'\n"), - GNUNET_i2s_full (&pid)); + FPRINTF (stdout, _ ("Failed to connect to `%s'\n"), GNUNET_i2s_full (&pid)); GNUNET_SCHEDULER_shutdown (); ret = 1; return; @@ -477,21 +463,18 @@ operation_timeout (void *cls) { next = cur->next; FPRINTF (stdout, - _("Failed to resolve address for peer `%s'\n"), + _ ("Failed to resolve address for peer `%s'\n"), GNUNET_i2s (&cur->addrcp->peer)); - GNUNET_CONTAINER_DLL_remove(rc_head, - rc_tail, - cur); + GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, cur); GNUNET_TRANSPORT_address_to_string_cancel (cur->asc); GNUNET_free (cur->transport); GNUNET_free (cur->addrcp); GNUNET_free (cur); - } FPRINTF (stdout, "%s", - _("Failed to list connections, timeout occurred\n")); + _ ("Failed to list connections, timeout occurred\n")); GNUNET_SCHEDULER_shutdown (); ret = 1; return; @@ -512,22 +495,15 @@ do_send (void *cls) struct GNUNET_MessageHeader *m; struct GNUNET_MQ_Envelope *env; - env = GNUNET_MQ_msg_extra (m, - BLOCKSIZE * 1024, - GNUNET_MESSAGE_TYPE_DUMMY); - memset (&m[1], - 52, - BLOCKSIZE * 1024 - sizeof(struct GNUNET_MessageHeader)); + env = GNUNET_MQ_msg_extra (m, BLOCKSIZE * 1024, GNUNET_MESSAGE_TYPE_DUMMY); + memset (&m[1], 52, BLOCKSIZE * 1024 - sizeof (struct GNUNET_MessageHeader)); traffic_sent += BLOCKSIZE * 1024; - GNUNET_MQ_notify_sent (env, - &do_send, - mq); + GNUNET_MQ_notify_sent (env, &do_send, mq); if (verbosity > 0) FPRINTF (stdout, - _("Transmitting %u bytes\n"), - (unsigned int) BLOCKSIZE * 1024); - GNUNET_MQ_send (mq, - env); + _ ("Transmitting %u bytes\n"), + (unsigned int) BLOCKSIZE * 1024); + GNUNET_MQ_send (mq, env); } @@ -542,11 +518,9 @@ do_send (void *cls) static void * notify_connect (void *cls, const struct GNUNET_PeerIdentity *peer, - struct GNUNET_MQ_Handle *mq) + struct GNUNET_MQ_Handle *mq) { - if (0 != memcmp (&pid, - peer, - sizeof(struct GNUNET_PeerIdentity))) + if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity))) return NULL; ret = 0; if (! benchmark_send) @@ -557,10 +531,12 @@ notify_connect (void *cls, op_timeout = NULL; } if (verbosity > 0) - FPRINTF (stdout, - _("Successfully connected to `%s', starting to send benchmark data in %u Kb blocks\n"), - GNUNET_i2s (peer), - BLOCKSIZE); + FPRINTF ( + stdout, + _ ( + "Successfully connected to `%s', starting to send benchmark data in %u Kb blocks\n"), + GNUNET_i2s (peer), + BLOCKSIZE); start_time = GNUNET_TIME_absolute_get (); do_send (mq); return mq; @@ -578,19 +554,17 @@ notify_connect (void *cls, static void notify_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer, - void *internal_cls) + void *internal_cls) { - if (0 != memcmp (&pid, - peer, - sizeof(struct GNUNET_PeerIdentity))) + if (0 != memcmp (&pid, peer, sizeof (struct GNUNET_PeerIdentity))) return; if (NULL == internal_cls) return; /* not about target peer */ if (! benchmark_send) return; /* not transmitting */ FPRINTF (stdout, - _("Disconnected from peer `%s' while benchmarking\n"), - GNUNET_i2s (&pid)); + _ ("Disconnected from peer `%s' while benchmarking\n"), + GNUNET_i2s (&pid)); } @@ -606,16 +580,16 @@ notify_disconnect (void *cls, static void * monitor_notify_connect (void *cls, const struct GNUNET_PeerIdentity *peer, - struct GNUNET_MQ_Handle *mq) + struct GNUNET_MQ_Handle *mq) { struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); const char *now_str = GNUNET_STRINGS_absolute_time_to_string (now); monitor_connect_counter++; FPRINTF (stdout, - _("%24s: %-17s %4s (%u connections in total)\n"), + _ ("%24s: %-17s %4s (%u connections in total)\n"), now_str, - _("Connected to"), + _ ("Connected to"), GNUNET_i2s (peer), monitor_connect_counter); return NULL; @@ -633,18 +607,18 @@ monitor_notify_connect (void *cls, static void monitor_notify_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer, - void *internal_cls) + void *internal_cls) { struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); const char *now_str = GNUNET_STRINGS_absolute_time_to_string (now); - GNUNET_assert(monitor_connect_counter > 0); + GNUNET_assert (monitor_connect_counter > 0); monitor_connect_counter--; FPRINTF (stdout, - _("%24s: %-17s %4s (%u connections in total)\n"), + _ ("%24s: %-17s %4s (%u connections in total)\n"), now_str, - _("Disconnected from"), + _ ("Disconnected from"), GNUNET_i2s (peer), monitor_connect_counter); } @@ -658,8 +632,7 @@ monitor_notify_disconnect (void *cls, * @return #GNUNET_OK */ static int -check_dummy (void *cls, - const struct GNUNET_MessageHeader *message) +check_dummy (void *cls, const struct GNUNET_MessageHeader *message) { return GNUNET_OK; /* all messages are fine */ } @@ -672,15 +645,14 @@ check_dummy (void *cls, * @param message the message */ static void -handle_dummy (void *cls, - const struct GNUNET_MessageHeader *message) +handle_dummy (void *cls, const struct GNUNET_MessageHeader *message) { if (! benchmark_receive) return; if (verbosity > 0) FPRINTF (stdout, - _("Received %u bytes\n"), - (unsigned int) ntohs (message->size)); + _ ("Received %u bytes\n"), + (unsigned int) ntohs (message->size)); if (0 == traffic_received) start_time = GNUNET_TIME_absolute_get (); traffic_received += ntohs (message->size); @@ -711,24 +683,23 @@ print_info (const struct GNUNET_PeerIdentity *id, struct GNUNET_TIME_Absolute state_timeout) { - if ( ((GNUNET_YES == iterate_connections) && - (GNUNET_YES == iterate_all)) || - (GNUNET_YES == monitor_connections)) + if (((GNUNET_YES == iterate_connections) && (GNUNET_YES == iterate_all)) || + (GNUNET_YES == monitor_connections)) { FPRINTF (stdout, - _("Peer `%s': %s %s in state `%s' until %s\n"), + _ ("Peer `%s': %s %s in state `%s' until %s\n"), GNUNET_i2s (id), (NULL == transport) ? "" : transport, (NULL == transport) ? "" : addr, GNUNET_TRANSPORT_ps2s (state), GNUNET_STRINGS_absolute_time_to_string (state_timeout)); } - else if ( (GNUNET_YES == iterate_connections) && - (GNUNET_TRANSPORT_is_connected(state)) ) + else if ((GNUNET_YES == iterate_connections) && + (GNUNET_TRANSPORT_is_connected (state))) { /* Only connected peers, skip state */ FPRINTF (stdout, - _("Peer `%s': %s %s\n"), + _ ("Peer `%s': %s %s\n"), GNUNET_i2s (id), transport, addr); @@ -753,9 +724,7 @@ print_info (const struct GNUNET_PeerIdentity *id, * if #GNUNET_SYSERR: communication error (IPC error) */ static void -process_peer_string (void *cls, - const char *address, - int res) +process_peer_string (void *cls, const char *address, int res) { struct PeerResolutionContext *rc = cls; @@ -763,11 +732,12 @@ process_peer_string (void *cls, { if (GNUNET_SYSERR == res) { - FPRINTF (stderr, - "Failed to convert address for peer `%s' plugin `%s' length %u to string \n", - GNUNET_i2s (&rc->addrcp->peer), - rc->addrcp->transport_name, - (unsigned int) rc->addrcp->address_length); + FPRINTF ( + stderr, + "Failed to convert address for peer `%s' plugin `%s' length %u to string \n", + GNUNET_i2s (&rc->addrcp->peer), + rc->addrcp->transport_name, + (unsigned int) rc->addrcp->address_length); print_info (&rc->addrcp->peer, rc->transport, NULL, @@ -818,9 +788,7 @@ process_peer_string (void *cls, } GNUNET_free (rc->transport); GNUNET_free (rc->addrcp); - GNUNET_CONTAINER_DLL_remove (rc_head, - rc_tail, - rc); + GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc); GNUNET_free (rc); if ((0 == address_resolutions) && (iterate_connections)) { @@ -854,9 +822,7 @@ resolve_peer_address (const struct GNUNET_HELLO_Address *address, struct PeerResolutionContext *rc; rc = GNUNET_new (struct PeerResolutionContext); - GNUNET_CONTAINER_DLL_insert (rc_head, - rc_tail, - rc); + GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc); address_resolutions++; rc->transport = GNUNET_strdup (address->transport_name); rc->addrcp = GNUNET_HELLO_address_copy (address); @@ -869,7 +835,7 @@ resolve_peer_address (const struct GNUNET_HELLO_Address *address, numeric, RESOLUTION_TIMEOUT, &process_peer_string, - rc); + rc); } @@ -897,15 +863,14 @@ process_peer_iteration_cb (void *cls, return; } - if ( (GNUNET_NO == iterate_all) && - (GNUNET_NO == GNUNET_TRANSPORT_is_connected(state))) - return; /* Display only connected peers */ + if ((GNUNET_NO == iterate_all) && + (GNUNET_NO == GNUNET_TRANSPORT_is_connected (state))) + return; /* Display only connected peers */ if (NULL != op_timeout) GNUNET_SCHEDULER_cancel (op_timeout); - op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, - &operation_timeout, - NULL); + op_timeout = + GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received address for peer `%s': %s\n", @@ -913,16 +878,9 @@ process_peer_iteration_cb (void *cls, address ? address->transport_name : ""); if (NULL != address) - resolve_peer_address (address, - numeric, - state, - state_timeout); + resolve_peer_address (address, numeric, state, state_timeout); else - print_info (peer, - NULL, - NULL, - state, - state_timeout); + print_info (peer, NULL, NULL, state, state_timeout); } @@ -958,7 +916,7 @@ struct PluginMonitorAddress */ static void print_plugin_event_info (struct PluginMonitorAddress *addr, - const struct GNUNET_TRANSPORT_SessionInfo *info) + const struct GNUNET_TRANSPORT_SessionInfo *info) { const char *state; @@ -987,20 +945,22 @@ print_plugin_event_info (struct PluginMonitorAddress *addr, "%s: state %s timeout in %s @ %s%s\n", GNUNET_i2s (&info->address->peer), state, - GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (info->session_timeout), - GNUNET_YES), - addr->str, + GNUNET_STRINGS_relative_time_to_string ( + GNUNET_TIME_absolute_get_remaining (info->session_timeout), + GNUNET_YES), + addr->str, (info->is_inbound == GNUNET_YES) ? " (INBOUND)" : ""); fprintf (stdout, "%s: queue has %3u messages and %6u bytes\n", GNUNET_i2s (&info->address->peer), info->num_msg_pending, info->num_bytes_pending); - if (0 != GNUNET_TIME_absolute_get_remaining (info->receive_delay).rel_value_us) + if (0 != + GNUNET_TIME_absolute_get_remaining (info->receive_delay).rel_value_us) fprintf (stdout, - "%s: receiving blocked until %s\n", - GNUNET_i2s (&info->address->peer), - GNUNET_STRINGS_absolute_time_to_string (info->receive_delay)); + "%s: receiving blocked until %s\n", + GNUNET_i2s (&info->address->peer), + GNUNET_STRINGS_absolute_time_to_string (info->receive_delay)); } @@ -1021,9 +981,7 @@ print_plugin_event_info (struct PluginMonitorAddress *addr, * if #GNUNET_SYSERR: communication error (IPC error) */ static void -address_cb (void *cls, - const char *address, - int res) +address_cb (void *cls, const char *address, int res) { struct PluginMonitorAddress *addr = cls; @@ -1035,8 +993,7 @@ address_cb (void *cls, if (NULL != addr->str) return; addr->str = GNUNET_strdup (address); - print_plugin_event_info (addr, - &addr->si); + print_plugin_event_info (addr, &addr->si); } @@ -1065,8 +1022,7 @@ plugin_monitoring_cb (void *cls, { struct PluginMonitorAddress *addr; - if ( (NULL == info) && - (NULL == session) ) + if ((NULL == info) && (NULL == session)) return; /* in sync with transport service */ addr = *session_ctx; if (NULL == info) @@ -1084,26 +1040,25 @@ plugin_monitoring_cb (void *cls, } return; /* shutdown */ } - if (0 != memcmp (&info->address->peer, - &pid, - sizeof (struct GNUNET_PeerIdentity))) + if (0 != + memcmp (&info->address->peer, &pid, sizeof (struct GNUNET_PeerIdentity))) return; /* filtered */ if (NULL == addr) { addr = GNUNET_new (struct PluginMonitorAddress); - addr->asc = GNUNET_TRANSPORT_address_to_string (cfg, - info->address, - numeric, - GNUNET_TIME_UNIT_FOREVER_REL, - &address_cb, - addr); + addr->asc = + GNUNET_TRANSPORT_address_to_string (cfg, + info->address, + numeric, + GNUNET_TIME_UNIT_FOREVER_REL, + &address_cb, + addr); *session_ctx = addr; } if (NULL == addr->str) addr->si = *info; else - print_plugin_event_info (addr, - info); + print_plugin_event_info (addr, info); if (GNUNET_TRANSPORT_SS_DONE == info->state) { if (NULL != addr->asc) @@ -1141,38 +1096,35 @@ process_peer_monitoring_cb (void *cls, { FPRINTF (stdout, "%s", - _("Monitor disconnected from transport service. Reconnecting.\n")); + _ ( + "Monitor disconnected from transport service. Reconnecting.\n")); return; } if (NULL != op_timeout) GNUNET_SCHEDULER_cancel (op_timeout); - op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, - &operation_timeout, - NULL); + op_timeout = + GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL); - if (NULL == (m = GNUNET_CONTAINER_multipeermap_get (monitored_peers, - peer))) + if (NULL == (m = GNUNET_CONTAINER_multipeermap_get (monitored_peers, peer))) { m = GNUNET_new (struct MonitoredPeer); - GNUNET_CONTAINER_multipeermap_put (monitored_peers, - peer, - m, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + GNUNET_CONTAINER_multipeermap_put ( + monitored_peers, + peer, + m, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } else { - if ( (m->state == state) && - (m->state_timeout.abs_value_us == state_timeout.abs_value_us) && - (NULL == address) && - (NULL == m->address) ) + if ((m->state == state) && + (m->state_timeout.abs_value_us == state_timeout.abs_value_us) && + (NULL == address) && (NULL == m->address)) { return; /* No real change */ } - if ( (m->state == state) && - (NULL != address) && - (NULL != m->address) && - (0 == GNUNET_HELLO_address_cmp(m->address, address)) ) + if ((m->state == state) && (NULL != address) && (NULL != m->address) && + (0 == GNUNET_HELLO_address_cmp (m->address, address))) return; /* No real change */ } @@ -1187,16 +1139,9 @@ process_peer_monitoring_cb (void *cls, m->state_timeout = state_timeout; if (NULL != address) - resolve_peer_address (m->address, - numeric, - m->state, - m->state_timeout); + resolve_peer_address (m->address, numeric, m->state, m->state_timeout); else - print_info (peer, - NULL, - NULL, - m->state, - m->state_timeout); + print_info (peer, NULL, NULL, m->state, m->state_timeout); } @@ -1210,12 +1155,9 @@ process_peer_monitoring_cb (void *cls, * @return #GNUNET_OK if the connection is allowed, #GNUNET_SYSERR if not */ static int -blacklist_cb (void *cls, - const struct GNUNET_PeerIdentity *cpid) +blacklist_cb (void *cls, const struct GNUNET_PeerIdentity *cpid) { - if (0 == memcmp (cpid, - &pid, - sizeof (struct GNUNET_PeerIdentity))) + if (0 == memcmp (cpid, &pid, sizeof (struct GNUNET_PeerIdentity))) return GNUNET_SYSERR; return GNUNET_OK; } @@ -1231,7 +1173,7 @@ blacklist_cb (void *cls, */ static void run (void *cls, - char * const *args, + char *const *args, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *mycfg) { @@ -1241,127 +1183,119 @@ run (void *cls, cfg = (struct GNUNET_CONFIGURATION_Handle *) mycfg; - counter = benchmark_send + benchmark_receive + iterate_connections - + monitor_connections + monitor_connects + do_disconnect + - monitor_plugins; + counter = benchmark_send + benchmark_receive + iterate_connections + + monitor_connections + monitor_connects + do_disconnect + + monitor_plugins; if (1 < counter) { - FPRINTF (stderr, - _("Multiple operations given. Please choose only one operation: %s, %s, %s, %s, %s, %s %s\n"), - "disconnect", - "benchmark send", - "benchmark receive", - "information", - "monitor", - "events", - "plugins"); + FPRINTF ( + stderr, + _ ( + "Multiple operations given. Please choose only one operation: %s, %s, %s, %s, %s, %s %s\n"), + "disconnect", + "benchmark send", + "benchmark receive", + "information", + "monitor", + "events", + "plugins"); return; } if (0 == counter) { - FPRINTF (stderr, - _("No operation given. Please choose one operation: %s, %s, %s, %s, %s, %s, %s\n"), - "disconnect", - "benchmark send", - "benchmark receive", - "information", - "monitor", - "events", - "plugins"); + FPRINTF ( + stderr, + _ ( + "No operation given. Please choose one operation: %s, %s, %s, %s, %s, %s, %s\n"), + "disconnect", + "benchmark send", + "benchmark receive", + "information", + "monitor", + "events", + "plugins"); return; } if (do_disconnect) /* -D: Disconnect from peer */ { - if (0 == memcmp (&zero_pid, - &pid, - sizeof (pid))) + if (0 == memcmp (&zero_pid, &pid, sizeof (pid))) { FPRINTF (stderr, - _("Option `%s' makes no sense without option `%s'.\n"), - "-D", "-p"); + _ ("Option `%s' makes no sense without option `%s'.\n"), + "-D", + "-p"); ret = 1; return; } - blacklist = GNUNET_TRANSPORT_blacklist (cfg, - &blacklist_cb, - NULL); + blacklist = GNUNET_TRANSPORT_blacklist (cfg, &blacklist_cb, NULL); if (NULL == blacklist) { FPRINTF (stderr, "%s", - _("Failed to connect to transport service for disconnection\n")); + _ ( + "Failed to connect to transport service for disconnection\n")); ret = 1; return; } FPRINTF (stdout, "%s", - _("Blacklisting request in place, stop with CTRL-C\n")); + _ ("Blacklisting request in place, stop with CTRL-C\n")); } else if (benchmark_send) /* -s: Benchmark sending */ { - if (0 == memcmp (&zero_pid, - &pid, - sizeof (pid))) + if (0 == memcmp (&zero_pid, &pid, sizeof (pid))) { FPRINTF (stderr, - _("Option `%s' makes no sense without option `%s'.\n"), - "-s", "-p"); + _ ("Option `%s' makes no sense without option `%s'.\n"), + "-s", + "-p"); ret = 1; return; } handle = GNUNET_TRANSPORT_core_connect (cfg, - NULL, - NULL, - NULL, - ¬ify_connect, - ¬ify_disconnect, - NULL); + NULL, + NULL, + NULL, + ¬ify_connect, + ¬ify_disconnect, + NULL); if (NULL == handle) { - FPRINTF (stderr, - "%s", - _("Failed to connect to transport service\n")); + FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n")); ret = 1; return; } start_time = GNUNET_TIME_absolute_get (); - op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, - &operation_timeout, - NULL); + op_timeout = + GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL); } else if (benchmark_receive) /* -b: Benchmark receiving */ { - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_var_size (dummy, - GNUNET_MESSAGE_TYPE_DUMMY, - struct GNUNET_MessageHeader, - NULL), - GNUNET_MQ_handler_end () - }; + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_var_size (dummy, + GNUNET_MESSAGE_TYPE_DUMMY, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_handler_end ()}; handle = GNUNET_TRANSPORT_core_connect (cfg, - NULL, - handlers, - NULL, - NULL, - NULL, - NULL); + NULL, + handlers, + NULL, + NULL, + NULL, + NULL); if (NULL == handle) { - FPRINTF (stderr, - "%s", - _("Failed to connect to transport service\n")); + FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n")); ret = 1; return; } if (verbosity > 0) - FPRINTF (stdout, - "%s", - _("Starting to receive benchmark data\n")); + FPRINTF (stdout, "%s", _ ("Starting to receive benchmark data\n")); start_time = GNUNET_TIME_absolute_get (); - } else if (iterate_connections) /* -i: List information about peers once */ { @@ -1370,42 +1304,38 @@ run (void *cls, GNUNET_YES, &process_peer_iteration_cb, (void *) cfg); - op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, - &operation_timeout, - NULL); + op_timeout = + GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL); } - else if (monitor_connections) /* -m: List information about peers continuously */ + else if (monitor_connections) /* -m: List information about peers continuously + */ { - monitored_peers = GNUNET_CONTAINER_multipeermap_create (10, - GNUNET_NO); + monitored_peers = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); pic = GNUNET_TRANSPORT_monitor_peers (cfg, - &pid, + &pid, GNUNET_NO, &process_peer_monitoring_cb, NULL); } - else if (monitor_plugins) /* -P: List information about plugins continuously */ + else if (monitor_plugins) /* -P: List information about plugins continuously + */ { monitored_plugins = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); - pm = GNUNET_TRANSPORT_monitor_plugins (cfg, - &plugin_monitoring_cb, - NULL); + pm = GNUNET_TRANSPORT_monitor_plugins (cfg, &plugin_monitoring_cb, NULL); } else if (monitor_connects) /* -e : Monitor (dis)connect events continuously */ { monitor_connect_counter = 0; handle = GNUNET_TRANSPORT_core_connect (cfg, - NULL, - NULL, - NULL, - &monitor_notify_connect, - &monitor_notify_disconnect, - NULL); + NULL, + NULL, + NULL, + &monitor_notify_connect, + &monitor_notify_disconnect, + NULL); if (NULL == handle) { - FPRINTF (stderr, - "%s", - _("Failed to connect to transport service\n")); + FPRINTF (stderr, "%s", _ ("Failed to connect to transport service\n")); ret = 1; return; } @@ -1413,75 +1343,86 @@ run (void *cls, } else { - GNUNET_break(0); + GNUNET_break (0); return; } - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - NULL); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); } int -main (int argc, - char * const *argv) +main (int argc, char *const *argv) { int res; - struct GNUNET_GETOPT_CommandLineOption options[] = { - GNUNET_GETOPT_option_flag ('a', - "all", - gettext_noop ("print information for all peers (instead of only connected peers)"), - &iterate_all), - GNUNET_GETOPT_option_flag ('b', - "benchmark", - gettext_noop ("measure how fast we are receiving data from all peers (until CTRL-C)"), - &benchmark_receive), - GNUNET_GETOPT_option_flag ('D', - "disconnect", - gettext_noop ("disconnect from a peer"), - &do_disconnect), - GNUNET_GETOPT_option_flag ('i', - "information", - gettext_noop ("provide information about all current connections (once)"), - &iterate_connections), - GNUNET_GETOPT_option_flag ('m', - "monitor", - gettext_noop ("provide information about all current connections (continuously)"), - &monitor_connections), - GNUNET_GETOPT_option_flag ('e', - "events", - gettext_noop ("provide information about all connects and disconnect events (continuously)"), - &monitor_connects), - GNUNET_GETOPT_option_flag ('n', - "numeric", - gettext_noop ("do not resolve hostnames"), - &numeric), - GNUNET_GETOPT_option_base32_auto ('p', - "peer", - "PEER", - gettext_noop ("peer identity"), - &pid), - GNUNET_GETOPT_option_flag ('P', - "plugins", - gettext_noop ("monitor plugin sessions"), - &monitor_plugins), - GNUNET_GETOPT_option_flag ('s', - "send", - gettext_noop - ("send data for benchmarking to the other peer (until CTRL-C)"), - &benchmark_send), - GNUNET_GETOPT_option_verbose (&verbosity), - GNUNET_GETOPT_OPTION_END - }; + struct GNUNET_GETOPT_CommandLineOption options[] = + {GNUNET_GETOPT_option_flag ( + 'a', + "all", + gettext_noop ( + "print information for all peers (instead of only connected peers)"), + &iterate_all), + GNUNET_GETOPT_option_flag ( + 'b', + "benchmark", + gettext_noop ( + "measure how fast we are receiving data from all peers (until CTRL-C)"), + &benchmark_receive), + GNUNET_GETOPT_option_flag ('D', + "disconnect", + gettext_noop ("disconnect from a peer"), + &do_disconnect), + GNUNET_GETOPT_option_flag ( + 'i', + "information", + gettext_noop ( + "provide information about all current connections (once)"), + &iterate_connections), + GNUNET_GETOPT_option_flag ( + 'm', + "monitor", + gettext_noop ( + "provide information about all current connections (continuously)"), + &monitor_connections), + GNUNET_GETOPT_option_flag ( + 'e', + "events", + gettext_noop ( + "provide information about all connects and disconnect events (continuously)"), + &monitor_connects), + GNUNET_GETOPT_option_flag ('n', + "numeric", + gettext_noop ("do not resolve hostnames"), + &numeric), + GNUNET_GETOPT_option_base32_auto ('p', + "peer", + "PEER", + gettext_noop ("peer identity"), + &pid), + GNUNET_GETOPT_option_flag ('P', + "plugins", + gettext_noop ("monitor plugin sessions"), + &monitor_plugins), + GNUNET_GETOPT_option_flag ( + 's', + "send", + gettext_noop ( + "send data for benchmarking to the other peer (until CTRL-C)"), + &benchmark_send), + GNUNET_GETOPT_option_verbose (&verbosity), + GNUNET_GETOPT_OPTION_END}; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) return 2; - res = GNUNET_PROGRAM_run (argc, argv, - "gnunet-transport", - gettext_noop ("Direct access to transport service."), - options, - &run, NULL); + res = + GNUNET_PROGRAM_run (argc, + argv, + "gnunet-transport", + gettext_noop ("Direct access to transport service."), + options, + &run, + NULL); GNUNET_free ((void *) argv); if (GNUNET_OK == res) return ret; diff --git a/src/transport/transport-testing.h b/src/transport/transport-testing.h index 4629d6125..83bbf277b 100644 --- a/src/transport/transport-testing.h +++ b/src/transport/transport-testing.h @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . @@ -30,7 +30,6 @@ #include "gnunet_util_lib.h" #include "gnunet_hello_lib.h" #include "gnunet_transport_service.h" -#include "gnunet_transport_core_service.h" #include "gnunet_transport_hello_service.h" #include "gnunet_transport_manipulation_service.h" #include "gnunet_testing_lib.h" @@ -143,7 +142,7 @@ struct GNUNET_TRANSPORT_TESTING_PeerContext * Closure for @e start_cb. */ void *start_cb_cls; - + /** * An unique number to identify the peer */ @@ -207,12 +206,12 @@ struct GNUNET_TRANSPORT_TESTING_ConnectRequest */ struct GNUNET_MQ_Handle *mq; - /** + /** * Set if peer1 says the connection is up to peer2. */ int p1_c; - /** + /** * Set if peer2 says the connection is up to peer1. */ int p2_c; @@ -289,15 +288,16 @@ GNUNET_TRANSPORT_TESTING_done (struct GNUNET_TRANSPORT_TESTING_Handle *tth); * @return the peer context */ struct GNUNET_TRANSPORT_TESTING_PeerContext * -GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_Handle *tth, - const char *cfgname, - int peer_id, - const struct GNUNET_MQ_MessageHandler *handlers, - GNUNET_TRANSPORT_NotifyConnect nc, - GNUNET_TRANSPORT_NotifyDisconnect nd, - void *cb_cls, - GNUNET_SCHEDULER_TaskCallback start_cb, - void *start_cb_cls); +GNUNET_TRANSPORT_TESTING_start_peer ( + struct GNUNET_TRANSPORT_TESTING_Handle *tth, + const char *cfgname, + int peer_id, + const struct GNUNET_MQ_MessageHandler *handlers, + GNUNET_TRANSPORT_NotifyConnect nc, + GNUNET_TRANSPORT_NotifyDisconnect nd, + void *cb_cls, + GNUNET_SCHEDULER_TaskCallback start_cb, + void *start_cb_cls); /** @@ -306,7 +306,8 @@ GNUNET_TRANSPORT_TESTING_start_peer (struct GNUNET_TRANSPORT_TESTING_Handle *tth * @param p the peer */ void -GNUNET_TRANSPORT_TESTING_stop_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext *pc); +GNUNET_TRANSPORT_TESTING_stop_peer ( + struct GNUNET_TRANSPORT_TESTING_PeerContext *pc); /** @@ -318,10 +319,10 @@ GNUNET_TRANSPORT_TESTING_stop_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext * @return #GNUNET_OK in success otherwise #GNUNET_SYSERR */ int -GNUNET_TRANSPORT_TESTING_restart_peer (struct GNUNET_TRANSPORT_TESTING_PeerContext *p, - GNUNET_SCHEDULER_TaskCallback restart_cb, - void *restart_cb_cls); - +GNUNET_TRANSPORT_TESTING_restart_peer ( + struct GNUNET_TRANSPORT_TESTING_PeerContext *p, + GNUNET_SCHEDULER_TaskCallback restart_cb, + void *restart_cb_cls); /** @@ -331,15 +332,17 @@ GNUNET_TRANSPORT_TESTING_restart_peer (struct GNUNET_TRANSPORT_TESTING_PeerConte * * @param p1 peer 1 * @param p2 peer 2 - * @param cb the callback to call when both peers notified that they are connected + * @param cb the callback to call when both peers notified that they are + * connected * @param cls callback cls * @return a connect request handle */ struct GNUNET_TRANSPORT_TESTING_ConnectRequest * -GNUNET_TRANSPORT_TESTING_connect_peers (struct GNUNET_TRANSPORT_TESTING_PeerContext *p1, - struct GNUNET_TRANSPORT_TESTING_PeerContext *p2, - GNUNET_SCHEDULER_TaskCallback cb, - void *cls); +GNUNET_TRANSPORT_TESTING_connect_peers ( + struct GNUNET_TRANSPORT_TESTING_PeerContext *p1, + struct GNUNET_TRANSPORT_TESTING_PeerContext *p2, + GNUNET_SCHEDULER_TaskCallback cb, + void *cls); /** @@ -350,7 +353,8 @@ GNUNET_TRANSPORT_TESTING_connect_peers (struct GNUNET_TRANSPORT_TESTING_PeerCont * @param cc a connect request handle */ void -GNUNET_TRANSPORT_TESTING_connect_peers_cancel (struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc); +GNUNET_TRANSPORT_TESTING_connect_peers_cancel ( + struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc); /** @@ -359,9 +363,9 @@ GNUNET_TRANSPORT_TESTING_connect_peers_cancel (struct GNUNET_TRANSPORT_TESTING_C * @param cls closure * @param cc request matching the query */ -typedef void -(*GNUNET_TRANSPORT_TESTING_ConnectContextCallback)(void *cls, - struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc); +typedef void (*GNUNET_TRANSPORT_TESTING_ConnectContextCallback) ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_ConnectRequest *cc); /** @@ -369,14 +373,15 @@ typedef void * * @param p1 first peer * @param p2 second peer - * @param cb function to call + * @param cb function to call * @param cb_cls closure for @a cb */ void -GNUNET_TRANSPORT_TESTING_find_connecting_context (struct GNUNET_TRANSPORT_TESTING_PeerContext *p1, - struct GNUNET_TRANSPORT_TESTING_PeerContext *p2, - GNUNET_TRANSPORT_TESTING_ConnectContextCallback cb, - void *cb_cls); +GNUNET_TRANSPORT_TESTING_find_connecting_context ( + struct GNUNET_TRANSPORT_TESTING_PeerContext *p1, + struct GNUNET_TRANSPORT_TESTING_PeerContext *p2, + GNUNET_TRANSPORT_TESTING_ConnectContextCallback cb, + void *cb_cls); /* ********************** high-level process functions *************** */ @@ -390,10 +395,10 @@ GNUNET_TRANSPORT_TESTING_find_connecting_context (struct GNUNET_TRANSPORT_TESTIN * @param num_peers size of the @a p array * @param p the peers that were launched */ -typedef void -(*GNUNET_TRANSPORT_TESTING_ConnectContinuation)(void *cls, - unsigned int num_peers, - struct GNUNET_TRANSPORT_TESTING_PeerContext *p[]); +typedef void (*GNUNET_TRANSPORT_TESTING_ConnectContinuation) ( + void *cls, + unsigned int num_peers, + struct GNUNET_TRANSPORT_TESTING_PeerContext *p[]); /** @@ -423,7 +428,6 @@ struct GNUNET_TRANSPORT_TESTING_TestMessage GNUNET_NETWORK_STRUCT_END - /** * Function called by the transport for each received message. * @@ -432,11 +436,11 @@ GNUNET_NETWORK_STRUCT_END * @param sender sender of the message * @param message the message */ -typedef void -(*GNUNET_TRANSPORT_TESTING_ReceiveCallback) (void *cls, - struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver, - const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_TRANSPORT_TESTING_TestMessage *message); +typedef void (*GNUNET_TRANSPORT_TESTING_ReceiveCallback) ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_TRANSPORT_TESTING_TestMessage *message); /** @@ -447,10 +451,10 @@ typedef void * @param me peer experiencing the event * @param other peer that connected to @a me */ -typedef void -(*GNUNET_TRANSPORT_TESTING_NotifyConnect) (void *cls, - struct GNUNET_TRANSPORT_TESTING_PeerContext *me, - const struct GNUNET_PeerIdentity *other); +typedef void (*GNUNET_TRANSPORT_TESTING_NotifyConnect) ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_PeerContext *me, + const struct GNUNET_PeerIdentity *other); /** @@ -461,10 +465,10 @@ typedef void * @param me peer experiencing the event * @param other peer that disconnected from @a me */ -typedef void -(*GNUNET_TRANSPORT_TESTING_NotifyDisconnect) (void *cls, - struct GNUNET_TRANSPORT_TESTING_PeerContext *me, - const struct GNUNET_PeerIdentity *other); +typedef void (*GNUNET_TRANSPORT_TESTING_NotifyDisconnect) ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_PeerContext *me, + const struct GNUNET_PeerIdentity *other); /** @@ -593,7 +597,7 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext * message. */ uint32_t send_num_gen; - + /* ******* internal state, clients should not mess with this **** */ /** @@ -625,7 +629,6 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext * Array with @e num_peers entries. */ struct GNUNET_TRANSPORT_TESTING_InternalPeerContext *ip; - }; @@ -637,8 +640,9 @@ struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext * @return NULL if @a peer was not found */ struct GNUNET_TRANSPORT_TESTING_PeerContext * -GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext *ccc, - const struct GNUNET_PeerIdentity *peer); +GNUNET_TRANSPORT_TESTING_find_peer ( + struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext *ccc, + const struct GNUNET_PeerIdentity *peer); /** @@ -648,7 +652,8 @@ GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheck * abort the test, and a shutdown handler to clean up properly * on exit. * - * @param cls closure of type `struct GNUNET_TRANSPORT_TESTING_ConnectCheckContext` + * @param cls closure of type `struct + * GNUNET_TRANSPORT_TESTING_ConnectCheckContext` * @param tth_ initialized testing handle * @param test_plugin_ name of the plugin * @param test_name_ name of the test @@ -657,12 +662,13 @@ GNUNET_TRANSPORT_TESTING_find_peer (struct GNUNET_TRANSPORT_TESTING_ConnectCheck * @return #GNUNET_SYSERR on error */ int -GNUNET_TRANSPORT_TESTING_connect_check (void *cls, - struct GNUNET_TRANSPORT_TESTING_Handle *tth_, - const char *test_plugin_, - const char *test_name_, - unsigned int num_peers, - char *cfg_files[]); +GNUNET_TRANSPORT_TESTING_connect_check ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_Handle *tth_, + const char *test_plugin_, + const char *test_name_, + unsigned int num_peers, + char *cfg_files[]); /** @@ -677,13 +683,13 @@ GNUNET_TRANSPORT_TESTING_connect_check (void *cls, * @param cfg_files array of names of configuration files for the peers * @return #GNUNET_SYSERR on error */ -typedef int -(*GNUNET_TRANSPORT_TESTING_CheckCallback)(void *cls, - struct GNUNET_TRANSPORT_TESTING_Handle *tth_, - const char *test_plugin_, - const char *test_name_, - unsigned int num_peers, - char *cfg_files[]); +typedef int (*GNUNET_TRANSPORT_TESTING_CheckCallback) ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_Handle *tth_, + const char *test_plugin_, + const char *test_name_, + unsigned int num_peers, + char *cfg_files[]); /** @@ -712,8 +718,12 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0, * @param check_cls closure for @a check * @return #GNUNET_OK on success */ -#define GNUNET_TRANSPORT_TESTING_main(num_peers,check,check_cls) \ - GNUNET_TRANSPORT_TESTING_main_ (argv[0], __FILE__, num_peers, check, check_cls) +#define GNUNET_TRANSPORT_TESTING_main(num_peers, check, check_cls) \ + GNUNET_TRANSPORT_TESTING_main_ (argv[0], \ + __FILE__, \ + num_peers, \ + check, \ + check_cls) /* ***************** Convenience functions for sending ********* */ @@ -725,7 +735,8 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0, * @param sender the sending peer * @param receiver the receiving peer * @param mtype message type to use - * @param msize size of the message, at least `sizeof (struct GNUNET_TRANSPORT_TESTING_TestMessage)` + * @param msize size of the message, at least `sizeof (struct + * GNUNET_TRANSPORT_TESTING_TestMessage)` * @param num unique message number * @param cont continuation to call after transmission * @param cont_cls closure for @a cont @@ -734,13 +745,14 @@ GNUNET_TRANSPORT_TESTING_main_ (const char *argv0, * #GNUNET_SYSERR if @a msize is illegal */ int -GNUNET_TRANSPORT_TESTING_send (struct GNUNET_TRANSPORT_TESTING_PeerContext *sender, - struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver, - uint16_t mtype, - uint16_t msize, - uint32_t num, - GNUNET_SCHEDULER_TaskCallback cont, - void *cont_cls); +GNUNET_TRANSPORT_TESTING_send ( + struct GNUNET_TRANSPORT_TESTING_PeerContext *sender, + struct GNUNET_TRANSPORT_TESTING_PeerContext *receiver, + uint16_t mtype, + uint16_t msize, + uint32_t num, + GNUNET_SCHEDULER_TaskCallback cont, + void *cont_cls); /** @@ -771,14 +783,14 @@ struct GNUNET_TRANSPORT_TESTING_SendClosure * the message size, can be NULL in which case the message * size is the default. */ - size_t (*get_size_cb)(unsigned int n); - + size_t (*get_size_cb) (unsigned int n); + /** * Number of messages to be transmitted in a loop. * Use zero for "forever" (until external shutdown). */ unsigned int num_messages; - + /** * Function to call after all transmissions, can be NULL. */ @@ -788,12 +800,11 @@ struct GNUNET_TRANSPORT_TESTING_SendClosure * Closure for @e cont. */ void *cont_cls; - }; /** - * Task that sends a minimalistic test message from the + * Task that sends a minimalistic test message from the * first peer to the second peer. * * @param cls the `struct GNUNET_TRANSPORT_TESTING_SendClosure` @@ -804,14 +815,14 @@ void GNUNET_TRANSPORT_TESTING_simple_send (void *cls); /** - * Size of a message sent with + * Size of a message sent with * #GNUNET_TRANSPORT_TESTING_large_send(). Big enough * to usually force defragmentation. */ #define GNUNET_TRANSPORT_TESTING_LARGE_MESSAGE_SIZE 2600 /** - * Task that sends a large test message from the + * Task that sends a large test message from the * first peer to the second peer. * * @param cls the `struct GNUNET_TRANSPORT_TESTING_SendClosure` @@ -833,9 +844,10 @@ GNUNET_TRANSPORT_TESTING_large_send (void *cls); * @param other peer that connected. */ void -GNUNET_TRANSPORT_TESTING_log_connect (void *cls, - struct GNUNET_TRANSPORT_TESTING_PeerContext *me, - const struct GNUNET_PeerIdentity *other); +GNUNET_TRANSPORT_TESTING_log_connect ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_PeerContext *me, + const struct GNUNET_PeerIdentity *other); /** @@ -846,10 +858,10 @@ GNUNET_TRANSPORT_TESTING_log_connect (void *cls, * @param other peer that disconnected. */ void -GNUNET_TRANSPORT_TESTING_log_disconnect (void *cls, - struct GNUNET_TRANSPORT_TESTING_PeerContext *me, - const struct GNUNET_PeerIdentity *other); - +GNUNET_TRANSPORT_TESTING_log_disconnect ( + void *cls, + struct GNUNET_TRANSPORT_TESTING_PeerContext *me, + const struct GNUNET_PeerIdentity *other); /* ********************** low-level filename functions *************** */ @@ -875,8 +887,7 @@ GNUNET_TRANSPORT_TESTING_get_test_name (const char *file); * @return configuration name to use */ char * -GNUNET_TRANSPORT_TESTING_get_config_name (const char *file, - int count); +GNUNET_TRANSPORT_TESTING_get_config_name (const char *file, int count); /** diff --git a/src/transport/transport.h b/src/transport/transport.h index d2a3a262b..ed89940cc 100644 --- a/src/transport/transport.h +++ b/src/transport/transport.h @@ -123,10 +123,21 @@ struct ConnectInfoMessage */ struct GNUNET_MessageHeader header; +#if (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \ + defined(GNUNET_TRANSPORT_CORE_VERSION)) + + /** + * Always zero, for alignment. + */ + uint32_t reserved GNUNET_PACKED; + +#else + /** * Current outbound quota for this peer */ struct GNUNET_BANDWIDTH_Value32NBO quota_out; +#endif /** * Identity of the new neighbour. @@ -163,6 +174,8 @@ struct DisconnectInfoMessage * Message used to set a particular bandwidth quota. Sent TO the * service to set an incoming quota, sent FROM the service to update * an outgoing quota. + * + * NOTE: no longer used in TNG! */ struct QuotaSetMessage { @@ -215,6 +228,13 @@ struct SendOkMessage */ struct GNUNET_MessageHeader header; +#if (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \ + defined(GNUNET_TRANSPORT_CORE_VERSION)) + + uint32_t reserved GNUNET_PACKED; + +#else + /** * #GNUNET_OK if the transmission succeeded, * #GNUNET_SYSERR if it failed (i.e. network disconnect); @@ -229,11 +249,13 @@ struct SendOkMessage uint16_t bytes_msg GNUNET_PACKED; /** - * Size of message sent over wire - * Includes plugin and protocol specific overhead + * Size of message sent over wire. + * Includes plugin and protocol specific overheads. */ uint32_t bytes_physical GNUNET_PACKED; +#endif + /** * Which peer can send more now? */ @@ -241,6 +263,32 @@ struct SendOkMessage }; +/** + * Message used to notify the transport API that it can + * send another message to the transport service. + * (Used to implement flow control.) + */ +struct RecvOkMessage +{ + + /** + * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK + */ + struct GNUNET_MessageHeader header; + + /** + * Number of messages by which to increase the window, greater or + * equal to one. + */ + uint32_t increase_window_delta GNUNET_PACKED; + + /** + * Which peer can CORE handle more from now? + */ + struct GNUNET_PeerIdentity peer; +}; + + /** * Message used to notify the transport service about a message * to be transmitted to another peer. The actual message follows. @@ -258,10 +306,14 @@ struct OutboundMessage */ uint32_t reserved GNUNET_PACKED; +#if ! (defined(GNUNET_TRANSPORT_COMMUNICATION_VERSION) || \ + defined(GNUNET_TRANSPORT_CORE_VERSION)) + /** * Allowed delay. */ struct GNUNET_TIME_RelativeNBO timeout; +#endif /** * Which peer should receive the message? diff --git a/src/transport/transport_api2_core.c b/src/transport/transport_api2_core.c index f00d00a44..a3c49e94f 100644 --- a/src/transport/transport_api2_core.c +++ b/src/transport/transport_api2_core.c @@ -32,13 +32,23 @@ #include "gnunet_transport_core_service.h" #include "transport.h" -#define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__) +#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__) /** * How large to start with for the hashmap of neighbours. */ #define STARTING_NEIGHBOURS_SIZE 16 +/** + * Window size. How many messages to the same target do we pass + * to TRANSPORT without a SEND_OK in between? Small values limit + * thoughput, large values will increase latency. + * + * FIXME-OPTIMIZE: find out what good values are experimentally, + * maybe set adaptively (i.e. to observed available bandwidth). + */ +#define SEND_WINDOW_SIZE 4 + /** * Entry in hash table of all of our current (connected) neighbours. @@ -72,46 +82,27 @@ struct Neighbour void *handlers_cls; /** - * Entry in our readyness heap (which is sorted by @e next_ready - * value). NULL if there is no pending transmission request for - * this neighbour or if we're waiting for @e is_ready to become - * true AFTER the @e out_tracker suggested that this peer's quota - * has been satisfied (so once @e is_ready goes to #GNUNET_YES, - * we should immediately go back into the heap). + * How many messages can we still send to this peer before we should + * throttle? */ - struct GNUNET_CONTAINER_HeapNode *hn; + unsigned int ready_window; /** - * Task to trigger MQ when we have enough bandwidth for the - * next transmission. + * Used to indicate our status if @e env is non-NULL. Set to + * #GNUNET_YES if we did pass a message to the MQ and are waiting + * for the call to #notify_send_done(). Set to #GNUNET_NO if the @e + * ready_window is 0 and @e env is waiting for a + * #GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK? */ - struct GNUNET_SCHEDULER_Task *timeout_task; - - /** - * Outbound bandwidh tracker. - */ - struct GNUNET_BANDWIDTH_Tracker out_tracker; - - /** - * Sending consumed more bytes on wire than payload was announced - * This overhead is added to the delay of next sending operation - */ - unsigned long long traffic_overhead; - - /** - * Is this peer currently ready to receive a message? - */ - int is_ready; + int16_t awaiting_done; /** * Size of the message in @e env. */ uint16_t env_size; - }; - /** * Handle for the transport service (includes all of the * state for the transport service). @@ -140,11 +131,6 @@ struct GNUNET_TRANSPORT_CoreHandle */ GNUNET_TRANSPORT_NotifyDisconnect nd_cb; - /** - * function to call on excess bandwidth events - */ - GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb; - /** * My client connection to the transport service. */ @@ -181,7 +167,6 @@ struct GNUNET_TRANSPORT_CoreHandle * (if #GNUNET_NO, then @e self is all zeros!). */ int check_self; - }; @@ -206,31 +191,7 @@ static struct Neighbour * neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h, const struct GNUNET_PeerIdentity *peer) { - return GNUNET_CONTAINER_multipeermap_get (h->neighbours, - peer); -} - - -/** - * Function called by the bandwidth tracker if we have excess - * bandwidth. - * - * @param cls the `struct Neighbour` that has excess bandwidth - */ -static void -notify_excess_cb (void *cls) -{ - struct Neighbour *n = cls; - struct GNUNET_TRANSPORT_CoreHandle *h = n->h; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Notifying CORE that more bandwidth is available for %s\n", - GNUNET_i2s (&n->id)); - - if (NULL != h->neb_cb) - h->neb_cb (h->cls, - &n->id, - n->handlers_cls); + return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer); } @@ -245,9 +206,7 @@ notify_excess_cb (void *cls) * #GNUNET_NO if not. */ static int -neighbour_delete (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value) { struct GNUNET_TRANSPORT_CoreHandle *handle = cls; struct Neighbour *n = value; @@ -255,16 +214,8 @@ neighbour_delete (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping entry for neighbour `%s'.\n", GNUNET_i2s (key)); - GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker); if (NULL != handle->nd_cb) - handle->nd_cb (handle->cls, - &n->id, - n->handlers_cls); - if (NULL != n->timeout_task) - { - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = NULL; - } + handle->nd_cb (handle->cls, &n->id, n->handlers_cls); if (NULL != n->env) { GNUNET_MQ_send_cancel (n->env); @@ -272,10 +223,9 @@ neighbour_delete (void *cls, } GNUNET_MQ_destroy (n->mq); GNUNET_assert (NULL == n->mq); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, - key, - n)); + GNUNET_assert ( + GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n)); GNUNET_free (n); return GNUNET_YES; } @@ -291,8 +241,7 @@ neighbour_delete (void *cls, * @param error error code */ static void -mq_error_handler (void *cls, - enum GNUNET_MQ_Error error) +mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; @@ -306,57 +255,42 @@ mq_error_handler (void *cls, * A message from the handler's message queue to a neighbour was * transmitted. Now trigger (possibly delayed) notification of the * neighbour's message queue that we are done and thus ready for - * the next message. + * the next message. Note that the MQ being ready is independent + * of the send window, as we may queue many messages and simply + * not pass them to TRANSPORT if the send window is insufficient. * * @param cls the `struct Neighbour` where the message was sent */ static void -notify_send_done_fin (void *cls) +notify_send_done (void *cls) { struct Neighbour *n = cls; - n->timeout_task = NULL; - n->is_ready = GNUNET_YES; + n->awaiting_done = GNUNET_NO; + n->env = NULL; GNUNET_MQ_impl_send_continue (n->mq); } /** - * A message from the handler's message queue to a neighbour was - * transmitted. Now trigger (possibly delayed) notification of the - * neighbour's message queue that we are done and thus ready for - * the next message. + * We have an envelope waiting for transmission at @a n, and + * our transmission window is positive. Perform the transmission. * - * @param cls the `struct Neighbour` where the message was sent + * @param n neighbour to perform transmission for */ static void -notify_send_done (void *cls) +do_send (struct Neighbour *n) { - struct Neighbour *n = cls; - struct GNUNET_TIME_Relative delay; - - n->timeout_task = NULL; - if (NULL != n->env) - { - GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, - n->env_size + n->traffic_overhead); - n->env = NULL; - n->traffic_overhead = 0; - } - delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, - 128); - if (0 == delay.rel_value_us) - { - n->is_ready = GNUNET_YES; - GNUNET_MQ_impl_send_continue (n->mq); - return; - } - GNUNET_MQ_impl_send_in_flight (n->mq); - /* cannot send even a small message without violating - quota, wait a before allowing MQ to send next message */ - n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, - ¬ify_send_done_fin, - n); + GNUNET_assert (0 < n->ready_window); + GNUNET_assert (NULL != n->env); + n->ready_window--; + n->awaiting_done = GNUNET_YES; + GNUNET_MQ_notify_sent (n->env, ¬ify_send_done, n); + GNUNET_MQ_send (n->h->mq, n->env); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Passed message of type %u for neighbour `%s' to TRANSPORT.\n", + ntohs (GNUNET_MQ_env_get_msg (n->env)->type), + GNUNET_i2s (&n->id)); } @@ -376,11 +310,9 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct Neighbour *n = impl_state; - struct GNUNET_TRANSPORT_CoreHandle *h = n->h; struct OutboundMessage *obm; uint16_t msize; - GNUNET_assert (GNUNET_YES == n->is_ready); msize = ntohs (msg->size); if (msize >= GNUNET_MAX_MESSAGE_SIZE - sizeof (*obm)) { @@ -388,25 +320,24 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq, GNUNET_MQ_impl_send_continue (mq); return; } - GNUNET_assert (NULL == n->env); - n->env = GNUNET_MQ_msg_nested_mh (obm, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, - msg); - obm->reserved = htonl (0); - obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */ - obm->peer = n->id; - GNUNET_assert (NULL == n->timeout_task); - n->is_ready = GNUNET_NO; - n->env_size = ntohs (msg->size); - GNUNET_MQ_notify_sent (n->env, - ¬ify_send_done, - n); - GNUNET_MQ_send (h->mq, - n->env); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queued message of type %u for neighbour `%s'.\n", + "CORE requested transmission of message of type %u to neighbour `%s'.\n", ntohs (msg->type), GNUNET_i2s (&n->id)); + + GNUNET_assert (NULL == n->env); + n->env = + GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg); + n->env_size = ntohs (msg->size); + obm->reserved = htonl (0); + obm->peer = n->id; + if (0 == n->ready_window) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Flow control delays transmission to CORE until we see SEND_OK.\n"); + return; /* can't send yet, need to wait for SEND_OK */ + } + do_send (n); } @@ -418,8 +349,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq, * @param impl_state state of the implementation */ static void -mq_destroy_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) +mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct Neighbour *n = impl_state; @@ -436,19 +366,22 @@ mq_destroy_impl (struct GNUNET_MQ_Handle *mq, * @param impl_state state specific to the implementation */ static void -mq_cancel_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) +mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct Neighbour *n = impl_state; - GNUNET_assert (GNUNET_NO == n->is_ready); - if (NULL != n->env) + n->ready_window++; + if (GNUNET_YES == n->awaiting_done) { GNUNET_MQ_send_cancel (n->env); n->env = NULL; + n->awaiting_done = GNUNET_NO; + } + else + { + GNUNET_assert (0 == n->ready_window); + n->env = NULL; } - - n->is_ready = GNUNET_YES; } @@ -461,8 +394,7 @@ mq_cancel_impl (struct GNUNET_MQ_Handle *mq, * @param error error code */ static void -peer_mq_error_handler (void *cls, - enum GNUNET_MQ_Error error) +peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { /* struct Neighbour *n = cls; */ @@ -470,29 +402,6 @@ peer_mq_error_handler (void *cls, } -/** - * The outbound quota has changed in a way that may require - * us to reset the timeout. Update the timeout. - * - * @param cls the `struct Neighbour` for which the timeout changed - */ -static void -outbound_bw_tracker_update (void *cls) -{ - struct Neighbour *n = cls; - struct GNUNET_TIME_Relative delay; - - if (NULL == n->timeout_task) - return; - delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, - 128); - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, - ¬ify_send_done, - n); -} - - /** * Function we use for handling incoming connect messages. * @@ -500,18 +409,15 @@ outbound_bw_tracker_update (void *cls) * @param cim message received */ static void -handle_connect (void *cls, - const struct ConnectInfoMessage *cim) +handle_connect (void *cls, const struct ConnectInfoMessage *cim) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving CONNECT message for `%s' with quota %u\n", - GNUNET_i2s (&cim->id), - ntohl (cim->quota_out.value__)); - n = neighbour_find (h, - &cim->id); + "Receiving CONNECT message for `%s'\n", + GNUNET_i2s (&cim->id)); + n = neighbour_find (h, &cim->id); if (NULL != n) { GNUNET_break (0); @@ -521,23 +427,14 @@ handle_connect (void *cls, n = GNUNET_new (struct Neighbour); n->id = cim->id; n->h = h; - n->is_ready = GNUNET_YES; - n->traffic_overhead = 0; - GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker, - &outbound_bw_tracker_update, - n, - GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, - MAX_BANDWIDTH_CARRY_S, - ¬ify_excess_cb, - n); + n->ready_window = SEND_WINDOW_SIZE; GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_put (h->neighbours, - &n->id, - n, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_CONTAINER_multipeermap_put ( + h->neighbours, + &n->id, + n, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - cim->quota_out); n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl, &mq_destroy_impl, &mq_cancel_impl, @@ -547,11 +444,8 @@ handle_connect (void *cls, n); if (NULL != h->nc_cb) { - n->handlers_cls = h->nc_cb (h->cls, - &n->id, - n->mq); - GNUNET_MQ_set_handlers_closure (n->mq, - n->handlers_cls); + n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq); + GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls); } } @@ -563,8 +457,7 @@ handle_connect (void *cls, * @param dim message received */ static void -handle_disconnect (void *cls, - const struct DisconnectInfoMessage *dim) +handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; @@ -573,18 +466,14 @@ handle_disconnect (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving DISCONNECT message for `%s'.\n", GNUNET_i2s (&dim->peer)); - n = neighbour_find (h, - &dim->peer); + n = neighbour_find (h, &dim->peer); if (NULL == n) { GNUNET_break (0); disconnect_and_schedule_reconnect (h); return; } - GNUNET_assert (GNUNET_YES == - neighbour_delete (h, - &dim->peer, - n)); + GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n)); } @@ -595,24 +484,15 @@ handle_disconnect (void *cls, * @param okm message received */ static void -handle_send_ok (void *cls, - const struct SendOkMessage *okm) +handle_send_ok (void *cls, const struct SendOkMessage *okm) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; - uint16_t bytes_msg; - uint32_t bytes_physical; - bytes_msg = ntohs (okm->bytes_msg); - bytes_physical = ntohl (okm->bytes_physical); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving SEND_OK message, transmission to %s %s.\n", - GNUNET_i2s (&okm->peer), - (GNUNET_OK == ntohs (okm->success)) - ? "succeeded" - : "failed"); - n = neighbour_find (h, - &okm->peer); + "Receiving SEND_OK message for transmission to %s\n", + GNUNET_i2s (&okm->peer)); + n = neighbour_find (h, &okm->peer); if (NULL == n) { /* We should never get a 'SEND_OK' for a peer that we are not @@ -621,14 +501,9 @@ handle_send_ok (void *cls, disconnect_and_schedule_reconnect (h); return; } - if (bytes_physical > bytes_msg) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Overhead for %u byte message was %u\n", - (unsigned int) bytes_msg, - (unsigned int) (bytes_physical - bytes_msg)); - n->traffic_overhead += bytes_physical - bytes_msg; - } + n->ready_window++; + if ((NULL != n->env) && (1 == n->ready_window)) + do_send (n); } @@ -639,8 +514,7 @@ handle_send_ok (void *cls, * @param im message received */ static int -check_recv (void *cls, - const struct InboundMessage *im) +check_recv (void *cls, const struct InboundMessage *im) { const struct GNUNET_MessageHeader *imm; uint16_t size; @@ -668,12 +542,11 @@ check_recv (void *cls, * @param im message received */ static void -handle_recv (void *cls, - const struct InboundMessage *im) +handle_recv (void *cls, const struct InboundMessage *im) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; - const struct GNUNET_MessageHeader *imm - = (const struct GNUNET_MessageHeader *) &im[1]; + const struct GNUNET_MessageHeader *imm = + (const struct GNUNET_MessageHeader *) &im[1]; struct Neighbour *n; LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -681,46 +554,14 @@ handle_recv (void *cls, (unsigned int) ntohs (imm->type), (unsigned int) ntohs (imm->size), GNUNET_i2s (&im->peer)); - n = neighbour_find (h, - &im->peer); - if (NULL == n) - { - GNUNET_break (0); - disconnect_and_schedule_reconnect (h); - return; - } - GNUNET_MQ_inject_message (n->mq, - imm); -} - - -/** - * Function we use for handling incoming set quota messages. - * - * @param cls closure, a `struct GNUNET_TRANSPORT_CoreHandle *` - * @param msg message received - */ -static void -handle_set_quota (void *cls, - const struct QuotaSetMessage *qm) -{ - struct GNUNET_TRANSPORT_CoreHandle *h = cls; - struct Neighbour *n; - - n = neighbour_find (h, - &qm->peer); + n = neighbour_find (h, &im->peer); if (NULL == n) { GNUNET_break (0); disconnect_and_schedule_reconnect (h); return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Receiving SET_QUOTA message for `%s' with quota %u\n", - GNUNET_i2s (&qm->peer), - (unsigned int) ntohl (qm->quota.value__)); - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - qm->quota); + GNUNET_MQ_inject_message (n->mq, imm); } @@ -733,46 +574,36 @@ static void reconnect (void *cls) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_fixed_size (connect, - GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, - struct ConnectInfoMessage, - h), - GNUNET_MQ_hd_fixed_size (disconnect, - GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, - struct DisconnectInfoMessage, - h), - GNUNET_MQ_hd_fixed_size (send_ok, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, - struct SendOkMessage, - h), - GNUNET_MQ_hd_var_size (recv, - GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, - struct InboundMessage, - h), - GNUNET_MQ_hd_fixed_size (set_quota, - GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, - struct QuotaSetMessage, - h), - GNUNET_MQ_handler_end () - }; + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_fixed_size (connect, + GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, + struct ConnectInfoMessage, + h), + GNUNET_MQ_hd_fixed_size (disconnect, + GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, + struct DisconnectInfoMessage, + h), + GNUNET_MQ_hd_fixed_size (send_ok, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, + struct SendOkMessage, + h), + GNUNET_MQ_hd_var_size (recv, + GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, + struct InboundMessage, + h), + GNUNET_MQ_handler_end ()}; struct GNUNET_MQ_Envelope *env; struct StartMessage *s; uint32_t options; h->reconnect_task = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to transport service.\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); GNUNET_assert (NULL == h->mq); - h->mq = GNUNET_CLIENT_connect (h->cfg, - "transport", - handlers, - &mq_error_handler, - h); + h->mq = + GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h); if (NULL == h->mq) return; - env = GNUNET_MQ_msg (s, - GNUNET_MESSAGE_TYPE_TRANSPORT_START); + env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START); options = 0; if (h->check_self) options |= 1; @@ -780,8 +611,7 @@ reconnect (void *cls) options |= 2; s->options = htonl (options); s->self = h->self; - GNUNET_MQ_send (h->mq, - env); + GNUNET_MQ_send (h->mq, env); } @@ -793,9 +623,7 @@ reconnect (void *cls) static void disconnect (struct GNUNET_TRANSPORT_CoreHandle *h) { - GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, - &neighbour_delete, - h); + GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h); if (NULL != h->mq) { GNUNET_MQ_destroy (h->mq); @@ -817,12 +645,9 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h) disconnect (h); LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduling task to reconnect to transport service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, - GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES)); h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, - &reconnect, - h); + GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); } @@ -840,14 +665,52 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, { struct Neighbour *n; - n = neighbour_find (handle, - peer); + n = neighbour_find (handle, peer); if (NULL == n) return NULL; return n->mq; } +/** + * Notification from the CORE service to the TRANSPORT service + * that the CORE service has finished processing a message from + * TRANSPORT (via the @code{handlers} of #GNUNET_TRANSPORT_core_connect()) + * and that it is thus now OK for TRANSPORT to send more messages + * for @a pid. + * + * Used to provide flow control, this is our equivalent to + * #GNUNET_SERVICE_client_continue() of an ordinary service. + * + * Note that due to the use of a window, TRANSPORT may send multiple + * messages destined for the same peer even without an intermediate + * call to this function. However, CORE must still call this function + * once per message received, as otherwise eventually the window will + * be full and TRANSPORT will stop providing messages to CORE for @a + * pid. + * + * @param ch core handle + * @param pid which peer was the message from that was fully processed by CORE + */ +void +GNUNET_TRANSPORT_core_receive_continue (struct GNUNET_TRANSPORT_CoreHandle *ch, + const struct GNUNET_PeerIdentity *pid) +{ + struct GNUNET_MQ_Envelope *env; + struct RecvOkMessage *rok; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Message for %s finished CORE processing, sending RECV_OK.\n", + GNUNET_i2s (pid)); + if (NULL == ch->mq) + return; + env = GNUNET_MQ_msg (rok, GNUNET_MESSAGE_TYPE_TRANSPORT_RECV_OK); + rok->increase_window_delta = htonl (1); + rok->peer = *pid; + GNUNET_MQ_send (ch->mq, env); +} + + /** * Connect to the transport service. Note that the connection may * complete (or fail) asynchronously. @@ -859,17 +722,15 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, * @param rec receive function to call * @param nc function to call on connect events * @param nd function to call on disconnect events - * @param neb function to call if we have excess bandwidth to a peer * @return NULL on error */ struct GNUNET_TRANSPORT_CoreHandle * GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_PeerIdentity *self, - const struct GNUNET_MQ_MessageHandler *handlers, - void *cls, - GNUNET_TRANSPORT_NotifyConnect nc, - GNUNET_TRANSPORT_NotifyDisconnect nd, - GNUNET_TRANSPORT_NotifyExcessBandwidth neb) + const struct GNUNET_PeerIdentity *self, + const struct GNUNET_MQ_MessageHandler *handlers, + void *cls, + GNUNET_TRANSPORT_NotifyConnect nc, + GNUNET_TRANSPORT_NotifyDisconnect nd) { struct GNUNET_TRANSPORT_CoreHandle *h; unsigned int i; @@ -884,19 +745,17 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, h->cls = cls; h->nc_cb = nc; h->nd_cb = nd; - h->neb_cb = neb; h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; if (NULL != handlers) { - for (i=0;NULL != handlers[i].cb; i++) ; - h->handlers = GNUNET_new_array (i + 1, - struct GNUNET_MQ_MessageHandler); + for (i = 0; NULL != handlers[i].cb; i++) + ; + h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler); GNUNET_memcpy (h->handlers, - handlers, - i * sizeof (struct GNUNET_MQ_MessageHandler)); + handlers, + i * sizeof (struct GNUNET_MQ_MessageHandler)); } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to transport service\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n"); reconnect (h); if (NULL == h->mq) { @@ -905,8 +764,7 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, return NULL; } h->neighbours = - GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, - GNUNET_YES); + GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES); return h; } @@ -914,13 +772,13 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, /** * Disconnect from the transport service. * - * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect() + * @param handle handle to the service as returned from + * #GNUNET_TRANSPORT_core_connect() */ void GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transport disconnect called!\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n"); /* this disconnects all neighbours... */ disconnect (handle); /* and now we stop trying to connect again... */ diff --git a/src/transport/transport_api_core.c b/src/transport/transport_api_core.c index e86499173..a163d7ccf 100644 --- a/src/transport/transport_api_core.c +++ b/src/transport/transport_api_core.c @@ -29,11 +29,10 @@ #include "gnunet_arm_service.h" #include "gnunet_hello_lib.h" #include "gnunet_protocols.h" -#include "gnunet_transport_core_service.h" #include "gnunet_transport_service.h" #include "transport.h" -#define LOG(kind,...) GNUNET_log_from (kind, "transport-api-core",__VA_ARGS__) +#define LOG(kind, ...) GNUNET_log_from (kind, "transport-api-core", __VA_ARGS__) /** * If we could not send any payload to a peer for this amount of @@ -113,11 +112,9 @@ struct Neighbour * Size of the message in @e env. */ uint16_t env_size; - }; - /** * Handle for the transport service (includes all of the * state for the transport service). @@ -187,7 +184,6 @@ struct GNUNET_TRANSPORT_CoreHandle * (if #GNUNET_NO, then @e self is all zeros!). */ int check_self; - }; @@ -212,8 +208,7 @@ static struct Neighbour * neighbour_find (struct GNUNET_TRANSPORT_CoreHandle *h, const struct GNUNET_PeerIdentity *peer) { - return GNUNET_CONTAINER_multipeermap_get (h->neighbours, - peer); + return GNUNET_CONTAINER_multipeermap_get (h->neighbours, peer); } @@ -234,9 +229,7 @@ notify_excess_cb (void *cls) GNUNET_i2s (&n->id)); if (NULL != h->neb_cb) - h->neb_cb (h->cls, - &n->id, - n->handlers_cls); + h->neb_cb (h->cls, &n->id, n->handlers_cls); } @@ -251,9 +244,7 @@ notify_excess_cb (void *cls) * #GNUNET_NO if not. */ static int -neighbour_delete (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +neighbour_delete (void *cls, const struct GNUNET_PeerIdentity *key, void *value) { struct GNUNET_TRANSPORT_CoreHandle *handle = cls; struct Neighbour *n = value; @@ -263,9 +254,7 @@ neighbour_delete (void *cls, GNUNET_i2s (key)); GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker); if (NULL != handle->nd_cb) - handle->nd_cb (handle->cls, - &n->id, - n->handlers_cls); + handle->nd_cb (handle->cls, &n->id, n->handlers_cls); if (NULL != n->timeout_task) { GNUNET_SCHEDULER_cancel (n->timeout_task); @@ -278,10 +267,9 @@ neighbour_delete (void *cls, } GNUNET_MQ_destroy (n->mq); GNUNET_assert (NULL == n->mq); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, - key, - n)); + GNUNET_assert ( + GNUNET_YES == + GNUNET_CONTAINER_multipeermap_remove (handle->neighbours, key, n)); GNUNET_free (n); return GNUNET_YES; } @@ -297,8 +285,7 @@ neighbour_delete (void *cls, * @param error error code */ static void -mq_error_handler (void *cls, - enum GNUNET_MQ_Error error) +mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; @@ -317,14 +304,12 @@ mq_error_handler (void *cls, * @return #GNUNET_OK if message is well-formed */ static int -check_hello (void *cls, - const struct GNUNET_MessageHeader *msg) +check_hello (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_PeerIdentity me; if (GNUNET_OK != - GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, - &me)) + GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg, &me)) { GNUNET_break (0); return GNUNET_SYSERR; @@ -340,8 +325,7 @@ check_hello (void *cls, * @param msg message received */ static void -handle_hello (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_hello (void *cls, const struct GNUNET_MessageHeader *msg) { /* we do not care => FIXME: signal in options to NEVER send HELLOs! */ } @@ -388,8 +372,7 @@ notify_send_done (void *cls) n->env = NULL; n->traffic_overhead = 0; } - delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, - 128); + delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128); if (0 == delay.rel_value_us) { n->is_ready = GNUNET_YES; @@ -399,9 +382,8 @@ notify_send_done (void *cls) GNUNET_MQ_impl_send_in_flight (n->mq); /* cannot send even a small message without violating quota, wait a before allowing MQ to send next message */ - n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, - ¬ify_send_done_fin, - n); + n->timeout_task = + GNUNET_SCHEDULER_add_delayed (delay, ¬ify_send_done_fin, n); } @@ -434,20 +416,17 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq, return; } GNUNET_assert (NULL == n->env); - n->env = GNUNET_MQ_msg_nested_mh (obm, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, - msg); + n->env = + GNUNET_MQ_msg_nested_mh (obm, GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, msg); obm->reserved = htonl (0); - obm->timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */ + obm->timeout = GNUNET_TIME_relative_hton ( + GNUNET_TIME_UNIT_MINUTES); /* FIXME: to be removed */ obm->peer = n->id; GNUNET_assert (NULL == n->timeout_task); n->is_ready = GNUNET_NO; n->env_size = ntohs (msg->size); - GNUNET_MQ_notify_sent (n->env, - ¬ify_send_done, - n); - GNUNET_MQ_send (h->mq, - n->env); + GNUNET_MQ_notify_sent (n->env, ¬ify_send_done, n); + GNUNET_MQ_send (h->mq, n->env); LOG (GNUNET_ERROR_TYPE_DEBUG, "Queued message of type %u for neighbour `%s'.\n", ntohs (msg->type), @@ -463,8 +442,7 @@ mq_send_impl (struct GNUNET_MQ_Handle *mq, * @param impl_state state of the implementation */ static void -mq_destroy_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) +mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct Neighbour *n = impl_state; @@ -481,8 +459,7 @@ mq_destroy_impl (struct GNUNET_MQ_Handle *mq, * @param impl_state state specific to the implementation */ static void -mq_cancel_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) +mq_cancel_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { struct Neighbour *n = impl_state; @@ -506,8 +483,7 @@ mq_cancel_impl (struct GNUNET_MQ_Handle *mq, * @param error error code */ static void -peer_mq_error_handler (void *cls, - enum GNUNET_MQ_Error error) +peer_mq_error_handler (void *cls, enum GNUNET_MQ_Error error) { /* struct Neighbour *n = cls; */ @@ -529,12 +505,9 @@ outbound_bw_tracker_update (void *cls) if (NULL == n->timeout_task) return; - delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, - 128); + delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, 128); GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, - ¬ify_send_done, - n); + n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay, ¬ify_send_done, n); } @@ -545,8 +518,7 @@ outbound_bw_tracker_update (void *cls) * @param cim message received */ static void -handle_connect (void *cls, - const struct ConnectInfoMessage *cim) +handle_connect (void *cls, const struct ConnectInfoMessage *cim) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; @@ -555,8 +527,7 @@ handle_connect (void *cls, "Receiving CONNECT message for `%s' with quota %u\n", GNUNET_i2s (&cim->id), ntohl (cim->quota_out.value__)); - n = neighbour_find (h, - &cim->id); + n = neighbour_find (h, &cim->id); if (NULL != n) { GNUNET_break (0); /* FIXME: this assertion seems to fail sometimes!? */ @@ -576,13 +547,13 @@ handle_connect (void *cls, ¬ify_excess_cb, n); GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_put (h->neighbours, - &n->id, - n, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_CONTAINER_multipeermap_put ( + h->neighbours, + &n->id, + n, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - cim->quota_out); + GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, cim->quota_out); n->mq = GNUNET_MQ_queue_for_callbacks (&mq_send_impl, &mq_destroy_impl, &mq_cancel_impl, @@ -592,11 +563,8 @@ handle_connect (void *cls, n); if (NULL != h->nc_cb) { - n->handlers_cls = h->nc_cb (h->cls, - &n->id, - n->mq); - GNUNET_MQ_set_handlers_closure (n->mq, - n->handlers_cls); + n->handlers_cls = h->nc_cb (h->cls, &n->id, n->mq); + GNUNET_MQ_set_handlers_closure (n->mq, n->handlers_cls); } } @@ -608,8 +576,7 @@ handle_connect (void *cls, * @param dim message received */ static void -handle_disconnect (void *cls, - const struct DisconnectInfoMessage *dim) +handle_disconnect (void *cls, const struct DisconnectInfoMessage *dim) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; @@ -625,10 +592,7 @@ handle_disconnect (void *cls, disconnect_and_schedule_reconnect (h); return; } - GNUNET_assert (GNUNET_YES == - neighbour_delete (h, - &dim->peer, - n)); + GNUNET_assert (GNUNET_YES == neighbour_delete (h, &dim->peer, n)); } @@ -639,8 +603,7 @@ handle_disconnect (void *cls, * @param okm message received */ static void -handle_send_ok (void *cls, - const struct SendOkMessage *okm) +handle_send_ok (void *cls, const struct SendOkMessage *okm) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; @@ -653,8 +616,7 @@ handle_send_ok (void *cls, "Receiving SEND_OK message, transmission to %s %s.\n", GNUNET_i2s (&okm->peer), ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed"); - n = neighbour_find (h, - &okm->peer); + n = neighbour_find (h, &okm->peer); if (NULL == n) { /* We should never get a 'SEND_OK' for a peer that we are not @@ -681,8 +643,7 @@ handle_send_ok (void *cls, * @param im message received */ static int -check_recv (void *cls, - const struct InboundMessage *im) +check_recv (void *cls, const struct InboundMessage *im) { const struct GNUNET_MessageHeader *imm; uint16_t size; @@ -710,12 +671,11 @@ check_recv (void *cls, * @param im message received */ static void -handle_recv (void *cls, - const struct InboundMessage *im) +handle_recv (void *cls, const struct InboundMessage *im) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; - const struct GNUNET_MessageHeader *imm - = (const struct GNUNET_MessageHeader *) &im[1]; + const struct GNUNET_MessageHeader *imm = + (const struct GNUNET_MessageHeader *) &im[1]; struct Neighbour *n; LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -730,8 +690,7 @@ handle_recv (void *cls, disconnect_and_schedule_reconnect (h); return; } - GNUNET_MQ_inject_message (n->mq, - imm); + GNUNET_MQ_inject_message (n->mq, imm); } @@ -742,8 +701,7 @@ handle_recv (void *cls, * @param msg message received */ static void -handle_set_quota (void *cls, - const struct QuotaSetMessage *qm) +handle_set_quota (void *cls, const struct QuotaSetMessage *qm) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; struct Neighbour *n; @@ -752,16 +710,15 @@ handle_set_quota (void *cls, "Receiving SET_QUOTA message for `%s' with quota %u\n", GNUNET_i2s (&qm->peer), ntohl (qm->quota.value__)); - n = neighbour_find (h, - &qm->peer); + n = neighbour_find (h, &qm->peer); if (NULL == n) { - GNUNET_break (0); /* FIXME: julius reports this assertion fails sometimes? */ + GNUNET_break ( + 0); /* FIXME: julius reports this assertion fails sometimes? */ disconnect_and_schedule_reconnect (h); return; } - GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, - qm->quota); + GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker, qm->quota); } @@ -774,50 +731,44 @@ static void reconnect (void *cls) { struct GNUNET_TRANSPORT_CoreHandle *h = cls; - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_var_size (hello, - GNUNET_MESSAGE_TYPE_HELLO, - struct GNUNET_MessageHeader, - h), - GNUNET_MQ_hd_fixed_size (connect, - GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, - struct ConnectInfoMessage, - h), - GNUNET_MQ_hd_fixed_size (disconnect, - GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, - struct DisconnectInfoMessage, - h), - GNUNET_MQ_hd_fixed_size (send_ok, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, - struct SendOkMessage, - h), - GNUNET_MQ_hd_var_size (recv, - GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, - struct InboundMessage, - h), - GNUNET_MQ_hd_fixed_size (set_quota, - GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, - struct QuotaSetMessage, - h), - GNUNET_MQ_handler_end () - }; + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_var_size (hello, + GNUNET_MESSAGE_TYPE_HELLO, + struct GNUNET_MessageHeader, + h), + GNUNET_MQ_hd_fixed_size (connect, + GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT, + struct ConnectInfoMessage, + h), + GNUNET_MQ_hd_fixed_size (disconnect, + GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT, + struct DisconnectInfoMessage, + h), + GNUNET_MQ_hd_fixed_size (send_ok, + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK, + struct SendOkMessage, + h), + GNUNET_MQ_hd_var_size (recv, + GNUNET_MESSAGE_TYPE_TRANSPORT_RECV, + struct InboundMessage, + h), + GNUNET_MQ_hd_fixed_size (set_quota, + GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA, + struct QuotaSetMessage, + h), + GNUNET_MQ_handler_end ()}; struct GNUNET_MQ_Envelope *env; struct StartMessage *s; uint32_t options; h->reconnect_task = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to transport service.\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n"); GNUNET_assert (NULL == h->mq); - h->mq = GNUNET_CLIENT_connect (h->cfg, - "transport", - handlers, - &mq_error_handler, - h); + h->mq = + GNUNET_CLIENT_connect (h->cfg, "transport", handlers, &mq_error_handler, h); if (NULL == h->mq) return; - env = GNUNET_MQ_msg (s, - GNUNET_MESSAGE_TYPE_TRANSPORT_START); + env = GNUNET_MQ_msg (s, GNUNET_MESSAGE_TYPE_TRANSPORT_START); options = 0; if (h->check_self) options |= 1; @@ -825,8 +776,7 @@ reconnect (void *cls) options |= 2; s->options = htonl (options); s->self = h->self; - GNUNET_MQ_send (h->mq, - env); + GNUNET_MQ_send (h->mq, env); } @@ -841,9 +791,7 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h) { GNUNET_assert (NULL == h->reconnect_task); /* Forget about all neighbours that we used to be connected to */ - GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, - &neighbour_delete, - h); + GNUNET_CONTAINER_multipeermap_iterate (h->neighbours, &neighbour_delete, h); if (NULL != h->mq) { GNUNET_MQ_destroy (h->mq); @@ -851,12 +799,9 @@ disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_CoreHandle *h) } LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduling task to reconnect to transport service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, - GNUNET_YES)); + GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, GNUNET_YES)); h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, - &reconnect, - h); + GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h); h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); } @@ -874,8 +819,7 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, { struct Neighbour *n; - n = neighbour_find (handle, - peer); + n = neighbour_find (handle, peer); if (NULL == n) return NULL; return n->mq; @@ -898,12 +842,12 @@ GNUNET_TRANSPORT_core_get_mq (struct GNUNET_TRANSPORT_CoreHandle *handle, */ struct GNUNET_TRANSPORT_CoreHandle * GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_PeerIdentity *self, - const struct GNUNET_MQ_MessageHandler *handlers, - void *cls, - GNUNET_TRANSPORT_NotifyConnect nc, - GNUNET_TRANSPORT_NotifyDisconnect nd, - GNUNET_TRANSPORT_NotifyExcessBandwidth neb) + const struct GNUNET_PeerIdentity *self, + const struct GNUNET_MQ_MessageHandler *handlers, + void *cls, + GNUNET_TRANSPORT_NotifyConnect nc, + GNUNET_TRANSPORT_NotifyDisconnect nd, + GNUNET_TRANSPORT_NotifyExcessBandwidth neb) { struct GNUNET_TRANSPORT_CoreHandle *h; unsigned int i; @@ -922,15 +866,14 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; if (NULL != handlers) { - for (i=0;NULL != handlers[i].cb; i++) ; - h->handlers = GNUNET_new_array (i + 1, - struct GNUNET_MQ_MessageHandler); + for (i = 0; NULL != handlers[i].cb; i++) + ; + h->handlers = GNUNET_new_array (i + 1, struct GNUNET_MQ_MessageHandler); GNUNET_memcpy (h->handlers, - handlers, - i * sizeof (struct GNUNET_MQ_MessageHandler)); + handlers, + i * sizeof (struct GNUNET_MQ_MessageHandler)); } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to transport service\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service\n"); reconnect (h); if (NULL == h->mq) { @@ -939,8 +882,7 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, return NULL; } h->neighbours = - GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, - GNUNET_YES); + GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE, GNUNET_YES); return h; } @@ -948,13 +890,13 @@ GNUNET_TRANSPORT_core_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, /** * Disconnect from the transport service. * - * @param handle handle to the service as returned from #GNUNET_TRANSPORT_core_connect() + * @param handle handle to the service as returned from + * #GNUNET_TRANSPORT_core_connect() */ void GNUNET_TRANSPORT_core_disconnect (struct GNUNET_TRANSPORT_CoreHandle *handle) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transport disconnect called!\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n"); /* this disconnects all neighbours... */ if (NULL == handle->reconnect_task) disconnect_and_schedule_reconnect (handle); -- cgit v1.2.3