From 186199e3b42e2d9ead8072b605b06b9e76619084 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 3 Apr 2019 20:39:57 +0200 Subject: resume transmission on idle queues upon incoming message (TNG) --- src/transport/gnunet-service-tng.c | 537 +++++++++++++++++++------------------ 1 file changed, 276 insertions(+), 261 deletions(-) (limited to 'src') diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c index 7d7d04375..b64bfb182 100644 --- a/src/transport/gnunet-service-tng.c +++ b/src/transport/gnunet-service-tng.c @@ -42,10 +42,10 @@ * effective flow control (for uni-directional transports!) * #4 UDP broadcasting logic must be extended to use the new API * #5 only validated addresses go to ATS for scheduling; that - * also ensures we know the RTT + * also ensures we know the RTT * #6 to ensure flow control and RTT are OK, we always do the * 'validation', even if address comes from PEERSTORE - * #7 + * #7 * - ACK handling / retransmission * - address verification * - track RTT, distance, loss, etc. @@ -1497,7 +1497,7 @@ static struct Neighbour * lookup_neighbour (const struct GNUNET_PeerIdentity *pid) { return GNUNET_CONTAINER_multipeermap_get (neighbours, - pid); + pid); } @@ -1561,9 +1561,9 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh) if (NULL == dv->dv_head) { GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (dv_routes, - &dv->target, - dv)); + GNUNET_CONTAINER_multipeermap_remove (dv_routes, + &dv->target, + dv)); if (NULL != dv->timeout_task) GNUNET_SCHEDULER_cancel (dv->timeout_task); GNUNET_free (dv); @@ -1602,18 +1602,18 @@ free_dv_route (struct DistanceVector *dv) */ static void notify_monitor (struct TransportClient *tc, - const struct GNUNET_PeerIdentity *peer, - const char *address, - enum GNUNET_NetworkType nt, - const struct MonitorEvent *me) + const struct GNUNET_PeerIdentity *peer, + const char *address, + enum GNUNET_NetworkType nt, + const struct MonitorEvent *me) { struct GNUNET_MQ_Envelope *env; struct GNUNET_TRANSPORT_MonitorData *md; size_t addr_len = strlen (address) + 1; env = GNUNET_MQ_msg_extra (md, - addr_len, - GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA); + addr_len, + GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA); md->nt = htonl ((uint32_t) nt); md->peer = *peer; md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation); @@ -1624,10 +1624,10 @@ notify_monitor (struct TransportClient *tc, md->num_msg_pending = htonl (me->num_msg_pending); md->num_bytes_pending = htonl (me->num_bytes_pending); memcpy (&md[1], - address, - addr_len); + address, + addr_len); GNUNET_MQ_send (tc->mq, - env); + env); } @@ -1642,9 +1642,9 @@ notify_monitor (struct TransportClient *tc, */ static void notify_monitors (const struct GNUNET_PeerIdentity *peer, - const char *address, - enum GNUNET_NetworkType nt, - const struct MonitorEvent *me) + const char *address, + enum GNUNET_NetworkType nt, + const struct MonitorEvent *me) { static struct GNUNET_PeerIdentity zero; @@ -1657,17 +1657,17 @@ notify_monitors (const struct GNUNET_PeerIdentity *peer, if (tc->details.monitor.one_shot) continue; if ( (0 != memcmp (&tc->details.monitor.peer, - &zero, - sizeof (zero))) && - (0 != memcmp (&tc->details.monitor.peer, - peer, - sizeof (*peer))) ) + &zero, + sizeof (zero))) && + (0 != memcmp (&tc->details.monitor.peer, + peer, + sizeof (*peer))) ) continue; notify_monitor (tc, - peer, - address, - nt, - me); + peer, + address, + nt, + me); } } @@ -1683,8 +1683,8 @@ notify_monitors (const struct GNUNET_PeerIdentity *peer, */ static void * client_connect_cb (void *cls, - struct GNUNET_SERVICE_Client *client, - struct GNUNET_MQ_Handle *mq) + struct GNUNET_SERVICE_Client *client, + struct GNUNET_MQ_Handle *mq) { struct TransportClient *tc; @@ -1712,11 +1712,11 @@ free_reassembly_context (struct ReassemblyContext *rc) struct Neighbour *n = rc->neighbour; GNUNET_assert (rc == - GNUNET_CONTAINER_heap_remove_node (rc->hn)); + GNUNET_CONTAINER_heap_remove_node (rc->hn)); GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map, - &rc->msg_uuid, - rc)); + GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map, + &rc->msg_uuid, + rc)); GNUNET_free (rc); } @@ -1742,8 +1742,8 @@ reassembly_cleanup_task (void *cls) } GNUNET_assert (NULL == n->reassembly_timeout_task); n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout, - &reassembly_cleanup_task, - n); + &reassembly_cleanup_task, + n); return; } } @@ -1783,16 +1783,16 @@ free_neighbour (struct Neighbour *neighbour) GNUNET_assert (NULL == neighbour->session_head); GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (neighbours, - &neighbour->pid, - neighbour)); + GNUNET_CONTAINER_multipeermap_remove (neighbours, + &neighbour->pid, + neighbour)); if (NULL != neighbour->timeout_task) GNUNET_SCHEDULER_cancel (neighbour->timeout_task); if (NULL != neighbour->reassembly_map) { GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map, - &free_reassembly_cb, - NULL); + &free_reassembly_cb, + NULL); GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map); neighbour->reassembly_map = NULL; GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap); @@ -1815,15 +1815,15 @@ free_neighbour (struct Neighbour *neighbour) */ static void core_send_connect_info (struct TransportClient *tc, - const struct GNUNET_PeerIdentity *pid, - struct GNUNET_BANDWIDTH_Value32NBO quota_out) + const struct GNUNET_PeerIdentity *pid, + struct GNUNET_BANDWIDTH_Value32NBO quota_out) { struct GNUNET_MQ_Envelope *env; struct ConnectInfoMessage *cim; GNUNET_assert (CT_CORE == tc->type); env = GNUNET_MQ_msg (cim, - GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); + GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT); cim->quota_out = quota_out; cim->id = *pid; GNUNET_MQ_send (tc->mq, @@ -1839,7 +1839,7 @@ core_send_connect_info (struct TransportClient *tc, */ static void cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, - struct GNUNET_BANDWIDTH_Value32NBO quota_out) + struct GNUNET_BANDWIDTH_Value32NBO quota_out) { for (struct TransportClient *tc = clients_head; NULL != tc; @@ -1848,8 +1848,8 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid, if (CT_CORE != tc->type) continue; core_send_connect_info (tc, - pid, - quota_out); + pid, + quota_out); } } @@ -1872,10 +1872,10 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid) if (CT_CORE != tc->type) continue; env = GNUNET_MQ_msg (dim, - GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); + GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT); dim->peer = *pid; GNUNET_MQ_send (tc->mq, - env); + env); } } @@ -1910,20 +1910,21 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) unsigned int wsize; GNUNET_assert (NULL != pm); - if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT) + if (queue->tc->details.communicator.total_queue_length >= + COMMUNICATOR_TOTAL_QUEUE_LIMIT) { GNUNET_STATISTICS_update (GST_stats, - "# Transmission throttled due to communicator queue limit", - 1, - GNUNET_NO); + "# Transmission throttled due to communicator queue limit", + 1, + GNUNET_NO); return; } if (queue->queue_length >= SESSION_QUEUE_LIMIT) { GNUNET_STATISTICS_update (GST_stats, - "# Transmission throttled due to session queue limit", - 1, - GNUNET_NO); + "# Transmission throttled due to session queue limit", + 1, + GNUNET_NO); return; } @@ -1931,27 +1932,28 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue) ? pm->bytes_msg /* FIXME: add overheads? */ : queue->mtu; out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out, - wsize); + wsize); out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt), - out_delay); + out_delay); if (0 == out_delay.rel_value_us) return; /* we should run immediately! */ /* queue has changed since we were scheduled, reschedule again */ - queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay, - &transmit_on_queue, - queue); + queue->transmit_task + = GNUNET_SCHEDULER_add_delayed (out_delay, + &transmit_on_queue, + queue); if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Next transmission on queue `%s' in %s (high delay)\n", - queue->address, - GNUNET_STRINGS_relative_time_to_string (out_delay, - GNUNET_YES)); + "Next transmission on queue `%s' in %s (high delay)\n", + queue->address, + GNUNET_STRINGS_relative_time_to_string (out_delay, + GNUNET_YES)); else GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Next transmission on queue `%s' in %s\n", - queue->address, - GNUNET_STRINGS_relative_time_to_string (out_delay, - GNUNET_YES)); + "Next transmission on queue `%s' in %s\n", + queue->address, + GNUNET_STRINGS_relative_time_to_string (out_delay, + GNUNET_YES)); } @@ -1978,19 +1980,19 @@ free_session (struct GNUNET_ATS_Session *session) session->transmit_task = NULL; } GNUNET_CONTAINER_MDLL_remove (neighbour, - neighbour->session_head, - neighbour->session_tail, - session); + neighbour->session_head, + neighbour->session_tail, + session); GNUNET_CONTAINER_MDLL_remove (client, - tc->details.communicator.session_head, - tc->details.communicator.session_tail, - session); + tc->details.communicator.session_head, + tc->details.communicator.session_tail, + session); maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length); while (NULL != (qe = session->queue_head)) { GNUNET_CONTAINER_DLL_remove (session->queue_head, - session->queue_tail, - qe); + session->queue_tail, + qe); session->queue_length--; tc->details.communicator.total_queue_length--; GNUNET_free (qe); @@ -2001,18 +2003,18 @@ free_session (struct GNUNET_ATS_Session *session) { /* Communicator dropped below threshold, resume all queues */ GNUNET_STATISTICS_update (GST_stats, - "# Transmission throttled due to communicator queue limit", - -1, - GNUNET_NO); + "# Transmission throttled due to communicator queue limit", + -1, + GNUNET_NO); for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head; - NULL != s; - s = s->next_client) + NULL != s; + s = s->next_client) schedule_transmit_on_queue (s); } notify_monitors (&neighbour->pid, - session->address, - session->nt, - &me); + session->address, + session->nt, + &me); GNUNET_ATS_session_del (session->sr); GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in); GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out); @@ -2036,8 +2038,8 @@ free_address_list_entry (struct AddressListEntry *ale) struct TransportClient *tc = ale->tc; GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head, - tc->details.communicator.addr_tail, - ale); + tc->details.communicator.addr_tail, + ale); if (NULL != ale->sc) { GNUNET_PEERSTORE_store_cancel (ale->sc); @@ -2062,8 +2064,8 @@ free_address_list_entry (struct AddressListEntry *ale) */ static void client_disconnect_cb (void *cls, - struct GNUNET_SERVICE_Client *client, - void *app_ctx) + struct GNUNET_SERVICE_Client *client, + void *app_ctx) { struct TransportClient *tc = app_ctx; @@ -2083,11 +2085,11 @@ client_disconnect_cb (void *cls, while (NULL != (pm = tc->details.core.pending_msg_head)) { - GNUNET_CONTAINER_MDLL_remove (client, - tc->details.core.pending_msg_head, - tc->details.core.pending_msg_tail, - pm); - pm->client = NULL; + GNUNET_CONTAINER_MDLL_remove (client, + tc->details.core.pending_msg_head, + tc->details.core.pending_msg_tail, + pm); + pm->client = NULL; } } break; @@ -2121,15 +2123,15 @@ client_disconnect_cb (void *cls, */ static int notify_client_connect_info (void *cls, - const struct GNUNET_PeerIdentity *pid, - void *value) + const struct GNUNET_PeerIdentity *pid, + void *value) { struct TransportClient *tc = cls; struct Neighbour *neighbour = value; core_send_connect_info (tc, - pid, - neighbour->quota_out); + pid, + neighbour->quota_out); return GNUNET_OK; } @@ -2144,7 +2146,7 @@ notify_client_connect_info (void *cls, */ static void handle_client_start (void *cls, - const struct StartMessage *start) + const struct StartMessage *start) { struct TransportClient *tc = cls; uint32_t options; @@ -2169,8 +2171,8 @@ handle_client_start (void *cls, } tc->type = CT_CORE; GNUNET_CONTAINER_multipeermap_iterate (neighbours, - ¬ify_client_connect_info, - tc); + ¬ify_client_connect_info, + tc); GNUNET_SERVICE_client_continue (tc->client); } @@ -2183,7 +2185,7 @@ handle_client_start (void *cls, */ static int check_client_send (void *cls, - const struct OutboundMessage *obm) + const struct OutboundMessage *obm) { struct TransportClient *tc = cls; uint16_t size; @@ -2248,14 +2250,14 @@ free_pending_message (struct PendingMessage *pm) if (NULL != tc) { GNUNET_CONTAINER_MDLL_remove (client, - tc->details.core.pending_msg_head, - tc->details.core.pending_msg_tail, - pm); + tc->details.core.pending_msg_head, + tc->details.core.pending_msg_tail, + pm); } GNUNET_CONTAINER_MDLL_remove (neighbour, - target->pending_msg_head, - target->pending_msg_tail, - pm); + target->pending_msg_head, + target->pending_msg_tail, + pm); free_fragment_tree (pm); GNUNET_free_non_null (pm->bpm); GNUNET_free (pm); @@ -2276,8 +2278,8 @@ free_pending_message (struct PendingMessage *pm) */ static void client_send_response (struct PendingMessage *pm, - int success, - uint32_t bytes_physical) + int success, + uint32_t bytes_physical) { struct TransportClient *tc = pm->client; struct Neighbour *target = pm->target; @@ -2287,7 +2289,7 @@ client_send_response (struct PendingMessage *pm, if (NULL != tc) { env = GNUNET_MQ_msg (som, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); som->success = htonl ((uint32_t) success); som->bytes_msg = htons (pm->bytes_msg); som->bytes_physical = htonl (bytes_physical); @@ -2324,22 +2326,22 @@ check_queue_timeouts (void *cls) if (pos->timeout.abs_value_us <= now.abs_value_us) { GNUNET_STATISTICS_update (GST_stats, - "# messages dropped (timeout before confirmation)", - 1, - GNUNET_NO); + "# messages dropped (timeout before confirmation)", + 1, + GNUNET_NO); client_send_response (pm, GNUNET_NO, 0); continue; } earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout, - pos->timeout); + pos->timeout); } n->earliest_timeout = earliest_timeout; if (NULL != n->pending_msg_head) n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout, - &check_queue_timeouts, - n); + &check_queue_timeouts, + n); } @@ -2351,13 +2353,14 @@ check_queue_timeouts (void *cls) */ static void handle_client_send (void *cls, - const struct OutboundMessage *obm) + const struct OutboundMessage *obm) { struct TransportClient *tc = cls; struct PendingMessage *pm; const struct GNUNET_MessageHeader *obmm; struct Neighbour *target; uint32_t bytes_msg; + int was_empty; GNUNET_assert (CT_CORE == tc->type); obmm = (const struct GNUNET_MessageHeader *) &obm[1]; @@ -2373,36 +2376,37 @@ handle_client_send (void *cls, struct SendOkMessage *som; env = GNUNET_MQ_msg (som, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK); som->success = htonl (GNUNET_SYSERR); som->bytes_msg = htonl (bytes_msg); som->bytes_physical = htonl (0); som->peer = obm->peer; GNUNET_MQ_send (tc->mq, - env); + env); GNUNET_SERVICE_client_continue (tc->client); GNUNET_STATISTICS_update (GST_stats, - "# messages dropped (neighbour unknown)", - 1, - GNUNET_NO); + "# messages dropped (neighbour unknown)", + 1, + GNUNET_NO); return; } + was_empty = (NULL == target->pending_msg_head); pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg); pm->client = tc; pm->target = target; pm->bytes_msg = bytes_msg; pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout)); memcpy (&pm[1], - &obm[1], - bytes_msg); + &obm[1], + bytes_msg); GNUNET_CONTAINER_MDLL_insert (neighbour, - target->pending_msg_head, - target->pending_msg_tail, - pm); + target->pending_msg_head, + target->pending_msg_tail, + pm); GNUNET_CONTAINER_MDLL_insert (client, - tc->details.core.pending_msg_head, - tc->details.core.pending_msg_tail, - pm); + tc->details.core.pending_msg_head, + tc->details.core.pending_msg_tail, + pm); if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us) { target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us; @@ -2410,8 +2414,19 @@ handle_client_send (void *cls, GNUNET_SCHEDULER_cancel (target->timeout_task); target->timeout_task = GNUNET_SCHEDULER_add_at (target->earliest_timeout, - &check_queue_timeouts, - target); + &check_queue_timeouts, + target); + } + if (! was_empty) + return; /* all queues must already be busy */ + for (struct GNUNET_ATS_Session *queue = target->session_head; + NULL != queue; + queue = queue->next_neighbour) + { + /* try transmission on any queue that is idle */ + if (NULL == queue->transmit_task) + queue->transmit_task = GNUNET_SCHEDULER_add_now (&transmit_on_queue, + queue); } } @@ -3835,9 +3850,9 @@ transmit_on_queue (void *cls) respect that even if MTU is 0 for this queue */) ) s = fragment_message (s, - (0 == queue->mtu) - ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo) - : queue->mtu); + (0 == queue->mtu) + ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo) + : queue->mtu); if (NULL == s) { /* Fragmentation failed, try next message... */ @@ -3868,13 +3883,13 @@ transmit_on_queue (void *cls) smt->mid = qe->mid; smt->receiver = n->pid; memcpy (&smt[1], - &s[1], - s->bytes_msg); + &s[1], + s->bytes_msg); GNUNET_assert (CT_COMMUNICATOR == queue->tc->type); queue->queue_length++; queue->tc->details.communicator.total_queue_length++; GNUNET_MQ_send (queue->tc->mq, - env); + env); // FIXME: do something similar to the logic below // in defragmentation / reliability ACK handling! @@ -3886,8 +3901,8 @@ transmit_on_queue (void *cls) { /* Full message sent, and over reliabile channel */ client_send_response (pm, - GNUNET_YES, - pm->bytes_msg); + GNUNET_YES, + pm->bytes_msg); } else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) && (PMT_FRAGMENT_BOX == s->pmt) ) @@ -3898,9 +3913,9 @@ transmit_on_queue (void *cls) free_fragment_tree (s); pos = s->frag_parent; GNUNET_CONTAINER_MDLL_remove (frag, - pos->head_frag, - pos->tail_frag, - s); + pos->head_frag, + pos->tail_frag, + s); GNUNET_free (s); /* check if subtree is done */ while ( (NULL == pos->head_frag) && @@ -3910,9 +3925,9 @@ transmit_on_queue (void *cls) s = pos; pos = s->frag_parent; GNUNET_CONTAINER_MDLL_remove (frag, - pos->head_frag, - pos->tail_frag, - s); + pos->head_frag, + pos->tail_frag, + s); GNUNET_free (s); } @@ -3920,8 +3935,8 @@ transmit_on_queue (void *cls) if ( (NULL == pm->head_frag) && (pm->frag_off == pm->bytes_msg) ) client_send_response (pm, - GNUNET_YES, - pm->bytes_msg /* FIXME: calculate and add overheads! */); + GNUNET_YES, + pm->bytes_msg /* FIXME: calculate and add overheads! */); } else if (PMT_CORE != pm->pmt) { @@ -3941,25 +3956,25 @@ transmit_on_queue (void *cls) message urgency and size when delaying ACKs, etc.) */ s->next_attempt = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (queue->rtt, - 4)); + 4)); if (s == pm) { struct PendingMessage *pos; /* re-insert sort in neighbour list */ GNUNET_CONTAINER_MDLL_remove (neighbour, - neighbour->pending_msg_head, - neighbour->pending_msg_tail, - pm); + neighbour->pending_msg_head, + neighbour->pending_msg_tail, + pm); pos = neighbour->pending_msg_tail; while ( (NULL != pos) && (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) ) - pos = pos->prev_neighbour; + pos = pos->prev_neighbour; GNUNET_CONTAINER_MDLL_insert_after (neighbour, - neighbour->pending_msg_head, - neighbour->pending_msg_tail, - pos, - pm); + neighbour->pending_msg_head, + neighbour->pending_msg_tail, + pos, + pm); } else { @@ -3968,18 +3983,18 @@ transmit_on_queue (void *cls) struct PendingMessage *pos; GNUNET_CONTAINER_MDLL_remove (frag, - fp->head_frag, - fp->tail_frag, - s); + fp->head_frag, + fp->tail_frag, + s); pos = fp->tail_frag; while ( (NULL != pos) && (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) ) - pos = pos->prev_frag; + pos = pos->prev_frag; GNUNET_CONTAINER_MDLL_insert_after (frag, - fp->head_frag, - fp->tail_frag, - pos, - s); + fp->head_frag, + fp->tail_frag, + pos, + s); } } @@ -4028,9 +4043,9 @@ tracker_excess_out_cb (void *cls) from here via a message instead! */ /* TODO: maybe inform ATS at this point? */ GNUNET_STATISTICS_update (GST_stats, - "# Excess outbound bandwidth reported", - 1, - GNUNET_NO); + "# Excess outbound bandwidth reported", + 1, + GNUNET_NO); } @@ -4046,9 +4061,9 @@ tracker_excess_in_cb (void *cls) { /* TODO: maybe inform ATS at this point? */ GNUNET_STATISTICS_update (GST_stats, - "# Excess inbound bandwidth reported", - 1, - GNUNET_NO); + "# Excess inbound bandwidth reported", + 1, + GNUNET_NO); } @@ -4083,12 +4098,12 @@ handle_add_queue_message (void *cls, neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS; neighbour->pid = aqm->receiver; GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_put (neighbours, - &neighbour->pid, - neighbour, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_CONTAINER_multipeermap_put (neighbours, + &neighbour->pid, + neighbour, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); cores_send_connect_info (&neighbour->pid, - GNUNET_BANDWIDTH_ZERO); + GNUNET_BANDWIDTH_ZERO); } addr_len = ntohs (aqm->header.size) - sizeof (*aqm); addr = (const char *) &aqm[1]; @@ -4117,8 +4132,8 @@ handle_add_queue_message (void *cls, &tracker_excess_out_cb, queue); memcpy (&queue[1], - addr, - addr_len); + addr, + addr_len); /* notify ATS about new queue */ { struct GNUNET_ATS_Properties prop = { @@ -4129,10 +4144,10 @@ handle_add_queue_message (void *cls, }; queue->sr = GNUNET_ATS_session_add (ats, - &neighbour->pid, - queue->address, - queue, - &prop); + &neighbour->pid, + queue->address, + queue, + &prop); if (NULL == queue->sr) { /* This can only happen if the 'address' was way too long for ATS @@ -4159,18 +4174,18 @@ handle_add_queue_message (void *cls, }; notify_monitors (&neighbour->pid, - queue->address, - queue->nt, - &me); + queue->address, + queue->nt, + &me); } GNUNET_CONTAINER_MDLL_insert (neighbour, - neighbour->session_head, - neighbour->session_tail, - queue); + neighbour->session_head, + neighbour->session_tail, + queue); GNUNET_CONTAINER_MDLL_insert (client, - tc->details.communicator.session_head, - tc->details.communicator.session_tail, - queue); + tc->details.communicator.session_head, + tc->details.communicator.session_tail, + queue); GNUNET_SERVICE_client_continue (tc->client); } @@ -4273,21 +4288,21 @@ handle_send_message_ack (void *cls, { /* Communicator dropped below threshold, resume all queues */ GNUNET_STATISTICS_update (GST_stats, - "# Transmission throttled due to communicator queue limit", - -1, - GNUNET_NO); + "# Transmission throttled due to communicator queue limit", + -1, + GNUNET_NO); for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head; - NULL != session; - session = session->next_client) + NULL != session; + session = session->next_client) schedule_transmit_on_queue (session); } else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length) { /* queue dropped below threshold; only resume this one queue */ GNUNET_STATISTICS_update (GST_stats, - "# Transmission throttled due to session queue limit", - -1, - GNUNET_NO); + "# Transmission throttled due to session queue limit", + -1, + GNUNET_NO); schedule_transmit_on_queue (queue->session); } @@ -4361,8 +4376,8 @@ handle_monitor_start (void *cls, tc->details.monitor.peer = start->peer; tc->details.monitor.one_shot = ntohl (start->one_shot); GNUNET_CONTAINER_multipeermap_iterate (neighbours, - ¬ify_client_queues, - tc); + ¬ify_client_queues, + tc); GNUNET_SERVICE_client_mark_monitor (tc->client); GNUNET_SERVICE_client_continue (tc->client); } @@ -4414,8 +4429,8 @@ lookup_communicator (const char *prefix) return tc; } GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n", - prefix); + "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n", + prefix); return NULL; } @@ -4451,21 +4466,21 @@ ats_suggestion_cb (void *cls, if (NULL == tc) { GNUNET_STATISTICS_update (GST_stats, - "# ATS suggestions ignored due to missing communicator", - 1, - GNUNET_NO); + "# ATS suggestions ignored due to missing communicator", + 1, + GNUNET_NO); return; } /* forward suggestion for queue creation to communicator */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Request #%u for `%s' communicator to create queue to `%s'\n", - (unsigned int) idgen, - prefix, - address); + "Request #%u for `%s' communicator to create queue to `%s'\n", + (unsigned int) idgen, + prefix, + address); alen = strlen (address) + 1; env = GNUNET_MQ_msg_extra (cqm, - alen, - GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE); + alen, + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE); cqm->request_id = htonl (idgen++); cqm->receiver = *pid; memcpy (&cqm[1], @@ -4485,7 +4500,7 @@ ats_suggestion_cb (void *cls, */ static void handle_queue_create_ok (void *cls, - const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr) + const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr) { struct TransportClient *tc = cls; @@ -4496,12 +4511,12 @@ handle_queue_create_ok (void *cls, return; } GNUNET_STATISTICS_update (GST_stats, - "# ATS suggestions succeeded at communicator", - 1, - GNUNET_NO); + "# ATS suggestions succeeded at communicator", + 1, + GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Request #%u for communicator to create queue succeeded\n", - (unsigned int) ntohs (cqr->request_id)); + "Request #%u for communicator to create queue succeeded\n", + (unsigned int) ntohs (cqr->request_id)); GNUNET_SERVICE_client_continue (tc->client); } @@ -4527,12 +4542,12 @@ handle_queue_create_fail (void *cls, return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Request #%u for communicator to create queue failed\n", - (unsigned int) ntohs (cqr->request_id)); + "Request #%u for communicator to create queue failed\n", + (unsigned int) ntohs (cqr->request_id)); GNUNET_STATISTICS_update (GST_stats, - "# ATS suggestions failed in queue creation at communicator", - 1, - GNUNET_NO); + "# ATS suggestions failed in queue creation at communicator", + 1, + GNUNET_NO); GNUNET_SERVICE_client_continue (tc->client); } @@ -4601,8 +4616,8 @@ handle_address_consider_verify (void *cls, */ static int free_neighbour_cb (void *cls, - const struct GNUNET_PeerIdentity *pid, - void *value) + const struct GNUNET_PeerIdentity *pid, + void *value) { struct Neighbour *neighbour = value; @@ -4625,8 +4640,8 @@ free_neighbour_cb (void *cls, */ static int free_dv_routes_cb (void *cls, - const struct GNUNET_PeerIdentity *pid, - void *value) + const struct GNUNET_PeerIdentity *pid, + void *value) { struct DistanceVector *dv = value; @@ -4648,8 +4663,8 @@ free_dv_routes_cb (void *cls, */ static int free_ephemeral_cb (void *cls, - const struct GNUNET_PeerIdentity *pid, - void *value) + const struct GNUNET_PeerIdentity *pid, + void *value) { struct EphemeralCacheEntry *ece = value; @@ -4734,9 +4749,9 @@ run (void *cls, /* setup globals */ GST_cfg = c; neighbours = GNUNET_CONTAINER_multipeermap_create (1024, - GNUNET_YES); + GNUNET_YES); dv_routes = GNUNET_CONTAINER_multipeermap_create (1024, - GNUNET_YES); + GNUNET_YES); ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32, GNUNET_YES); ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); @@ -4790,50 +4805,50 @@ GNUNET_SERVICE_MAIN NULL, /* communication with core */ GNUNET_MQ_hd_fixed_size (client_start, - GNUNET_MESSAGE_TYPE_TRANSPORT_START, - struct StartMessage, - NULL), + GNUNET_MESSAGE_TYPE_TRANSPORT_START, + struct StartMessage, + NULL), GNUNET_MQ_hd_var_size (client_send, - GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, - struct OutboundMessage, - NULL), + GNUNET_MESSAGE_TYPE_TRANSPORT_SEND, + struct OutboundMessage, + NULL), /* communication with communicators */ GNUNET_MQ_hd_var_size (communicator_available, - GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR, - struct GNUNET_TRANSPORT_CommunicatorAvailableMessage, - NULL), + GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR, + struct GNUNET_TRANSPORT_CommunicatorAvailableMessage, + NULL), GNUNET_MQ_hd_var_size (communicator_backchannel, - GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL, - struct GNUNET_TRANSPORT_CommunicatorBackchannel, - NULL), + GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL, + struct GNUNET_TRANSPORT_CommunicatorBackchannel, + NULL), GNUNET_MQ_hd_var_size (add_address, - GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS, - struct GNUNET_TRANSPORT_AddAddressMessage, - NULL), + GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS, + struct GNUNET_TRANSPORT_AddAddressMessage, + NULL), GNUNET_MQ_hd_fixed_size (del_address, GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS, struct GNUNET_TRANSPORT_DelAddressMessage, NULL), GNUNET_MQ_hd_var_size (incoming_msg, - GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG, - struct GNUNET_TRANSPORT_IncomingMessage, - NULL), + GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG, + struct GNUNET_TRANSPORT_IncomingMessage, + NULL), GNUNET_MQ_hd_fixed_size (queue_create_ok, - GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK, - struct GNUNET_TRANSPORT_CreateQueueResponse, - NULL), + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK, + struct GNUNET_TRANSPORT_CreateQueueResponse, + NULL), GNUNET_MQ_hd_fixed_size (queue_create_fail, - GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL, - struct GNUNET_TRANSPORT_CreateQueueResponse, - NULL), + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL, + struct GNUNET_TRANSPORT_CreateQueueResponse, + NULL), GNUNET_MQ_hd_var_size (add_queue_message, - GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, - struct GNUNET_TRANSPORT_AddQueueMessage, - NULL), + GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP, + struct GNUNET_TRANSPORT_AddQueueMessage, + NULL), GNUNET_MQ_hd_var_size (address_consider_verify, - GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY, - struct GNUNET_TRANSPORT_AddressToVerify, - NULL), + GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY, + struct GNUNET_TRANSPORT_AddressToVerify, + NULL), GNUNET_MQ_hd_fixed_size (del_queue_message, GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN, struct GNUNET_TRANSPORT_DelQueueMessage, -- cgit v1.2.3