From b8eebba5dd3716f4d585e0b6b85d6ba36ef0241e Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 3 Jul 2016 11:06:03 +0000 Subject: convert unindex to MQ --- src/fs/fs_unindex.c | 133 +++++++++++++++++++++++++++++----------------------- 1 file changed, 74 insertions(+), 59 deletions(-) (limited to 'src/fs/fs_unindex.c') diff --git a/src/fs/fs_unindex.c b/src/fs/fs_unindex.c index 2c4cb6ae6..e614fbe03 100644 --- a/src/fs/fs_unindex.c +++ b/src/fs/fs_unindex.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2003--2013 GNUnet e.V. + Copyright (C) 2003--2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -221,53 +221,57 @@ unindex_process (void *cls, /** - * Function called with the response from the - * FS service to our unindexing request. + * Function called with the response from the FS service to our + * unindexing request. * * @param cls closure, unindex context - * @param msg NULL on timeout, otherwise the response + * @param msg the response */ static void -process_fs_response (void *cls, const struct GNUNET_MessageHeader *msg) +handle_unindex_response (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_FS_UnindexContext *uc = cls; struct GNUNET_FS_ProgressInfo pi; - if (uc->client != NULL) + if (NULL != uc->mq) { - GNUNET_CLIENT_disconnect (uc->client); - uc->client = NULL; - } - if (uc->state != UNINDEX_STATE_FS_NOTIFY) - { - uc->state = UNINDEX_STATE_ERROR; - uc->emsg = - GNUNET_strdup (_("Unexpected time for a response from `fs' service.")); - GNUNET_FS_unindex_sync_ (uc); - signal_unindex_error (uc); - return; - } - if (NULL == msg) - { - uc->state = UNINDEX_STATE_ERROR; - uc->emsg = GNUNET_strdup (_("Timeout waiting for `fs' service.")); - GNUNET_FS_unindex_sync_ (uc); - signal_unindex_error (uc); - return; - } - if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK) - { - uc->state = UNINDEX_STATE_ERROR; - uc->emsg = GNUNET_strdup (_("Invalid response from `fs' service.")); - GNUNET_FS_unindex_sync_ (uc); - signal_unindex_error (uc); - return; + GNUNET_MQ_destroy (uc->mq); + uc->mq = NULL; } uc->state = UNINDEX_STATE_COMPLETE; pi.status = GNUNET_FS_STATUS_UNINDEX_COMPLETED; pi.value.unindex.eta = GNUNET_TIME_UNIT_ZERO; GNUNET_FS_unindex_sync_ (uc); - GNUNET_FS_unindex_make_status_ (&pi, uc, uc->file_size); + GNUNET_FS_unindex_make_status_ (&pi, + uc, + uc->file_size); +} + + +/** + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_FS_UnindexContext *` + * @param error error code + */ +static void +unindex_mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_FS_UnindexContext *uc = cls; + + if (NULL != uc->mq) + { + GNUNET_MQ_destroy (uc->mq); + uc->mq = NULL; + } + uc->state = UNINDEX_STATE_ERROR; + uc->emsg = GNUNET_strdup (_("Error communicating with `fs' service.")); + GNUNET_FS_unindex_sync_ (uc); + signal_unindex_error (uc); } @@ -281,12 +285,25 @@ process_fs_response (void *cls, const struct GNUNET_MessageHeader *msg) static void unindex_finish (struct GNUNET_FS_UnindexContext *uc) { + GNUNET_MQ_hd_fixed_size (unindex_response, + GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK, + struct GNUNET_MessageHeader); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_unindex_response_handler (uc), + GNUNET_MQ_handler_end () + }; char *emsg; - struct UnindexMessage req; + struct GNUNET_MQ_Envelope *env; + struct UnindexMessage *req; /* generate final progress message */ - unindex_progress (uc, uc->file_size, NULL, 0, 0); - GNUNET_FS_tree_encoder_finish (uc->tc, &emsg); + unindex_progress (uc, + uc->file_size, + NULL, + 0, + 0); + GNUNET_FS_tree_encoder_finish (uc->tc, + &emsg); uc->tc = NULL; GNUNET_DISK_file_close (uc->fh); uc->fh = NULL; @@ -296,8 +313,12 @@ unindex_finish (struct GNUNET_FS_UnindexContext *uc) uc->seen_dh = NULL; uc->state = UNINDEX_STATE_FS_NOTIFY; GNUNET_FS_unindex_sync_ (uc); - uc->client = GNUNET_CLIENT_connect ("fs", uc->h->cfg); - if (uc->client == NULL) + uc->mq = GNUNET_CLIENT_connecT (uc->h->cfg, + "fs", + handlers, + &unindex_mq_error_handler, + uc); + if (NULL == uc->mq) { uc->state = UNINDEX_STATE_ERROR; uc->emsg = @@ -308,21 +329,15 @@ unindex_finish (struct GNUNET_FS_UnindexContext *uc) } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending UNINDEX message to FS service\n"); - req.header.size = htons (sizeof (struct UnindexMessage)); - req.header.type = htons (GNUNET_MESSAGE_TYPE_FS_UNINDEX); - req.reserved = 0; - req.file_id = uc->file_id; - GNUNET_break (GNUNET_OK == - GNUNET_CLIENT_transmit_and_get_response (uc->client, - &req.header, - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - GNUNET_YES, - &process_fs_response, - uc)); + env = GNUNET_MQ_msg (req, + GNUNET_MESSAGE_TYPE_FS_UNINDEX); + req->reserved = 0; + req->file_id = uc->file_id; + GNUNET_MQ_send (uc->mq, + env); } - /** * Function called by the directory scanner as we extract keywords * that we will need to remove UBlocks. @@ -745,10 +760,10 @@ GNUNET_FS_unindex_signal_suspend_ (void *cls) GNUNET_FS_uri_destroy (uc->ksk_uri); uc->ksk_uri = NULL; } - if (uc->client != NULL) + if (NULL != uc->mq) { - GNUNET_CLIENT_disconnect (uc->client); - uc->client = NULL; + GNUNET_MQ_destroy (uc->mq); + uc->mq = NULL; } if (NULL != uc->dsh) { @@ -835,7 +850,7 @@ GNUNET_FS_unindex_stop (struct GNUNET_FS_UnindexContext *uc) { struct GNUNET_FS_ProgressInfo pi; - if (uc->dscan != NULL) + if (NULL != uc->dscan) { GNUNET_FS_directory_scan_abort (uc->dscan); uc->dscan = NULL; @@ -845,15 +860,15 @@ GNUNET_FS_unindex_stop (struct GNUNET_FS_UnindexContext *uc) GNUNET_DATASTORE_cancel (uc->dqe); uc->dqe = NULL; } - if (uc->fhc != NULL) + if (NULL != uc->fhc) { GNUNET_CRYPTO_hash_file_cancel (uc->fhc); uc->fhc = NULL; } - if (uc->client != NULL) + if (NULL != uc->mq) { - GNUNET_CLIENT_disconnect (uc->client); - uc->client = NULL; + GNUNET_MQ_destroy (uc->mq); + uc->mq = NULL; } if (NULL != uc->dsh) { -- cgit v1.2.3