summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSchanzenbach, Martin <mschanzenbach@posteo.de>2019-04-03 21:16:47 +0200
committerSchanzenbach, Martin <mschanzenbach@posteo.de>2019-04-03 21:16:47 +0200
commitf13792325fc3f7e49ec2b0880eb4f1aa978e00d7 (patch)
tree1451ec07f6181707b224a165fb5171a02c85d93b /src
parentcd3db61291fd47b5acde1ae3add16c7ccfc2f996 (diff)
parent186199e3b42e2d9ead8072b605b06b9e76619084 (diff)
merge
Diffstat (limited to 'src')
-rw-r--r--src/transport/gnunet-service-tng.c537
1 files changed, 276 insertions, 261 deletions
diff --git a/src/transport/gnunet-service-tng.c b/src/transport/gnunet-service-tng.c
index 7d7d04375..b64bfb182 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -42,10 +42,10 @@
* effective flow control (for uni-directional transports!)
* #4 UDP broadcasting logic must be extended to use the new API
* #5 only validated addresses go to ATS for scheduling; that
- * also ensures we know the RTT
+ * also ensures we know the RTT
* #6 to ensure flow control and RTT are OK, we always do the
* 'validation', even if address comes from PEERSTORE
- * #7
+ * #7
* - ACK handling / retransmission
* - address verification
* - track RTT, distance, loss, etc.
@@ -1497,7 +1497,7 @@ static struct Neighbour *
lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
{
return GNUNET_CONTAINER_multipeermap_get (neighbours,
- pid);
+ pid);
}
@@ -1561,9 +1561,9 @@ free_distance_vector_hop (struct DistanceVectorHop *dvh)
if (NULL == dv->dv_head)
{
GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_remove (dv_routes,
- &dv->target,
- dv));
+ GNUNET_CONTAINER_multipeermap_remove (dv_routes,
+ &dv->target,
+ dv));
if (NULL != dv->timeout_task)
GNUNET_SCHEDULER_cancel (dv->timeout_task);
GNUNET_free (dv);
@@ -1602,18 +1602,18 @@ free_dv_route (struct DistanceVector *dv)
*/
static void
notify_monitor (struct TransportClient *tc,
- const struct GNUNET_PeerIdentity *peer,
- const char *address,
- enum GNUNET_NetworkType nt,
- const struct MonitorEvent *me)
+ const struct GNUNET_PeerIdentity *peer,
+ const char *address,
+ enum GNUNET_NetworkType nt,
+ const struct MonitorEvent *me)
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_TRANSPORT_MonitorData *md;
size_t addr_len = strlen (address) + 1;
env = GNUNET_MQ_msg_extra (md,
- addr_len,
- GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
+ addr_len,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_DATA);
md->nt = htonl ((uint32_t) nt);
md->peer = *peer;
md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
@@ -1624,10 +1624,10 @@ notify_monitor (struct TransportClient *tc,
md->num_msg_pending = htonl (me->num_msg_pending);
md->num_bytes_pending = htonl (me->num_bytes_pending);
memcpy (&md[1],
- address,
- addr_len);
+ address,
+ addr_len);
GNUNET_MQ_send (tc->mq,
- env);
+ env);
}
@@ -1642,9 +1642,9 @@ notify_monitor (struct TransportClient *tc,
*/
static void
notify_monitors (const struct GNUNET_PeerIdentity *peer,
- const char *address,
- enum GNUNET_NetworkType nt,
- const struct MonitorEvent *me)
+ const char *address,
+ enum GNUNET_NetworkType nt,
+ const struct MonitorEvent *me)
{
static struct GNUNET_PeerIdentity zero;
@@ -1657,17 +1657,17 @@ notify_monitors (const struct GNUNET_PeerIdentity *peer,
if (tc->details.monitor.one_shot)
continue;
if ( (0 != memcmp (&tc->details.monitor.peer,
- &zero,
- sizeof (zero))) &&
- (0 != memcmp (&tc->details.monitor.peer,
- peer,
- sizeof (*peer))) )
+ &zero,
+ sizeof (zero))) &&
+ (0 != memcmp (&tc->details.monitor.peer,
+ peer,
+ sizeof (*peer))) )
continue;
notify_monitor (tc,
- peer,
- address,
- nt,
- me);
+ peer,
+ address,
+ nt,
+ me);
}
}
@@ -1683,8 +1683,8 @@ notify_monitors (const struct GNUNET_PeerIdentity *peer,
*/
static void *
client_connect_cb (void *cls,
- struct GNUNET_SERVICE_Client *client,
- struct GNUNET_MQ_Handle *mq)
+ struct GNUNET_SERVICE_Client *client,
+ struct GNUNET_MQ_Handle *mq)
{
struct TransportClient *tc;
@@ -1712,11 +1712,11 @@ free_reassembly_context (struct ReassemblyContext *rc)
struct Neighbour *n = rc->neighbour;
GNUNET_assert (rc ==
- GNUNET_CONTAINER_heap_remove_node (rc->hn));
+ GNUNET_CONTAINER_heap_remove_node (rc->hn));
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
- &rc->msg_uuid,
- rc));
+ GNUNET_CONTAINER_multishortmap_remove (n->reassembly_map,
+ &rc->msg_uuid,
+ rc));
GNUNET_free (rc);
}
@@ -1742,8 +1742,8 @@ reassembly_cleanup_task (void *cls)
}
GNUNET_assert (NULL == n->reassembly_timeout_task);
n->reassembly_timeout_task = GNUNET_SCHEDULER_add_at (rc->reassembly_timeout,
- &reassembly_cleanup_task,
- n);
+ &reassembly_cleanup_task,
+ n);
return;
}
}
@@ -1783,16 +1783,16 @@ free_neighbour (struct Neighbour *neighbour)
GNUNET_assert (NULL == neighbour->session_head);
GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multipeermap_remove (neighbours,
- &neighbour->pid,
- neighbour));
+ GNUNET_CONTAINER_multipeermap_remove (neighbours,
+ &neighbour->pid,
+ neighbour));
if (NULL != neighbour->timeout_task)
GNUNET_SCHEDULER_cancel (neighbour->timeout_task);
if (NULL != neighbour->reassembly_map)
{
GNUNET_CONTAINER_multishortmap_iterate (neighbour->reassembly_map,
- &free_reassembly_cb,
- NULL);
+ &free_reassembly_cb,
+ NULL);
GNUNET_CONTAINER_multishortmap_destroy (neighbour->reassembly_map);
neighbour->reassembly_map = NULL;
GNUNET_CONTAINER_heap_destroy (neighbour->reassembly_heap);
@@ -1815,15 +1815,15 @@ free_neighbour (struct Neighbour *neighbour)
*/
static void
core_send_connect_info (struct TransportClient *tc,
- const struct GNUNET_PeerIdentity *pid,
- struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+ const struct GNUNET_PeerIdentity *pid,
+ struct GNUNET_BANDWIDTH_Value32NBO quota_out)
{
struct GNUNET_MQ_Envelope *env;
struct ConnectInfoMessage *cim;
GNUNET_assert (CT_CORE == tc->type);
env = GNUNET_MQ_msg (cim,
- GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
+ GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT);
cim->quota_out = quota_out;
cim->id = *pid;
GNUNET_MQ_send (tc->mq,
@@ -1839,7 +1839,7 @@ core_send_connect_info (struct TransportClient *tc,
*/
static void
cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
- struct GNUNET_BANDWIDTH_Value32NBO quota_out)
+ struct GNUNET_BANDWIDTH_Value32NBO quota_out)
{
for (struct TransportClient *tc = clients_head;
NULL != tc;
@@ -1848,8 +1848,8 @@ cores_send_connect_info (const struct GNUNET_PeerIdentity *pid,
if (CT_CORE != tc->type)
continue;
core_send_connect_info (tc,
- pid,
- quota_out);
+ pid,
+ quota_out);
}
}
@@ -1872,10 +1872,10 @@ cores_send_disconnect_info (const struct GNUNET_PeerIdentity *pid)
if (CT_CORE != tc->type)
continue;
env = GNUNET_MQ_msg (dim,
- GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
+ GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT);
dim->peer = *pid;
GNUNET_MQ_send (tc->mq,
- env);
+ env);
}
}
@@ -1910,20 +1910,21 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
unsigned int wsize;
GNUNET_assert (NULL != pm);
- if (queue->tc->details.communicator.total_queue_length >= COMMUNICATOR_TOTAL_QUEUE_LIMIT)
+ if (queue->tc->details.communicator.total_queue_length >=
+ COMMUNICATOR_TOTAL_QUEUE_LIMIT)
{
GNUNET_STATISTICS_update (GST_stats,
- "# Transmission throttled due to communicator queue limit",
- 1,
- GNUNET_NO);
+ "# Transmission throttled due to communicator queue limit",
+ 1,
+ GNUNET_NO);
return;
}
if (queue->queue_length >= SESSION_QUEUE_LIMIT)
{
GNUNET_STATISTICS_update (GST_stats,
- "# Transmission throttled due to session queue limit",
- 1,
- GNUNET_NO);
+ "# Transmission throttled due to session queue limit",
+ 1,
+ GNUNET_NO);
return;
}
@@ -1931,27 +1932,28 @@ schedule_transmit_on_queue (struct GNUNET_ATS_Session *queue)
? pm->bytes_msg /* FIXME: add overheads? */
: queue->mtu;
out_delay = GNUNET_BANDWIDTH_tracker_get_delay (&queue->tracker_out,
- wsize);
+ wsize);
out_delay = GNUNET_TIME_relative_max (GNUNET_TIME_absolute_get_remaining (pm->next_attempt),
- out_delay);
+ out_delay);
if (0 == out_delay.rel_value_us)
return; /* we should run immediately! */
/* queue has changed since we were scheduled, reschedule again */
- queue->transmit_task = GNUNET_SCHEDULER_add_delayed (out_delay,
- &transmit_on_queue,
- queue);
+ queue->transmit_task
+ = GNUNET_SCHEDULER_add_delayed (out_delay,
+ &transmit_on_queue,
+ queue);
if (out_delay.rel_value_us > DELAY_WARN_THRESHOLD.rel_value_us)
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Next transmission on queue `%s' in %s (high delay)\n",
- queue->address,
- GNUNET_STRINGS_relative_time_to_string (out_delay,
- GNUNET_YES));
+ "Next transmission on queue `%s' in %s (high delay)\n",
+ queue->address,
+ GNUNET_STRINGS_relative_time_to_string (out_delay,
+ GNUNET_YES));
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Next transmission on queue `%s' in %s\n",
- queue->address,
- GNUNET_STRINGS_relative_time_to_string (out_delay,
- GNUNET_YES));
+ "Next transmission on queue `%s' in %s\n",
+ queue->address,
+ GNUNET_STRINGS_relative_time_to_string (out_delay,
+ GNUNET_YES));
}
@@ -1978,19 +1980,19 @@ free_session (struct GNUNET_ATS_Session *session)
session->transmit_task = NULL;
}
GNUNET_CONTAINER_MDLL_remove (neighbour,
- neighbour->session_head,
- neighbour->session_tail,
- session);
+ neighbour->session_head,
+ neighbour->session_tail,
+ session);
GNUNET_CONTAINER_MDLL_remove (client,
- tc->details.communicator.session_head,
- tc->details.communicator.session_tail,
- session);
+ tc->details.communicator.session_head,
+ tc->details.communicator.session_tail,
+ session);
maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >= tc->details.communicator.total_queue_length);
while (NULL != (qe = session->queue_head))
{
GNUNET_CONTAINER_DLL_remove (session->queue_head,
- session->queue_tail,
- qe);
+ session->queue_tail,
+ qe);
session->queue_length--;
tc->details.communicator.total_queue_length--;
GNUNET_free (qe);
@@ -2001,18 +2003,18 @@ free_session (struct GNUNET_ATS_Session *session)
{
/* Communicator dropped below threshold, resume all queues */
GNUNET_STATISTICS_update (GST_stats,
- "# Transmission throttled due to communicator queue limit",
- -1,
- GNUNET_NO);
+ "# Transmission throttled due to communicator queue limit",
+ -1,
+ GNUNET_NO);
for (struct GNUNET_ATS_Session *s = tc->details.communicator.session_head;
- NULL != s;
- s = s->next_client)
+ NULL != s;
+ s = s->next_client)
schedule_transmit_on_queue (s);
}
notify_monitors (&neighbour->pid,
- session->address,
- session->nt,
- &me);
+ session->address,
+ session->nt,
+ &me);
GNUNET_ATS_session_del (session->sr);
GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_in);
GNUNET_BANDWIDTH_tracker_notification_stop (&session->tracker_out);
@@ -2036,8 +2038,8 @@ free_address_list_entry (struct AddressListEntry *ale)
struct TransportClient *tc = ale->tc;
GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
- tc->details.communicator.addr_tail,
- ale);
+ tc->details.communicator.addr_tail,
+ ale);
if (NULL != ale->sc)
{
GNUNET_PEERSTORE_store_cancel (ale->sc);
@@ -2062,8 +2064,8 @@ free_address_list_entry (struct AddressListEntry *ale)
*/
static void
client_disconnect_cb (void *cls,
- struct GNUNET_SERVICE_Client *client,
- void *app_ctx)
+ struct GNUNET_SERVICE_Client *client,
+ void *app_ctx)
{
struct TransportClient *tc = app_ctx;
@@ -2083,11 +2085,11 @@ client_disconnect_cb (void *cls,
while (NULL != (pm = tc->details.core.pending_msg_head))
{
- GNUNET_CONTAINER_MDLL_remove (client,
- tc->details.core.pending_msg_head,
- tc->details.core.pending_msg_tail,
- pm);
- pm->client = NULL;
+ GNUNET_CONTAINER_MDLL_remove (client,
+ tc->details.core.pending_msg_head,
+ tc->details.core.pending_msg_tail,
+ pm);
+ pm->client = NULL;
}
}
break;
@@ -2121,15 +2123,15 @@ client_disconnect_cb (void *cls,
*/
static int
notify_client_connect_info (void *cls,
- const struct GNUNET_PeerIdentity *pid,
- void *value)
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
{
struct TransportClient *tc = cls;
struct Neighbour *neighbour = value;
core_send_connect_info (tc,
- pid,
- neighbour->quota_out);
+ pid,
+ neighbour->quota_out);
return GNUNET_OK;
}
@@ -2144,7 +2146,7 @@ notify_client_connect_info (void *cls,
*/
static void
handle_client_start (void *cls,
- const struct StartMessage *start)
+ const struct StartMessage *start)
{
struct TransportClient *tc = cls;
uint32_t options;
@@ -2169,8 +2171,8 @@ handle_client_start (void *cls,
}
tc->type = CT_CORE;
GNUNET_CONTAINER_multipeermap_iterate (neighbours,
- &notify_client_connect_info,
- tc);
+ &notify_client_connect_info,
+ tc);
GNUNET_SERVICE_client_continue (tc->client);
}
@@ -2183,7 +2185,7 @@ handle_client_start (void *cls,
*/
static int
check_client_send (void *cls,
- const struct OutboundMessage *obm)
+ const struct OutboundMessage *obm)
{
struct TransportClient *tc = cls;
uint16_t size;
@@ -2248,14 +2250,14 @@ free_pending_message (struct PendingMessage *pm)
if (NULL != tc)
{
GNUNET_CONTAINER_MDLL_remove (client,
- tc->details.core.pending_msg_head,
- tc->details.core.pending_msg_tail,
- pm);
+ tc->details.core.pending_msg_head,
+ tc->details.core.pending_msg_tail,
+ pm);
}
GNUNET_CONTAINER_MDLL_remove (neighbour,
- target->pending_msg_head,
- target->pending_msg_tail,
- pm);
+ target->pending_msg_head,
+ target->pending_msg_tail,
+ pm);
free_fragment_tree (pm);
GNUNET_free_non_null (pm->bpm);
GNUNET_free (pm);
@@ -2276,8 +2278,8 @@ free_pending_message (struct PendingMessage *pm)
*/
static void
client_send_response (struct PendingMessage *pm,
- int success,
- uint32_t bytes_physical)
+ int success,
+ uint32_t bytes_physical)
{
struct TransportClient *tc = pm->client;
struct Neighbour *target = pm->target;
@@ -2287,7 +2289,7 @@ client_send_response (struct PendingMessage *pm,
if (NULL != tc)
{
env = GNUNET_MQ_msg (som,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
som->success = htonl ((uint32_t) success);
som->bytes_msg = htons (pm->bytes_msg);
som->bytes_physical = htonl (bytes_physical);
@@ -2324,22 +2326,22 @@ check_queue_timeouts (void *cls)
if (pos->timeout.abs_value_us <= now.abs_value_us)
{
GNUNET_STATISTICS_update (GST_stats,
- "# messages dropped (timeout before confirmation)",
- 1,
- GNUNET_NO);
+ "# messages dropped (timeout before confirmation)",
+ 1,
+ GNUNET_NO);
client_send_response (pm,
GNUNET_NO,
0);
continue;
}
earliest_timeout = GNUNET_TIME_absolute_min (earliest_timeout,
- pos->timeout);
+ pos->timeout);
}
n->earliest_timeout = earliest_timeout;
if (NULL != n->pending_msg_head)
n->timeout_task = GNUNET_SCHEDULER_add_at (earliest_timeout,
- &check_queue_timeouts,
- n);
+ &check_queue_timeouts,
+ n);
}
@@ -2351,13 +2353,14 @@ check_queue_timeouts (void *cls)
*/
static void
handle_client_send (void *cls,
- const struct OutboundMessage *obm)
+ const struct OutboundMessage *obm)
{
struct TransportClient *tc = cls;
struct PendingMessage *pm;
const struct GNUNET_MessageHeader *obmm;
struct Neighbour *target;
uint32_t bytes_msg;
+ int was_empty;
GNUNET_assert (CT_CORE == tc->type);
obmm = (const struct GNUNET_MessageHeader *) &obm[1];
@@ -2373,36 +2376,37 @@ handle_client_send (void *cls,
struct SendOkMessage *som;
env = GNUNET_MQ_msg (som,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
som->success = htonl (GNUNET_SYSERR);
som->bytes_msg = htonl (bytes_msg);
som->bytes_physical = htonl (0);
som->peer = obm->peer;
GNUNET_MQ_send (tc->mq,
- env);
+ env);
GNUNET_SERVICE_client_continue (tc->client);
GNUNET_STATISTICS_update (GST_stats,
- "# messages dropped (neighbour unknown)",
- 1,
- GNUNET_NO);
+ "# messages dropped (neighbour unknown)",
+ 1,
+ GNUNET_NO);
return;
}
+ was_empty = (NULL == target->pending_msg_head);
pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
pm->client = tc;
pm->target = target;
pm->bytes_msg = bytes_msg;
pm->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (obm->timeout));
memcpy (&pm[1],
- &obm[1],
- bytes_msg);
+ &obm[1],
+ bytes_msg);
GNUNET_CONTAINER_MDLL_insert (neighbour,
- target->pending_msg_head,
- target->pending_msg_tail,
- pm);
+ target->pending_msg_head,
+ target->pending_msg_tail,
+ pm);
GNUNET_CONTAINER_MDLL_insert (client,
- tc->details.core.pending_msg_head,
- tc->details.core.pending_msg_tail,
- pm);
+ tc->details.core.pending_msg_head,
+ tc->details.core.pending_msg_tail,
+ pm);
if (target->earliest_timeout.abs_value_us > pm->timeout.abs_value_us)
{
target->earliest_timeout.abs_value_us = pm->timeout.abs_value_us;
@@ -2410,8 +2414,19 @@ handle_client_send (void *cls,
GNUNET_SCHEDULER_cancel (target->timeout_task);
target->timeout_task
= GNUNET_SCHEDULER_add_at (target->earliest_timeout,
- &check_queue_timeouts,
- target);
+ &check_queue_timeouts,
+ target);
+ }
+ if (! was_empty)
+ return; /* all queues must already be busy */
+ for (struct GNUNET_ATS_Session *queue = target->session_head;
+ NULL != queue;
+ queue = queue->next_neighbour)
+ {
+ /* try transmission on any queue that is idle */
+ if (NULL == queue->transmit_task)
+ queue->transmit_task = GNUNET_SCHEDULER_add_now (&transmit_on_queue,
+ queue);
}
}
@@ -3835,9 +3850,9 @@ transmit_on_queue (void *cls)
respect that even if MTU is 0 for
this queue */) )
s = fragment_message (s,
- (0 == queue->mtu)
- ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
- : queue->mtu);
+ (0 == queue->mtu)
+ ? UINT16_MAX - sizeof (struct GNUNET_TRANSPORT_SendMessageTo)
+ : queue->mtu);
if (NULL == s)
{
/* Fragmentation failed, try next message... */
@@ -3868,13 +3883,13 @@ transmit_on_queue (void *cls)
smt->mid = qe->mid;
smt->receiver = n->pid;
memcpy (&smt[1],
- &s[1],
- s->bytes_msg);
+ &s[1],
+ s->bytes_msg);
GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
queue->queue_length++;
queue->tc->details.communicator.total_queue_length++;
GNUNET_MQ_send (queue->tc->mq,
- env);
+ env);
// FIXME: do something similar to the logic below
// in defragmentation / reliability ACK handling!
@@ -3886,8 +3901,8 @@ transmit_on_queue (void *cls)
{
/* Full message sent, and over reliabile channel */
client_send_response (pm,
- GNUNET_YES,
- pm->bytes_msg);
+ GNUNET_YES,
+ pm->bytes_msg);
}
else if ( (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) &&
(PMT_FRAGMENT_BOX == s->pmt) )
@@ -3898,9 +3913,9 @@ transmit_on_queue (void *cls)
free_fragment_tree (s);
pos = s->frag_parent;
GNUNET_CONTAINER_MDLL_remove (frag,
- pos->head_frag,
- pos->tail_frag,
- s);
+ pos->head_frag,
+ pos->tail_frag,
+ s);
GNUNET_free (s);
/* check if subtree is done */
while ( (NULL == pos->head_frag) &&
@@ -3910,9 +3925,9 @@ transmit_on_queue (void *cls)
s = pos;
pos = s->frag_parent;
GNUNET_CONTAINER_MDLL_remove (frag,
- pos->head_frag,
- pos->tail_frag,
- s);
+ pos->head_frag,
+ pos->tail_frag,
+ s);
GNUNET_free (s);
}
@@ -3920,8 +3935,8 @@ transmit_on_queue (void *cls)
if ( (NULL == pm->head_frag) &&
(pm->frag_off == pm->bytes_msg) )
client_send_response (pm,
- GNUNET_YES,
- pm->bytes_msg /* FIXME: calculate and add overheads! */);
+ GNUNET_YES,
+ pm->bytes_msg /* FIXME: calculate and add overheads! */);
}
else if (PMT_CORE != pm->pmt)
{
@@ -3941,25 +3956,25 @@ transmit_on_queue (void *cls)
message urgency and size when delaying ACKs, etc.) */
s->next_attempt = GNUNET_TIME_relative_to_absolute
(GNUNET_TIME_relative_multiply (queue->rtt,
- 4));
+ 4));
if (s == pm)
{
struct PendingMessage *pos;
/* re-insert sort in neighbour list */
GNUNET_CONTAINER_MDLL_remove (neighbour,
- neighbour->pending_msg_head,
- neighbour->pending_msg_tail,
- pm);
+ neighbour->pending_msg_head,
+ neighbour->pending_msg_tail,
+ pm);
pos = neighbour->pending_msg_tail;
while ( (NULL != pos) &&
(pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
- pos = pos->prev_neighbour;
+ pos = pos->prev_neighbour;
GNUNET_CONTAINER_MDLL_insert_after (neighbour,
- neighbour->pending_msg_head,
- neighbour->pending_msg_tail,
- pos,
- pm);
+ neighbour->pending_msg_head,
+ neighbour->pending_msg_tail,
+ pos,
+ pm);
}
else
{
@@ -3968,18 +3983,18 @@ transmit_on_queue (void *cls)
struct PendingMessage *pos;
GNUNET_CONTAINER_MDLL_remove (frag,
- fp->head_frag,
- fp->tail_frag,
- s);
+ fp->head_frag,
+ fp->tail_frag,
+ s);
pos = fp->tail_frag;
while ( (NULL != pos) &&
(s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
- pos = pos->prev_frag;
+ pos = pos->prev_frag;
GNUNET_CONTAINER_MDLL_insert_after (frag,
- fp->head_frag,
- fp->tail_frag,
- pos,
- s);
+ fp->head_frag,
+ fp->tail_frag,
+ pos,
+ s);
}
}
@@ -4028,9 +4043,9 @@ tracker_excess_out_cb (void *cls)
from here via a message instead! */
/* TODO: maybe inform ATS at this point? */
GNUNET_STATISTICS_update (GST_stats,
- "# Excess outbound bandwidth reported",
- 1,
- GNUNET_NO);
+ "# Excess outbound bandwidth reported",
+ 1,
+ GNUNET_NO);
}
@@ -4046,9 +4061,9 @@ tracker_excess_in_cb (void *cls)
{
/* TODO: maybe inform ATS at this point? */
GNUNET_STATISTICS_update (GST_stats,
- "# Excess inbound bandwidth reported",
- 1,
- GNUNET_NO);
+ "# Excess inbound bandwidth reported",
+ 1,
+ GNUNET_NO);
}
@@ -4083,12 +4098,12 @@ handle_add_queue_message (void *cls,
neighbour->earliest_timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
neighbour->pid = aqm->receiver;
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_put (neighbours,
- &neighbour->pid,
- neighbour,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ GNUNET_CONTAINER_multipeermap_put (neighbours,
+ &neighbour->pid,
+ neighbour,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
cores_send_connect_info (&neighbour->pid,
- GNUNET_BANDWIDTH_ZERO);
+ GNUNET_BANDWIDTH_ZERO);
}
addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
addr = (const char *) &aqm[1];
@@ -4117,8 +4132,8 @@ handle_add_queue_message (void *cls,
&tracker_excess_out_cb,
queue);
memcpy (&queue[1],
- addr,
- addr_len);
+ addr,
+ addr_len);
/* notify ATS about new queue */
{
struct GNUNET_ATS_Properties prop = {
@@ -4129,10 +4144,10 @@ handle_add_queue_message (void *cls,
};
queue->sr = GNUNET_ATS_session_add (ats,
- &neighbour->pid,
- queue->address,
- queue,
- &prop);
+ &neighbour->pid,
+ queue->address,
+ queue,
+ &prop);
if (NULL == queue->sr)
{
/* This can only happen if the 'address' was way too long for ATS
@@ -4159,18 +4174,18 @@ handle_add_queue_message (void *cls,
};
notify_monitors (&neighbour->pid,
- queue->address,
- queue->nt,
- &me);
+ queue->address,
+ queue->nt,
+ &me);
}
GNUNET_CONTAINER_MDLL_insert (neighbour,
- neighbour->session_head,
- neighbour->session_tail,
- queue);
+ neighbour->session_head,
+ neighbour->session_tail,
+ queue);
GNUNET_CONTAINER_MDLL_insert (client,
- tc->details.communicator.session_head,
- tc->details.communicator.session_tail,
- queue);
+ tc->details.communicator.session_head,
+ tc->details.communicator.session_tail,
+ queue);
GNUNET_SERVICE_client_continue (tc->client);
}
@@ -4273,21 +4288,21 @@ handle_send_message_ack (void *cls,
{
/* Communicator dropped below threshold, resume all queues */
GNUNET_STATISTICS_update (GST_stats,
- "# Transmission throttled due to communicator queue limit",
- -1,
- GNUNET_NO);
+ "# Transmission throttled due to communicator queue limit",
+ -1,
+ GNUNET_NO);
for (struct GNUNET_ATS_Session *session = tc->details.communicator.session_head;
- NULL != session;
- session = session->next_client)
+ NULL != session;
+ session = session->next_client)
schedule_transmit_on_queue (session);
}
else if (SESSION_QUEUE_LIMIT - 1 == queue->session->queue_length)
{
/* queue dropped below threshold; only resume this one queue */
GNUNET_STATISTICS_update (GST_stats,
- "# Transmission throttled due to session queue limit",
- -1,
- GNUNET_NO);
+ "# Transmission throttled due to session queue limit",
+ -1,
+ GNUNET_NO);
schedule_transmit_on_queue (queue->session);
}
@@ -4361,8 +4376,8 @@ handle_monitor_start (void *cls,
tc->details.monitor.peer = start->peer;
tc->details.monitor.one_shot = ntohl (start->one_shot);
GNUNET_CONTAINER_multipeermap_iterate (neighbours,
- &notify_client_queues,
- tc);
+ &notify_client_queues,
+ tc);
GNUNET_SERVICE_client_mark_monitor (tc->client);
GNUNET_SERVICE_client_continue (tc->client);
}
@@ -4414,8 +4429,8 @@ lookup_communicator (const char *prefix)
return tc;
}
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
- prefix);
+ "ATS suggested use of communicator for `%s', but we do not have such a communicator!\n",
+ prefix);
return NULL;
}
@@ -4451,21 +4466,21 @@ ats_suggestion_cb (void *cls,
if (NULL == tc)
{
GNUNET_STATISTICS_update (GST_stats,
- "# ATS suggestions ignored due to missing communicator",
- 1,
- GNUNET_NO);
+ "# ATS suggestions ignored due to missing communicator",
+ 1,
+ GNUNET_NO);
return;
}
/* forward suggestion for queue creation to communicator */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Request #%u for `%s' communicator to create queue to `%s'\n",
- (unsigned int) idgen,
- prefix,
- address);
+ "Request #%u for `%s' communicator to create queue to `%s'\n",
+ (unsigned int) idgen,
+ prefix,
+ address);
alen = strlen (address) + 1;
env = GNUNET_MQ_msg_extra (cqm,
- alen,
- GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
+ alen,
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE);
cqm->request_id = htonl (idgen++);
cqm->receiver = *pid;
memcpy (&cqm[1],
@@ -4485,7 +4500,7 @@ ats_suggestion_cb (void *cls,
*/
static void
handle_queue_create_ok (void *cls,
- const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
+ const struct GNUNET_TRANSPORT_CreateQueueResponse *cqr)
{
struct TransportClient *tc = cls;
@@ -4496,12 +4511,12 @@ handle_queue_create_ok (void *cls,
return;
}
GNUNET_STATISTICS_update (GST_stats,
- "# ATS suggestions succeeded at communicator",
- 1,
- GNUNET_NO);
+ "# ATS suggestions succeeded at communicator",
+ 1,
+ GNUNET_NO);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Request #%u for communicator to create queue succeeded\n",
- (unsigned int) ntohs (cqr->request_id));
+ "Request #%u for communicator to create queue succeeded\n",
+ (unsigned int) ntohs (cqr->request_id));
GNUNET_SERVICE_client_continue (tc->client);
}
@@ -4527,12 +4542,12 @@ handle_queue_create_fail (void *cls,
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Request #%u for communicator to create queue failed\n",
- (unsigned int) ntohs (cqr->request_id));
+ "Request #%u for communicator to create queue failed\n",
+ (unsigned int) ntohs (cqr->request_id));
GNUNET_STATISTICS_update (GST_stats,
- "# ATS suggestions failed in queue creation at communicator",
- 1,
- GNUNET_NO);
+ "# ATS suggestions failed in queue creation at communicator",
+ 1,
+ GNUNET_NO);
GNUNET_SERVICE_client_continue (tc->client);
}
@@ -4601,8 +4616,8 @@ handle_address_consider_verify (void *cls,
*/
static int
free_neighbour_cb (void *cls,
- const struct GNUNET_PeerIdentity *pid,
- void *value)
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
{
struct Neighbour *neighbour = value;
@@ -4625,8 +4640,8 @@ free_neighbour_cb (void *cls,
*/
static int
free_dv_routes_cb (void *cls,
- const struct GNUNET_PeerIdentity *pid,
- void *value)
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
{
struct DistanceVector *dv = value;
@@ -4648,8 +4663,8 @@ free_dv_routes_cb (void *cls,
*/
static int
free_ephemeral_cb (void *cls,
- const struct GNUNET_PeerIdentity *pid,
- void *value)
+ const struct GNUNET_PeerIdentity *pid,
+ void *value)
{
struct EphemeralCacheEntry *ece = value;
@@ -4734,9 +4749,9 @@ run (void *cls,
/* setup globals */
GST_cfg = c;
neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
- GNUNET_YES);
+ GNUNET_YES);
dv_routes = GNUNET_CONTAINER_multipeermap_create (1024,
- GNUNET_YES);
+ GNUNET_YES);
ephemeral_map = GNUNET_CONTAINER_multipeermap_create (32,
GNUNET_YES);
ephemeral_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
@@ -4790,50 +4805,50 @@ GNUNET_SERVICE_MAIN
NULL,
/* communication with core */
GNUNET_MQ_hd_fixed_size (client_start,
- GNUNET_MESSAGE_TYPE_TRANSPORT_START,
- struct StartMessage,
- NULL),
+ GNUNET_MESSAGE_TYPE_TRANSPORT_START,
+ struct StartMessage,
+ NULL),
GNUNET_MQ_hd_var_size (client_send,
- GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
- struct OutboundMessage,
- NULL),
+ GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
+ struct OutboundMessage,
+ NULL),
/* communication with communicators */
GNUNET_MQ_hd_var_size (communicator_available,
- GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
- struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
- NULL),
+ GNUNET_MESSAGE_TYPE_TRANSPORT_NEW_COMMUNICATOR,
+ struct GNUNET_TRANSPORT_CommunicatorAvailableMessage,
+ NULL),
GNUNET_MQ_hd_var_size (communicator_backchannel,
- GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
- struct GNUNET_TRANSPORT_CommunicatorBackchannel,
- NULL),
+ GNUNET_MESSAGE_TYPE_TRANSPORT_COMMUNICATOR_BACKCHANNEL,
+ struct GNUNET_TRANSPORT_CommunicatorBackchannel,
+ NULL),
GNUNET_MQ_hd_var_size (add_address,
- GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
- struct GNUNET_TRANSPORT_AddAddressMessage,
- NULL),
+ GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS,
+ struct GNUNET_TRANSPORT_AddAddressMessage,
+ NULL),
GNUNET_MQ_hd_fixed_size (del_address,
GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS,
struct GNUNET_TRANSPORT_DelAddressMessage,
NULL),
GNUNET_MQ_hd_var_size (incoming_msg,
- GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
- struct GNUNET_TRANSPORT_IncomingMessage,
- NULL),
+ GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG,
+ struct GNUNET_TRANSPORT_IncomingMessage,
+ NULL),
GNUNET_MQ_hd_fixed_size (queue_create_ok,
- GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
- struct GNUNET_TRANSPORT_CreateQueueResponse,
- NULL),
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_OK,
+ struct GNUNET_TRANSPORT_CreateQueueResponse,
+ NULL),
GNUNET_MQ_hd_fixed_size (queue_create_fail,
- GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
- struct GNUNET_TRANSPORT_CreateQueueResponse,
- NULL),
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE_FAIL,
+ struct GNUNET_TRANSPORT_CreateQueueResponse,
+ NULL),
GNUNET_MQ_hd_var_size (add_queue_message,
- GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
- struct GNUNET_TRANSPORT_AddQueueMessage,
- NULL),
+ GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP,
+ struct GNUNET_TRANSPORT_AddQueueMessage,
+ NULL),
GNUNET_MQ_hd_var_size (address_consider_verify,
- GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
- struct GNUNET_TRANSPORT_AddressToVerify,
- NULL),
+ GNUNET_MESSAGE_TYPE_TRANSPORT_ADDRESS_CONSIDER_VERIFY,
+ struct GNUNET_TRANSPORT_AddressToVerify,
+ NULL),
GNUNET_MQ_hd_fixed_size (del_queue_message,
GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN,
struct GNUNET_TRANSPORT_DelQueueMessage,