From 9e012a7a5c3991d224018b2d390b09d8e32c57ae Mon Sep 17 00:00:00 2001 From: t3sserakt Date: Tue, 8 Sep 2020 13:33:25 +0200 Subject: - fixed socket clean up; added sync between start of service and communicator --- src/transport/gnunet-communicator-tcp.c | 137 ++++++++++++++------ src/transport/test_communicator_basic.c | 24 +++- .../test_communicator_unix_basic_peer1.conf | 2 + .../test_communicator_unix_basic_peer2.conf | 2 + src/transport/transport-testing2.c | 144 +-------------------- src/transport/transport-testing2.h | 144 ++++++++++++++++++++- src/util/mq.c | 6 +- 7 files changed, 269 insertions(+), 190 deletions(-) diff --git a/src/transport/gnunet-communicator-tcp.c b/src/transport/gnunet-communicator-tcp.c index 4caef909c..59f42496a 100644 --- a/src/transport/gnunet-communicator-tcp.c +++ b/src/transport/gnunet-communicator-tcp.c @@ -393,11 +393,6 @@ struct Queue */ struct GNUNET_PeerIdentity target; - /** - * ID of listen task - */ - struct GNUNET_SCHEDULER_Task *listen_task; - /** * Listen socket. */ @@ -637,11 +632,6 @@ struct ProtoQueue */ struct ProtoQueue *prev; - /** - * ID of listen task - */ - struct GNUNET_SCHEDULER_Task *listen_task; - /** * Listen socket. */ @@ -760,6 +750,11 @@ static struct GNUNET_TRANSPORT_CommunicatorHandle *ch; */ static struct GNUNET_CONTAINER_MultiPeerMap *queue_map; +/** + * ListenTasks (map from socket to `struct ListenTask`) + */ +static struct GNUNET_CONTAINER_MultiHashMap *lt_map; + /** * Our public key. */ @@ -815,6 +810,16 @@ struct Addresses *addrs_head; */ struct Addresses *addrs_tail; +/** + * Head of DLL with ListenTasks. + */ +struct ListenTask *lts_head; + +/** + * Head of DLL with ListenTask. + */ +struct ListenTask *lts_tail; + /** * Number of addresses in the DLL for register at NAT service. */ @@ -850,7 +855,6 @@ unsigned int bind_port; static void listen_cb (void *cls); - /** * Functions with this signature are called whenever we need * to close a queue due to a disconnect or failure to @@ -861,10 +865,14 @@ listen_cb (void *cls); static void queue_destroy (struct Queue *queue) { - struct ListenTask *lt; - lt = GNUNET_new (struct ListenTask); - lt->listen_sock = queue->listen_sock; - lt->listen_task = queue->listen_task; + struct ListenTask *lt = NULL; + struct GNUNET_HashCode h_sock; + + GNUNET_CRYPTO_hash (queue->listen_sock, + sizeof(queue->listen_sock), + &h_sock); + + lt = GNUNET_CONTAINER_multihashmap_get (lt_map, &h_sock); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting queue for peer `%s'\n", @@ -934,7 +942,7 @@ queue_destroy (struct Queue *queue) else GNUNET_free (queue); - if ((NULL != lt->listen_sock) && (NULL == lt->listen_task)) + if ((! shutdown_running) && (NULL == lt->listen_task)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "add read net listen\n"); @@ -1982,7 +1990,8 @@ tcp_address_to_sockaddr (const char *bindto, socklen_t *sock_len) // colon = strrchr (cp, ':'); port = extract_port (bindto); in = tcp_address_to_sockaddr_numeric_v6 (sock_len, v6, port); - }else{ + } + else{ GNUNET_assert (0); } @@ -2530,11 +2539,6 @@ decrypt_and_check_tc (struct Queue *queue, static void free_proto_queue (struct ProtoQueue *pq) { - if (NULL != pq->listen_task) - { - GNUNET_SCHEDULER_cancel (pq->listen_task); - pq->listen_task = NULL; - } if (NULL != pq->listen_sock) { GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (pq->listen_sock)); @@ -2653,7 +2657,6 @@ proto_read_kx (void *cls) queue->address = pq->address; /* steals reference */ queue->address_len = pq->address_len; queue->target = tc.sender; - queue->listen_task = pq->listen_task; queue->listen_sock = pq->listen_sock; queue->sock = pq->sock; @@ -2696,6 +2699,9 @@ listen_cb (void *cls) struct ProtoQueue *pq; struct ListenTask *lt; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "listen_cb\n"); + lt = cls; lt->listen_task = NULL; @@ -2908,6 +2914,36 @@ mq_init (void *cls, const struct GNUNET_PeerIdentity *peer, const char *address) return GNUNET_OK; } +/** + * Iterator over all ListenTasks to clean up. + * + * @param cls NULL + * @param key unused + * @param value the ListenTask to cancel. + * @return #GNUNET_OK to continue to iterate + */ +static int +get_lt_delete_it (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct ListenTask *lt = value; + + (void) cls; + (void) key; + if (NULL != lt->listen_task) + { + GNUNET_SCHEDULER_cancel (lt->listen_task); + lt->listen_task = NULL; + } + if (NULL != lt->listen_sock) + { + GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (lt->listen_sock)); + lt->listen_sock = NULL; + } + return GNUNET_OK; +} + /** * Iterator over all message queues to clean up. * @@ -2925,16 +2961,10 @@ get_queue_delete_it (void *cls, (void) cls; (void) target; - if (NULL != queue->listen_task) - { - GNUNET_SCHEDULER_cancel (queue->listen_task); - queue->listen_task = NULL; - } queue_destroy (queue); return GNUNET_OK; } - /** * Shutdown the UNIX communicator. * @@ -2943,7 +2973,6 @@ get_queue_delete_it (void *cls, static void do_shutdown (void *cls) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Shutdown %s!\n", shutdown_running ? "running" : "not running"); @@ -2960,6 +2989,7 @@ do_shutdown (void *cls) GNUNET_NAT_unregister (nat); nat = NULL; } + GNUNET_CONTAINER_multihashmap_iterate (lt_map, &get_lt_delete_it, NULL); GNUNET_CONTAINER_multipeermap_iterate (queue_map, &get_queue_delete_it, NULL); GNUNET_CONTAINER_multipeermap_destroy (queue_map); GNUNET_TRANSPORT_communicator_address_remove_all (ch); @@ -2993,6 +3023,8 @@ do_shutdown (void *cls) GNUNET_RESOLVER_request_cancel (resolve_request_handle); resolve_request_handle = NULL; } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Shutdown done!\n"); } @@ -3043,10 +3075,10 @@ nat_address_cb (void *cls, char *my_addr; struct GNUNET_TRANSPORT_AddressIdentifier *ai; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "nat address cb %s %s\n", - add_remove ? "add" : "remove", - GNUNET_a2s (addr, addrlen)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "nat address cb %s %s\n", + add_remove ? "add" : "remove", + GNUNET_a2s (addr, addrlen)); if (GNUNET_YES == add_remove) { @@ -3098,7 +3130,7 @@ add_addr (struct sockaddr *in, socklen_t in_len) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "add address %s\n", GNUNET_a2s (saddrs->addr, saddrs->addr_len)); - + addrs_lens++; } @@ -3117,6 +3149,7 @@ init_socket (struct sockaddr *addr, socklen_t sto_len; struct GNUNET_NETWORK_Handle *listen_sock; struct ListenTask *lt; + struct GNUNET_HashCode h_sock; if (NULL == addr) { @@ -3168,13 +3201,12 @@ init_socket (struct sockaddr *addr, sto_len = in_len; } - //addr = (struct sockaddr *) &in_sto; + // addr = (struct sockaddr *) &in_sto; in_len = sto_len; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Bound to `%s'\n", GNUNET_a2s ((const struct sockaddr *) &in_sto, sto_len)); stats = GNUNET_STATISTICS_create ("C-TCP", cfg); - GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); if (NULL == is) is = GNUNET_NT_scanner_init (); @@ -3203,6 +3235,27 @@ init_socket (struct sockaddr *addr, &listen_cb, lt); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "creating hash\n"); + GNUNET_CRYPTO_hash (lt->listen_sock, + sizeof(lt->listen_sock), + &h_sock); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "creating map\n"); + if (NULL == lt_map) + lt_map = GNUNET_CONTAINER_multihashmap_create (2, GNUNET_NO); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "creating map entry\n"); + GNUNET_CONTAINER_multihashmap_put (lt_map, + &h_sock, + lt, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "map entry created\n"); + if (NULL == queue_map) queue_map = GNUNET_CONTAINER_multipeermap_create (10, GNUNET_NO); @@ -3335,12 +3388,14 @@ init_socket_resolv (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Address is NULL. This might be an error or the resolver finished resolving.\n"); - if (NULL == addrs_head){ + if (NULL == addrs_head) + { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Resolver finished resolving, but we do not listen to an address!.\n"); + "Resolver finished resolving, but we do not listen to an address!.\n"); return; } nat_register (); + } } @@ -3405,6 +3460,8 @@ run (void *cls, return; } + GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); + if (1 == sscanf (bindto, "%u%1s", &bind_port, dummy)) { po = tcp_address_to_sockaddr_port_only (bindto, &bind_port); @@ -3444,6 +3501,7 @@ run (void *cls, init_socket (in, in_len); nat_register (); GNUNET_free (bindto); + return; } @@ -3454,6 +3512,7 @@ run (void *cls, init_socket (in, in_len); nat_register (); GNUNET_free (bindto); + return; } diff --git a/src/transport/test_communicator_basic.c b/src/transport/test_communicator_basic.c index e2d2eb73c..aa02bda93 100644 --- a/src/transport/test_communicator_basic.c +++ b/src/transport/test_communicator_basic.c @@ -124,6 +124,25 @@ communicator_available_cb (void *cls, address_prefix); } +static void +open_queue (void *cls) +{ + char *address = cls; + + if (NULL != tc_hs[PEER_A]->c_mq) + { + queue_est = GNUNET_YES; + GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (tc_hs[PEER_A], + &peer_id[PEER_B], + address); + } + else + { + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &open_queue, + address); + } +} static void add_address_cb (void *cls, @@ -144,10 +163,7 @@ add_address_cb (void *cls, if ((0 == strcmp ((char*) cls, cfg_peers_name[PEER_B])) && (GNUNET_NO == queue_est)) { - queue_est = GNUNET_YES; - GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue (tc_hs[PEER_A], - &peer_id[PEER_B], - address); + open_queue (address); } } diff --git a/src/transport/test_communicator_unix_basic_peer1.conf b/src/transport/test_communicator_unix_basic_peer1.conf index 71283e381..8e9700108 100644 --- a/src/transport/test_communicator_unix_basic_peer1.conf +++ b/src/transport/test_communicator_unix_basic_peer1.conf @@ -28,6 +28,8 @@ PORT = 62089 UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-resolver_test_1.sock [communicator-unix] +#PREFIX = xterm -geometry 100x85 -T peer1 -e gdb --args +#PREFIX = valgrind --leak-check=full --track-origins=yes UNIXPATH = $GNUNET_RUNTIME_DIR/communicator-unix-1.sock [communicator-tcp] diff --git a/src/transport/test_communicator_unix_basic_peer2.conf b/src/transport/test_communicator_unix_basic_peer2.conf index ac95845b2..c12cc9111 100644 --- a/src/transport/test_communicator_unix_basic_peer2.conf +++ b/src/transport/test_communicator_unix_basic_peer2.conf @@ -28,6 +28,8 @@ PORT = 62090 UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-resolver_test_2.sock [communicator-unix] +#PREFIX = xterm -geometry 100x85 -T peer2 -e gdb --args +#PREFIX = valgrind --leak-check=full --track-origins=yes UNIXPATH = $GNUNET_RUNTIME_DIR/communicator-unix-2.sock [communicator-tcp] diff --git a/src/transport/transport-testing2.c b/src/transport/transport-testing2.c index e194b0159..98cfd5e29 100644 --- a/src/transport/transport-testing2.c +++ b/src/transport/transport-testing2.c @@ -58,145 +58,6 @@ struct MyClient }; -/** - * @brief Handle to a transport communicator - */ -struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle -{ - /** - * Clients - */ - struct MyClient *client_head; - struct MyClient *client_tail; - - /** - * @brief Handle to the client - */ - struct GNUNET_MQ_Handle *c_mq; - - /** - * @brief Handle to the configuration - */ - struct GNUNET_CONFIGURATION_Handle *cfg; - - /** - * @brief File name of configuration file - */ - char *cfg_filename; - - struct GNUNET_PeerIdentity peer_id; - - /** - * @brief Handle to the transport service - */ - struct GNUNET_SERVICE_Handle *tsh; - - /** - * @brief Task that will be run on shutdown to stop and clean transport - * service - */ - struct GNUNET_SCHEDULER_Task *ts_shutdown_task; - - - /** - * @brief Process of the communicator - */ - struct GNUNET_OS_Process *c_proc; - - /** - * NAT process - */ - struct GNUNET_OS_Process *nat_proc; - - /** - * resolver service process - */ - struct GNUNET_OS_Process *resolver_proc; - - /** - * peerstore service process - */ - struct GNUNET_OS_Process *ps_proc; - - /** - * @brief Task that will be run on shutdown to stop and clean communicator - */ - struct GNUNET_SCHEDULER_Task *c_shutdown_task; - - /** - * @brief Characteristics of the communicator - */ - enum GNUNET_TRANSPORT_CommunicatorCharacteristics c_characteristics; - - /** - * @brief Specifies supported addresses - */ - char *c_addr_prefix; - - /** - * @brief Specifies supported addresses - */ - char *c_address; - - /** - * @brief Head of the DLL of queues associated with this communicator - */ - struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *queue_head; - - /** - * @brief Tail of the DLL of queues associated with this communicator - */ - struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *queue_tail; - - /* Callbacks + Closures */ - /** - * @brief Callback called when a new communicator connects - */ - GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback - communicator_available_cb; - - /** - * @brief Callback called when a new communicator connects - */ - GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb; - - /** - * @brief Callback called when a new communicator connects - */ - GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb; - - /** - * @brief Callback called when a new communicator connects - */ - GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb; - - /** - * @brief Callback called when a new communicator connects - */ - GNUNET_TRANSPORT_TESTING_IncomingMessageCallback incoming_msg_cb; - - /** - * @brief Backchannel callback - */ - GNUNET_TRANSPORT_TESTING_BackchannelCallback bc_cb; - - /** - * Our service handle - */ - struct GNUNET_SERVICE_Handle *sh; - - /** - * @brief Closure to the callback - */ - void *cb_cls; - - /** - * Backchannel supported - */ - int bc_enabled; -}; - - /** * @brief Queue of a communicator and some context */ @@ -707,6 +568,9 @@ shutdown_service (void *cls) { struct GNUNET_SERVICE_Handle *h = cls; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Shutting down service!\n"); + GNUNET_SERVICE_stop (h); } @@ -1202,6 +1066,8 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_open_queue ( memcpy (&msg[1], address, alen); if (NULL != tc_h->c_mq) { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending queue create immediately\n"); GNUNET_MQ_send (tc_h->c_mq, env); } else diff --git a/src/transport/transport-testing2.h b/src/transport/transport-testing2.h index b77125e82..04f75fc88 100644 --- a/src/transport/transport-testing2.h +++ b/src/transport/transport-testing2.h @@ -29,13 +29,6 @@ #include "gnunet_ats_transport_service.h" #include "transport.h" - -/** - * @brief Handle to a transport communicator - */ -struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle; - - /** * @brief Queue of a communicator and some context */ @@ -151,6 +144,143 @@ typedef void const char*payload, size_t payload_len); +/** + * @brief Handle to a transport communicator + */ +struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle +{ + /** + * Clients + */ + struct MyClient *client_head; + struct MyClient *client_tail; + + /** + * @brief Handle to the client + */ + struct GNUNET_MQ_Handle *c_mq; + + /** + * @brief Handle to the configuration + */ + struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * @brief File name of configuration file + */ + char *cfg_filename; + + struct GNUNET_PeerIdentity peer_id; + + /** + * @brief Handle to the transport service + */ + struct GNUNET_SERVICE_Handle *tsh; + + /** + * @brief Task that will be run on shutdown to stop and clean transport + * service + */ + struct GNUNET_SCHEDULER_Task *ts_shutdown_task; + + + /** + * @brief Process of the communicator + */ + struct GNUNET_OS_Process *c_proc; + + /** + * NAT process + */ + struct GNUNET_OS_Process *nat_proc; + + /** + * resolver service process + */ + struct GNUNET_OS_Process *resolver_proc; + + /** + * peerstore service process + */ + struct GNUNET_OS_Process *ps_proc; + + /** + * @brief Task that will be run on shutdown to stop and clean communicator + */ + struct GNUNET_SCHEDULER_Task *c_shutdown_task; + + /** + * @brief Characteristics of the communicator + */ + enum GNUNET_TRANSPORT_CommunicatorCharacteristics c_characteristics; + + /** + * @brief Specifies supported addresses + */ + char *c_addr_prefix; + + /** + * @brief Specifies supported addresses + */ + char *c_address; + + /** + * @brief Head of the DLL of queues associated with this communicator + */ + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *queue_head; + + /** + * @brief Tail of the DLL of queues associated with this communicator + */ + struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *queue_tail; + + /* Callbacks + Closures */ + /** + * @brief Callback called when a new communicator connects + */ + GNUNET_TRANSPORT_TESTING_CommunicatorAvailableCallback + communicator_available_cb; + + /** + * @brief Callback called when a new communicator connects + */ + GNUNET_TRANSPORT_TESTING_AddAddressCallback add_address_cb; + + /** + * @brief Callback called when a new communicator connects + */ + GNUNET_TRANSPORT_TESTING_QueueCreateReplyCallback queue_create_reply_cb; + + /** + * @brief Callback called when a new communicator connects + */ + GNUNET_TRANSPORT_TESTING_AddQueueCallback add_queue_cb; + + /** + * @brief Callback called when a new communicator connects + */ + GNUNET_TRANSPORT_TESTING_IncomingMessageCallback incoming_msg_cb; + + /** + * @brief Backchannel callback + */ + GNUNET_TRANSPORT_TESTING_BackchannelCallback bc_cb; + + /** + * Our service handle + */ + struct GNUNET_SERVICE_Handle *sh; + + /** + * @brief Closure to the callback + */ + void *cb_cls; + + /** + * Backchannel supported + */ + int bc_enabled; +}; /** * @brief Start communicator part of transport service and communicator diff --git a/src/util/mq.c b/src/util/mq.c index 302b310de..29ead02a4 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -273,7 +273,7 @@ GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers, break; } } -done: + done: if (GNUNET_NO == handled) { LOG (GNUNET_ERROR_TYPE_INFO, @@ -355,6 +355,10 @@ void GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) { + if (NULL == mq) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "mq is NUll when sending message of type %u\n", + (unsigned int) ntohs (ev->mh->type)); GNUNET_assert (NULL != mq); GNUNET_assert (NULL == ev->parent_queue); -- cgit v1.2.3