From 840ea5278ccb38427c76c6c14682664c643150f9 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 7 Dec 2014 22:54:20 +0000 Subject: -style, actually call GNUNET_CADET_receive_done to receive more messages --- src/scalarproduct/gnunet-scalarproduct.c | 16 +- .../gnunet-service-scalarproduct_alice.c | 36 ++-- .../gnunet-service-scalarproduct_bob.c | 28 ++- src/scalarproduct/scalarproduct.h | 5 +- src/scalarproduct/scalarproduct_api.c | 192 ++++++++++----------- src/scalarproduct/test_scalarproduct.conf | 8 +- 6 files changed, 154 insertions(+), 131 deletions(-) (limited to 'src/scalarproduct') diff --git a/src/scalarproduct/gnunet-scalarproduct.c b/src/scalarproduct/gnunet-scalarproduct.c index f6e7d6af2..e812ff444 100644 --- a/src/scalarproduct/gnunet-scalarproduct.c +++ b/src/scalarproduct/gnunet-scalarproduct.c @@ -84,23 +84,23 @@ responder_callback (void *cls, { switch (status) { - case GNUNET_SCALARPRODUCT_Status_Success: + case GNUNET_SCALARPRODUCT_STATUS_SUCCESS: ret = 0; LOG (GNUNET_ERROR_TYPE_INFO, "Session %s concluded.\n", GNUNET_h2s (&session_key)); break; - case GNUNET_SCALARPRODUCT_Status_InvalidResponse: + case GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE: LOG (GNUNET_ERROR_TYPE_ERROR, "Session %s failed: invalid response\n", GNUNET_h2s (&session_key)); break; - case GNUNET_SCALARPRODUCT_Status_Failure: + case GNUNET_SCALARPRODUCT_STATUS_FAILURE: LOG (GNUNET_ERROR_TYPE_ERROR, "Session %s failed: service failure\n", GNUNET_h2s (&session_key)); break; - case GNUNET_SCALARPRODUCT_Status_ServiceDisconnected: + case GNUNET_SCALARPRODUCT_STATUS_DISCONNECTED: LOG (GNUNET_ERROR_TYPE_ERROR, "Session %s failed: service disconnect!\n", GNUNET_h2s (&session_key)); @@ -133,7 +133,7 @@ requester_callback (void *cls, switch (status) { - case GNUNET_SCALARPRODUCT_Status_Success: + case GNUNET_SCALARPRODUCT_STATUS_SUCCESS: if (0 == (rc = gcry_mpi_aprint (GCRYMPI_FMT_HEX, &buf, NULL, result))) { ret = 0; @@ -144,19 +144,19 @@ requester_callback (void *cls, "gcry_mpi_aprint", rc); break; - case GNUNET_SCALARPRODUCT_Status_InvalidResponse: + case GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE: LOG (GNUNET_ERROR_TYPE_ERROR, "Session %s with peer %s failed: invalid response received\n", GNUNET_h2s (&session_key), GNUNET_i2s (&peer_id)); break; - case GNUNET_SCALARPRODUCT_Status_Failure: + case GNUNET_SCALARPRODUCT_STATUS_FAILURE: LOG (GNUNET_ERROR_TYPE_ERROR, "Session %s with peer %s failed: API failure\n", GNUNET_h2s (&session_key), GNUNET_i2s (&peer_id)); break; - case GNUNET_SCALARPRODUCT_Status_ServiceDisconnected: + case GNUNET_SCALARPRODUCT_STATUS_DISCONNECTED: LOG (GNUNET_ERROR_TYPE_ERROR, "Session %s with peer %s was disconnected from service.\n", GNUNET_h2s (&session_key), diff --git a/src/scalarproduct/gnunet-service-scalarproduct_alice.c b/src/scalarproduct/gnunet-service-scalarproduct_alice.c index ce796aa05..b797a378d 100644 --- a/src/scalarproduct/gnunet-service-scalarproduct_alice.c +++ b/src/scalarproduct/gnunet-service-scalarproduct_alice.c @@ -167,11 +167,12 @@ struct AliceServiceSession uint32_t transferred_element_count; /** - * Is this session active (#GNUNET_YES), Concluded (#GNUNET_NO), or - * had an error (#GNUNET_SYSERR). - * FIXME: replace with proper enum for status codes! + * State of this session. In + * #GNUNET_SCALARPRODUCT_STATUS_ACTIVE while operation is + * ongoing, afterwards in #GNUNET_SCALARPRODUCT_STATUS_SUCCESS or + * #GNUNET_SCALARPRODUCT_STATUS_FAILURE. */ - int32_t active; + enum GNUNET_SCALARPRODUCT_ResponseStatus status; /** * Flag to prevent recursive calls to #destroy_service_session() from @@ -326,12 +327,12 @@ prepare_client_end_notification (struct AliceServiceSession *session) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending session-end notification with status %d to client for session %s\n", - session->active, + session->status, GNUNET_h2s (&session->session_id)); e = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT); msg->product_length = htonl (0); - msg->status = htonl (session->active); + msg->status = htonl (session->status); GNUNET_MQ_send (session->client_mq, e); } @@ -439,11 +440,16 @@ cb_channel_destruction (void *cls, "Peer disconnected, terminating session %s with peer %s\n", GNUNET_h2s (&s->session_id), GNUNET_i2s (&s->peer)); + if (NULL != s->cadet_mq) + { + GNUNET_MQ_destroy (s->cadet_mq); + s->cadet_mq = NULL; + } s->channel = NULL; - if (GNUNET_YES == s->active) + if (GNUNET_SCALARPRODUCT_STATUS_ACTIVE == s->status) { /* We didn't get an answer yet, fail with error */ - s->active = GNUNET_SYSERR; + s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE; prepare_client_end_notification (s); } } @@ -685,6 +691,7 @@ handle_bobs_cryptodata_multipart (void *cls, sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); } s->transferred_element_count += contained; + GNUNET_CADET_receive_done (s->channel); if (s->transferred_element_count != s->used_element_count) return GNUNET_OK; @@ -775,6 +782,7 @@ handle_bobs_cryptodata_message (void *cls, sizeof (struct GNUNET_CRYPTO_PaillierCiphertext)); } s->transferred_element_count = contained; + GNUNET_CADET_receive_done (s->channel); if (s->transferred_element_count != s->used_element_count) { @@ -964,7 +972,7 @@ cb_intersection_element_removed (void *cls, } s->intersection_op = NULL; s->intersection_set = NULL; - s->active = GNUNET_SYSERR; + s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE; prepare_client_end_notification (s); return; default: @@ -1012,7 +1020,7 @@ cb_intersection_request_alice (void *cls, if (NULL == s->intersection_op) { GNUNET_break (0); - s->active = GNUNET_SYSERR; + s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE; prepare_client_end_notification (s); return; } @@ -1020,7 +1028,7 @@ cb_intersection_request_alice (void *cls, GNUNET_SET_commit (s->intersection_op, s->intersection_set)) { - s->active = GNUNET_SYSERR; + s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE; prepare_client_end_notification (s); return; } @@ -1051,7 +1059,7 @@ client_request_complete_alice (struct AliceServiceSession *s) GNUNET_CADET_OPTION_RELIABLE); if (NULL == s->channel) { - s->active = GNUNET_SYSERR; + s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE; prepare_client_end_notification (s); return; } @@ -1064,7 +1072,7 @@ client_request_complete_alice (struct AliceServiceSession *s) s); if (NULL == s->intersection_listen) { - s->active = GNUNET_SYSERR; + s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE; GNUNET_CADET_channel_destroy (s->channel); s->channel = NULL; prepare_client_end_notification (s); @@ -1227,7 +1235,7 @@ GSS_handle_alice_client_message (void *cls, s = GNUNET_new (struct AliceServiceSession); s->peer = msg->peer; - s->active = GNUNET_YES; + s->status = GNUNET_SCALARPRODUCT_STATUS_ACTIVE; s->client = client; s->client_mq = GNUNET_MQ_queue_for_server_client (client); s->total = total_count; diff --git a/src/scalarproduct/gnunet-service-scalarproduct_bob.c b/src/scalarproduct/gnunet-service-scalarproduct_bob.c index 8f946f7a3..fc0fd5bb3 100644 --- a/src/scalarproduct/gnunet-service-scalarproduct_bob.c +++ b/src/scalarproduct/gnunet-service-scalarproduct_bob.c @@ -175,9 +175,12 @@ struct BobServiceSession uint32_t cadet_transmitted_element_count; /** - * Is this session active (#GNUNET_YES), Concluded (#GNUNET_NO), or had an error (#GNUNET_SYSERR) + * State of this session. In + * #GNUNET_SCALARPRODUCT_STATUS_ACTIVE while operation is + * ongoing, afterwards in #GNUNET_SCALARPRODUCT_STATUS_SUCCESS or + * #GNUNET_SCALARPRODUCT_STATUS_FAILURE. */ - int32_t active; + enum GNUNET_SCALARPRODUCT_ResponseStatus status; /** * Are we already in #destroy_service_session()? @@ -448,13 +451,13 @@ prepare_client_end_notification (struct BobServiceSession *session) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending session-end notification with status %d to client for session %s\n", - session->active, + session->status, GNUNET_h2s (&session->session_id)); e = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SCALARPRODUCT_RESULT); msg->range = 0; msg->product_length = htonl (0); - msg->status = htonl (session->active); + msg->status = htonl (session->status); GNUNET_MQ_send (session->client_mq, e); } @@ -483,12 +486,17 @@ cb_channel_destruction (void *cls, "Peer disconnected, terminating session %s with peer %s\n", GNUNET_h2s (&in->session_id), GNUNET_i2s (&in->peer)); + if (NULL != in->cadet_mq) + { + GNUNET_MQ_destroy (in->cadet_mq); + in->cadet_mq = NULL; + } in->channel = NULL; if (NULL != (s = in->s)) { - if (GNUNET_YES == s->active) + if (GNUNET_SCALARPRODUCT_STATUS_ACTIVE == s->status) { - s->active = GNUNET_SYSERR; + s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE; prepare_client_end_notification (s); } } @@ -505,7 +513,7 @@ bob_cadet_done_cb (void *cls) { struct BobServiceSession *session = cls; - session->active = GNUNET_NO; /* that means, done */ + session->status = GNUNET_SCALARPRODUCT_STATUS_SUCCESS; prepare_client_end_notification (session); } @@ -944,6 +952,7 @@ handle_alices_cryptodata_message (void *cls, CADET response(s) */ transmit_cryptographic_reply (s); } + GNUNET_CADET_receive_done (s->cadet->channel); return GNUNET_OK; } @@ -984,6 +993,7 @@ cb_intersection_element_removed (void *cls, case GNUNET_SET_STATUS_DONE: s->intersection_op = NULL; s->intersection_set = NULL; + GNUNET_CADET_receive_done (s->cadet->channel); LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished intersection, %d items remain\n", GNUNET_CONTAINER_multihashmap_size (s->intersected_elements)); @@ -1005,7 +1015,7 @@ cb_intersection_element_removed (void *cls, "Set intersection failed!\n"); s->intersection_op = NULL; s->intersection_set = NULL; - s->active = GNUNET_SYSERR; + s->status = GNUNET_SCALARPRODUCT_STATUS_FAILURE; prepare_client_end_notification (s); return; default: @@ -1297,7 +1307,7 @@ GSS_handle_bob_client_message (void *cls, } s = GNUNET_new (struct BobServiceSession); - s->active = GNUNET_YES; + s->status = GNUNET_SCALARPRODUCT_STATUS_ACTIVE; s->client = client; s->client_mq = GNUNET_MQ_queue_for_server_client (client); s->total = total_count; diff --git a/src/scalarproduct/scalarproduct.h b/src/scalarproduct/scalarproduct.h index a59f7641d..8f5abb3e0 100644 --- a/src/scalarproduct/scalarproduct.h +++ b/src/scalarproduct/scalarproduct.h @@ -154,9 +154,10 @@ struct ClientResponseMessage uint32_t product_length GNUNET_PACKED; /** - * status information about the outcome of this session + * Status information about the outcome of this session, + * An `enum GNUNET_SCALARPRODUCT_ResponseStatus` (in NBO). */ - int32_t status GNUNET_PACKED; + uint32_t status GNUNET_PACKED; /** * Workaround for libgcrypt: -1 if negative, 0 if zero, else 1 diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c index eea17c8f6..cae045d62 100644 --- a/src/scalarproduct/scalarproduct_api.c +++ b/src/scalarproduct/scalarproduct_api.c @@ -124,96 +124,9 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle /** - * Handles the STATUS received from the service for a response, does - * not contain a payload. Called when we participate as "Bob" via - * #GNUNET_SCALARPRODUCT_accept_computation(). - * - * @param h our Handle - * @param msg the response received - * @param status the condition the request was terminated with (eg: disconnect) - */ -static void -process_status_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h, - const struct ClientResponseMessage *msg, - enum GNUNET_SCALARPRODUCT_ResponseStatus status) -{ - if (NULL != h->cont_status) - h->cont_status (h->cont_cls, - status); - GNUNET_SCALARPRODUCT_cancel (h); -} - - -/** - * Handles the RESULT received from the service for a request, should - * contain a result MPI value. Called when we participate as "Alice" via - * #GNUNET_SCALARPRODUCT_start_computation(). - * - * @param h our Handle - * @param msg Pointer to the response received - * @param status the condition the request was terminated with (eg: disconnect) - */ -static void -process_result_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h, - const struct ClientResponseMessage *msg, - enum GNUNET_SCALARPRODUCT_ResponseStatus status) -{ - size_t product_len; - gcry_mpi_t result = NULL; - gcry_error_t rc; - gcry_mpi_t num; - size_t rsize; - - if ( (GNUNET_SCALARPRODUCT_Status_Success == status) && - ( (NULL == msg) || - ( (ntohs (msg->header.size) - sizeof (struct ClientResponseMessage) - != (product_len = ntohl (msg->product_length))) ) ) ) - { - GNUNET_break (0); - status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; - } - if (GNUNET_SCALARPRODUCT_Status_Success == status) - { - result = gcry_mpi_new (0); - - if (0 < product_len) - { - rsize = 0; - if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD, - &msg[1], - product_len, - &rsize))) - { - LOG_GCRY (GNUNET_ERROR_TYPE_ERROR, - "gcry_mpi_scan", - rc); - gcry_mpi_release (result); - result = NULL; - status = GNUNET_SCALARPRODUCT_Status_InvalidResponse; - } - else - { - if (0 < ntohl (msg->range)) - gcry_mpi_add (result, result, num); - else if (0 > ntohl (msg->range)) - gcry_mpi_sub (result, result, num); - gcry_mpi_release (num); - } - } - } - h->cont_datum (h->cont_cls, - status, - result); - if (NULL != result) - gcry_mpi_release (result); - GNUNET_SCALARPRODUCT_cancel (h); -} - - -/** - * Called when a response is received from the service. After basic check, the - * handler in qe->response_proc is called. This functions handles the response - * to the client which used the API. + * Called when a response is received from the service. After basic + * check, the handler in `h->response_proc` is called. This functions + * handles the response to the client which used the API. * * @param cls Pointer to the Master Context * @param msg Pointer to the data received in response @@ -224,6 +137,7 @@ receive_cb (void *cls, { struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; const struct ClientResponseMessage *message; + enum GNUNET_SCALARPRODUCT_ResponseStatus status; if (NULL == msg) { @@ -231,28 +145,31 @@ receive_cb (void *cls, "Disconnected from SCALARPRODUCT service.\n"); h->response_proc (h, NULL, - GNUNET_SCALARPRODUCT_Status_ServiceDisconnected); + GNUNET_SCALARPRODUCT_STATUS_DISCONNECTED); return; } - if (ntohs (msg->size) != sizeof (struct ClientResponseMessage)) + if (ntohs (msg->size) < sizeof (struct ClientResponseMessage)) { GNUNET_break (0); h->response_proc (h, NULL, - GNUNET_SCALARPRODUCT_Status_InvalidResponse); + GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE); return; } message = (const struct ClientResponseMessage *) msg; - if (GNUNET_SYSERR == ntohl (message->status)) + if (ntohs (msg->size) != + ntohl (message->product_length) + sizeof (struct ClientResponseMessage)) { + GNUNET_break (0); h->response_proc (h, NULL, - GNUNET_SCALARPRODUCT_Status_Failure); + GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE); return; } + status = (enum GNUNET_SCALARPRODUCT_ResponseStatus) ntohl (message->status); h->response_proc (h, message, - GNUNET_SCALARPRODUCT_Status_Success); + status); } @@ -282,7 +199,7 @@ do_send_message (void *cls, "Failed to transmit request to SCALARPRODUCT.\n"); /* notify caller about the error, done here */ h->response_proc (h, NULL, - GNUNET_SCALARPRODUCT_Status_Failure); + GNUNET_SCALARPRODUCT_STATUS_FAILURE); return 0; } ret = ntohs (h->msg->size); @@ -329,6 +246,27 @@ do_send_message (void *cls, } +/** + * Handles the STATUS received from the service for a response, does + * not contain a payload. Called when we participate as "Bob" via + * #GNUNET_SCALARPRODUCT_accept_computation(). + * + * @param h our Handle + * @param msg the response received + * @param status the condition the request was terminated with (eg: disconnect) + */ +static void +process_status_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h, + const struct ClientResponseMessage *msg, + enum GNUNET_SCALARPRODUCT_ResponseStatus status) +{ + if (NULL != h->cont_status) + h->cont_status (h->cont_cls, + status); + GNUNET_SCALARPRODUCT_cancel (h); +} + + /** * Used by Bob's client to cooperate with Alice, * @@ -409,6 +347,66 @@ GNUNET_SCALARPRODUCT_accept_computation (const struct GNUNET_CONFIGURATION_Handl } +/** + * Handles the RESULT received from the service for a request, should + * contain a result MPI value. Called when we participate as "Alice" via + * #GNUNET_SCALARPRODUCT_start_computation(). + * + * @param h our Handle + * @param msg Pointer to the response received + * @param status the condition the request was terminated with (eg: disconnect) + */ +static void +process_result_message (struct GNUNET_SCALARPRODUCT_ComputationHandle *h, + const struct ClientResponseMessage *msg, + enum GNUNET_SCALARPRODUCT_ResponseStatus status) +{ + uint32_t product_len; + gcry_mpi_t result = NULL; + gcry_error_t rc; + gcry_mpi_t num; + size_t rsize; + + if (GNUNET_SCALARPRODUCT_STATUS_SUCCESS == status) + { + result = gcry_mpi_new (0); + + product_len = ntohl (msg->product_length); + if (0 < product_len) + { + rsize = 0; + if (0 != (rc = gcry_mpi_scan (&num, GCRYMPI_FMT_STD, + &msg[1], + product_len, + &rsize))) + { + LOG_GCRY (GNUNET_ERROR_TYPE_ERROR, + "gcry_mpi_scan", + rc); + gcry_mpi_release (result); + result = NULL; + status = GNUNET_SCALARPRODUCT_STATUS_INVALID_RESPONSE; + } + else + { + if (0 < ntohl (msg->range)) + gcry_mpi_add (result, result, num); + else if (0 > ntohl (msg->range)) + gcry_mpi_sub (result, result, num); + gcry_mpi_release (num); + } + } + } + if (NULL != h->cont_datum) + h->cont_datum (h->cont_cls, + status, + result); + if (NULL != result) + gcry_mpi_release (result); + GNUNET_SCALARPRODUCT_cancel (h); +} + + /** * Request by Alice's client for computing a scalar product * diff --git a/src/scalarproduct/test_scalarproduct.conf b/src/scalarproduct/test_scalarproduct.conf index 0083bc202..96a487a57 100644 --- a/src/scalarproduct/test_scalarproduct.conf +++ b/src/scalarproduct/test_scalarproduct.conf @@ -9,4 +9,10 @@ GNUNET_TEST_HOME = /tmp/test-scalarproduct/ OVERLAY_TOPOLOGY = CLIQUE [nse] -WORKBITS=0 \ No newline at end of file +WORKBITS=0 + +[scalarproduct-bob] +#PREFIX = valgrind + +[scalarproduct-alice] +#PREFIX = valgrind \ No newline at end of file -- cgit v1.2.3