From 31536a9bb75502f4c090472f188e1eec138515f7 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 24 May 2014 19:57:15 +0000 Subject: cleaning up scalar product client API --- src/scalarproduct/scalarproduct_api.c | 589 +++++++++++++++------------------- 1 file changed, 258 insertions(+), 331 deletions(-) (limited to 'src/scalarproduct/scalarproduct_api.c') diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c index 3b92af579..ea62027ab 100644 --- a/src/scalarproduct/scalarproduct_api.c +++ b/src/scalarproduct/scalarproduct_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2013 Christian Grothoff (and other contributing authors) + (C) 2013, 2014 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -23,7 +23,7 @@ * @brief API for the scalarproduct * @author Christian Fuchs * @author Gaurav Kukreja - * + * @author Christian Grothoff */ #include "platform.h" #include "gnunet_util_lib.h" @@ -34,16 +34,19 @@ #define LOG(kind,...) GNUNET_log_from (kind, "scalarproduct-api",__VA_ARGS__) -/************************************************************** - *** Datatype Declarations ********** - **************************************************************/ /** - * the abstraction function for our internal callback + * The abstraction function for our internal callback + * + * @param h computation handle + * @param msg response we got, NULL on errors + * @param status processing status code */ -typedef void (*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (void *cls, - const struct GNUNET_MessageHeader *msg, - enum GNUNET_SCALARPRODUCT_ResponseStatus status); +typedef void +(*GNUNET_SCALARPRODUCT_ResponseMessageHandler) (struct GNUNET_SCALARPRODUCT_ComputationHandle *h, + const struct ClientResponseMessage *msg, + enum GNUNET_SCALARPRODUCT_ResponseStatus status); + /** * A handle returned for each computation @@ -60,11 +63,6 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle */ struct GNUNET_CLIENT_Connection *client; - /** - * Handle for statistics. - */ - struct GNUNET_STATISTICS_Handle *stats; - /** * The shared session key identifying this computation */ @@ -76,30 +74,25 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle struct GNUNET_CLIENT_TransmitHandle *th; /** - * count of all elements we offer for computation + * count of all @e elements we offer for computation */ uint32_t element_count_total; /** - * count of the transfered elements we offer for computation + * count of the transfered @e elements we offer for computation */ uint32_t element_count_transfered; - + /** - * the client's elements which + * the client's elements which */ - struct GNUNET_SCALARPRODUCT_Element * elements; - + struct GNUNET_SCALARPRODUCT_Element *elements; + /** * Message to be sent to the scalarproduct service */ - void * msg; + struct GNUNET_MessageHeader *msg; - /** - * The client's msg handler callback - */ - union - { /** * Function to call after transmission of the request (Bob). */ @@ -109,101 +102,99 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle * Function to call after transmission of the request (Alice). */ GNUNET_SCALARPRODUCT_DatumProcessor cont_datum; - }; /** - * Closure for 'cont'. + * Closure for @e cont_status or @e cont_datum. */ void *cont_cls; /** - * API internal callback for results and failures to be forwarded to the client + * API internal callback for results and failures to be forwarded to + * the client. */ GNUNET_SCALARPRODUCT_ResponseMessageHandler response_proc; - - /** - * - */ - GNUNET_SCHEDULER_TaskIdentifier cont_multipart; -}; - -/************************************************************** - *** Forward Function Declarations ********** - **************************************************************/ - -void -GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h); -static size_t do_send_message (void *cls, size_t size, void *buf); -/************************************************************** - *** Static Function Declarations ********** - **************************************************************/ +}; /** - * Handles the STATUS received from the service for a response, does not contain a payload + * Handles the STATUS received from the service for a response, does + * not contain a payload. * - * @param cls our Handle + * @param h our Handle * @param msg Pointer to the response received * @param status the condition the request was terminated with (eg: disconnect) */ static void -process_status_message (void *cls, - const struct GNUNET_MessageHeader *msg, +process_status_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h, + const struct ClientResponseMessage *msg, enum GNUNET_SCALARPRODUCT_ResponseStatus status) { - struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; - - qe->cont_status (qe->cont_cls, status); + if (NULL != h->cont_status) + h->cont_status (h->cont_cls, + status); + GNUNET_SCALARPRODUCT_cancel (h); } /** - * Handles the RESULT received from the service for a request, should contain a result MPI value + * Handles the RESULT received from the service for a request, should + * contain a result MPI value * - * @param cls our Handle + * @param h our Handle * @param msg Pointer to the response received * @param status the condition the request was terminated with (eg: disconnect) */ static void -process_result_message (void *cls, - const struct GNUNET_MessageHeader *msg, +process_result_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h, + const struct ClientResponseMessage *msg, enum GNUNET_SCALARPRODUCT_ResponseStatus status) { - struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; - const struct GNUNET_SCALARPRODUCT_client_response *message = - (const struct GNUNET_SCALARPRODUCT_client_response *) msg; + size_t product_len = ntohl (msg->product_length); gcry_mpi_t result = NULL; gcry_error_t rc; + gcry_mpi_t num; + size_t rsize; + if (ntohs (msg->header.size) - sizeof (struct ClientResponseMessage) + != product_len) + { + GNUNET_break (0); + status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; + } if (GNUNET_SCALARPRODUCT_Status_Success == status) + { + result = gcry_mpi_new (0); + + if (0 < product_len) { - size_t product_len = ntohl (message->product_length); - result = gcry_mpi_new (0); - - if (0 < product_len) - { - gcry_mpi_t num; - size_t read = 0; - - if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD, &message[1], product_len, &read))) - { - LOG_GCRY(GNUNET_ERROR_TYPE_ERROR, "gcry_mpi_scan", rc); - gcry_mpi_release (result); - result = NULL; - status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; - } - else - { - if (0 < message->range) - gcry_mpi_add (result, result, num); - else if (0 > message->range) - gcry_mpi_sub (result, result, num); - gcry_mpi_release (num); - } - } + rsize = 0; + if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD, + &msg[1], + product_len, + &rsize))) + { + LOG_GCRY (GNUNET_ERROR_TYPE_ERROR, + "gcry_mpi_scan", + rc); + gcry_mpi_release (result); + result = NULL; + status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; + } + else + { + if (0 < ntohl (msg->range)) + gcry_mpi_add (result, result, num); + else if (0 > ntohl (msg->range)) + gcry_mpi_sub (result, result, num); + gcry_mpi_release (num); + } } - qe->cont_datum (qe->cont_cls, status, result); + } + h->cont_datum (h->cont_cls, status, result); + if (NULL != result) + gcry_mpi_release (result); + GNUNET_SCALARPRODUCT_cancel (h); } @@ -216,144 +207,114 @@ process_result_message (void *cls, * @param msg Pointer to the data received in response */ static void -receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) +receive_cb (void *cls, + const struct GNUNET_MessageHeader *msg) { struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; - const struct GNUNET_SCALARPRODUCT_client_response *message = - (const struct GNUNET_SCALARPRODUCT_client_response *) msg; - enum GNUNET_SCALARPRODUCT_ResponseStatus status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; + const struct ClientResponseMessage *message; if (NULL == msg) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n"); - status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected; - } - else if ((GNUNET_SYSERR != message->status) && (0 < message->product_length )) - { - // response for the responder client, successful - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# SUC responder result messages received"), 1, - GNUNET_NO); - - status = GNUNET_SCALARPRODUCT_Status_Success; - } - else if (message->status == GNUNET_SYSERR){ - // service signaled an error - status = GNUNET_SCALARPRODUCT_Status_Failure; + { + LOG (GNUNET_ERROR_TYPE_INFO, + "Disconnected from SCALARPRODUCT service.\n"); + h->response_proc (h, + NULL, + GNUNET_SCALARPRODUCT_Status_ServiceDisconnected); + return; } - - if (h->cont_status != NULL) - h->response_proc (h, msg, status); - - GNUNET_free (h); -} - - -static void -send_multipart (void * cls, const struct GNUNET_SCHEDULER_TaskContext * tc) -{ - struct GNUNET_SCALARPRODUCT_ComputationHandle *h = (struct GNUNET_SCALARPRODUCT_ComputationHandle *) cls; - struct GNUNET_SCALARPRODUCT_computation_message_multipart *msg; - uint32_t size; - uint32_t todo; - - h->cont_multipart = GNUNET_SCHEDULER_NO_TASK; - - todo = h->element_count_total - h->element_count_transfered; - size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart) +todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); - if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size) { - //create a multipart msg, first we calculate a new msg size for the head msg - todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart)) / sizeof (struct GNUNET_SCALARPRODUCT_Element); - size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message_multipart) +todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); + if (ntohs (msg->size) != sizeof (struct ClientResponseMessage)) + { + GNUNET_break (0); + h->response_proc (h, + NULL, + GNUNET_SCALARPRODUCT_Status_InvalidResponse); + return; } - - msg = (struct GNUNET_SCALARPRODUCT_computation_message_multipart*) GNUNET_malloc (size); - h->msg = msg; - msg->header.size = htons (size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART); - msg->element_count_contained = htonl (todo); - - memcpy (&msg[1], &h->elements[h->element_count_transfered], todo * sizeof (struct GNUNET_SCALARPRODUCT_Element)); - h->element_count_transfered += todo; - - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, // retry is OK in the initial stage - &do_send_message, h); - - if (!h->th) { - LOG (GNUNET_ERROR_TYPE_ERROR, - _ ("Failed to send a multipart message to the scalarproduct service\n")); - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# transmission request failures"), - 1, GNUNET_NO); - GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); - GNUNET_CLIENT_disconnect (h->client); - GNUNET_free (h->msg); - h->msg = NULL; - if (h->cont_status != NULL) - h->response_proc (h, NULL, GNUNET_SCALARPRODUCT_Status_Failure); - - GNUNET_SCALARPRODUCT_cancel (cls); + message = (const struct ClientResponseMessage *) msg; + if (GNUNET_SYSERR == ntohl (message->status)) + { + h->response_proc (h, + NULL, + GNUNET_SCALARPRODUCT_Status_Failure); + return; } + h->response_proc (h, + message, + GNUNET_SCALARPRODUCT_Status_Success); } + /** - * Transmits the request to the VectorProduct Service + * Transmits the request to the SCALARPRODUCT service * - * @param cls Closure - * @param size Size of the buffer + * @param cls Closure with the `struct GNUNET_SCALARPRODUCT_ComputationHandle` + * @param size Size of the buffer @a buf * @param buf Pointer to the buffer - * * @return Size of the message sent */ static size_t -do_send_message (void *cls, size_t size, +do_send_message (void *cls, + size_t size, void *buf) { struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; + struct ComputationMultipartMessage *msg; + size_t ret; + uint32_t nsize; + uint32_t todo; - if (NULL == buf) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n"); - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# transmission request failures"), - 1, GNUNET_NO); - - // notify caller about the error, done here. - if (h->cont_status != NULL) - h->response_proc (h, NULL, GNUNET_SCALARPRODUCT_Status_Failure); - - GNUNET_SCALARPRODUCT_cancel (cls); + h->th = NULL; + if (NULL == buf) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Failed to transmit request to SCALARPRODUCT.\n"); + /* notify caller about the error, done here */ + h->response_proc (h, NULL, + GNUNET_SCALARPRODUCT_Status_Failure); return 0; } - memcpy (buf, h->msg, size); - + ret = ntohs (h->msg->size); + memcpy (buf, h->msg, ret); GNUNET_free (h->msg); h->msg = NULL; - h->th = NULL; - -#if INSANE_STATISTICS - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# bytes sent to scalarproduct"), 1, - GNUNET_NO); -#endif - /* done sending */ - if (h->element_count_total == h->element_count_transfered) { - GNUNET_CLIENT_receive (h->client, &receive_cb, h, + /* done sending? */ + if (h->element_count_total == h->element_count_transfered) + { + GNUNET_CLIENT_receive (h->client, + &receive_cb, h, GNUNET_TIME_UNIT_FOREVER_REL); - return size; + return ret; } - - h->cont_multipart = GNUNET_SCHEDULER_add_now (&send_multipart, h); - - return size; -} + todo = h->element_count_total - h->element_count_transfered; + nsize = sizeof (struct ComputationMultipartMessage) + + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); + if (GNUNET_SERVER_MAX_MESSAGE_SIZE <= size) + { + /* cannot do all of them, limit to what is possible in one message */ + todo = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationMultipartMessage)) + / sizeof (struct GNUNET_SCALARPRODUCT_Element); + nsize = sizeof (struct ComputationMultipartMessage) + + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element); + } -/************************************************************** - *** API ********** - **************************************************************/ + msg = GNUNET_malloc (nsize); + h->msg = &msg->header; + msg->header.size = htons (nsize); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_MUTLIPART); + msg->element_count_contained = htonl (todo); + memcpy (&msg[1], + &h->elements[h->element_count_transfered], + todo * sizeof (struct GNUNET_SCALARPRODUCT_Element)); + h->element_count_transfered += todo; + h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, nsize, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_NO, + &do_send_message, h); + GNUNET_assert (NULL != h->th); + return ret; +} /** @@ -362,96 +323,75 @@ do_send_message (void *cls, size_t size, * @param cfg the gnunet configuration handle * @param key Session key unique to the requesting client * @param elements Array of elements of the vector - * @param element_count Number of elements in the vector + * @param element_count Number of elements in the @a elements vector * @param cont Callback function - * @param cont_cls Closure for the callback function - * + * @param cont_cls Closure for @a cont * @return a new handle for this computation */ struct GNUNET_SCALARPRODUCT_ComputationHandle * -GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handle * cfg, - const struct GNUNET_HashCode * session_key, - const struct GNUNET_SCALARPRODUCT_Element * elements, - uint32_t element_count, - GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, - void * cont_cls) +GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_HashCode *session_key, + const struct GNUNET_SCALARPRODUCT_Element *elements, + uint32_t element_count, + GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, + void *cont_cls) { struct GNUNET_SCALARPRODUCT_ComputationHandle *h; - struct GNUNET_SCALARPRODUCT_computation_message *msg; + struct ComputationMessage *msg; uint32_t size; uint16_t possible; - GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_computation_message) - + element_count * sizeof (int32_t)); h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); + h->cont_status = cont; + h->cont_cls = cont_cls; + h->response_proc = &process_status_message; + h->cfg = cfg; + h->key = *session_key; h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); - if (!h->client) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - _ ("Failed to connect to the scalarproduct service\n")); - GNUNET_free (h); - return NULL; - } - h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); - if (!h->stats) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - _ ("Failed to send a message to the statistics service\n")); - GNUNET_CLIENT_disconnect (h->client); - GNUNET_free (h); - return NULL; - } - h->element_count_total = element_count; - size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element); - if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) { + if (NULL == h->client) + { + /* scalarproduct configuration error */ + GNUNET_break (0); + GNUNET_free (h); + return NULL; + } + size = sizeof (struct ComputationMessage) + + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element); + if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) + { possible = element_count; h->element_count_transfered = element_count; } - else { - //create a multipart msg, first we calculate a new msg size for the head msg - possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message)) / sizeof (struct GNUNET_SCALARPRODUCT_Element); + else + { + /* create a multipart msg, first we calculate a new msg size for the head msg */ + possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationMessage)) + / sizeof (struct GNUNET_SCALARPRODUCT_Element); h->element_count_transfered = possible; - size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + possible*sizeof (struct GNUNET_SCALARPRODUCT_Element); - h->elements = (struct GNUNET_SCALARPRODUCT_Element*) - GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count); - memcpy (h->elements, elements, sizeof (struct GNUNET_SCALARPRODUCT_Element)*element_count); + size = sizeof (struct ComputationMessage) + + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element); + h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count); + memcpy (h->elements, + elements, + sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count); } - h->cont_status = cont; - h->cont_cls = cont_cls; - h->response_proc = &process_status_message; - h->cfg = cfg; - memcpy (&h->key, session_key, sizeof (struct GNUNET_HashCode)); - - msg = (struct GNUNET_SCALARPRODUCT_computation_message*) GNUNET_malloc (size); - h->msg = msg; + msg = GNUNET_malloc (size); + h->msg = &msg->header; msg->header.size = htons (size); msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB); msg->element_count_total = htonl (element_count); msg->element_count_contained = htonl (possible); - - memcpy (&msg->session_key, session_key, sizeof (struct GNUNET_HashCode)); - memcpy (&msg[1], elements, possible); - + msg->session_key = *session_key; + memcpy (&msg[1], + elements, + possible); h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, // retry is OK in the initial stage + GNUNET_YES, /* retry is OK in the initial stage */ &do_send_message, h); - if (!h->th) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - _ ("Failed to send a message to the scalarproduct service\n")); - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# transmission request failures"), - 1, GNUNET_NO); - GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); - GNUNET_CLIENT_disconnect (h->client); - GNUNET_free (h->msg); - GNUNET_free_non_null (h->elements); - GNUNET_free (h); - return NULL; - } + GNUNET_assert (NULL != h->th); return h; } @@ -463,99 +403,82 @@ GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handl * @param session_key Session key should be unique to the requesting client * @param peer PeerID of the other peer * @param elements Array of elements of the vector - * @param element_count Number of elements in the vector + * @param element_count Number of elements in the @a elements vector * @param cont Callback function - * @param cont_cls Closure for the callback function - * + * @param cont_cls Closure for @a cont * @return a new handle for this computation */ struct GNUNET_SCALARPRODUCT_ComputationHandle * -GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle * cfg, - const struct GNUNET_HashCode * session_key, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_SCALARPRODUCT_Element * elements, - uint32_t element_count, - GNUNET_SCALARPRODUCT_DatumProcessor cont, - void * cont_cls) +GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle *cfg, + const struct GNUNET_HashCode *session_key, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_SCALARPRODUCT_Element *elements, + uint32_t element_count, + GNUNET_SCALARPRODUCT_DatumProcessor cont, + void *cont_cls) { struct GNUNET_SCALARPRODUCT_ComputationHandle *h; - struct GNUNET_SCALARPRODUCT_computation_message *msg; + struct ComputationMessage *msg; uint32_t size; - uint16_t possible; + uint32_t possible; h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); - if (!h->client) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - _ ("Failed to connect to the scalarproduct service\n")); - GNUNET_free (h); - return NULL; - } - h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); - if (!h->stats) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - _ ("Failed to send a message to the statistics service\n")); - GNUNET_CLIENT_disconnect (h->client); - GNUNET_free (h); - return NULL; - } - + if (NULL == h->client) + { + /* missconfigured scalarproduct service */ + GNUNET_break (0); + GNUNET_free (h); + return NULL; + } h->element_count_total = element_count; - size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element); - if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) { + h->cont_datum = cont; + h->cont_cls = cont_cls; + h->response_proc = &process_result_message; + h->cfg = cfg; + h->key = *session_key; + size = sizeof (struct ComputationMessage) + + element_count * sizeof (struct GNUNET_SCALARPRODUCT_Element); + if (GNUNET_SERVER_MAX_MESSAGE_SIZE > size) + { possible = element_count; h->element_count_transfered = element_count; } - else { - //create a multipart msg, first we calculate a new msg size for the head msg - possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct GNUNET_SCALARPRODUCT_computation_message)) / sizeof (struct GNUNET_SCALARPRODUCT_Element); + else + { + /* create a multipart msg, first we calculate a new msg size for the head msg */ + possible = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (struct ComputationMessage)) + / sizeof (struct GNUNET_SCALARPRODUCT_Element); h->element_count_transfered = possible; - size = sizeof (struct GNUNET_SCALARPRODUCT_computation_message) + possible*sizeof (struct GNUNET_SCALARPRODUCT_Element); - h->elements = (struct GNUNET_SCALARPRODUCT_Element*) - GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count); - memcpy (h->elements, elements, sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count); + size = sizeof (struct ComputationMessage) + + possible * sizeof (struct GNUNET_SCALARPRODUCT_Element); + h->elements = GNUNET_malloc (sizeof(struct GNUNET_SCALARPRODUCT_Element) * element_count); + memcpy (h->elements, + elements, + sizeof (struct GNUNET_SCALARPRODUCT_Element) * element_count); } - - h->cont_datum = cont; - h->cont_cls = cont_cls; - h->response_proc = &process_result_message; - h->cfg = cfg; - memcpy (&h->key, session_key, sizeof (struct GNUNET_HashCode)); - msg = (struct GNUNET_SCALARPRODUCT_computation_message*) GNUNET_malloc (size); - h->msg = msg; + msg = GNUNET_malloc (size); + h->msg = &msg->header; msg->header.size = htons (size); msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); msg->element_count_total = htonl (element_count); msg->element_count_contained = htonl (possible); - - memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); - memcpy (&msg->session_key, session_key, sizeof (struct GNUNET_HashCode)); - memcpy (&msg[1], elements, sizeof (struct GNUNET_SCALARPRODUCT_Element) * possible); - + msg->reserved = htonl (0); + msg->peer = *peer; + msg->session_key = *session_key; + memcpy (&msg[1], + elements, + sizeof (struct GNUNET_SCALARPRODUCT_Element) * possible); h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, // retry is OK in the initial stage + GNUNET_YES, /* retry is OK in the initial stage */ &do_send_message, h); - if (!h->th) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - _ ("Failed to send a message to the scalarproduct service\n")); - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# transmission request failures"), - 1, GNUNET_NO); - GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); - GNUNET_CLIENT_disconnect (h->client); - GNUNET_free (h->msg); - GNUNET_free_non_null (h->elements); - GNUNET_free (h); - return NULL; - } + GNUNET_assert (NULL != h->th); return h; } + /** * Cancel an ongoing computation or revoke our collaboration offer. * Closes the connection to the service @@ -563,16 +486,20 @@ GNUNET_SCALARPRODUCT_start_computation (const struct GNUNET_CONFIGURATION_Handle * @param h computation handle to terminate */ void -GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h) +GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle *h) { if (NULL != h->th) + { GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - if (GNUNET_SCHEDULER_NO_TASK != h->cont_multipart) - GNUNET_SCHEDULER_cancel (h->cont_multipart); + h->th = NULL; + } GNUNET_free_non_null (h->elements); GNUNET_free_non_null (h->msg); - GNUNET_CLIENT_disconnect (h->client); - GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); + if (NULL != h->client) + { + GNUNET_CLIENT_disconnect (h->client); + h->client = NULL; + } GNUNET_free (h); } -- cgit v1.2.3