From a7ccf828ae4f7e306ffe3e7efebc0e678615f6c5 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 17 Jul 2019 10:50:45 +0200 Subject: remove duplication MQ options, make conversation build --- src/cadet/gnunet-service-cadet_core.c | 501 +++++++++++++--------------------- 1 file changed, 195 insertions(+), 306 deletions(-) (limited to 'src/cadet/gnunet-service-cadet_core.c') diff --git a/src/cadet/gnunet-service-cadet_core.c b/src/cadet/gnunet-service-cadet_core.c index 220a2b3cd..ec70a968b 100644 --- a/src/cadet/gnunet-service-cadet_core.c +++ b/src/cadet/gnunet-service-cadet_core.c @@ -11,7 +11,7 @@ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - + You should have received a copy of the GNU Affero General Public License along with this program. If not, see . @@ -39,7 +39,7 @@ #include "gnunet_statistics_service.h" #include "cadet_protocol.h" -#define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__) +#define LOG(level, ...) GNUNET_log_from (level, "cadet-cor", __VA_ARGS__) /** * Information we keep per direction for a route. @@ -138,7 +138,6 @@ struct RouteDirection * Is @e mqm currently ready for transmission? */ int is_ready; - }; @@ -177,8 +176,6 @@ struct CadetRoute * Position of this route in the #route_heap. */ struct GNUNET_CONTAINER_HeapNode *hn; - - }; @@ -258,24 +255,17 @@ lower_rung (struct RouteDirection *dir) struct Rung *rung = dir->rung; struct Rung *prev; - GNUNET_CONTAINER_DLL_remove (rung->rd_head, - rung->rd_tail, - dir); + GNUNET_CONTAINER_DLL_remove (rung->rd_head, rung->rd_tail, dir); prev = rung->prev; GNUNET_assert (NULL != prev); if (prev->rung_off != rung->rung_off - 1) { prev = GNUNET_new (struct Rung); prev->rung_off = rung->rung_off - 1; - GNUNET_CONTAINER_DLL_insert_after (rung_head, - rung_tail, - rung->prev, - prev); + GNUNET_CONTAINER_DLL_insert_after (rung_head, rung_tail, rung->prev, prev); } GNUNET_assert (NULL != prev); - GNUNET_CONTAINER_DLL_insert (prev->rd_head, - prev->rd_tail, - dir); + GNUNET_CONTAINER_DLL_insert (prev->rd_head, prev->rd_tail, dir); dir->rung = prev; } @@ -288,19 +278,13 @@ lower_rung (struct RouteDirection *dir) * @param env envelope to discard */ static void -discard_buffer (struct RouteDirection *dir, - struct GNUNET_MQ_Envelope *env) +discard_buffer (struct RouteDirection *dir, struct GNUNET_MQ_Envelope *env) { - GNUNET_MQ_dll_remove (&dir->env_head, - &dir->env_tail, - env); + GNUNET_MQ_dll_remove (&dir->env_head, &dir->env_tail, env); cur_buffers--; GNUNET_MQ_discard (env); lower_rung (dir); - GNUNET_STATISTICS_set (stats, - "# buffer use", - cur_buffers, - GNUNET_NO); + GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO); } @@ -322,12 +306,9 @@ discard_all_from_rung_tail () "# messages dropped due to full buffer", 1, GNUNET_NO); - discard_buffer (dir, - dir->env_head); + discard_buffer (dir, dir->env_head); } - GNUNET_CONTAINER_DLL_remove (rung_head, - rung_tail, - tail); + GNUNET_CONTAINER_DLL_remove (rung_head, rung_tail, tail); GNUNET_free (tail); } @@ -345,7 +326,7 @@ static void route_message (struct CadetPeer *prev, const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, const struct GNUNET_MessageHeader *msg, - const enum GNUNET_MQ_PriorityPreferences priority) + const enum GNUNET_MQ_PriorityPreferences priority) { struct CadetRoute *route; struct RouteDirection *dir; @@ -371,17 +352,14 @@ route_message (struct CadetPeer *prev, /* No need to respond to these! */ return; } - env = GNUNET_MQ_msg (bm, - GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); + env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); bm->cid = *cid; bm->peer1 = my_full_id; - GCP_send_ooo (prev, - env); + GCP_send_ooo (prev, env); return; } route->last_use = GNUNET_TIME_absolute_get (); - GNUNET_CONTAINER_heap_update_cost (route->hn, - route->last_use.abs_value_us); + GNUNET_CONTAINER_heap_update_cost (route->hn, route->last_use.abs_value_us); dir = (prev == route->prev.hop) ? &route->next : &route->prev; if (GNUNET_YES == dir->is_ready) { @@ -392,27 +370,24 @@ route_message (struct CadetPeer *prev, GNUNET_i2s (GCP_get_id (dir->hop)), GNUNET_sh2s (&cid->connection_of_tunnel)); dir->is_ready = GNUNET_NO; - GCP_send (dir->mqm, - GNUNET_MQ_msg_copy (msg)); + GCP_send (dir->mqm, GNUNET_MQ_msg_copy (msg)); return; } - /* Check if buffering is disallowed, and if so, make sure we only queue - one message per direction. */ - if ( (0 != (priority & GNUNET_MQ_PREF_NO_BUFFER)) && - (NULL != dir->env_head) ) - discard_buffer (dir, - dir->env_head); + /* Check if low latency is required and if the previous message was + unreliable; if so, make sure we only queue one message per + direction (no buffering). */ + if ((0 != (priority & GNUNET_MQ_PREF_LOW_LATENCY)) && + (NULL != dir->env_head) && + (0 == + (GNUNET_MQ_env_get_options (dir->env_head) & GNUNET_MQ_PREF_UNRELIABLE))) + discard_buffer (dir, dir->env_head); /* Check for duplicates */ - for (const struct GNUNET_MQ_Envelope *env = dir->env_head; - NULL != env; + for (const struct GNUNET_MQ_Envelope *env = dir->env_head; NULL != env; env = GNUNET_MQ_env_next (env)) { const struct GNUNET_MessageHeader *hdr = GNUNET_MQ_env_get_msg (env); - if ( (hdr->size == msg->size) && - (0 == memcmp (hdr, - msg, - ntohs (msg->size))) ) + if ((hdr->size == msg->size) && (0 == memcmp (hdr, msg, ntohs (msg->size)))) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Received duplicate of message already in buffer, dropping\n"); @@ -443,31 +418,22 @@ route_message (struct CadetPeer *prev, "# messages dropped due to full buffer", 1, GNUNET_NO); - discard_buffer (dir, - dir->env_head); + discard_buffer (dir, dir->env_head); rung = dir->rung; } } /* remove 'dir' from current rung */ - GNUNET_CONTAINER_DLL_remove (rung->rd_head, - rung->rd_tail, - dir); + GNUNET_CONTAINER_DLL_remove (rung->rd_head, rung->rd_tail, dir); /* make 'nxt' point to the next higher rung, create if necessary */ nxt = rung->next; - if ( (NULL == nxt) || - (rung->rung_off + 1 != nxt->rung_off) ) + if ((NULL == nxt) || (rung->rung_off + 1 != nxt->rung_off)) { nxt = GNUNET_new (struct Rung); nxt->rung_off = rung->rung_off + 1; - GNUNET_CONTAINER_DLL_insert_after (rung_head, - rung_tail, - rung, - nxt); + GNUNET_CONTAINER_DLL_insert_after (rung_head, rung_tail, rung, nxt); } /* insert 'dir' into next higher rung */ - GNUNET_CONTAINER_DLL_insert (nxt->rd_head, - nxt->rd_tail, - dir); + GNUNET_CONTAINER_DLL_insert (nxt->rd_head, nxt->rd_tail, dir); dir->rung = nxt; /* add message into 'dir' buffer */ @@ -478,21 +444,21 @@ route_message (struct CadetPeer *prev, GNUNET_i2s (GCP_get_id (dir->hop)), GNUNET_sh2s (&cid->connection_of_tunnel)); env = GNUNET_MQ_msg_copy (msg); - GNUNET_MQ_dll_insert_tail (&dir->env_head, - &dir->env_tail, - env); + GNUNET_MQ_env_set_options (env, priority); + if ((0 != (priority & GNUNET_MQ_PREF_LOW_LATENCY)) && + (0 != (priority & GNUNET_MQ_PREF_OUT_OF_ORDER)) && + (NULL != dir->env_head) && + (0 == (GNUNET_MQ_env_get_options (dir->env_head) & + GNUNET_MQ_PREF_LOW_LATENCY))) + GNUNET_MQ_dll_insert_head (&dir->env_head, &dir->env_tail, env); + else + GNUNET_MQ_dll_insert_tail (&dir->env_head, &dir->env_tail, env); cur_buffers++; - GNUNET_STATISTICS_set (stats, - "# buffer use", - cur_buffers, - GNUNET_NO); + GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO); /* Clean up 'rung' if now empty (and not head) */ - if ( (NULL == rung->rd_head) && - (rung != rung_head) ) + if ((NULL == rung->rd_head) && (rung != rung_head)) { - GNUNET_CONTAINER_DLL_remove (rung_head, - rung_tail, - rung); + GNUNET_CONTAINER_DLL_remove (rung_head, rung_tail, rung); GNUNET_free (rung); } } @@ -537,18 +503,14 @@ destroy_direction (struct RouteDirection *dir) "# messages dropped due to route destruction", 1, GNUNET_NO); - discard_buffer (dir, - env); + discard_buffer (dir, env); } if (NULL != dir->mqm) { - GCP_request_mq_cancel (dir->mqm, - NULL); + GCP_request_mq_cancel (dir->mqm, NULL); dir->mqm = NULL; } - GNUNET_CONTAINER_DLL_remove (rung_head->rd_head, - rung_head->rd_tail, - dir); + GNUNET_CONTAINER_DLL_remove (rung_head->rd_head, rung_head->rd_tail, dir); } @@ -562,15 +524,15 @@ destroy_route (struct CadetRoute *route) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying route from %s to %s of connection %s\n", - GNUNET_i2s (GCP_get_id (route->prev.hop)), + GNUNET_i2s (GCP_get_id (route->prev.hop)), GNUNET_i2s2 (GCP_get_id (route->next.hop)), GNUNET_sh2s (&route->cid.connection_of_tunnel)); - GNUNET_assert (route == - GNUNET_CONTAINER_heap_remove_node (route->hn)); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multishortmap_remove (routes, - &route->cid.connection_of_tunnel, - route)); + GNUNET_assert (route == GNUNET_CONTAINER_heap_remove_node (route->hn)); + GNUNET_assert ( + GNUNET_YES == + GNUNET_CONTAINER_multishortmap_remove (routes, + &route->cid.connection_of_tunnel, + route)); GNUNET_STATISTICS_set (stats, "# routes", GNUNET_CONTAINER_multishortmap_size (routes), @@ -607,15 +569,13 @@ send_broken (struct RouteDirection *target, GNUNET_i2s2 (peer2), GNUNET_sh2s (&cid->connection_of_tunnel)); - env = GNUNET_MQ_msg (bm, - GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); + env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); bm->cid = *cid; if (NULL != peer1) bm->peer1 = *peer1; if (NULL != peer2) bm->peer2 = *peer2; - GCP_request_mq_cancel (target->mqm, - env); + GCP_request_mq_cancel (target->mqm, env); target->mqm = NULL; } @@ -635,33 +595,22 @@ timeout_cb (void *cls) struct GNUNET_TIME_Absolute exp; timeout_task = NULL; - linger = GNUNET_TIME_relative_multiply (keepalive_period, - 3); + linger = GNUNET_TIME_relative_multiply (keepalive_period, 3); while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap))) { - exp = GNUNET_TIME_absolute_add (r->last_use, - linger); + exp = GNUNET_TIME_absolute_add (r->last_use, linger); if (0 != GNUNET_TIME_absolute_get_remaining (exp).rel_value_us) { /* Route not yet timed out, wait until it does. */ - timeout_task = GNUNET_SCHEDULER_add_at (exp, - &timeout_cb, - NULL); + timeout_task = GNUNET_SCHEDULER_add_at (exp, &timeout_cb, NULL); return; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Sending BROKEN due to timeout (%s was last use, %s linger)\n", - GNUNET_STRINGS_absolute_time_to_string (r->last_use), - GNUNET_STRINGS_relative_time_to_string (linger, - GNUNET_YES)); - send_broken (&r->prev, - &r->cid, - NULL, - NULL); - send_broken (&r->next, - &r->cid, - NULL, - NULL); + "Sending BROKEN due to timeout (%s was last use, %s linger)\n", + GNUNET_STRINGS_absolute_time_to_string (r->last_use), + GNUNET_STRINGS_relative_time_to_string (linger, GNUNET_YES)); + send_broken (&r->prev, &r->cid, NULL, NULL); + send_broken (&r->next, &r->cid, NULL, NULL); destroy_route (r); } /* No more routes left, so no need for a #timeout_task */ @@ -681,8 +630,7 @@ timeout_cb (void *cls) * and the last envelope was discarded */ static void -dir_ready_cb (void *cls, - int ready) +dir_ready_cb (void *cls, int ready) { struct RouteDirection *dir = cls; struct CadetRoute *route = dir->my_route; @@ -695,28 +643,18 @@ dir_ready_cb (void *cls, dir->is_ready = GNUNET_YES; if (NULL != (env = dir->env_head)) { - GNUNET_MQ_dll_remove (&dir->env_head, - &dir->env_tail, - env); + GNUNET_MQ_dll_remove (&dir->env_head, &dir->env_tail, env); cur_buffers--; - GNUNET_STATISTICS_set (stats, - "# buffer use", - cur_buffers, - GNUNET_NO); + GNUNET_STATISTICS_set (stats, "# buffer use", cur_buffers, GNUNET_NO); lower_rung (dir); dir->is_ready = GNUNET_NO; - GCP_send (dir->mqm, - env); + GCP_send (dir->mqm, env); } return; } odir = (dir == &route->next) ? &route->prev : &route->next; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending BROKEN due to MQ going down\n"); - send_broken (&route->next, - &route->cid, - GCP_get_id (odir->hop), - &my_full_id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending BROKEN due to MQ going down\n"); + send_broken (&route->next, &route->cid, GCP_get_id (odir->hop), &my_full_id); destroy_route (route); } @@ -735,12 +673,8 @@ dir_init (struct RouteDirection *dir, { dir->hop = hop; dir->my_route = route; - dir->mqm = GCP_request_mq (hop, - &dir_ready_cb, - dir); - GNUNET_CONTAINER_DLL_insert (rung_head->rd_head, - rung_head->rd_tail, - dir); + dir->mqm = GCP_request_mq (hop, &dir_ready_cb, dir); + GNUNET_CONTAINER_DLL_insert (rung_head->rd_head, rung_head->rd_tail, dir); dir->rung = rung_head; GNUNET_assert (GNUNET_YES == dir->is_ready); } @@ -757,21 +691,20 @@ dir_init (struct RouteDirection *dir, * or NULL. */ static void -send_broken_without_mqm (struct CadetPeer *target, - const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, - const struct GNUNET_PeerIdentity *failure_at) +send_broken_without_mqm ( + struct CadetPeer *target, + const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid, + const struct GNUNET_PeerIdentity *failure_at) { struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_ConnectionBrokenMessage *bm; - env = GNUNET_MQ_msg (bm, - GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); + env = GNUNET_MQ_msg (bm, GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN); bm->cid = *cid; bm->peer1 = my_full_id; if (NULL != failure_at) bm->peer2 = *failure_at; - GCP_send_ooo (target, - env); + GCP_send_ooo (target, env); } @@ -782,12 +715,14 @@ send_broken_without_mqm (struct CadetPeer *target, * @param msg Message itself. */ static void -handle_connection_create (void *cls, - const struct GNUNET_CADET_ConnectionCreateMessage *msg) +handle_connection_create ( + void *cls, + const struct GNUNET_CADET_ConnectionCreateMessage *msg) { struct CadetPeer *sender = cls; struct CadetPeer *next; - const struct GNUNET_PeerIdentity *pids = (const struct GNUNET_PeerIdentity *) &msg[1]; + const struct GNUNET_PeerIdentity *pids = + (const struct GNUNET_PeerIdentity *) &msg[1]; struct CadetRoute *route; uint16_t size = ntohs (msg->header.size) - sizeof (*msg); unsigned int path_length; @@ -810,20 +745,19 @@ handle_connection_create (void *cls, { struct GNUNET_CONTAINER_MultiPeerMap *map; - map = GNUNET_CONTAINER_multipeermap_create (path_length * 2, - GNUNET_YES); + map = GNUNET_CONTAINER_multipeermap_create (path_length * 2, GNUNET_YES); GNUNET_assert (NULL != map); - for (unsigned int i=0;icid))) + if (NULL != (route = get_route (&msg->cid))) { /* Duplicate CREATE, pass it on, previous one might have been lost! */ @@ -867,7 +798,8 @@ handle_connection_create (void *cls, route_message (sender, &msg->cid, &msg->header, - GNUNET_MQ_PRIO_CRITICAL_CONTROL); + GNUNET_MQ_PRIO_CRITICAL_CONTROL | + GNUNET_MQ_PREF_LOW_LATENCY); return; } if (off == path_length - 1) @@ -887,17 +819,14 @@ handle_connection_create (void *cls, return; } - origin = GCP_get (&pids[0], - GNUNET_YES); + origin = GCP_get (&pids[0], GNUNET_YES); LOG (GNUNET_ERROR_TYPE_DEBUG, "I am destination for CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n", GCP_2s (origin), GNUNET_sh2s (&msg->cid.connection_of_tunnel)); - path = GCPP_get_path_from_route (path_length - 1, - pids); + path = GCPP_get_path_from_route (path_length - 1, pids); if (GNUNET_OK != - GCT_add_inbound_connection (GCP_get_tunnel (origin, - GNUNET_YES), + GCT_add_inbound_connection (GCP_get_tunnel (origin, GNUNET_YES), &msg->cid, path)) { @@ -908,18 +837,14 @@ handle_connection_create (void *cls, GCP_2s (sender), GNUNET_sh2s (&msg->cid.connection_of_tunnel), GCPP_2s (path)); - send_broken_without_mqm (sender, - &msg->cid, - NULL); + send_broken_without_mqm (sender, &msg->cid, NULL); return; } return; } /* We are merely a hop on the way, check if we can support the route */ - next = GCP_get (&pids[off + 1], - GNUNET_NO); - if ( (NULL == next) || - (GNUNET_NO == GCP_has_core_connection (next)) ) + next = GCP_get (&pids[off + 1], GNUNET_NO); + if ((NULL == next) || (GNUNET_NO == GCP_has_core_connection (next))) { /* unworkable, send back BROKEN notification */ LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -928,9 +853,7 @@ handle_connection_create (void *cls, GNUNET_sh2s (&msg->cid.connection_of_tunnel), GNUNET_i2s (&pids[off + 1]), off + 1); - send_broken_without_mqm (sender, - &msg->cid, - &pids[off + 1]); + send_broken_without_mqm (sender, &msg->cid, &pids[off + 1]); return; } if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes)) @@ -939,9 +862,7 @@ handle_connection_create (void *cls, "Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n", GCP_2s (sender), GNUNET_sh2s (&msg->cid.connection_of_tunnel)); - send_broken_without_mqm (sender, - &msg->cid, - &pids[off - 1]); + send_broken_without_mqm (sender, &msg->cid, &pids[off - 1]); return; } @@ -955,17 +876,14 @@ handle_connection_create (void *cls, route = GNUNET_new (struct CadetRoute); route->cid = msg->cid; route->last_use = GNUNET_TIME_absolute_get (); - dir_init (&route->prev, - route, - sender); - dir_init (&route->next, - route, - next); + dir_init (&route->prev, route, sender); + dir_init (&route->next, route, next); GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multishortmap_put (routes, - &route->cid.connection_of_tunnel, - route, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_CONTAINER_multishortmap_put ( + routes, + &route->cid.connection_of_tunnel, + route, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); GNUNET_STATISTICS_set (stats, "# routes", GNUNET_CONTAINER_multishortmap_size (routes), @@ -974,15 +892,16 @@ handle_connection_create (void *cls, route, route->last_use.abs_value_us); if (NULL == timeout_task) - timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period, - 3), - &timeout_cb, - NULL); + timeout_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period, + 3), + &timeout_cb, + NULL); /* also pass CREATE message along to next hop */ route_message (sender, &msg->cid, &msg->header, - GNUNET_MQ_PRIO_CRITICAL_CONTROL); + GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY); } @@ -993,8 +912,9 @@ handle_connection_create (void *cls, * @param msg Message itself. */ static void -handle_connection_create_ack (void *cls, - const struct GNUNET_CADET_ConnectionCreateAckMessage *msg) +handle_connection_create_ack ( + void *cls, + const struct GNUNET_CADET_ConnectionCreateAckMessage *msg) { struct CadetPeer *peer = cls; struct CadetConnection *cc; @@ -1005,12 +925,9 @@ handle_connection_create_ack (void *cls, { /* verify ACK came from the right direction */ unsigned int len; - struct CadetPeerPath *path = GCC_get_path (cc, - &len); + struct CadetPeerPath *path = GCC_get_path (cc, &len); - if (peer != - GCPP_get_peer_at_offset (path, - 0)) + if (peer != GCPP_get_peer_at_offset (path, 0)) { /* received ACK from unexpected direction, ignore! */ GNUNET_break_op (0); @@ -1027,7 +944,7 @@ handle_connection_create_ack (void *cls, route_message (peer, &msg->cid, &msg->header, - GNUNET_MQ_PRIO_CRITICAL_CONTROL); + GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY); } @@ -1039,8 +956,9 @@ handle_connection_create_ack (void *cls, * @deprecated duplicate logic with #handle_destroy(); dedup! */ static void -handle_connection_broken (void *cls, - const struct GNUNET_CADET_ConnectionBrokenMessage *msg) +handle_connection_broken ( + void *cls, + const struct GNUNET_CADET_ConnectionBrokenMessage *msg) { struct CadetPeer *peer = cls; struct CadetConnection *cc; @@ -1052,12 +970,9 @@ handle_connection_broken (void *cls, { /* verify message came from the right direction */ unsigned int len; - struct CadetPeerPath *path = GCC_get_path (cc, - &len); + struct CadetPeerPath *path = GCC_get_path (cc, &len); - if (peer != - GCPP_get_peer_at_offset (path, - 0)) + if (peer != GCPP_get_peer_at_offset (path, 0)) { /* received message from unexpected direction, ignore! */ GNUNET_break_op (0); @@ -1076,7 +991,7 @@ handle_connection_broken (void *cls, route_message (peer, &msg->cid, &msg->header, - GNUNET_MQ_PREF_NO_BUFFER); + GNUNET_MQ_PREF_LOW_LATENCY | GNUNET_MQ_PRIO_CRITICAL_CONTROL); route = get_route (&msg->cid); if (NULL != route) destroy_route (route); @@ -1091,8 +1006,9 @@ handle_connection_broken (void *cls, * @param msg Message itself. */ static void -handle_connection_destroy (void *cls, - const struct GNUNET_CADET_ConnectionDestroyMessage *msg) +handle_connection_destroy ( + void *cls, + const struct GNUNET_CADET_ConnectionDestroyMessage *msg) { struct CadetPeer *peer = cls; struct CadetConnection *cc; @@ -1104,12 +1020,9 @@ handle_connection_destroy (void *cls, { /* verify message came from the right direction */ unsigned int len; - struct CadetPeerPath *path = GCC_get_path (cc, - &len); + struct CadetPeerPath *path = GCC_get_path (cc, &len); - if (peer != - GCPP_get_peer_at_offset (path, - 0)) + if (peer != GCPP_get_peer_at_offset (path, 0)) { /* received message from unexpected direction, ignore! */ GNUNET_break_op (0); @@ -1130,7 +1043,7 @@ handle_connection_destroy (void *cls, route_message (peer, &msg->cid, &msg->header, - GNUNET_MQ_PREF_NO_BUFFER); + GNUNET_MQ_PREF_LOW_LATENCY | GNUNET_MQ_PRIO_CRITICAL_CONTROL); route = get_route (&msg->cid); if (NULL != route) destroy_route (route); @@ -1162,19 +1075,15 @@ handle_tunnel_kx (void *cls, { /* verify message came from the right direction */ unsigned int len; - struct CadetPeerPath *path = GCC_get_path (cc, - &len); + struct CadetPeerPath *path = GCC_get_path (cc, &len); - if (peer != - GCPP_get_peer_at_offset (path, - 0)) + if (peer != GCPP_get_peer_at_offset (path, 0)) { /* received message from unexpected direction, ignore! */ GNUNET_break_op (0); return; } - GCC_handle_kx (cc, - msg); + GCC_handle_kx (cc, msg); return; } @@ -1182,7 +1091,7 @@ handle_tunnel_kx (void *cls, route_message (peer, &msg->cid, &msg->header, - GNUNET_MQ_PRIO_CRITICAL_CONTROL); + GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY); } @@ -1193,8 +1102,9 @@ handle_tunnel_kx (void *cls, * @param msg Message itself. */ static void -handle_tunnel_kx_auth (void *cls, - const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg) +handle_tunnel_kx_auth ( + void *cls, + const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg) { struct CadetPeer *peer = cls; struct CadetConnection *cc; @@ -1205,19 +1115,15 @@ handle_tunnel_kx_auth (void *cls, { /* verify message came from the right direction */ unsigned int len; - struct CadetPeerPath *path = GCC_get_path (cc, - &len); + struct CadetPeerPath *path = GCC_get_path (cc, &len); - if (peer != - GCPP_get_peer_at_offset (path, - 0)) + if (peer != GCPP_get_peer_at_offset (path, 0)) { /* received message from unexpected direction, ignore! */ GNUNET_break_op (0); return; } - GCC_handle_kx_auth (cc, - msg); + GCC_handle_kx_auth (cc, msg); return; } @@ -1225,7 +1131,7 @@ handle_tunnel_kx_auth (void *cls, route_message (peer, &msg->kx.cid, &msg->kx.header, - GNUNET_MQ_PRIO_CRITICAL_CONTROL); + GNUNET_MQ_PRIO_CRITICAL_CONTROL | GNUNET_MQ_PREF_LOW_LATENCY); } @@ -1264,26 +1170,19 @@ handle_tunnel_encrypted (void *cls, { /* verify message came from the right direction */ unsigned int len; - struct CadetPeerPath *path = GCC_get_path (cc, - &len); + struct CadetPeerPath *path = GCC_get_path (cc, &len); - if (peer != - GCPP_get_peer_at_offset (path, - 0)) + if (peer != GCPP_get_peer_at_offset (path, 0)) { /* received message from unexpected direction, ignore! */ GNUNET_break_op (0); return; } - GCC_handle_encrypted (cc, - msg); + GCC_handle_encrypted (cc, msg); return; } /* We're just an intermediary peer, route the message along its path */ - route_message (peer, - &msg->cid, - &msg->header, - GNUNET_MQ_PRIO_CRITICAL_CONTROL); + route_message (peer, &msg->cid, &msg->header, GNUNET_MQ_PRIO_BEST_EFFORT); } @@ -1300,17 +1199,14 @@ handle_tunnel_encrypted (void *cls, * @param my_identity ID of this peer, NULL if we failed */ static void -core_init_cb (void *cls, - const struct GNUNET_PeerIdentity *my_identity) +core_init_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity) { if (NULL == my_identity) { GNUNET_break (0); return; } - GNUNET_break (0 == - GNUNET_memcmp (my_identity, - &my_full_id)); + GNUNET_break (0 == GNUNET_memcmp (my_identity, &my_full_id)); } @@ -1330,10 +1226,8 @@ core_connect_cb (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "CORE connection to peer %s was established.\n", GNUNET_i2s (peer)); - cp = GCP_get (peer, - GNUNET_YES); - GCP_set_mq (cp, - mq); + cp = GCP_get (peer, GNUNET_YES); + GCP_set_mq (cp, mq); return cp; } @@ -1354,8 +1248,7 @@ core_disconnect_cb (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "CORE connection to peer %s went down.\n", GNUNET_i2s (peer)); - GCP_set_mq (cp, - NULL); + GCP_set_mq (cp, NULL); } @@ -1367,52 +1260,48 @@ core_disconnect_cb (void *cls, void GCO_init (const struct GNUNET_CONFIGURATION_Handle *c) { - struct GNUNET_MQ_MessageHandler handlers[] = { - GNUNET_MQ_hd_var_size (connection_create, - GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, - struct GNUNET_CADET_ConnectionCreateMessage, - NULL), - GNUNET_MQ_hd_fixed_size (connection_create_ack, - GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK, - struct GNUNET_CADET_ConnectionCreateAckMessage, - NULL), - GNUNET_MQ_hd_fixed_size (connection_broken, - GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN, - struct GNUNET_CADET_ConnectionBrokenMessage, - NULL), - GNUNET_MQ_hd_fixed_size (connection_destroy, - GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY, - struct GNUNET_CADET_ConnectionDestroyMessage, - NULL), - GNUNET_MQ_hd_fixed_size (tunnel_kx, - GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX, - struct GNUNET_CADET_TunnelKeyExchangeMessage, - NULL), - GNUNET_MQ_hd_fixed_size (tunnel_kx_auth, - GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH, - struct GNUNET_CADET_TunnelKeyExchangeAuthMessage, - NULL), - GNUNET_MQ_hd_var_size (tunnel_encrypted, - GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED, - struct GNUNET_CADET_TunnelEncryptedMessage, - NULL), - GNUNET_MQ_handler_end () - }; - - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_number (c, - "CADET", - "MAX_ROUTES", - &max_routes)) - max_routes = 5000; - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_number (c, - "CADET", - "MAX_MSGS_QUEUE", - &max_buffers)) - max_buffers = 10000; - routes = GNUNET_CONTAINER_multishortmap_create (1024, - GNUNET_NO); + struct GNUNET_MQ_MessageHandler handlers[] = + {GNUNET_MQ_hd_var_size (connection_create, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE, + struct GNUNET_CADET_ConnectionCreateMessage, + NULL), + GNUNET_MQ_hd_fixed_size (connection_create_ack, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK, + struct GNUNET_CADET_ConnectionCreateAckMessage, + NULL), + GNUNET_MQ_hd_fixed_size (connection_broken, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN, + struct GNUNET_CADET_ConnectionBrokenMessage, + NULL), + GNUNET_MQ_hd_fixed_size (connection_destroy, + GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY, + struct GNUNET_CADET_ConnectionDestroyMessage, + NULL), + GNUNET_MQ_hd_fixed_size (tunnel_kx, + GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX, + struct GNUNET_CADET_TunnelKeyExchangeMessage, + NULL), + GNUNET_MQ_hd_fixed_size (tunnel_kx_auth, + GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH, + struct GNUNET_CADET_TunnelKeyExchangeAuthMessage, + NULL), + GNUNET_MQ_hd_var_size (tunnel_encrypted, + GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED, + struct GNUNET_CADET_TunnelEncryptedMessage, + NULL), + GNUNET_MQ_handler_end ()}; + + if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c, + "CADET", + "MAX_ROUTES", + &max_routes)) + max_routes = 5000; + if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (c, + "CADET", + "MAX_MSGS_QUEUE", + &max_buffers)) + max_buffers = 10000; + routes = GNUNET_CONTAINER_multishortmap_create (1024, GNUNET_NO); route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); core = GNUNET_CORE_connect (c, NULL, -- cgit v1.2.3