From 8d7168b4fead012aaff73313248415ba2d395082 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 2 Jul 2016 19:21:17 +0000 Subject: rewrite scalarproduct_api to use MQ --- src/scalarproduct/scalarproduct_api.c | 397 ++++++++++++++-------------------- 1 file changed, 164 insertions(+), 233 deletions(-) diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c index 71db3cd86..8639b4e5a 100644 --- a/src/scalarproduct/scalarproduct_api.c +++ b/src/scalarproduct/scalarproduct_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2013, 2014 GNUnet e.V. + Copyright (C) 2013, 2014, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -23,8 +23,6 @@ * @author Christian Fuchs * @author Gaurav Kukreja * @author Christian Grothoff - * - * TODO: use MQ */ #include "platform.h" #include "gnunet_util_lib.h" @@ -62,22 +60,7 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle /** * Current connection to the scalarproduct service. */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Current transmit handle. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - - /** - * the client's elements which - */ - struct GNUNET_SCALARPRODUCT_Element *elements; - - /** - * Message to be sent to the scalarproduct service - */ - struct GNUNET_MessageHeader *msg; + struct GNUNET_MQ_Handle *mq; /** * Function to call after transmission of the request (Bob). @@ -105,144 +88,28 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle */ struct GNUNET_HashCode key; - /** - * count of all @e elements we offer for computation - */ - uint32_t element_count_total; - - /** - * count of the transfered @e elements we offer for computation - */ - uint32_t element_count_transfered; - - /** - * Type to use for the multipart messages. - */ - uint16_t mp_type; - }; /** - * Called when a response is received from the service. After basic - * check, the handler in `h->response_proc` is called. This functions - * handles the response to the client which used the API. + * Called when a response is received from the service. Perform basic + * check that the message is well-formed. * * @param cls Pointer to the Master Context - * @param msg Pointer to the data received in response + * @param message Pointer to the data received in response + * @return #GNUNET_OK if @a message is well-formed */ -static void -receive_cb (void *cls, - const struct GNUNET_MessageHeader *msg) +static int +check_response (void *cls, + const struct ClientResponseMessage *message) { - struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; - const struct ClientResponseMessage *message; - enum GNUNET_SCALARPRODUCT_ResponseStatus status; - - if (NULL == msg) - { - LOG (GNUNET_ERROR_TYPE_INFO, - "Disconnected from SCALARPRODUCT service.\n"); - h->response_proc (h, - NULL, - GNUNET_SCALARPRODUCT_STATUS_DISCONNECTED); - return; - } - if (ntohs (msg->size) < sizeof (struct ClientResponseMessage)) - { - GNUNET_break (0); - h->response_proc (h, - NULL, - GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE); - return; - } - message = (const struct ClientResponseMessage *) msg; - if (ntohs (msg->size) != + if (ntohs (message->header.size) != ntohl (message->product_length) + sizeof (struct ClientResponseMessage)) { GNUNET_break (0); - h->response_proc (h, - NULL, - GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE); - return; - } - status = (enum GNUNET_SCALARPRODUCT_ResponseStatus) ntohl (message->status); - h->response_proc (h, - message, - status); -} - - -/** - * Transmits the request to the SCALARPRODUCT service - * - * @param cls Closure with the `struct GNUNET_SCALARPRODUCT_ComputationHandle` - * @param size Size of the buffer @a buf - * @param buf Pointer to the buffer - * @return Size of the message sent - */ -static size_t -do_send_message (void *cls, - size_t size, - void *buf) -{ - struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; - struct ComputationBobCryptodataMultipartMessage *msg; - size_t ret; - uint32_t nsize; - uint32_t todo; - - h->th = NULL; - if (NULL == buf) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit request to SCALARPRODUCT.\n"); - /* notify caller about the error, done here */ - h->response_proc (h, NULL, - GNUNET_SCALARPRODUCT_STATUS_FAILURE); - return 0; - } - ret = ntohs (h->msg->size); - memcpy (buf, h->msg, ret); - GNUNET_free (h->msg); - h->msg = NULL; - - /* done sending? */ - if (h->element_count_total == h->element_count_transfered) - { - GNUNET_CLIENT_receive (h->client, - &receive_cb, h, - GNUNET_TIME_UNIT_FOREVER_REL); - return ret; + return GNUNET_SYSERR; } - - todo = h->element_count_total - h->element_count_transfered; - nsize = sizeof (struct ComputationBobCryptodataMultipartMessage) - + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); - if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size) - { - /* cannot do all of them, limit to what is possible in one message */ - todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationBobCryptodataMultipartMessage)) - / sizeof (struct GNUNET_SCALARPRODUCT_Element); - nsize = sizeof (struct ComputationBobCryptodataMultipartMessage) - + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); - } - - msg = GNUNET_malloc (nsize); - h->msg = &msg->header; - msg->header.size = htons (nsize); - msg->header.type = htons (h->mp_type); - msg->element_count_contained = htonl (todo); - memcpy (&msg[1], - &h->elements[h->element_count_transfered], - todo * sizeof (struct GNUNET_SCALARPRODUCT_Element)); - h->element_count_transfered += todo; - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, nsize, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &do_send_message, h); - GNUNET_assert (NULL != h->th); - return ret; + return GNUNET_OK; } @@ -267,6 +134,28 @@ process_status_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h, } +/** + * Called when a response is received from the service. After basic + * check, the handler in `h->response_proc` is called. This functions + * handles the response to the client which used the API. + * + * @param cls Pointer to the Master Context + * @param msg Pointer to the data received in response + */ +static void +handle_response (void *cls, + const struct ClientResponseMessage *message) +{ + struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; + enum GNUNET_SCALARPRODUCT_ResponseStatus status; + + status = (enum GNUNET_SCALARPRODUCT_ResponseStatus) ntohl (message->status); + h->response_proc (h, + message, + status); +} + + /** * Check if the keys for all given elements are unique. * @@ -301,6 +190,27 @@ check_unique (const struct GNUNET_SCALARPRODUCT_Element *elements, } +/** + * We encountered an error communicating with the set service while + * performing a set operation. Report to the application. + * + * @param cls the `struct GNUNET_SCALARPRODUCT_ComputationHandle` + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; + + LOG (GNUNET_ERROR_TYPE_INFO, + "Disconnected from SCALARPRODUCT service.\n"); + h->response_proc (h, + NULL, + GNUNET_SCALARPRODUCT_STATUS_DISCONNECTED); +} + + /** * Used by Bob's client to cooperate with Alice, * @@ -320,65 +230,79 @@ GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handl GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, void *cont_cls) { - struct GNUNET_SCALARPRODUCT_ComputationHandle *h; + GNUNET_MQ_hd_var_size (response, + GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT, + struct ClientResponseMessage); + struct GNUNET_SCALARPRODUCT_ComputationHandle *h + = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_response_handler (h), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; struct BobComputationMessage *msg; + struct ComputationBobCryptodataMultipartMessage *mmsg; uint32_t size; uint16_t possible; + uint16_t todo; + uint32_t element_count_transfered; + - if (GNUNET_SYSERR == check_unique (elements, element_count)) + if (GNUNET_SYSERR == check_unique (elements, + element_count)) return NULL; - h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); h->cont_status = cont; h->cont_cls = cont_cls; h->response_proc = &process_status_message; h->cfg = cfg; h->key = *session_key; - h->client = GNUNET_CLIENT_connect ("scalarproduct-bob", cfg); - h->element_count_total = element_count; - h->mp_type = GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_BOB; - if (NULL == h->client) + h->mq = GNUNET_CLIENT_connecT (cfg, + "scalarproduct-bob", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) { /* scalarproduct configuration error */ GNUNET_break (0); GNUNET_free (h); return NULL; } - size = sizeof (struct BobComputationMessage) - + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element); - if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) - { - possible = element_count; - h->element_count_transfered = element_count; - } - else - { - /* create a multipart msg, first we calculate a new msg size for the head msg */ - possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BobComputationMessage)) - / sizeof (struct GNUNET_SCALARPRODUCT_Element); - h->element_count_transfered = possible; - size = sizeof (struct BobComputationMessage) - + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element); - h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count); - memcpy (h->elements, - elements, - sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count); - } - - msg = GNUNET_malloc (size); - h->msg = &msg->header; - msg->header.size = htons (size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB); + possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct BobComputationMessage)) + / sizeof (struct GNUNET_SCALARPRODUCT_Element); + todo = GNUNET_MIN (possible, + element_count); + size = todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); + env = GNUNET_MQ_msg_extra (msg, + size, + GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB); msg->element_count_total = htonl (element_count); - msg->element_count_contained = htonl (possible); + msg->element_count_contained = htonl (todo); msg->session_key = *session_key; memcpy (&msg[1], elements, - possible * sizeof (struct GNUNET_SCALARPRODUCT_Element)); - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, /* retry is OK in the initial stage */ - &do_send_message, h); - GNUNET_assert (NULL != h->th); + size); + element_count_transfered = todo; + GNUNET_MQ_send (h->mq, + env); + possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (*mmsg)) + / sizeof (struct GNUNET_SCALARPRODUCT_Element); + while (element_count_transfered < element_count) + { + todo = GNUNET_MIN (possible, + element_count - element_count_transfered); + size = todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); + env = GNUNET_MQ_msg_extra (mmsg, + size, + GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_BOB); + mmsg->element_count_contained = htonl (todo); + memcpy (&mmsg[1], + &elements[element_count_transfered], + size); + element_count_transfered += todo; + GNUNET_MQ_send (h->mq, + env); + } return h; } @@ -464,67 +388,81 @@ GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle GNUNET_SCALARPRODUCT_DatumProcessor cont, void *cont_cls) { - struct GNUNET_SCALARPRODUCT_ComputationHandle *h; + GNUNET_MQ_hd_var_size (response, + GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT, + struct ClientResponseMessage); + struct GNUNET_SCALARPRODUCT_ComputationHandle *h + = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_response_handler (h), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; struct AliceComputationMessage *msg; + struct ComputationBobCryptodataMultipartMessage *mmsg; uint32_t size; - uint32_t possible; + uint16_t possible; + uint16_t todo; + uint32_t element_count_transfered; - if (GNUNET_SYSERR == check_unique (elements, element_count)) + if (GNUNET_SYSERR == check_unique (elements, + element_count)) return NULL; - h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); - h->client = GNUNET_CLIENT_connect ("scalarproduct-alice", cfg); - if (NULL == h->client) + h->mq = GNUNET_CLIENT_connecT (cfg, + "scalarproduct-alice", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) { /* missconfigured scalarproduct service */ GNUNET_break (0); GNUNET_free (h); return NULL; } - h->element_count_total = element_count; h->cont_datum = cont; h->cont_cls = cont_cls; h->response_proc = &process_result_message; h->cfg = cfg; h->key = *session_key; - h->mp_type = GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_ALICE; - size = sizeof (struct AliceComputationMessage) - + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element); - if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) - { - possible = element_count; - h->element_count_transfered = element_count; - } - else - { - /* create a multipart msg, first we calculate a new msg size for the head msg */ - possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct AliceComputationMessage)) - / sizeof (struct GNUNET_SCALARPRODUCT_Element); - h->element_count_transfered = possible; - size = sizeof (struct AliceComputationMessage) - + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element); - h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count); - memcpy (h->elements, - elements, - sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count); - } - msg = GNUNET_malloc (size); - h->msg = &msg->header; - msg->header.size = htons (size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); + possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct AliceComputationMessage)) + / sizeof (struct GNUNET_SCALARPRODUCT_Element); + todo = GNUNET_MIN (possible, + element_count); + size = todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); + env = GNUNET_MQ_msg_extra (msg, + size, + GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); msg->element_count_total = htonl (element_count); - msg->element_count_contained = htonl (possible); + msg->element_count_contained = htonl (todo); msg->reserved = htonl (0); msg->peer = *peer; msg->session_key = *session_key; memcpy (&msg[1], elements, - sizeof (struct GNUNET_SCALARPRODUCT_Element) * possible); - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, /* retry is OK in the initial stage */ - &do_send_message, h); - GNUNET_assert (NULL != h->th); + size); + GNUNET_MQ_send (h->mq, + env); + element_count_transfered = todo; + possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (*mmsg)) + / sizeof (struct GNUNET_SCALARPRODUCT_Element); + while (element_count_transfered < element_count) + { + todo = GNUNET_MIN (possible, + element_count - element_count_transfered); + size = todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); + env = GNUNET_MQ_msg_extra (mmsg, + size, + GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART_ALICE); + mmsg->element_count_contained = htonl (todo); + memcpy (&mmsg[1], + &elements[element_count_transfered], + size); + element_count_transfered += todo; + GNUNET_MQ_send (h->mq, + env); + } return h; } @@ -538,17 +476,10 @@ GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle void GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h) { - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } - GNUNET_free_non_null (h->elements); - GNUNET_free_non_null (h->msg); - if (NULL != h->client) + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } GNUNET_free (h); } -- cgit v1.2.3