From b7585254768daa9b64fc33fb2562293c6fe3fc16 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 22 Feb 2015 22:10:15 +0000 Subject: simplify logic, do not report monitoring events for sessions in destroy, indentation, doxygen -- may help/fix #3591 --- src/transport/gnunet-service-transport_plugins.c | 3 +- .../gnunet-service-transport_validation.c | 32 +- src/transport/gnunet-transport.c | 135 ++++++--- src/transport/plugin_transport_tcp.c | 3 - src/transport/plugin_transport_udp.c | 331 +++++++++++++-------- src/transport/transport_api_monitor_plugins.c | 23 +- 6 files changed, 312 insertions(+), 215 deletions(-) diff --git a/src/transport/gnunet-service-transport_plugins.c b/src/transport/gnunet-service-transport_plugins.c index cbfa50afd..5cdc55518 100644 --- a/src/transport/gnunet-service-transport_plugins.c +++ b/src/transport/gnunet-service-transport_plugins.c @@ -440,7 +440,8 @@ GST_plugins_monitor_subscribe (GNUNET_TRANSPORT_SessionInfoCallback cb, GNUNET_break (0); else pos->api->setup_monitor (pos->api->cls, - cb, cb_cls); + cb, + cb_cls); } diff --git a/src/transport/gnunet-service-transport_validation.c b/src/transport/gnunet-service-transport_validation.c index de6edc90a..ee8cd9308 100644 --- a/src/transport/gnunet-service-transport_validation.c +++ b/src/transport/gnunet-service-transport_validation.c @@ -1524,9 +1524,13 @@ GST_validation_handle_pong (const struct GNUNET_PeerIdentity *sender, /* build HELLO to store in PEERINFO */ ve->copied = GNUNET_NO; hello = GNUNET_HELLO_create (&ve->address->peer.public_key, - &add_valid_peer_address, ve, + &add_valid_peer_address, + ve, GNUNET_NO); - GNUNET_PEERINFO_add_peer (GST_peerinfo, hello, NULL, NULL); + GNUNET_PEERINFO_add_peer (GST_peerinfo, + hello, + NULL, + NULL); GNUNET_free (hello); return GNUNET_OK; } @@ -1565,26 +1569,10 @@ GST_validation_handle_hello (const struct GNUNET_MessageHeader *hello) /* got our own HELLO, how boring */ return GNUNET_OK; } - if (GNUNET_NO == - GNUNET_CONTAINER_multipeermap_contains (validation_map, - &pid)) - { - /* Add peer identity without addresses to peerinfo service */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding HELLO without addresses for peer `%s'\n", - GNUNET_i2s (&pid)); - h = GNUNET_HELLO_create (&pid.public_key, NULL, NULL, friend); - GNUNET_PEERINFO_add_peer (GST_peerinfo, h, NULL, NULL); - - GNUNET_free (h); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Validation received HELLO message for peer `%s' with size %u, checking for new addresses\n", - GNUNET_i2s (&pid), - ntohs (hello->size)); - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Validation received HELLO message for peer `%s' with size %u, checking for new addresses\n", + GNUNET_i2s (&pid), + ntohs (hello->size)); GNUNET_assert (NULL == GNUNET_HELLO_iterate_addresses (hm, GNUNET_NO, diff --git a/src/transport/gnunet-transport.c b/src/transport/gnunet-transport.c index bf586b7f0..f4895e154 100644 --- a/src/transport/gnunet-transport.c +++ b/src/transport/gnunet-transport.c @@ -143,11 +143,6 @@ struct PeerResolutionContext */ struct PeerResolutionContext *prev; - /** - * The peer id - */ - struct GNUNET_PeerIdentity id; - /** * address to resolve */ @@ -516,6 +511,7 @@ shutdown_task (void *cls, struct GNUNET_TIME_Relative duration; struct ValidationResolutionContext *cur; struct ValidationResolutionContext *next; + struct PeerResolutionContext *rc; end = NULL; if (NULL != op_timeout) @@ -550,12 +546,23 @@ shutdown_task (void *cls, next = cur->next; GNUNET_TRANSPORT_address_to_string_cancel (cur->asc); - GNUNET_CONTAINER_DLL_remove (vc_head, vc_tail, cur); + GNUNET_CONTAINER_DLL_remove (vc_head, + vc_tail, + cur); GNUNET_free (cur->transport); GNUNET_HELLO_address_free (cur->addrcp); GNUNET_free (cur); } - + while (NULL != (rc = rc_head)) + { + GNUNET_CONTAINER_DLL_remove (rc_head, + rc_tail, + rc); + GNUNET_TRANSPORT_address_to_string_cancel (rc->asc); + GNUNET_free (rc->transport); + GNUNET_free (rc->addrcp); + GNUNET_free (rc); + } if (NULL != th) { GNUNET_TRANSPORT_notify_transmit_ready_cancel (th); @@ -649,7 +656,7 @@ operation_timeout (void *cls, static void -run_nat_test (); +run_nat_test (void); /** @@ -720,6 +727,7 @@ display_test_result (struct TestContext *tc, run_nat_test (); } + /** * Function called by NAT to report the outcome of the nat-test. * Clean up and update GUI. @@ -733,7 +741,8 @@ result_callback (void *cls, { struct TestContext *tc = cls; - display_test_result (tc, result); + display_test_result (tc, + result); } @@ -965,11 +974,12 @@ run_nat_test () (uint16_t) head->adv_port); head->tst = GNUNET_NAT_test_start (cfg, - (0 == strcasecmp (head->name, "udp")) ? GNUNET_NO : GNUNET_YES, - (uint16_t) head->bnd_port, - (uint16_t) head->adv_port, - TIMEOUT, - &result_callback, head); + (0 == strcasecmp (head->name, "udp")) + ? GNUNET_NO : GNUNET_YES, + (uint16_t) head->bnd_port, + (uint16_t) head->adv_port, + TIMEOUT, + &result_callback, head); } @@ -1270,9 +1280,17 @@ notify_receive (void *cls, } +/** + * Convert address to a printable format. + * + * @param address the address + * @param numeric #GNUNET_YES to convert to numeric format, #GNUNET_NO + * to try to use reverse DNS + * @param state state the peer is in + * @param state_timeout when will the peer's state expire + */ static void -resolve_peer_address (const struct GNUNET_PeerIdentity *id, - const struct GNUNET_HELLO_Address *address, +resolve_peer_address (const struct GNUNET_HELLO_Address *address, int numeric, enum GNUNET_TRANSPORT_PeerState state, struct GNUNET_TIME_Absolute state_timeout); @@ -1339,10 +1357,10 @@ process_peer_string (void *cls, { FPRINTF (stderr, "Failed to convert address for peer `%s' plugin `%s' length %u to string \n", - GNUNET_i2s (&rc->id), + GNUNET_i2s (&rc->addrcp->peer), rc->addrcp->transport_name, (unsigned int) rc->addrcp->address_length); - print_info (&rc->id, + print_info (&rc->addrcp->peer, rc->transport, NULL, rc->state, @@ -1352,7 +1370,7 @@ process_peer_string (void *cls, } if (GNUNET_OK == res) { - print_info (&rc->id, + print_info (&rc->addrcp->peer, rc->transport, address, rc->state, @@ -1365,6 +1383,7 @@ process_peer_string (void *cls, } /* NULL == address, last call, we are done */ + rc->asc = NULL; GNUNET_assert (address_resolutions > 0); address_resolutions--; if (GNUNET_NO == rc->printed) @@ -1375,15 +1394,14 @@ process_peer_string (void *cls, (note: this should not be needed, as transport should fallback to numeric conversion if DNS takes too long) */ - resolve_peer_address (&rc->id, - rc->addrcp, + resolve_peer_address (rc->addrcp, GNUNET_YES, rc->state, rc->state_timeout); } else { - print_info (&rc->id, + print_info (&rc->addrcp->peer, rc->transport, NULL, rc->state, @@ -1407,14 +1425,24 @@ process_peer_string (void *cls, op_timeout = NULL; } ret = 0; - end = GNUNET_SCHEDULER_add_now (&shutdown_task, NULL); + end = GNUNET_SCHEDULER_add_now (&shutdown_task, + NULL); } } +/** + * Convert address to a printable format and print it + * together with the given state data. + * + * @param address the address + * @param numeric #GNUNET_YES to convert to numeric format, #GNUNET_NO + * to try to use reverse DNS + * @param state state the peer is in + * @param state_timeout when will the peer's state expire + */ static void -resolve_peer_address (const struct GNUNET_PeerIdentity *id, - const struct GNUNET_HELLO_Address *address, +resolve_peer_address (const struct GNUNET_HELLO_Address *address, int numeric, enum GNUNET_TRANSPORT_PeerState state, struct GNUNET_TIME_Absolute state_timeout) @@ -1422,12 +1450,11 @@ resolve_peer_address (const struct GNUNET_PeerIdentity *id, struct PeerResolutionContext *rc; rc = GNUNET_new (struct PeerResolutionContext); - GNUNET_assert(NULL != rc); - GNUNET_CONTAINER_DLL_insert(rc_head, rc_tail, rc); + GNUNET_CONTAINER_DLL_insert (rc_head, + rc_tail, + rc); address_resolutions++; - - rc->id = *id; - rc->transport = GNUNET_strdup(address->transport_name); + rc->transport = GNUNET_strdup (address->transport_name); rc->addrcp = GNUNET_HELLO_address_copy (address); rc->printed = GNUNET_NO; rc->state = state; @@ -1485,9 +1512,16 @@ process_peer_iteration_cb (void *cls, address->transport_name); if (NULL != address) - resolve_peer_address (peer, address, numeric, state, state_timeout); + resolve_peer_address (address, + numeric, + state, + state_timeout); else - print_info (peer, NULL, NULL, state, state_timeout); + print_info (peer, + NULL, + NULL, + state, + state_timeout); } @@ -1653,13 +1687,12 @@ plugin_monitoring_cb (void *cls, /** * Function called with information about a peers * - * @param cls closure + * @param cls closure, NULL * @param peer identity of the peer, NULL for final callback when operation done * @param address binary address used to communicate with this peer, * NULL on disconnect or when done * @param state current state this peer is in * @param state_timeout time out for the current state - * */ static void process_peer_monitoring_cb (void *cls, @@ -1684,22 +1717,28 @@ process_peer_monitoring_cb (void *cls, &operation_timeout, NULL); - if (NULL == (m = GNUNET_CONTAINER_multipeermap_get (monitored_peers, peer))) + if (NULL == (m = GNUNET_CONTAINER_multipeermap_get (monitored_peers, + peer))) { m = GNUNET_new (struct MonitoredPeer); - GNUNET_CONTAINER_multipeermap_put (monitored_peers, peer, - m, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + GNUNET_CONTAINER_multipeermap_put (monitored_peers, + peer, + m, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } else { if ( (m->state == state) && - (m->state_timeout.abs_value_us == state_timeout.abs_value_us) && - ((NULL == address) && (NULL == m->address))) + (m->state_timeout.abs_value_us == state_timeout.abs_value_us) && + (NULL == address) && + (NULL == m->address) ) { return; /* No real change */ } - if ( (m->state == state) && ((NULL != address) && (NULL != m->address)) && - (0 == GNUNET_HELLO_address_cmp(m->address, address))) + if ( (m->state == state) && + (NULL != address) && + (NULL != m->address) && + (0 == GNUNET_HELLO_address_cmp(m->address, address)) ) return; /* No real change */ } @@ -1714,8 +1753,7 @@ process_peer_monitoring_cb (void *cls, m->state_timeout = state_timeout; if (NULL != address) - resolve_peer_address (peer, - m->address, + resolve_peer_address (m->address, numeric, m->state, m->state_timeout); @@ -1970,11 +2008,14 @@ testservice_task (void *cls, } else if (monitor_connections) /* -m: List information about peers continuously */ { - monitored_peers = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); + monitored_peers = GNUNET_CONTAINER_multipeermap_create (10, + GNUNET_NO); address_resolution_in_progress = GNUNET_YES; - pic = GNUNET_TRANSPORT_monitor_peers (cfg, (NULL == cpid) ? NULL : &pid, - GNUNET_NO, TIMEOUT, - &process_peer_monitoring_cb, (void *) cfg); + pic = GNUNET_TRANSPORT_monitor_peers (cfg, + (NULL == cpid) ? NULL : &pid, + GNUNET_NO, + TIMEOUT, + &process_peer_monitoring_cb, NULL); } else if (monitor_plugins) /* -P: List information about plugins continuously */ { diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index 34d3881f6..874811a7b 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c @@ -2385,9 +2385,6 @@ handle_tcp_welcome (void *cls, session->address, session, session->scope); - notify_session_monitor (plugin, - session, - GNUNET_TRANSPORT_SS_INIT); } else { diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index 477efc0a1..38210f322 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -508,7 +508,9 @@ notify_session_monitor (struct Plugin *plugin, struct GNUNET_TRANSPORT_SessionInfo info; if (NULL == plugin->sic) - return; + return; + if (GNUNET_YES == session->in_destroy) + return; /* already destroyed, just RC>0 left-over actions */ memset (&info, 0, sizeof (info)); info.state = state; info.is_inbound = GNUNET_SYSERR; /* hard to say */ @@ -935,7 +937,8 @@ call_continuation (struct UDP_MessageWrapper *udpw, LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling continuation for %u byte message to `%s' with result %s\n", - udpw->payload_size, GNUNET_i2s (&udpw->session->target), + udpw->payload_size, + GNUNET_i2s (&udpw->session->target), (GNUNET_OK == result) ? "OK" : "SYSERR"); if (udpw->msg_size >= udpw->payload_size) @@ -952,63 +955,97 @@ call_continuation (struct UDP_MessageWrapper *udpw, if (NULL != udpw->cont) { /* Transport continuation */ - udpw->cont (udpw->cont_cls, &udpw->session->target, result, - udpw->payload_size, udpw->msg_size); + udpw->cont (udpw->cont_cls, + &udpw->session->target, + result, + udpw->payload_size, + udpw->msg_size); } GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, messages, sent, success", 1, GNUNET_NO); + "# UDP, unfragmented msgs, messages, sent, success", + 1, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, bytes payload, sent, success", - udpw->payload_size, GNUNET_NO); + "# UDP, unfragmented msgs, bytes payload, sent, success", + udpw->payload_size, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, bytes overhead, sent, success", overhead, - GNUNET_NO); + "# UDP, unfragmented msgs, bytes overhead, sent, success", + overhead, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO); + "# UDP, total, bytes overhead, sent", + overhead, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes payload, sent", udpw->payload_size, GNUNET_NO); + "# UDP, total, bytes payload, sent", + udpw->payload_size, + GNUNET_NO); break; case UMT_MSG_FRAGMENTED_COMPLETE: GNUNET_assert(NULL != udpw->frag_ctx); if (udpw->frag_ctx->cont != NULL ) - udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, - GNUNET_OK, udpw->frag_ctx->payload_size, - udpw->frag_ctx->on_wire_size); + udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, + &udpw->session->target, + GNUNET_OK, + udpw->frag_ctx->payload_size, + udpw->frag_ctx->on_wire_size); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, messages, sent, success", 1, GNUNET_NO); + "# UDP, fragmented msgs, messages, sent, success", + 1, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes payload, sent, success", - udpw->payload_size, GNUNET_NO); + "# UDP, fragmented msgs, bytes payload, sent, success", + udpw->payload_size, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes overhead, sent, success", overhead, - GNUNET_NO); + "# UDP, fragmented msgs, bytes overhead, sent, success", + overhead, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO); + "# UDP, total, bytes overhead, sent", + overhead, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes payload, sent", udpw->payload_size, GNUNET_NO); + "# UDP, total, bytes payload, sent", + udpw->payload_size, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, messages, pending", -1, GNUNET_NO); + "# UDP, fragmented msgs, messages, pending", + -1, + GNUNET_NO); break; case UMT_MSG_FRAGMENTED: /* Fragmented message: enqueue next fragment */ if (NULL != udpw->cont) - udpw->cont (udpw->cont_cls, &udpw->session->target, result, - udpw->payload_size, udpw->msg_size); + udpw->cont (udpw->cont_cls, + &udpw->session->target, + result, + udpw->payload_size, + udpw->msg_size); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, fragments, sent, success", 1, GNUNET_NO); + "# UDP, fragmented msgs, fragments, sent, success", + 1, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, fragments bytes, sent, success", - udpw->msg_size, GNUNET_NO); + "# UDP, fragmented msgs, fragments bytes, sent, success", + udpw->msg_size, + GNUNET_NO); break; case UMT_MSG_ACK: /* No continuation */ GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, ACK msgs, messages, sent, success", 1, GNUNET_NO); + "# UDP, ACK msgs, messages, sent, success", + 1, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, ACK msgs, bytes overhead, sent, success", overhead, - GNUNET_NO); + "# UDP, ACK msgs, bytes overhead, sent, success", + overhead, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes overhead, sent", overhead, GNUNET_NO); + "# UDP, total, bytes overhead, sent", + overhead, + GNUNET_NO); break; default: GNUNET_break(0); @@ -1021,50 +1058,71 @@ call_continuation (struct UDP_MessageWrapper *udpw, case UMT_MSG_UNFRAGMENTED: /* Unfragmented message: failed to send */ if (NULL != udpw->cont) - udpw->cont (udpw->cont_cls, &udpw->session->target, result, - udpw->payload_size, overhead); + udpw->cont (udpw->cont_cls, + &udpw->session->target, + result, + udpw->payload_size, + overhead); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, messages, sent, failure", 1, GNUNET_NO); + "# UDP, unfragmented msgs, messages, sent, failure", + 1, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, bytes payload, sent, failure", - udpw->payload_size, GNUNET_NO); + "# UDP, unfragmented msgs, bytes payload, sent, failure", + udpw->payload_size, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, unfragmented msgs, bytes overhead, sent, failure", overhead, - GNUNET_NO); + "# UDP, unfragmented msgs, bytes overhead, sent, failure", + overhead, + GNUNET_NO); break; case UMT_MSG_FRAGMENTED_COMPLETE: - GNUNET_assert(NULL != udpw->frag_ctx); - if (udpw->frag_ctx->cont != NULL ) - udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, &udpw->session->target, - GNUNET_SYSERR, udpw->frag_ctx->payload_size, - udpw->frag_ctx->on_wire_size); + GNUNET_assert (NULL != udpw->frag_ctx); + if (udpw->frag_ctx->cont != NULL) + udpw->frag_ctx->cont (udpw->frag_ctx->cont_cls, + &udpw->session->target, + GNUNET_SYSERR, + udpw->frag_ctx->payload_size, + udpw->frag_ctx->on_wire_size); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, messages, sent, failure", 1, GNUNET_NO); + "# UDP, fragmented msgs, messages, sent, failure", + 1, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes payload, sent, failure", - udpw->payload_size, GNUNET_NO); + "# UDP, fragmented msgs, bytes payload, sent, failure", + udpw->payload_size, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes payload, sent, failure", overhead, - GNUNET_NO); + "# UDP, fragmented msgs, bytes payload, sent, failure", + overhead, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, bytes payload, sent, failure", overhead, - GNUNET_NO); + "# UDP, fragmented msgs, bytes payload, sent, failure", + overhead, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, messages, pending", -1, GNUNET_NO); + "# UDP, fragmented msgs, messages, pending", + -1, + GNUNET_NO); break; case UMT_MSG_FRAGMENTED: - GNUNET_assert(NULL != udpw->frag_ctx); + GNUNET_assert (NULL != udpw->frag_ctx); /* Fragmented message: failed to send */ GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, fragments, sent, failure", 1, GNUNET_NO); + "# UDP, fragmented msgs, fragments, sent, failure", + 1, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, fragmented msgs, fragments bytes, sent, failure", - udpw->msg_size, GNUNET_NO); + "# UDP, fragmented msgs, fragments bytes, sent, failure", + udpw->msg_size, + GNUNET_NO); break; case UMT_MSG_ACK: /* ACK message: failed to send */ GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, ACK msgs, messages, sent, failure", 1, GNUNET_NO); + "# UDP, ACK msgs, messages, sent, failure", + 1, + GNUNET_NO); break; default: GNUNET_break(0); @@ -1224,7 +1282,14 @@ dequeue (struct Plugin *plugin, /** - * FIXME. + * We have completed our (attempt) to transmit a message + * that had to be fragmented -- either because we got an + * ACK saying that all fragments were received, or because + * of timeout / disconnect. Clean up our state. + * + * @param fc fragmentation context to clean up + * @param result #GNUNET_OK if we succeeded (got ACK), + * #GNUNET_SYSERR if the transmission failed */ static void fragmented_message_done (struct UDP_FragmentationContext *fc, @@ -1258,11 +1323,13 @@ fragmented_message_done (struct UDP_FragmentationContext *fc, while (NULL != udpw) { tmp = udpw->next; - if ((udpw->frag_ctx != NULL )&& (udpw->frag_ctx == s->frag_ctx)){ - dequeue (plugin, udpw); - call_continuation (udpw, GNUNET_SYSERR); - GNUNET_free (udpw); - } + if ( (udpw->frag_ctx != NULL) && + (udpw->frag_ctx == s->frag_ctx) ) + { + dequeue (plugin, udpw); + call_continuation (udpw, GNUNET_SYSERR); + GNUNET_free (udpw); + } udpw = tmp; } } @@ -1558,35 +1625,10 @@ reschedule_session_timeout (struct Session *s) } -/** - * FIXME. - */ -static struct Session * -create_session (struct Plugin *plugin, - const struct GNUNET_HELLO_Address *address) -{ - struct Session *s; - - s = GNUNET_new (struct Session); - s->plugin = plugin; - s->address = GNUNET_HELLO_address_copy (address); - s->target = address->peer; - s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, - 250); - s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS; - s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS; - s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO; - s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT); - s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, - &session_timeout, s); - return s; -} - - /** * Function obtain the network type for a session * - * @param cls closure ('struct Plugin*') + * @param cls closure (`struct Plugin *`) * @param session the session * @return the network type */ @@ -1740,7 +1782,18 @@ udp_plugin_create_session (void *cls, struct Plugin *plugin = cls; struct Session *s; - s = create_session (plugin, address); + s = GNUNET_new (struct Session); + s->plugin = plugin; + s->address = GNUNET_HELLO_address_copy (address); + s->target = address->peer; + s->last_expected_ack_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + 250); + s->last_expected_msg_delay = GNUNET_TIME_UNIT_MILLISECONDS; + s->flow_delay_from_other_peer = GNUNET_TIME_UNIT_ZERO_ABS; + s->flow_delay_for_other_peer = GNUNET_TIME_UNIT_ZERO; + s->timeout = GNUNET_TIME_relative_to_absolute (UDP_SESSION_TIME_OUT); + s->timeout_task = GNUNET_SCHEDULER_add_delayed (UDP_SESSION_TIME_OUT, + &session_timeout, s); s->scope = network_type; LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1768,7 +1821,7 @@ udp_plugin_create_session (void *cls, * notify the plugin that a session is still active and in use and * therefore the session timeout for this session has to be updated * - * @param cls closure + * @param cls closure with the `struct Plugin` * @param peer which peer was the session for * @param session which session is being updated */ @@ -1796,7 +1849,7 @@ udp_plugin_update_session_timeout (void *cls, * Creates a new outbound session the transport service will use to * send data to the peer. * - * @param cls the plugin + * @param cls the `struct Plugin *` * @param address the address * @return the session or NULL of max connections exceeded */ @@ -1812,16 +1865,20 @@ udp_plugin_get_session (void *cls, if (NULL == address) { - GNUNET_break(0); + GNUNET_break (0); return NULL; } if ( (address->address_length != sizeof(struct IPv4UdpAddress)) && (address->address_length != sizeof(struct IPv6UdpAddress)) ) + { + GNUNET_break_op (0); return NULL; + } if (NULL != (s = udp_plugin_lookup_session (cls, address))) return s; + /* need to create new session */ if (sizeof (struct IPv4UdpAddress) == address->address_length) { struct sockaddr_in v4; @@ -1838,7 +1895,7 @@ udp_plugin_get_session (void *cls, (const struct sockaddr *) &v4, sizeof (v4)); } - else if (sizeof (struct IPv6UdpAddress) == address->address_length) + if (sizeof (struct IPv6UdpAddress) == address->address_length) { struct sockaddr_in6 v6; @@ -1854,9 +1911,9 @@ udp_plugin_get_session (void *cls, (const struct sockaddr *) &v6, sizeof (v6)); } - - /* otherwise create new */ - return udp_plugin_create_session (cls, address, network_type); + return udp_plugin_create_session (cls, + address, + network_type); } @@ -2007,8 +2064,6 @@ udp_plugin_send (void *cls, struct UDP_MessageWrapper * udpw; struct UDPMessage *udp; char mbuf[udpmlen]; - GNUNET_assert(plugin != NULL); - GNUNET_assert(s != NULL); if ( (s->address->address_length == sizeof(struct IPv6UdpAddress)) && (plugin->sockv6 == NULL) ) @@ -2239,6 +2294,7 @@ process_udp_message (struct Plugin *plugin, struct Session *s; struct GNUNET_HELLO_Address *address; + GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != network_type); if (0 != ntohl (msg->reserved)) { GNUNET_break_op(0); @@ -2486,7 +2542,6 @@ read_process_ack (struct Plugin *plugin, udp_addr, udp_addr_len)); - /* Remove fragmented message after successful sending */ fragmented_message_done (s->frag_ctx, GNUNET_OK); @@ -2734,24 +2789,30 @@ udp_select_read (struct Plugin *plugin, /** - * FIXME. + * Removes messages from the transmission queue that have + * timed out, and then selects a message that should be + * transmitted next. + * + * @param plugin the UDP plugin + * @param sock which socket should we process the queue for (v4 or v6) + * @return message selected for transmission, or NULL for none */ static struct UDP_MessageWrapper * -remove_timeout_messages_and_select (struct UDP_MessageWrapper *head, +remove_timeout_messages_and_select (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) { struct UDP_MessageWrapper *udpw = NULL; struct GNUNET_TIME_Relative remaining; struct Session *session; - struct Plugin *plugin; int removed; removed = GNUNET_NO; - udpw = head; + udpw = (sock == plugin->sockv4) + ? plugin->ipv4_queue_head + : plugin->ipv6_queue_head; while (NULL != udpw) { session = udpw->session; - plugin = session->plugin; /* Find messages with timeout */ remaining = GNUNET_TIME_absolute_get_remaining (udpw->timeout); if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us) @@ -2797,12 +2858,13 @@ remove_timeout_messages_and_select (struct UDP_MessageWrapper *head, "# UDP, total, messages, sent, timeout", 1, GNUNET_NO); - call_continuation (udpw, GNUNET_SYSERR); + call_continuation (udpw, + GNUNET_SYSERR); LOG (GNUNET_ERROR_TYPE_DEBUG, "Fragment for message for peer `%s' with size %u timed out\n", GNUNET_i2s (&udpw->session->target), udpw->frag_ctx->payload_size); - + GNUNET_STATISTICS_update (plugin->env->stats, "# UDP, fragmented msgs, messages, sent, timeout", 1, @@ -2812,7 +2874,8 @@ remove_timeout_messages_and_select (struct UDP_MessageWrapper *head, udpw->frag_ctx->payload_size, GNUNET_NO); /* Remove fragmented message due to timeout */ - fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR); + fragmented_message_done (udpw->frag_ctx, + GNUNET_SYSERR); break; case UMT_MSG_ACK: GNUNET_STATISTICS_update (plugin->env->stats, @@ -2827,10 +2890,12 @@ remove_timeout_messages_and_select (struct UDP_MessageWrapper *head, "ACK Message for peer `%s' with size %u timed out\n", GNUNET_i2s (&udpw->session->target), udpw->payload_size); - call_continuation (udpw, GNUNET_SYSERR); + call_continuation (udpw, + GNUNET_SYSERR); removed = GNUNET_YES; - dequeue (plugin, udpw); - GNUNET_free(udpw); + dequeue (plugin, + udpw); + GNUNET_free (udpw); break; default: break; @@ -2867,8 +2932,10 @@ remove_timeout_messages_and_select (struct UDP_MessageWrapper *head, /* Message is delayed, try next */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' (%u bytes) is delayed for %s\n", - GNUNET_i2s (&udpw->session->target), udpw->payload_size, - GNUNET_STRINGS_relative_time_to_string (remaining, GNUNET_YES)); + GNUNET_i2s (&udpw->session->target), + udpw->payload_size, + GNUNET_STRINGS_relative_time_to_string (remaining, + GNUNET_YES)); udpw = udpw->next; } } @@ -2931,7 +2998,12 @@ analyze_send_error (struct Plugin *plugin, /** - * FIXME. + * It is time to try to transmit a UDP message. Select one + * and send. + * + * @param plugin the plugin + * @param sock which socket (v4/v6) to send on + * @return number of bytes transmitted, #GNUNET_SYSERR on failure */ static size_t udp_select_send (struct Plugin *plugin, @@ -2947,9 +3019,7 @@ udp_select_send (struct Plugin *plugin, struct UDP_MessageWrapper *udpw; /* Find message to send */ - udpw = remove_timeout_messages_and_select ((sock == plugin->sockv4) - ? plugin->ipv4_queue_head - : plugin->ipv6_queue_head, + udpw = remove_timeout_messages_and_select (plugin, sock); if (NULL == udpw) return 0; /* No message to send */ @@ -2963,7 +3033,9 @@ udp_select_send (struct Plugin *plugin, a4.sin_len = sizeof (a4); #endif a4.sin_port = u4->u4_port; - memcpy (&a4.sin_addr, &u4->ipv4_addr, sizeof(struct in_addr)); + memcpy (&a4.sin_addr, + &u4->ipv4_addr, + sizeof(struct in_addr)); a = (struct sockaddr *) &a4; slen = sizeof (a4); } @@ -2982,15 +3054,16 @@ udp_select_send (struct Plugin *plugin, } else { - call_continuation (udpw, GNUNET_OK); - dequeue (plugin, udpw); + call_continuation (udpw, + GNUNET_OK); + dequeue (plugin, + udpw); notify_session_monitor (plugin, udpw->session, GNUNET_TRANSPORT_SS_UPDATE); GNUNET_free (udpw); return GNUNET_SYSERR; } - sent = GNUNET_NETWORK_socket_sendto (sock, udpw->msg_buf, udpw->msg_size, @@ -2999,12 +3072,20 @@ udp_select_send (struct Plugin *plugin, if (GNUNET_SYSERR == sent) { /* Failure */ - analyze_send_error (plugin, a, slen, errno); - call_continuation (udpw, GNUNET_SYSERR); + analyze_send_error (plugin, + a, + slen, + errno); + call_continuation (udpw, + GNUNET_SYSERR); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, bytes, sent, failure", sent, GNUNET_NO); + "# UDP, total, bytes, sent, failure", + sent, + GNUNET_NO); GNUNET_STATISTICS_update (plugin->env->stats, - "# UDP, total, messages, sent, failure", 1, GNUNET_NO); + "# UDP, total, messages, sent, failure", + 1, + GNUNET_NO); } else { @@ -3032,7 +3113,7 @@ udp_select_send (struct Plugin *plugin, notify_session_monitor (plugin, udpw->session, GNUNET_TRANSPORT_SS_UPDATE); - GNUNET_free(udpw); + GNUNET_free (udpw); return sent; } diff --git a/src/transport/transport_api_monitor_plugins.c b/src/transport/transport_api_monitor_plugins.c index 7c11194f2..f313bdfe9 100644 --- a/src/transport/transport_api_monitor_plugins.c +++ b/src/transport/transport_api_monitor_plugins.c @@ -178,21 +178,6 @@ free_entry (void *cls, } -/** - * We got disconnected, remove all existing entries from - * the map and notify client. - * - * @param pm montitor that got disconnected - */ -static void -clear_map (struct GNUNET_TRANSPORT_PluginMonitor *pm) -{ - GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions, - &free_entry, - pm); -} - - /** * Cut the existing connection and reconnect. * @@ -203,7 +188,9 @@ reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm) { GNUNET_CLIENT_disconnect (pm->client); pm->client = NULL; - clear_map (pm); + GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions, + &free_entry, + pm); pm->backoff = GNUNET_TIME_STD_BACKOFF (pm->backoff); pm->reconnect_task = GNUNET_SCHEDULER_add_delayed (pm->backoff, &do_plugin_connect, @@ -447,7 +434,9 @@ GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor * GNUNET_SCHEDULER_cancel (pm->reconnect_task); pm->reconnect_task = NULL; } - clear_map (pm); + GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions, + &free_entry, + pm); GNUNET_CONTAINER_multihashmap32_destroy (pm->sessions); GNUNET_free (pm); } -- cgit v1.2.3