From 38cd63a1e806124d19bdf478d9bfcdafe3bdccbb Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 20 Jun 2016 19:56:20 +0000 Subject: convering nse_api.c to new MQ API --- src/nse/nse_api.c | 203 ++++++++++++++++++++---------------------------------- 1 file changed, 73 insertions(+), 130 deletions(-) (limited to 'src') diff --git a/src/nse/nse_api.c b/src/nse/nse_api.c index 1c260d537..d942d5ec6 100644 --- a/src/nse/nse_api.c +++ b/src/nse/nse_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009, 2010, 2011 GNUnet e.V. + Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -45,19 +45,14 @@ struct GNUNET_NSE_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). + * Message queue (if available). */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Currently pending transmission request. - */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_MQ_Handle *mq; /** * Task doing exponential back-off trying to reconnect. */ - struct GNUNET_SCHEDULER_Task * reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** * Time for next connect retry. @@ -80,118 +75,55 @@ struct GNUNET_NSE_Handle /** * Try again to connect to network size estimation service. * - * @param cls the handle to the transport service + * @param cls closure with the `struct GNUNET_NSE_Handle *` */ static void reconnect (void *cls); /** - * Type of a function to call when we receive a message - * from the service. + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error + * @param cls closure with the `struct GNUNET_NSE_Handle *` + * @param error error code */ static void -message_handler (void *cls, - const struct GNUNET_MessageHeader *msg) +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { struct GNUNET_NSE_Handle *h = cls; - const struct GNUNET_NSE_ClientMessage *client_msg; - - if (NULL == msg) - { - /* Error, timeout, death */ - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; - h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, - &reconnect, h); - return; - } - if ((ntohs (msg->size) != sizeof (struct GNUNET_NSE_ClientMessage)) || - (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_NSE_ESTIMATE)) - { - GNUNET_break (0); - return; - } - client_msg = (const struct GNUNET_NSE_ClientMessage *) msg; - h->recv_cb (h->recv_cb_cls, GNUNET_TIME_absolute_ntoh (client_msg->timestamp), - GNUNET_ntoh_double (client_msg->size_estimate), - GNUNET_ntoh_double (client_msg->std_deviation)); - GNUNET_CLIENT_receive (h->client, &message_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); -} - - -/** - * Reschedule a connect attempt to the service. - * - * @param h transport service to reconnect - */ -static void -reschedule_connect (struct GNUNET_NSE_Handle *h) -{ - GNUNET_assert (h->reconnect_task == NULL); - - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } - if (NULL != h->client) - { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling task to reconnect to nse service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay, - GNUNET_YES)); - h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, - &reconnect, h); + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; + h->reconnect_task + = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, + &reconnect, + h); h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); } /** - * Transmit START message to service. + * Type of a function to call when we receive a message + * from the service. * - * @param cls the `struct GNUNET_NSE_Handle *` - * @param size number of bytes available in @a buf - * @param buf where to copy the message - * @return number of bytes copied to @a buf + * @param cls closure + * @param cklient_msg message received */ -static size_t -send_start (void *cls, size_t size, void *buf) +static void +handle_estimate (void *cls, + const struct GNUNET_NSE_ClientMessage *client_msg) { struct GNUNET_NSE_Handle *h = cls; - struct GNUNET_MessageHeader *msg; - - h->th = NULL; - if (NULL == buf) - { - /* Connect error... */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Error while trying to transmit `%s' request.\n", - "START"); - reschedule_connect (h); - return 0; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting `%s' request.\n", - "START"); - GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader)); - msg = (struct GNUNET_MessageHeader *) buf; - msg->size = htons (sizeof (struct GNUNET_MessageHeader)); - msg->type = htons (GNUNET_MESSAGE_TYPE_NSE_START); - GNUNET_CLIENT_receive (h->client, &message_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); - return sizeof (struct GNUNET_MessageHeader); + h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; + h->recv_cb (h->recv_cb_cls, + GNUNET_TIME_absolute_ntoh (client_msg->timestamp), + GNUNET_ntoh_double (client_msg->size_estimate), + GNUNET_ntoh_double (client_msg->std_deviation)); } @@ -203,21 +135,32 @@ send_start (void *cls, size_t size, void *buf) static void reconnect (void *cls) { + GNUNET_MQ_hd_fixed_size (estimate, + GNUNET_MESSAGE_TYPE_NSE_ESTIMATE, + struct GNUNET_NSE_ClientMessage); struct GNUNET_NSE_Handle *h = cls; + struct GNUNET_MQ_MessageHandler handlers[] = { + make_estimate_handler (h), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MessageHeader *msg; + struct GNUNET_MQ_Envelope *env; h->reconnect_task = NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to network size estimation service.\n"); - GNUNET_assert (NULL == h->client); - h->client = GNUNET_CLIENT_connect ("nse", h->cfg); - GNUNET_assert (NULL != h->client); - GNUNET_assert (NULL == h->th); - h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, - sizeof (struct GNUNET_MessageHeader), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &send_start, h); - GNUNET_assert (NULL != h->th); + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connecT (h->cfg, + "nse", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) + return; + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_NSE_START); + GNUNET_MQ_send (h->mq, + env); } @@ -231,18 +174,24 @@ reconnect (void *cls) */ struct GNUNET_NSE_Handle * GNUNET_NSE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, - GNUNET_NSE_Callback func, void *func_cls) + GNUNET_NSE_Callback func, + void *func_cls) { - struct GNUNET_NSE_Handle *ret; - - GNUNET_assert (func != NULL); - ret = GNUNET_new (struct GNUNET_NSE_Handle); - ret->cfg = cfg; - ret->recv_cb = func; - ret->recv_cb_cls = func_cls; - ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, ret); - return ret; + struct GNUNET_NSE_Handle *h; + + GNUNET_assert (NULL != func); + h = GNUNET_new (struct GNUNET_NSE_Handle); + h->cfg = cfg; + h->recv_cb = func; + h->recv_cb_cls = func_cls; + h->reconnect_delay = GNUNET_TIME_UNIT_ZERO; + reconnect (h); + if (NULL == h->mq) + { + GNUNET_free (h); + return NULL; + } + return h; } @@ -254,21 +203,15 @@ GNUNET_NSE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_NSE_disconnect (struct GNUNET_NSE_Handle *h) { - GNUNET_assert (NULL != h); - if (h->reconnect_task != NULL) + if (NULL != h->reconnect_task) { GNUNET_SCHEDULER_cancel (h->reconnect_task); h->reconnect_task = NULL; } - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } - if (NULL != h->client) + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } GNUNET_free (h); } -- cgit v1.2.3