summaryrefslogtreecommitdiff
path: root/src/cadet/gnunet-service-cadet_core.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cadet/gnunet-service-cadet_core.c')
-rw-r--r--src/cadet/gnunet-service-cadet_core.c501
1 files changed, 195 insertions, 306 deletions
diff --git a/src/cadet/gnunet-service-cadet_core.c b/src/cadet/gnunet-service-cadet_core.c
index 220a2b3cd..ec70a968b 100644
--- a/src/cadet/gnunet-service-cadet_core.c
+++ b/src/cadet/gnunet-service-cadet_core.c
@@ -11,7 +11,7 @@
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
-
+
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
@@ -39,7 +39,7 @@
#include "gnunet_statistics_service.h"
#include "cadet_protocol.h"
-#define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__)
+#define LOG(level, ...) GNUNET_log_from (level, "cadet-cor", __VA_ARGS__)
/**
* Information we keep per direction for a route.
@@ -138,7 +138,6 @@ struct RouteDirection
* Is @e mqm currently ready for transmission?
*/
int is_ready;
-
};
@@ -177,8 +176,6 @@ struct CadetRoute
* Position of this route in the #route_heap.
*/
struct GNUNET_CONTAINER_HeapNode *hn;
-
-
};
@@ -258,24 +255,17 @@ lower_rung (struct RouteDirection *dir)
struct Rung *rung = dir->rung;
struct Rung *prev;
- GNUNET_CONTAINER_DLL_remove (rung->rd_head,
- rung->rd_tail,
- dir);
+ GNUNET_CONTAINER_DLL_remove (rung->rd_head, rung->rd_tail, dir);
prev = rung->prev;
GNUNET_assert (NULL != prev);
if (prev->rung_off != rung->rung_off - 1)
{
prev = GNUNET_new (struct Rung);
prev->rung_off = rung->rung_off - 1;
- GNUNET_CONTAINER_DLL_insert_after (rung_head,
- rung_tail,
- rung->prev,
- prev);
+ GNUNET_CONTAINER_DLL_insert_after (rung_head, rung_tail, rung->prev, prev);
}
GNUNET_assert (NULL != prev);
- GNUNET_CONTAINER_DLL_insert (prev->rd_head,
- prev->rd_tail,
- dir);
+ GNUNET_CONTAINER_DLL_insert (prev->rd_head, prev->rd_tail, dir);
dir->rung = prev;
}
@@ -288,19 +278,13 @@ lower_rung (struct RouteDirection *dir)
* @param env envelope to discard
*/
static void
-discard_buffer (struct RouteDirection *dir,
- struct GNUNET_MQ_Envelope *env)
+discard_buffer (struct RouteDirection *dir, struct GNUNET_MQ_Envelope *env)
{
- GNUNET_MQ_dll_remove (&dir->env_head,
- &dir->env_tail,
- env);
+ GNUNET_MQ_dll_remove (&dir->env_head, &dir->env_tail, env);
cur_buffers--;
GNUNET_MQ_discard (env);
lower_rung (dir);
- GNUNET_STATISTICS_set (stats,
- "# buffer use",
- cur_buffers,
- GNUNET_NO);
+ GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
}
@@ -322,12 +306,9 @@ discard_all_from_rung_tail ()
"# messages dropped due to full buffer",
1,
GNUNET_NO);
- discard_buffer (dir,
- dir->env_head);
+ discard_buffer (dir, dir->env_head);
}
- GNUNET_CONTAINER_DLL_remove (rung_head,
- rung_tail,
- tail);
+ GNUNET_CONTAINER_DLL_remove (rung_head, rung_tail, tail);
GNUNET_free (tail);
}
@@ -345,7 +326,7 @@ static void
route_message (struct CadetPeer *prev,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
const struct GNUNET_MessageHeader *msg,
- const enum GNUNET_MQ_PriorityPreferences priority)
+ const enum GNUNET_MQ_PriorityPreferences priority)
{
struct CadetRoute *route;
struct RouteDirection *dir;
@@ -371,17 +352,14 @@ route_message (struct CadetPeer *prev,
/* No need to respond to these! */
return;
}
- env = GNUNET_MQ_msg (bm,
- GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+ env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
bm->cid = *cid;
bm->peer1 = my_full_id;
- GCP_send_ooo (prev,
- env);
+ GCP_send_ooo (prev, env);
return;
}
route->last_use = GNUNET_TIME_absolute_get ();
- GNUNET_CONTAINER_heap_update_cost (route->hn,
- route->last_use.abs_value_us);
+ GNUNET_CONTAINER_heap_update_cost (route->hn, route->last_use.abs_value_us);
dir = (prev == route->prev.hop) ? &route->next : &route->prev;
if (GNUNET_YES == dir->is_ready)
{
@@ -392,27 +370,24 @@ route_message (struct CadetPeer *prev,
GNUNET_i2s (GCP_get_id (dir->hop)),
GNUNET_sh2s (&cid->connection_of_tunnel));
dir->is_ready = GNUNET_NO;
- GCP_send (dir->mqm,
- GNUNET_MQ_msg_copy (msg));
+ GCP_send (dir->mqm, GNUNET_MQ_msg_copy (msg));
return;
}
- /* Check if buffering is disallowed, and if so, make sure we only queue
- one message per direction. */
- if ( (0 != (priority & GNUNET_MQ_PREF_NO_BUFFER)) &&
- (NULL != dir->env_head) )
- discard_buffer (dir,
- dir->env_head);
+ /* Check if low latency is required and if the previous message was
+ unreliable; if so, make sure we only queue one message per
+ direction (no buffering). */
+ if ((0 != (priority & GNUNET_MQ_PREF_LOW_LATENCY)) &&
+ (NULL != dir->env_head) &&
+ (0 ==
+ (GNUNET_MQ_env_get_options (dir->env_head) & GNUNET_MQ_PREF_UNRELIABLE)))
+ discard_buffer (dir, dir->env_head);
/* Check for duplicates */
- for (const struct GNUNET_MQ_Envelope *env = dir->env_head;
- NULL != env;
+ for (const struct GNUNET_MQ_Envelope *env = dir->env_head; NULL != env;
env = GNUNET_MQ_env_next (env))
{
const struct GNUNET_MessageHeader *hdr = GNUNET_MQ_env_get_msg (env);
- if ( (hdr->size == msg->size) &&
- (0 == memcmp (hdr,
- msg,
- ntohs (msg->size))) )
+ if ((hdr->size == msg->size) && (0 == memcmp (hdr, msg, ntohs (msg->size))))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received duplicate of message already in buffer, dropping\n");
@@ -443,31 +418,22 @@ route_message (struct CadetPeer *prev,
"# messages dropped due to full buffer",
1,
GNUNET_NO);
- discard_buffer (dir,
- dir->env_head);
+ discard_buffer (dir, dir->env_head);
rung = dir->rung;
}
}
/* remove 'dir' from current rung */
- GNUNET_CONTAINER_DLL_remove (rung->rd_head,
- rung->rd_tail,
- dir);
+ GNUNET_CONTAINER_DLL_remove (rung->rd_head, rung->rd_tail, dir);
/* make 'nxt' point to the next higher rung, create if necessary */
nxt = rung->next;
- if ( (NULL == nxt) ||
- (rung->rung_off + 1 != nxt->rung_off) )
+ if ((NULL == nxt) || (rung->rung_off + 1 != nxt->rung_off))
{
nxt = GNUNET_new (struct Rung);
nxt->rung_off = rung->rung_off + 1;
- GNUNET_CONTAINER_DLL_insert_after (rung_head,
- rung_tail,
- rung,
- nxt);
+ GNUNET_CONTAINER_DLL_insert_after (rung_head, rung_tail, rung, nxt);
}
/* insert 'dir' into next higher rung */
- GNUNET_CONTAINER_DLL_insert (nxt->rd_head,
- nxt->rd_tail,
- dir);
+ GNUNET_CONTAINER_DLL_insert (nxt->rd_head, nxt->rd_tail, dir);
dir->rung = nxt;
/* add message into 'dir' buffer */
@@ -478,21 +444,21 @@ route_message (struct CadetPeer *prev,
GNUNET_i2s (GCP_get_id (dir->hop)),
GNUNET_sh2s (&cid->connection_of_tunnel));
env = GNUNET_MQ_msg_copy (msg);
- GNUNET_MQ_dll_insert_tail (&dir->env_head,
- &dir->env_tail,
- env);
+ GNUNET_MQ_env_set_options (env, priority);
+ if ((0 != (priority & GNUNET_MQ_PREF_LOW_LATENCY)) &&
+ (0 != (priority & GNUNET_MQ_PREF_OUT_OF_ORDER)) &&
+ (NULL != dir->env_head) &&
+ (0 == (GNUNET_MQ_env_get_options (dir->env_head) &
+ GNUNET_MQ_PREF_LOW_LATENCY)))
+ GNUNET_MQ_dll_insert_head (&dir->env_head, &dir->env_tail, env);
+ else
+ GNUNET_MQ_dll_insert_tail (&dir->env_head, &dir->env_tail, env);
cur_buffers++;
- GNUNET_STATISTICS_set (stats,
- "# buffer use",
- cur_buffers,
- GNUNET_NO);
+ GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
/* Clean up 'rung' if now empty (and not head) */
- if ( (NULL == rung->rd_head) &&
- (rung != rung_head) )
+ if ((NULL == rung->rd_head) && (rung != rung_head))
{
- GNUNET_CONTAINER_DLL_remove (rung_head,
- rung_tail,
- rung);
+ GNUNET_CONTAINER_DLL_remove (rung_head, rung_tail, rung);
GNUNET_free (rung);
}
}
@@ -537,18 +503,14 @@ destroy_direction (struct RouteDirection *dir)
"# messages dropped due to route destruction",
1,
GNUNET_NO);
- discard_buffer (dir,
- env);
+ discard_buffer (dir, env);
}
if (NULL != dir->mqm)
{
- GCP_request_mq_cancel (dir->mqm,
- NULL);
+ GCP_request_mq_cancel (dir->mqm, NULL);
dir->mqm = NULL;
}
- GNUNET_CONTAINER_DLL_remove (rung_head->rd_head,
- rung_head->rd_tail,
- dir);
+ GNUNET_CONTAINER_DLL_remove (rung_head->rd_head, rung_head->rd_tail, dir);
}
@@ -562,15 +524,15 @@ destroy_route (struct CadetRoute *route)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Destroying route from %s to %s of connection %s\n",
- GNUNET_i2s (GCP_get_id (route->prev.hop)),
+ GNUNET_i2s (GCP_get_id (route->prev.hop)),
GNUNET_i2s2 (GCP_get_id (route->next.hop)),
GNUNET_sh2s (&route->cid.connection_of_tunnel));
- GNUNET_assert (route ==
- GNUNET_CONTAINER_heap_remove_node (route->hn));
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multishortmap_remove (routes,
- &route->cid.connection_of_tunnel,
- route));
+ GNUNET_assert (route == GNUNET_CONTAINER_heap_remove_node (route->hn));
+ GNUNET_assert (
+ GNUNET_YES ==
+ GNUNET_CONTAINER_multishortmap_remove (routes,
+ &route->cid.connection_of_tunnel,
+ route));
GNUNET_STATISTICS_set (stats,
"# routes",
GNUNET_CONTAINER_multishortmap_size (routes),
@@ -607,15 +569,13 @@ send_broken (struct RouteDirection *target,
GNUNET_i2s2 (peer2),
GNUNET_sh2s (&cid->connection_of_tunnel));
- env = GNUNET_MQ_msg (bm,
- GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+ env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
bm->cid = *cid;
if (NULL != peer1)
bm->peer1 = *peer1;
if (NULL != peer2)
bm->peer2 = *peer2;
- GCP_request_mq_cancel (target->mqm,
- env);
+ GCP_request_mq_cancel (target->mqm, env);
target->mqm = NULL;
}
@@ -635,33 +595,22 @@ timeout_cb (void *cls)
struct GNUNET_TIME_Absolute exp;
timeout_task = NULL;
- linger = GNUNET_TIME_relative_multiply (keepalive_period,
- 3);
+ linger = GNUNET_TIME_relative_multiply (keepalive_period, 3);
while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap)))
{
- exp = GNUNET_TIME_absolute_add (r->last_use,
- linger);
+ exp = GNUNET_TIME_absolute_add (r->last_use, linger);
if (0 != GNUNET_TIME_absolute_get_remaining (exp).rel_value_us)
{
/* Route not yet timed out, wait until it does. */
- timeout_task = GNUNET_SCHEDULER_add_at (exp,
- &timeout_cb,
- NULL);
+ timeout_task = GNUNET_SCHEDULER_add_at (exp, &timeout_cb, NULL);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Sending BROKEN due to timeout (%s was last use, %s linger)\n",
- GNUNET_STRINGS_absolute_time_to_string (r->last_use),
- GNUNET_STRINGS_relative_time_to_string (linger,
- GNUNET_YES));
- send_broken (&r->prev,
- &r->cid,
- NULL,
- NULL);
- send_broken (&r->next,
- &r->cid,
- NULL,
- NULL);
+ "Sending BROKEN due to timeout (%s was last use, %s linger)\n",
+ GNUNET_STRINGS_absolute_time_to_string (r->last_use),
+ GNUNET_STRINGS_relative_time_to_string (linger, GNUNET_YES));
+ send_broken (&r->prev, &r->cid, NULL, NULL);
+ send_broken (&r->next, &r->cid, NULL, NULL);
destroy_route (r);
}
/* No more routes left, so no need for a #timeout_task */
@@ -681,8 +630,7 @@ timeout_cb (void *cls)
* and the last envelope was discarded
*/
static void
-dir_ready_cb (void *cls,
- int ready)
+dir_ready_cb (void *cls, int ready)
{
struct RouteDirection *dir = cls;
struct CadetRoute *route = dir->my_route;
@@ -695,28 +643,18 @@ dir_ready_cb (void *cls,
dir->is_ready = GNUNET_YES;
if (NULL != (env = dir->env_head))
{
- GNUNET_MQ_dll_remove (&dir->env_head,
- &dir->env_tail,
- env);
+ GNUNET_MQ_dll_remove (&dir->env_head, &dir->env_tail, env);
cur_buffers--;
- GNUNET_STATISTICS_set (stats,
- "# buffer use",
- cur_buffers,
- GNUNET_NO);
+ GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO);
lower_rung (dir);
dir->is_ready = GNUNET_NO;
- GCP_send (dir->mqm,
- env);
+ GCP_send (dir->mqm, env);
}
return;
}
odir = (dir == &route->next) ? &route->prev : &route->next;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending BROKEN due to MQ going down\n");
- send_broken (&route->next,
- &route->cid,
- GCP_get_id (odir->hop),
- &my_full_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending BROKEN due to MQ going down\n");
+ send_broken (&route->next, &route->cid, GCP_get_id (odir->hop), &my_full_id);
destroy_route (route);
}
@@ -735,12 +673,8 @@ dir_init (struct RouteDirection *dir,
{
dir->hop = hop;
dir->my_route = route;
- dir->mqm = GCP_request_mq (hop,
- &dir_ready_cb,
- dir);
- GNUNET_CONTAINER_DLL_insert (rung_head->rd_head,
- rung_head->rd_tail,
- dir);
+ dir->mqm = GCP_request_mq (hop, &dir_ready_cb, dir);
+ GNUNET_CONTAINER_DLL_insert (rung_head->rd_head, rung_head->rd_tail, dir);
dir->rung = rung_head;
GNUNET_assert (GNUNET_YES == dir->is_ready);
}
@@ -757,21 +691,20 @@ dir_init (struct RouteDirection *dir,
* or NULL.
*/
static void
-send_broken_without_mqm (struct CadetPeer *target,
- const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
- const struct GNUNET_PeerIdentity *failure_at)
+send_broken_without_mqm (
+ struct CadetPeer *target,
+ const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
+ const struct GNUNET_PeerIdentity *failure_at)
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_ConnectionBrokenMessage *bm;
- env = GNUNET_MQ_msg (bm,
- GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
+ env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
bm->cid = *cid;
bm->peer1 = my_full_id;
if (NULL != failure_at)
bm->peer2 = *failure_at;
- GCP_send_ooo (target,
- env);
+ GCP_send_ooo (target, env);
}
@@ -782,12 +715,14 @@ send_broken_without_mqm (struct CadetPeer *target,
* @param msg Message itself.
*/
static void
-handle_connection_create (void *cls,
- const struct GNUNET_CADET_ConnectionCreateMessage *msg)
+handle_connection_create (
+ void *cls,
+ const struct GNUNET_CADET_ConnectionCreateMessage *msg)
{
struct CadetPeer *sender = cls;
struct CadetPeer *next;
- const struct GNUNET_PeerIdentity *pids = (const struct GNUNET_PeerIdentity *) &msg[1];
+ const struct GNUNET_PeerIdentity *pids =
+ (const struct GNUNET_PeerIdentity *) &msg[1];
struct CadetRoute *route;
uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
unsigned int path_length;
@@ -810,20 +745,19 @@ handle_connection_create (void *cls,
{
struct GNUNET_CONTAINER_MultiPeerMap *map;
- map = GNUNET_CONTAINER_multipeermap_create (path_length * 2,
- GNUNET_YES);
+ map = GNUNET_CONTAINER_multipeermap_create (path_length * 2, GNUNET_YES);
GNUNET_assert (NULL != map);
- for (unsigned int i=0;i<path_length;i++)
+ for (unsigned int i = 0; i < path_length; i++)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"CADET_CONNECTION_CREATE has peer %s at offset %u\n",
GNUNET_i2s (&pids[i]),
i);
- if (GNUNET_SYSERR ==
- GNUNET_CONTAINER_multipeermap_put (map,
- &pids[i],
- NULL,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
+ if (GNUNET_SYSERR == GNUNET_CONTAINER_multipeermap_put (
+ map,
+ &pids[i],
+ NULL,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
{
/* bogus request */
GNUNET_CONTAINER_multipeermap_destroy (map);
@@ -836,9 +770,8 @@ handle_connection_create (void *cls,
GNUNET_CONTAINER_multipeermap_destroy (map);
}
/* Initiator is at offset 0, find us */
- for (off=1;off<path_length;off++)
- if (0 == GNUNET_memcmp (&my_full_id,
- &pids[off]))
+ for (off = 1; off < path_length; off++)
+ if (0 == GNUNET_memcmp (&my_full_id, &pids[off]))
break;
if (off == path_length)
{
@@ -848,16 +781,14 @@ handle_connection_create (void *cls,
return;
}
/* Check previous hop */
- if (sender != GCP_get (&pids[off - 1],
- GNUNET_NO))
+ if (sender != GCP_get (&pids[off - 1], GNUNET_NO))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Dropping CADET_CONNECTION_CREATE without sender at previous hop in the path\n");
GNUNET_break_op (0);
return;
}
- if (NULL !=
- (route = get_route (&msg->cid)))
+ if (NULL != (route = get_route (&msg->cid)))
{
/* Duplicate CREATE, pass it on, previous one might have been lost! */
@@ -867,7 +798,8 @@ handle_connection_create (void *cls,
route_message (sender,
&msg->cid,
&msg->header,
- GNUNET_MQ_PRIO_CRITICAL_CONTROL);
+ GNUNET_MQ_PRIO_CRITICAL_CONTROL |
+ GNUNET_MQ_PREF_LOW_LATENCY);
return;
}
if (off == path_length - 1)
@@ -887,17 +819,14 @@ handle_connection_create (void *cls,
return;
}
- origin = GCP_get (&pids[0],
- GNUNET_YES);
+ origin = GCP_get (&pids[0], GNUNET_YES);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"I am destination for CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
GCP_2s (origin),
GNUNET_sh2s (&msg->cid.connection_of_tunnel));
- path = GCPP_get_path_from_route (path_length - 1,
- pids);
+ path = GCPP_get_path_from_route (path_length - 1, pids);
if (GNUNET_OK !=
- GCT_add_inbound_connection (GCP_get_tunnel (origin,
- GNUNET_YES),
+ GCT_add_inbound_connection (GCP_get_tunnel (origin, GNUNET_YES),
&msg->cid,
path))
{
@@ -908,18 +837,14 @@ handle_connection_create (void *cls,
GCP_2s (sender),
GNUNET_sh2s (&msg->cid.connection_of_tunnel),
GCPP_2s (path));
- send_broken_without_mqm (sender,
- &msg->cid,
- NULL);
+ send_broken_without_mqm (sender, &msg->cid, NULL);
return;
}
return;
}
/* We are merely a hop on the way, check if we can support the route */
- next = GCP_get (&pids[off + 1],
- GNUNET_NO);
- if ( (NULL == next) ||
- (GNUNET_NO == GCP_has_core_connection (next)) )
+ next = GCP_get (&pids[off + 1], GNUNET_NO);
+ if ((NULL == next) || (GNUNET_NO == GCP_has_core_connection (next)))
{
/* unworkable, send back BROKEN notification */
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -928,9 +853,7 @@ handle_connection_create (void *cls,
GNUNET_sh2s (&msg->cid.connection_of_tunnel),
GNUNET_i2s (&pids[off + 1]),
off + 1);
- send_broken_without_mqm (sender,
- &msg->cid,
- &pids[off + 1]);
+ send_broken_without_mqm (sender, &msg->cid, &pids[off + 1]);
return;
}
if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes))
@@ -939,9 +862,7 @@ handle_connection_create (void *cls,
"Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n",
GCP_2s (sender),
GNUNET_sh2s (&msg->cid.connection_of_tunnel));
- send_broken_without_mqm (sender,
- &msg->cid,
- &pids[off - 1]);
+ send_broken_without_mqm (sender, &msg->cid, &pids[off - 1]);
return;
}
@@ -955,17 +876,14 @@ handle_connection_create (void *cls,
route = GNUNET_new (struct CadetRoute);
route->cid = msg->cid;
route->last_use = GNUNET_TIME_absolute_get ();
- dir_init (&route->prev,
- route,
- sender);
- dir_init (&route->next,
- route,
- next);
+ dir_init (&route->prev, route, sender);
+ dir_init (&route->next, route, next);
GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multishortmap_put (routes,
- &route->cid.connection_of_tunnel,
- route,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ GNUNET_CONTAINER_multishortmap_put (
+ routes,
+ &route->cid.connection_of_tunnel,
+ route,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
GNUNET_STATISTICS_set (stats,
"# routes",
GNUNET_CONTAINER_multishortmap_size (routes),
@@ -974,15 +892,16 @@ handle_connection_create (void *cls,
route,
route->last_use.abs_value_us);
if (NULL == timeout_task)
- timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period,
- 3),
- &timeout_cb,
- NULL);
+ timeout_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period,
+ 3),
+ &timeout_cb,
+ NULL);
/* also pass CREATE message along to next hop */
route_message (sender,
&msg->cid,
&msg->header,
- GNUNET_MQ_PRIO_CRITICAL_CONTROL);
+ GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
}
@@ -993,8 +912,9 @@ handle_connection_create (void *cls,
* @param msg Message itself.
*/
static void
-handle_connection_create_ack (void *cls,
- const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
+handle_connection_create_ack (
+ void *cls,
+ const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
@@ -1005,12 +925,9 @@ handle_connection_create_ack (void *cls,
{
/* verify ACK came from the right direction */
unsigned int len;
- struct CadetPeerPath *path = GCC_get_path (cc,
- &len);
+ struct CadetPeerPath *path = GCC_get_path (cc, &len);
- if (peer !=
- GCPP_get_peer_at_offset (path,
- 0))
+ if (peer != GCPP_get_peer_at_offset (path, 0))
{
/* received ACK from unexpected direction, ignore! */
GNUNET_break_op (0);
@@ -1027,7 +944,7 @@ handle_connection_create_ack (void *cls,
route_message (peer,
&msg->cid,
&msg->header,
- GNUNET_MQ_PRIO_CRITICAL_CONTROL);
+ GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
}
@@ -1039,8 +956,9 @@ handle_connection_create_ack (void *cls,
* @deprecated duplicate logic with #handle_destroy(); dedup!
*/
static void
-handle_connection_broken (void *cls,
- const struct GNUNET_CADET_ConnectionBrokenMessage *msg)
+handle_connection_broken (
+ void *cls,
+ const struct GNUNET_CADET_ConnectionBrokenMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
@@ -1052,12 +970,9 @@ handle_connection_broken (void *cls,
{
/* verify message came from the right direction */
unsigned int len;
- struct CadetPeerPath *path = GCC_get_path (cc,
- &len);
+ struct CadetPeerPath *path = GCC_get_path (cc, &len);
- if (peer !=
- GCPP_get_peer_at_offset (path,
- 0))
+ if (peer != GCPP_get_peer_at_offset (path, 0))
{
/* received message from unexpected direction, ignore! */
GNUNET_break_op (0);
@@ -1076,7 +991,7 @@ handle_connection_broken (void *cls,
route_message (peer,
&msg->cid,
&msg->header,
- GNUNET_MQ_PREF_NO_BUFFER);
+ GNUNET_MQ_PREF_LOW_LATENCY | GNUNET_MQ_PRIO_CRITICAL_CONTROL);
route = get_route (&msg->cid);
if (NULL != route)
destroy_route (route);
@@ -1091,8 +1006,9 @@ handle_connection_broken (void *cls,
* @param msg Message itself.
*/
static void
-handle_connection_destroy (void *cls,
- const struct GNUNET_CADET_ConnectionDestroyMessage *msg)
+handle_connection_destroy (
+ void *cls,
+ const struct GNUNET_CADET_ConnectionDestroyMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
@@ -1104,12 +1020,9 @@ handle_connection_destroy (void *cls,
{
/* verify message came from the right direction */
unsigned int len;
- struct CadetPeerPath *path = GCC_get_path (cc,
- &len);
+ struct CadetPeerPath *path = GCC_get_path (cc, &len);
- if (peer !=
- GCPP_get_peer_at_offset (path,
- 0))
+ if (peer != GCPP_get_peer_at_offset (path, 0))
{
/* received message from unexpected direction, ignore! */
GNUNET_break_op (0);
@@ -1130,7 +1043,7 @@ handle_connection_destroy (void *cls,
route_message (peer,
&msg->cid,
&msg->header,
- GNUNET_MQ_PREF_NO_BUFFER);
+ GNUNET_MQ_PREF_LOW_LATENCY | GNUNET_MQ_PRIO_CRITICAL_CONTROL);
route = get_route (&msg->cid);
if (NULL != route)
destroy_route (route);
@@ -1162,19 +1075,15 @@ handle_tunnel_kx (void *cls,
{
/* verify message came from the right direction */
unsigned int len;
- struct CadetPeerPath *path = GCC_get_path (cc,
- &len);
+ struct CadetPeerPath *path = GCC_get_path (cc, &len);
- if (peer !=
- GCPP_get_peer_at_offset (path,
- 0))
+ if (peer != GCPP_get_peer_at_offset (path, 0))
{
/* received message from unexpected direction, ignore! */
GNUNET_break_op (0);
return;
}
- GCC_handle_kx (cc,
- msg);
+ GCC_handle_kx (cc, msg);
return;
}
@@ -1182,7 +1091,7 @@ handle_tunnel_kx (void *cls,
route_message (peer,
&msg->cid,
&msg->header,
- GNUNET_MQ_PRIO_CRITICAL_CONTROL);
+ GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
}
@@ -1193,8 +1102,9 @@ handle_tunnel_kx (void *cls,
* @param msg Message itself.
*/
static void
-handle_tunnel_kx_auth (void *cls,
- const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
+handle_tunnel_kx_auth (
+ void *cls,
+ const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
@@ -1205,19 +1115,15 @@ handle_tunnel_kx_auth (void *cls,
{
/* verify message came from the right direction */
unsigned int len;
- struct CadetPeerPath *path = GCC_get_path (cc,
- &len);
+ struct CadetPeerPath *path = GCC_get_path (cc, &len);
- if (peer !=
- GCPP_get_peer_at_offset (path,
- 0))
+ if (peer != GCPP_get_peer_at_offset (path, 0))
{
/* received message from unexpected direction, ignore! */
GNUNET_break_op (0);
return;
}
- GCC_handle_kx_auth (cc,
- msg);
+ GCC_handle_kx_auth (cc, msg);
return;
}
@@ -1225,7 +1131,7 @@ handle_tunnel_kx_auth (void *cls,
route_message (peer,
&msg->kx.cid,
&msg->kx.header,
- GNUNET_MQ_PRIO_CRITICAL_CONTROL);
+ GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY);
}
@@ -1264,26 +1170,19 @@ handle_tunnel_encrypted (void *cls,
{
/* verify message came from the right direction */
unsigned int len;
- struct CadetPeerPath *path = GCC_get_path (cc,
- &len);
+ struct CadetPeerPath *path = GCC_get_path (cc, &len);
- if (peer !=
- GCPP_get_peer_at_offset (path,
- 0))
+ if (peer != GCPP_get_peer_at_offset (path, 0))
{
/* received message from unexpected direction, ignore! */
GNUNET_break_op (0);
return;
}
- GCC_handle_encrypted (cc,
- msg);
+ GCC_handle_encrypted (cc, msg);
return;
}
/* We're just an intermediary peer, route the message along its path */
- route_message (peer,
- &msg->cid,
- &msg->header,
- GNUNET_MQ_PRIO_CRITICAL_CONTROL);
+ route_message (peer, &msg->cid, &msg->header, GNUNET_MQ_PRIO_BEST_EFFORT);
}
@@ -1300,17 +1199,14 @@ handle_tunnel_encrypted (void *cls,
* @param my_identity ID of this peer, NULL if we failed
*/
static void
-core_init_cb (void *cls,
- const struct GNUNET_PeerIdentity *my_identity)
+core_init_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
{
if (NULL == my_identity)
{
GNUNET_break (0);
return;
}
- GNUNET_break (0 ==
- GNUNET_memcmp (my_identity,
- &my_full_id));
+ GNUNET_break (0 == GNUNET_memcmp (my_identity, &my_full_id));
}
@@ -1330,10 +1226,8 @@ core_connect_cb (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"CORE connection to peer %s was established.\n",
GNUNET_i2s (peer));
- cp = GCP_get (peer,
- GNUNET_YES);
- GCP_set_mq (cp,
- mq);
+ cp = GCP_get (peer, GNUNET_YES);
+ GCP_set_mq (cp, mq);
return cp;
}
@@ -1354,8 +1248,7 @@ core_disconnect_cb (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG,
"CORE connection to peer %s went down.\n",
GNUNET_i2s (peer));
- GCP_set_mq (cp,
- NULL);
+ GCP_set_mq (cp, NULL);
}
@@ -1367,52 +1260,48 @@ core_disconnect_cb (void *cls,
void
GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
{
- struct GNUNET_MQ_MessageHandler handlers[] = {
- GNUNET_MQ_hd_var_size (connection_create,
- GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
- struct GNUNET_CADET_ConnectionCreateMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (connection_create_ack,
- GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
- struct GNUNET_CADET_ConnectionCreateAckMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (connection_broken,
- GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
- struct GNUNET_CADET_ConnectionBrokenMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (connection_destroy,
- GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
- struct GNUNET_CADET_ConnectionDestroyMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (tunnel_kx,
- GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
- struct GNUNET_CADET_TunnelKeyExchangeMessage,
- NULL),
- GNUNET_MQ_hd_fixed_size (tunnel_kx_auth,
- GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH,
- struct GNUNET_CADET_TunnelKeyExchangeAuthMessage,
- NULL),
- GNUNET_MQ_hd_var_size (tunnel_encrypted,
- GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
- struct GNUNET_CADET_TunnelEncryptedMessage,
- NULL),
- GNUNET_MQ_handler_end ()
- };
-
- if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_number (c,
- "CADET",
- "MAX_ROUTES",
- &max_routes))
- max_routes = 5000;
- if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_number (c,
- "CADET",
- "MAX_MSGS_QUEUE",
- &max_buffers))
- max_buffers = 10000;
- routes = GNUNET_CONTAINER_multishortmap_create (1024,
- GNUNET_NO);
+ struct GNUNET_MQ_MessageHandler handlers[] =
+ {GNUNET_MQ_hd_var_size (connection_create,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
+ struct GNUNET_CADET_ConnectionCreateMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (connection_create_ack,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
+ struct GNUNET_CADET_ConnectionCreateAckMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (connection_broken,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
+ struct GNUNET_CADET_ConnectionBrokenMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (connection_destroy,
+ GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
+ struct GNUNET_CADET_ConnectionDestroyMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (tunnel_kx,
+ GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
+ struct GNUNET_CADET_TunnelKeyExchangeMessage,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (tunnel_kx_auth,
+ GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH,
+ struct GNUNET_CADET_TunnelKeyExchangeAuthMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (tunnel_encrypted,
+ GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
+ struct GNUNET_CADET_TunnelEncryptedMessage,
+ NULL),
+ GNUNET_MQ_handler_end ()};
+
+ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c,
+ "CADET",
+ "MAX_ROUTES",
+ &max_routes))
+ max_routes = 5000;
+ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c,
+ "CADET",
+ "MAX_MSGS_QUEUE",
+ &max_buffers))
+ max_buffers = 10000;
+ routes = GNUNET_CONTAINER_multishortmap_create (1024, GNUNET_NO);
route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
core = GNUNET_CORE_connect (c,
NULL,