From 55c16b6ab6a83a594303efccff1e59a022470982 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 6 Jul 2016 21:57:30 +0000 Subject: converting GNUNET_TRANSPORT_monitor_peers implementation to MQ --- src/transport/gnunet-service-transport_clients.c | 2 +- src/transport/gnunet-transport.c | 24 +- src/transport/test_transport_api_monitor_peers.c | 12 +- src/transport/transport_api_monitor_peers.c | 350 +++++++++++------------ 4 files changed, 196 insertions(+), 192 deletions(-) (limited to 'src/transport') diff --git a/src/transport/gnunet-service-transport_clients.c b/src/transport/gnunet-service-transport_clients.c index 953ea54e5..b9bccc08b 100644 --- a/src/transport/gnunet-service-transport_clients.c +++ b/src/transport/gnunet-service-transport_clients.c @@ -1298,7 +1298,7 @@ clients_handle_monitor_peers (void *cls, GNUNET_SERVER_transmit_context_append_data (tc, NULL, 0, - GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PEER_RESPONSE); + GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PEER_RESPONSE_END); } GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); diff --git a/src/transport/gnunet-transport.c b/src/transport/gnunet-transport.c index d7852893c..85f22a7f2 100644 --- a/src/transport/gnunet-transport.c +++ b/src/transport/gnunet-transport.c @@ -1875,8 +1875,11 @@ testservice_task (void *cls, } else if (iterate_connections) /* -i: List information about peers once */ { - pic = GNUNET_TRANSPORT_monitor_peers (cfg, (NULL == cpid) ? NULL : &pid, - GNUNET_YES, TIMEOUT, &process_peer_iteration_cb, (void *) cfg); + pic = GNUNET_TRANSPORT_monitor_peers (cfg, + (NULL == cpid) ? NULL : &pid, + GNUNET_YES, + &process_peer_iteration_cb, + (void *) cfg); op_timeout = GNUNET_SCHEDULER_add_delayed (OP_TIMEOUT, &operation_timeout, NULL); @@ -1888,8 +1891,8 @@ testservice_task (void *cls, pic = GNUNET_TRANSPORT_monitor_peers (cfg, (NULL == cpid) ? NULL : &pid, GNUNET_NO, - TIMEOUT, - &process_peer_monitoring_cb, NULL); + &process_peer_monitoring_cb, + NULL); } else if (monitor_plugins) /* -P: List information about plugins continuously */ { @@ -1933,7 +1936,7 @@ testservice_task (void *cls, GNUNET_break(0); return; } - + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); } @@ -1959,12 +1962,17 @@ run (void *cls, do_test_configuration (cfg); return; } - GNUNET_CLIENT_service_test ("transport", cfg, GNUNET_TIME_UNIT_SECONDS, - &testservice_task, (void *) cfg); + GNUNET_CLIENT_service_test ("transport", + cfg, + GNUNET_TIME_UNIT_SECONDS, + &testservice_task, + (void *) cfg); } + int -main (int argc, char * const *argv) +main (int argc, + char * const *argv) { int res; static const struct GNUNET_GETOPT_CommandLineOption options[] = { diff --git a/src/transport/test_transport_api_monitor_peers.c b/src/transport/test_transport_api_monitor_peers.c index 549394944..90c96829d 100644 --- a/src/transport/test_transport_api_monitor_peers.c +++ b/src/transport/test_transport_api_monitor_peers.c @@ -450,14 +450,22 @@ run (void *cls, char *const *args, const char *cfgfile, ¬ify_receive, ¬ify_connect, ¬ify_disconnect, &start_cb, NULL); - pmc_p1 = GNUNET_TRANSPORT_monitor_peers (p1->cfg, NULL, GNUNET_NO, GNUNET_TIME_UNIT_FOREVER_REL, &monitor1_cb, NULL); + pmc_p1 = GNUNET_TRANSPORT_monitor_peers (p1->cfg, + NULL, + GNUNET_NO, + &monitor1_cb, + NULL); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 started\n"); p2 = GNUNET_TRANSPORT_TESTING_start_peer (tth, cfg_file_p2, 2, ¬ify_receive, ¬ify_connect, ¬ify_disconnect, &start_cb, NULL); - pmc_p2 = GNUNET_TRANSPORT_monitor_peers (p2->cfg, NULL, GNUNET_NO, GNUNET_TIME_UNIT_FOREVER_REL, &monitor2_cb, NULL); + pmc_p2 = GNUNET_TRANSPORT_monitor_peers (p2->cfg, + NULL, + GNUNET_NO, + &monitor2_cb, + NULL); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 started\n"); if ((p1 == NULL) || (p2 == NULL)) { diff --git a/src/transport/transport_api_monitor_peers.c b/src/transport/transport_api_monitor_peers.c index 5d19ad6d7..a5c70fcfa 100644 --- a/src/transport/transport_api_monitor_peers.c +++ b/src/transport/transport_api_monitor_peers.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009-2014 GNUnet e.V. + Copyright (C) 2009-2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -54,18 +54,13 @@ struct GNUNET_TRANSPORT_PeerMonitoringContext /** * Connection to the service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** * Configuration we use. */ const struct GNUNET_CONFIGURATION_Handle *cfg; - /** - * When should this operation time out? - */ - struct GNUNET_TIME_Absolute timeout; - /** * Backoff for reconnect. */ @@ -165,76 +160,103 @@ GNUNET_TRANSPORT_ps2s (enum GNUNET_TRANSPORT_PeerState state) /** - * Function called with responses from the service. + * Task run to re-establish the connection. * * @param cls our `struct GNUNET_TRANSPORT_PeerMonitoringContext *` - * @param msg NULL on timeout or error, otherwise presumably a - * message with the human-readable address */ static void -peer_response_processor (void *cls, - const struct GNUNET_MessageHeader *msg); +do_peer_connect (void *cls); /** - * Send our subscription request to the service. + * Cut the existing connection and reconnect. * * @param pal_ctx our context */ static void -send_peer_mon_request (struct GNUNET_TRANSPORT_PeerMonitoringContext *pal_ctx) +reconnect_peer_ctx (struct GNUNET_TRANSPORT_PeerMonitoringContext *pal_ctx) { - struct PeerMonitorMessage msg; - - msg.header.size = htons (sizeof (struct PeerMonitorMessage)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PEER_REQUEST); - msg.one_shot = htonl (pal_ctx->one_shot); - msg.peer = pal_ctx->peer; - GNUNET_assert (GNUNET_OK == - GNUNET_CLIENT_transmit_and_get_response (pal_ctx->client, - &msg.header, - GNUNET_TIME_absolute_get_remaining (pal_ctx->timeout), - GNUNET_YES, - &peer_response_processor, - pal_ctx)); + GNUNET_assert (GNUNET_NO == pal_ctx->one_shot); + GNUNET_MQ_destroy (pal_ctx->mq); + pal_ctx->mq = NULL; + pal_ctx->cb (pal_ctx->cb_cls, + NULL, + NULL, + GNUNET_TRANSPORT_PS_NOT_CONNECTED, + GNUNET_TIME_UNIT_ZERO_ABS); + pal_ctx->backoff = GNUNET_TIME_STD_BACKOFF (pal_ctx->backoff); + pal_ctx->reconnect_task = GNUNET_SCHEDULER_add_delayed (pal_ctx->backoff, + &do_peer_connect, + pal_ctx); } /** - * Task run to re-establish the connection. + * Function called with responses from the service. * * @param cls our `struct GNUNET_TRANSPORT_PeerMonitoringContext *` + * @param msg message from service */ static void -do_peer_connect (void *cls) +handle_response_end (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_TRANSPORT_PeerMonitoringContext *pal_ctx = cls; - pal_ctx->reconnect_task = NULL; - pal_ctx->client = GNUNET_CLIENT_connect ("transport", pal_ctx->cfg); - GNUNET_assert (NULL != pal_ctx->client); - send_peer_mon_request (pal_ctx); + if (pal_ctx->one_shot) + { + /* iteration finished */ + pal_ctx->cb (pal_ctx->cb_cls, + NULL, + NULL, + GNUNET_TRANSPORT_PS_NOT_CONNECTED, + GNUNET_TIME_UNIT_ZERO_ABS); + GNUNET_TRANSPORT_monitor_peers_cancel (pal_ctx); + return; + } + /* not quite what we expected, reconnect */ + GNUNET_break (0); + reconnect_peer_ctx (pal_ctx); } /** - * Cut the existing connection and reconnect. + * Function called to check responses from the service. * - * @param pal_ctx our context + * @param cls our `struct GNUNET_TRANSPORT_PeerMonitoringContext *` + * @param pir_msg message with the human-readable address + * @return #GNUNET_OK if @a pir_msg is well-formed */ -static void -reconnect_peer_ctx (struct GNUNET_TRANSPORT_PeerMonitoringContext *pal_ctx) +static int +check_response (void *cls, + const struct PeerIterateResponseMessage *pir_msg) { - GNUNET_assert (GNUNET_NO == pal_ctx->one_shot); - GNUNET_CLIENT_disconnect (pal_ctx->client); - pal_ctx->client = NULL; - pal_ctx->cb (pal_ctx->cb_cls, NULL, NULL, - GNUNET_TRANSPORT_PS_NOT_CONNECTED, - GNUNET_TIME_UNIT_ZERO_ABS); - pal_ctx->backoff = GNUNET_TIME_STD_BACKOFF (pal_ctx->backoff); - pal_ctx->reconnect_task = GNUNET_SCHEDULER_add_delayed (pal_ctx->backoff, - &do_peer_connect, - pal_ctx); + uint16_t size = ntohs (pir_msg->header.size) - sizeof (*pir_msg); + size_t alen = ntohl (pir_msg->addrlen); + size_t tlen = ntohl (pir_msg->pluginlen); + const char *addr; + const char *transport_name; + + if (size != tlen + alen) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if ( (0 == tlen) && (0 == alen) ) + return GNUNET_OK; + if (0 == tlen) + { + GNUNET_break (0); /* This must not happen: address without plugin */ + return GNUNET_SYSERR; + } + addr = (const char *) &pir_msg[1]; + transport_name = &addr[alen]; + if (transport_name[tlen - 1] != '\0') + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } @@ -242,143 +264,115 @@ reconnect_peer_ctx (struct GNUNET_TRANSPORT_PeerMonitoringContext *pal_ctx) * Function called with responses from the service. * * @param cls our `struct GNUNET_TRANSPORT_PeerMonitoringContext *` - * @param msg NULL on timeout or error, otherwise presumably a - * message with the human-readable address + * @param msg message with the human-readable address */ static void -peer_response_processor (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_response (void *cls, + const struct PeerIterateResponseMessage *pir_msg) { struct GNUNET_TRANSPORT_PeerMonitoringContext *pal_ctx = cls; - struct PeerIterateResponseMessage *pir_msg; struct GNUNET_HELLO_Address *address; + size_t alen = ntohl (pir_msg->addrlen); + size_t tlen = ntohl (pir_msg->pluginlen); const char *addr; const char *transport_name; - uint16_t size; - size_t alen; - size_t tlen; - if (NULL == msg) - { - if (pal_ctx->one_shot) - { - /* Disconnect */ - pal_ctx->cb (pal_ctx->cb_cls, NULL, NULL, - GNUNET_TRANSPORT_PS_NOT_CONNECTED, GNUNET_TIME_UNIT_ZERO_ABS); - GNUNET_TRANSPORT_monitor_peers_cancel (pal_ctx); - } - else - { - reconnect_peer_ctx (pal_ctx); - } - return; - } - size = ntohs (msg->size); - GNUNET_break (ntohs (msg->type) == - GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PEER_RESPONSE); - if (size == sizeof (struct GNUNET_MessageHeader)) + if ( (0 == tlen) && + (0 == alen) ) { - /* Done! */ - if (pal_ctx->one_shot) - { - /* iteration finished */ - pal_ctx->cb (pal_ctx->cb_cls, NULL, NULL, - GNUNET_TRANSPORT_PS_NOT_CONNECTED, GNUNET_TIME_UNIT_ZERO_ABS); - GNUNET_TRANSPORT_monitor_peers_cancel (pal_ctx); - } - else - { - reconnect_peer_ctx (pal_ctx); - } + /* No address available */ + pal_ctx->cb (pal_ctx->cb_cls, + &pir_msg->peer, + NULL, + ntohl(pir_msg->state), + GNUNET_TIME_absolute_ntoh (pir_msg->state_timeout)); return; } + addr = (const char *) &pir_msg[1]; + transport_name = &addr[alen]; + + /* notify client */ + address = GNUNET_HELLO_address_allocate (&pir_msg->peer, + transport_name, + addr, + alen, + ntohl (pir_msg->local_address_info)); + pal_ctx->cb (pal_ctx->cb_cls, + &pir_msg->peer, + address, + ntohl (pir_msg->state), + GNUNET_TIME_absolute_ntoh (pir_msg->state_timeout)); + GNUNET_HELLO_address_free (address); +} - if ((size < sizeof (struct PeerIterateResponseMessage)) || - (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PEER_RESPONSE)) - { - GNUNET_break (0); - if (pal_ctx->one_shot) - { - /* iteration finished (with error) */ - pal_ctx->cb (pal_ctx->cb_cls, NULL, NULL, - GNUNET_TRANSPORT_PS_NOT_CONNECTED, GNUNET_TIME_UNIT_ZERO_ABS); - GNUNET_TRANSPORT_monitor_peers_cancel (pal_ctx); - } - else - { - reconnect_peer_ctx (pal_ctx); - } - return; - } - pir_msg = (struct PeerIterateResponseMessage *) msg; - tlen = ntohl (pir_msg->pluginlen); - alen = ntohl (pir_msg->addrlen); - if (size != sizeof (struct PeerIterateResponseMessage) + tlen + alen) +/** + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_TRANSPORT_PeerMonitoringContext *` + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_TRANSPORT_PeerMonitoringContext *pal_ctx = cls; + + if (pal_ctx->one_shot) { - GNUNET_break (0); - if (pal_ctx->one_shot) - { - pal_ctx->cb (pal_ctx->cb_cls, NULL, NULL, - GNUNET_TRANSPORT_PS_NOT_CONNECTED, GNUNET_TIME_UNIT_ZERO_ABS); - GNUNET_TRANSPORT_monitor_peers_cancel (pal_ctx); - } - else - { - reconnect_peer_ctx (pal_ctx); - } + /* Disconnect */ + pal_ctx->cb (pal_ctx->cb_cls, + NULL, + NULL, + GNUNET_TRANSPORT_PS_NOT_CONNECTED, + GNUNET_TIME_UNIT_ZERO_ABS); + GNUNET_TRANSPORT_monitor_peers_cancel (pal_ctx); return; } + reconnect_peer_ctx (pal_ctx); +} - if ( (0 == tlen) && (0 == alen) ) - { - /* No address available */ - pal_ctx->cb (pal_ctx->cb_cls, &pir_msg->peer, NULL, - ntohl(pir_msg->state), - GNUNET_TIME_absolute_ntoh (pir_msg->state_timeout)); - } - else - { - if (0 == tlen) - { - GNUNET_break (0); /* This must not happen: address without plugin */ - return; - } - addr = (const char *) &pir_msg[1]; - transport_name = &addr[alen]; - - if (transport_name[tlen - 1] != '\0') - { - /* Corrupt plugin name */ - GNUNET_break (0); - if (pal_ctx->one_shot) - { - pal_ctx->cb (pal_ctx->cb_cls, NULL, NULL, - GNUNET_TRANSPORT_PS_NOT_CONNECTED, GNUNET_TIME_UNIT_ZERO_ABS); - GNUNET_TRANSPORT_monitor_peers_cancel (pal_ctx); - } - else - { - reconnect_peer_ctx (pal_ctx); - } - return; - } - - /* notify client */ - address = GNUNET_HELLO_address_allocate (&pir_msg->peer, - transport_name, addr, alen, ntohl(pir_msg->local_address_info)); - pal_ctx->cb (pal_ctx->cb_cls, &pir_msg->peer, address, - ntohl(pir_msg->state), - GNUNET_TIME_absolute_ntoh (pir_msg->state_timeout)); - GNUNET_HELLO_address_free (address); - } +/** + * Task run to re-establish the connection. + * + * @param cls our `struct GNUNET_TRANSPORT_PeerMonitoringContext *` + */ +static void +do_peer_connect (void *cls) +{ + GNUNET_MQ_hd_var_size (response, + GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PEER_RESPONSE, + struct PeerIterateResponseMessage); + GNUNET_MQ_hd_fixed_size (response_end, + GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PEER_RESPONSE_END, + struct GNUNET_MessageHeader); + struct GNUNET_TRANSPORT_PeerMonitoringContext *pal_ctx = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + make_response_handler (pal_ctx), + make_response_end_handler (pal_ctx), + GNUNET_MQ_handler_end () + }; + struct PeerMonitorMessage *msg; + struct GNUNET_MQ_Envelope *env; - /* expect more replies */ - GNUNET_CLIENT_receive (pal_ctx->client, &peer_response_processor, - pal_ctx, - GNUNET_TIME_absolute_get_remaining (pal_ctx->timeout)); + pal_ctx->reconnect_task = NULL; + pal_ctx->mq = GNUNET_CLIENT_connecT (pal_ctx->cfg, + "transport", + handlers, + &mq_error_handler, + pal_ctx); + if (NULL == pal_ctx->mq) + return; + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PEER_REQUEST); + msg->one_shot = htonl (pal_ctx->one_shot); + msg->peer = pal_ctx->peer; + GNUNET_MQ_send (pal_ctx->mq, + env); } @@ -405,7 +399,6 @@ peer_response_processor (void *cls, * NULL for all peers * @param one_shot #GNUNET_YES to return the current state and then end (with NULL+NULL), * #GNUNET_NO to monitor peers continuously - * @param timeout how long is the lookup allowed to take at most * @param peer_callback function to call with the results * @param peer_callback_cls closure for @a peer_address_callback */ @@ -413,29 +406,24 @@ struct GNUNET_TRANSPORT_PeerMonitoringContext * GNUNET_TRANSPORT_monitor_peers (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_PeerIdentity *peer, int one_shot, - struct GNUNET_TIME_Relative timeout, GNUNET_TRANSPORT_PeerIterateCallback peer_callback, void *peer_callback_cls) { - struct GNUNET_TRANSPORT_PeerMonitoringContext *pal_ctx; - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_TRANSPORT_PeerMonitoringContext *pal_ctx + = GNUNET_new (struct GNUNET_TRANSPORT_PeerMonitoringContext); - client = GNUNET_CLIENT_connect ("transport", cfg); - if (NULL == client) - return NULL; - if (GNUNET_YES != one_shot) - timeout = GNUNET_TIME_UNIT_FOREVER_REL; - pal_ctx = GNUNET_new (struct GNUNET_TRANSPORT_PeerMonitoringContext); pal_ctx->cb = peer_callback; pal_ctx->cb_cls = peer_callback_cls; pal_ctx->cfg = cfg; - pal_ctx->timeout = GNUNET_TIME_relative_to_absolute (timeout); if (NULL != peer) pal_ctx->peer = *peer; pal_ctx->one_shot = one_shot; - pal_ctx->client = client; - send_peer_mon_request (pal_ctx); - + do_peer_connect (pal_ctx); + if (NULL == pal_ctx->mq) + { + GNUNET_free (pal_ctx); + return NULL; + } return pal_ctx; } @@ -448,10 +436,10 @@ GNUNET_TRANSPORT_monitor_peers (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_TRANSPORT_monitor_peers_cancel (struct GNUNET_TRANSPORT_PeerMonitoringContext *pic) { - if (NULL != pic->client) + if (NULL != pic->mq) { - GNUNET_CLIENT_disconnect (pic->client); - pic->client = NULL; + GNUNET_MQ_destroy (pic->mq); + pic->mq = NULL; } if (NULL != pic->reconnect_task) { -- cgit v1.2.3