From de62048ab84c178fbf57682b6f9310904ff38964 Mon Sep 17 00:00:00 2001 From: Christian Fuchs Date: Mon, 2 Sep 2013 21:09:53 +0000 Subject: rewrote API minus cancel function --- src/scalarproduct/scalarproduct_api.c | 180 +++++++++++++++------------------- 1 file changed, 79 insertions(+), 101 deletions(-) (limited to 'src/scalarproduct/scalarproduct_api.c') diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c index 7c1f5394e..118de4a04 100644 --- a/src/scalarproduct/scalarproduct_api.c +++ b/src/scalarproduct/scalarproduct_api.c @@ -119,11 +119,11 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle /** * Head of the active sessions queue */ -struct GNUNET_SCALARPRODUCT_ComputationHandle *head; +static struct GNUNET_SCALARPRODUCT_ComputationHandle *head; /** * Tail of the active sessions queue */ -struct GNUNET_SCALARPRODUCT_ComputationHandle *tail; +static struct GNUNET_SCALARPRODUCT_ComputationHandle *tail; /************************************************************** *** Function Declarations ********** @@ -170,10 +170,7 @@ process_status_message (void *cls, { struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; - GNUNET_assert (qe != NULL); - - if (qe->cont_status != NULL) - qe->cont_status (qe->cont_cls, &qe->msg->key, status); + qe->cont_status (qe->cont_cls, status); } @@ -190,17 +187,38 @@ process_result_message (void *cls, enum GNUNET_SCALARPRODUCT_ResponseStatus status) { struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; + const struct GNUNET_SCALARPRODUCT_client_response *message = + (const struct GNUNET_SCALARPRODUCT_client_response *) msg; + gcry_mpi_t result = NULL; - GNUNET_assert (qe != NULL); - - if (msg == NULL && qe->cont_datum != NULL) + if (GNUNET_SCALARPRODUCT_Status_Success == status + && qe->cont_datum != NULL) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout reached or session terminated.\n"); - } - if (qe->cont_datum != NULL) - { - qe->cont_datum (qe->cont_cls, &qe->msg->key, &qe->msg->peer, status, (struct GNUNET_SCALARPRODUCT_client_response *) msg); + size_t product_len = ntohl(message->product_length); + result = gcry_mpi_new(0); + + if (0 < product_len) + { + gcry_mpi_t num; + size_t read = 0; + + if (0 != gcry_mpi_scan (&num, GCRYMPI_FMT_USG, &msg[1], product_len, &read)){ + LOG (GNUNET_ERROR_TYPE_ERROR, "Could not convert to mpi to value!\n"); + gcry_mpi_release(result); + result = NULL; + status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; + } + else + { + if (message->range > 0) + gcry_mpi_add(result, result, num); + else + gcry_mpi_sub(result, result, num); + gcry_mpi_release(num); + } + } } + qe->cont_datum (qe->cont_cls, status, result); } @@ -215,76 +233,34 @@ process_result_message (void *cls, static void receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) { - struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; - struct GNUNET_SCALARPRODUCT_ComputationHandle *qe; - int16_t was_transmitted; - struct GNUNET_SCALARPRODUCT_client_response *message = - (struct GNUNET_SCALARPRODUCT_client_response *) msg; - - h->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply from VectorProduct\n"); - - if (NULL == (qe = free_queue_head_entry (h))) - { - /** - * The queue head will be NULL if the client disconnected, - * * In case of Alice, client disconnected after sending request, before receiving response - * * In case of Bob, client disconnected after preparing response, before getting request from Alice. - */ - process_queue (h); - return; - } - - if (h->client == NULL) - { - // GKUKREJA : handle this correctly - /** - * The queue head will be NULL if the client disconnected, - * * In case of Alice, client disconnected after sending request, before receiving response - * * In case of Bob, client disconnected after preparing response, before getting request from Alice. - */ - process_queue (h); - return; - } - - was_transmitted = qe->was_transmitted; - // Control will only come here, when the request was transmitted to service, - // and service responded. - GNUNET_assert (was_transmitted == GNUNET_YES); + struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; + const struct GNUNET_SCALARPRODUCT_client_response *message = + (const struct GNUNET_SCALARPRODUCT_client_response *) msg; + enum GNUNET_SCALARPRODUCT_ResponseStatus status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; - if (msg == NULL) + if (NULL == msg) { - LOG (GNUNET_ERROR_TYPE_WARNING, "Service responded with NULL!\n"); - qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure); + LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n"); + status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected; } - else if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT)) + else if ( GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT != ntohs (msg->type)) { - LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid Message Received\n"); - qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_InvalidResponse); + LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid message type received\n"); } - else if (ntohl (message->product_length) == 0) + else if (0 < ntohl (message->product_length) || (0 == message->range)) { // response for the responder client, successful - GNUNET_STATISTICS_update (h->stats, + GNUNET_STATISTICS_update (qe->stats, gettext_noop ("# SUC responder result messages received"), 1, GNUNET_NO); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from service without product attached.\n"); - qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success); - } - else if (ntohl (message->product_length) > 0) - { - // response for the requester client, successful - GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# SUC requester result messages received"), 1, - GNUNET_NO); - - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from requester service for requester client.\n"); - qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success); + status = GNUNET_SCALARPRODUCT_Status_Success; } + + if (qe->cont_datum != NULL) + qe->response_proc (qe, msg, status); GNUNET_free (qe); - process_queue (h); } @@ -302,31 +278,31 @@ transmit_request (void *cls, size_t size, void *buf) { struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; - size_t msize; - if (buf == NULL) + if (NULL == buf) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n"); GNUNET_STATISTICS_update (qe->stats, gettext_noop ("# transmission request failures"), 1, GNUNET_NO); - GNUNET_SCALARPRODUCT_disconnect (qe); + + // notify caller about the error, done here. + if (qe->cont_datum != NULL) + qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure); + GNUNET_SCALARPRODUCT_cancel(cls); return 0; } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n", - msize); - memcpy (buf, qe->msg, size); + GNUNET_free (qe->msg); - qe->was_transmitted = GNUNET_YES; - + qe->msg = NULL; qe->th = NULL; - GNUNET_CLIENT_receive (h->client, &receive_cb, h, + GNUNET_CLIENT_receive (qe->client, &receive_cb, qe, GNUNET_TIME_UNIT_FOREVER_REL); #if INSANE_STATISTICS - GNUNET_STATISTICS_update (h->stats, + GNUNET_STATISTICS_update (qe->stats, gettext_noop ("# bytes sent to scalarproduct"), 1, GNUNET_NO); #endif @@ -389,7 +365,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t); - h->cont_datum = cont; + h->cont_status = cont; h->cont_cls = cont_cls; h->response_proc = &process_result_message; h->cfg = cfg; @@ -416,7 +392,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, { LOG (GNUNET_ERROR_TYPE_ERROR, _ ("Failed to send a message to the scalarproduct service\n")); - GNUNET_STATISTICS_destroy(h->GNUNET_YES); + GNUNET_STATISTICS_destroy(h->stats, GNUNET_YES); GNUNET_CLIENT_disconnect(h->client); GNUNET_free(h->msg); GNUNET_free(h); @@ -459,7 +435,7 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) - + mask_length); + + mask_bytes); h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); @@ -479,7 +455,7 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, return NULL; } - size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length; + size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_bytes; h->cont_datum = cont; h->cont_cls = cont_cls; @@ -492,7 +468,7 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, msg->header.size = htons (size); msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); msg->element_count = htons (element_count); - msg->mask_length = htons (mask_length); + msg->mask_length = htons (mask_bytes); vector = (int32_t*) &msg[1]; // copy each element over to the message @@ -501,7 +477,7 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); - memcpy (&vector[element_count], mask, mask_length); + memcpy (&vector[element_count], mask, mask_bytes); h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, GNUNET_TIME_UNIT_FOREVER_REL, @@ -511,7 +487,7 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, { LOG (GNUNET_ERROR_TYPE_ERROR, _ ("Failed to send a message to the scalarproduct service\n")); - GNUNET_STATISTICS_destroy(h->GNUNET_YES); + GNUNET_STATISTICS_destroy(h->stats, GNUNET_YES); GNUNET_CLIENT_disconnect(h->client); GNUNET_free(h->msg); GNUNET_free(h); @@ -524,26 +500,28 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, /** * Disconnect from the scalarproduct service. * - * @param h handle to the scalarproduct + * @param h a computation handle to cancel */ void -GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_ComputationHandle * h) +GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h) { struct GNUNET_SCALARPRODUCT_ComputationHandle * qe; - LOG (GNUNET_ERROR_TYPE_INFO, - "Disconnecting from VectorProduct\n"); - for (qe = head; head != NULL; qe = head) { - GNUNET_CONTAINER_DLL_remove (head, tail, qe); - if (NULL == qe->th) - GNUNET_CLIENT_notify_transmit_ready_cancel(qe->th); - GNUNET_CLIENT_disconnect (h->client); - GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); - qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected); - GNUNET_free(qe->msg); - GNUNET_free(qe); + if (qe == h) + { + GNUNET_CONTAINER_DLL_remove (head, tail, qe); + LOG (GNUNET_ERROR_TYPE_INFO, + "Disconnecting from VectorProduct\n"); + if (NULL == qe->th) + GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th); + GNUNET_CLIENT_disconnect (h->client); + GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); + GNUNET_free (qe->msg); + GNUNET_free (qe); + break; + } } } -- cgit v1.2.3