aboutsummaryrefslogtreecommitdiff
path: root/src/scalarproduct/scalarproduct_api.c
diff options
context:
space:
mode:
authorChristian Fuchs <christian.fuchs@cfuchs.net>2013-09-02 21:09:53 +0000
committerChristian Fuchs <christian.fuchs@cfuchs.net>2013-09-02 21:09:53 +0000
commitde62048ab84c178fbf57682b6f9310904ff38964 (patch)
tree5cc573f309e15d42a8fced9146d0cb6348d0d687 /src/scalarproduct/scalarproduct_api.c
parent9f064f70f7aff38119ebf1b4345118cb61302f2d (diff)
downloadgnunet-de62048ab84c178fbf57682b6f9310904ff38964.tar.gz
gnunet-de62048ab84c178fbf57682b6f9310904ff38964.zip
rewrote API minus cancel function
Diffstat (limited to 'src/scalarproduct/scalarproduct_api.c')
-rw-r--r--src/scalarproduct/scalarproduct_api.c180
1 files changed, 79 insertions, 101 deletions
diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c
index 7c1f5394e..118de4a04 100644
--- a/src/scalarproduct/scalarproduct_api.c
+++ b/src/scalarproduct/scalarproduct_api.c
@@ -119,11 +119,11 @@ struct GNUNET_SCALARPRODUCT_ComputationHandle
119/** 119/**
120 * Head of the active sessions queue 120 * Head of the active sessions queue
121 */ 121 */
122struct GNUNET_SCALARPRODUCT_ComputationHandle *head; 122static struct GNUNET_SCALARPRODUCT_ComputationHandle *head;
123/** 123/**
124 * Tail of the active sessions queue 124 * Tail of the active sessions queue
125 */ 125 */
126struct GNUNET_SCALARPRODUCT_ComputationHandle *tail; 126static struct GNUNET_SCALARPRODUCT_ComputationHandle *tail;
127 127
128/************************************************************** 128/**************************************************************
129 *** Function Declarations ********** 129 *** Function Declarations **********
@@ -170,10 +170,7 @@ process_status_message (void *cls,
170{ 170{
171 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; 171 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
172 172
173 GNUNET_assert (qe != NULL); 173 qe->cont_status (qe->cont_cls, status);
174
175 if (qe->cont_status != NULL)
176 qe->cont_status (qe->cont_cls, &qe->msg->key, status);
177} 174}
178 175
179 176
@@ -190,17 +187,38 @@ process_result_message (void *cls,
190 enum GNUNET_SCALARPRODUCT_ResponseStatus status) 187 enum GNUNET_SCALARPRODUCT_ResponseStatus status)
191{ 188{
192 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; 189 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
190 const struct GNUNET_SCALARPRODUCT_client_response *message =
191 (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
192 gcry_mpi_t result = NULL;
193 193
194 GNUNET_assert (qe != NULL); 194 if (GNUNET_SCALARPRODUCT_Status_Success == status
195 195 && qe->cont_datum != NULL)
196 if (msg == NULL && qe->cont_datum != NULL)
197 { 196 {
198 LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout reached or session terminated.\n"); 197 size_t product_len = ntohl(message->product_length);
199 } 198 result = gcry_mpi_new(0);
200 if (qe->cont_datum != NULL) 199
201 { 200 if (0 < product_len)
202 qe->cont_datum (qe->cont_cls, &qe->msg->key, &qe->msg->peer, status, (struct GNUNET_SCALARPRODUCT_client_response *) msg); 201 {
202 gcry_mpi_t num;
203 size_t read = 0;
204
205 if (0 != gcry_mpi_scan (&num, GCRYMPI_FMT_USG, &msg[1], product_len, &read)){
206 LOG (GNUNET_ERROR_TYPE_ERROR, "Could not convert to mpi to value!\n");
207 gcry_mpi_release(result);
208 result = NULL;
209 status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
210 }
211 else
212 {
213 if (message->range > 0)
214 gcry_mpi_add(result, result, num);
215 else
216 gcry_mpi_sub(result, result, num);
217 gcry_mpi_release(num);
218 }
219 }
203 } 220 }
221 qe->cont_datum (qe->cont_cls, status, result);
204} 222}
205 223
206 224
@@ -215,76 +233,34 @@ process_result_message (void *cls,
215static void 233static void
216receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) 234receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
217{ 235{
218 struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls; 236 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
219 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe; 237 const struct GNUNET_SCALARPRODUCT_client_response *message =
220 int16_t was_transmitted; 238 (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
221 struct GNUNET_SCALARPRODUCT_client_response *message = 239 enum GNUNET_SCALARPRODUCT_ResponseStatus status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
222 (struct GNUNET_SCALARPRODUCT_client_response *) msg;
223
224 h->in_receive = GNUNET_NO;
225 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply from VectorProduct\n");
226
227 if (NULL == (qe = free_queue_head_entry (h)))
228 {
229 /**
230 * The queue head will be NULL if the client disconnected,
231 * * In case of Alice, client disconnected after sending request, before receiving response
232 * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
233 */
234 process_queue (h);
235 return;
236 }
237
238 if (h->client == NULL)
239 {
240 // GKUKREJA : handle this correctly
241 /**
242 * The queue head will be NULL if the client disconnected,
243 * * In case of Alice, client disconnected after sending request, before receiving response
244 * * In case of Bob, client disconnected after preparing response, before getting request from Alice.
245 */
246 process_queue (h);
247 return;
248 }
249
250 was_transmitted = qe->was_transmitted;
251 // Control will only come here, when the request was transmitted to service,
252 // and service responded.
253 GNUNET_assert (was_transmitted == GNUNET_YES);
254 240
255 if (msg == NULL) 241 if (NULL == msg)
256 { 242 {
257 LOG (GNUNET_ERROR_TYPE_WARNING, "Service responded with NULL!\n"); 243 LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n");
258 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure); 244 status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected;
259 } 245 }
260 else if ((ntohs (msg->type) != GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT)) 246 else if ( GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT != ntohs (msg->type))
261 { 247 {
262 LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid Message Received\n"); 248 LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid message type received\n");
263 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_InvalidResponse);
264 } 249 }
265 else if (ntohl (message->product_length) == 0) 250 else if (0 < ntohl (message->product_length) || (0 == message->range))
266 { 251 {
267 // response for the responder client, successful 252 // response for the responder client, successful
268 GNUNET_STATISTICS_update (h->stats, 253 GNUNET_STATISTICS_update (qe->stats,
269 gettext_noop ("# SUC responder result messages received"), 1, 254 gettext_noop ("# SUC responder result messages received"), 1,
270 GNUNET_NO); 255 GNUNET_NO);
271 256
272 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from service without product attached.\n"); 257 status = GNUNET_SCALARPRODUCT_Status_Success;
273 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
274 }
275 else if (ntohl (message->product_length) > 0)
276 {
277 // response for the requester client, successful
278 GNUNET_STATISTICS_update (h->stats,
279 gettext_noop ("# SUC requester result messages received"), 1,
280 GNUNET_NO);
281
282 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from requester service for requester client.\n");
283 qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
284 } 258 }
259
260 if (qe->cont_datum != NULL)
261 qe->response_proc (qe, msg, status);
285 262
286 GNUNET_free (qe); 263 GNUNET_free (qe);
287 process_queue (h);
288} 264}
289 265
290 266
@@ -302,31 +278,31 @@ transmit_request (void *cls, size_t size,
302 void *buf) 278 void *buf)
303{ 279{
304 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls; 280 struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
305 size_t msize;
306 281
307 if (buf == NULL) 282 if (NULL == buf)
308 { 283 {
309 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n"); 284 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to SCALARPRODUCT.\n");
310 GNUNET_STATISTICS_update (qe->stats, 285 GNUNET_STATISTICS_update (qe->stats,
311 gettext_noop ("# transmission request failures"), 286 gettext_noop ("# transmission request failures"),
312 1, GNUNET_NO); 287 1, GNUNET_NO);
313 GNUNET_SCALARPRODUCT_disconnect (qe); 288
289 // notify caller about the error, done here.
290 if (qe->cont_datum != NULL)
291 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
292 GNUNET_SCALARPRODUCT_cancel(cls);
314 return 0; 293 return 0;
315 } 294 }
316 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to SCALARPRODUCT\n",
317 msize);
318
319 memcpy (buf, qe->msg, size); 295 memcpy (buf, qe->msg, size);
296
320 GNUNET_free (qe->msg); 297 GNUNET_free (qe->msg);
321 qe->was_transmitted = GNUNET_YES; 298 qe->msg = NULL;
322
323 qe->th = NULL; 299 qe->th = NULL;
324 300
325 GNUNET_CLIENT_receive (h->client, &receive_cb, h, 301 GNUNET_CLIENT_receive (qe->client, &receive_cb, qe,
326 GNUNET_TIME_UNIT_FOREVER_REL); 302 GNUNET_TIME_UNIT_FOREVER_REL);
327 303
328#if INSANE_STATISTICS 304#if INSANE_STATISTICS
329 GNUNET_STATISTICS_update (h->stats, 305 GNUNET_STATISTICS_update (qe->stats,
330 gettext_noop ("# bytes sent to scalarproduct"), 1, 306 gettext_noop ("# bytes sent to scalarproduct"), 1,
331 GNUNET_NO); 307 GNUNET_NO);
332#endif 308#endif
@@ -389,7 +365,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
389 365
390 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t); 366 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t);
391 367
392 h->cont_datum = cont; 368 h->cont_status = cont;
393 h->cont_cls = cont_cls; 369 h->cont_cls = cont_cls;
394 h->response_proc = &process_result_message; 370 h->response_proc = &process_result_message;
395 h->cfg = cfg; 371 h->cfg = cfg;
@@ -416,7 +392,7 @@ GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
416 { 392 {
417 LOG (GNUNET_ERROR_TYPE_ERROR, 393 LOG (GNUNET_ERROR_TYPE_ERROR,
418 _ ("Failed to send a message to the scalarproduct service\n")); 394 _ ("Failed to send a message to the scalarproduct service\n"));
419 GNUNET_STATISTICS_destroy(h->GNUNET_YES); 395 GNUNET_STATISTICS_destroy(h->stats, GNUNET_YES);
420 GNUNET_CLIENT_disconnect(h->client); 396 GNUNET_CLIENT_disconnect(h->client);
421 GNUNET_free(h->msg); 397 GNUNET_free(h->msg);
422 GNUNET_free(h); 398 GNUNET_free(h);
@@ -459,7 +435,7 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
459 435
460 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) 436 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
461 + element_count * sizeof (int32_t) 437 + element_count * sizeof (int32_t)
462 + mask_length); 438 + mask_bytes);
463 439
464 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle); 440 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
465 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); 441 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
@@ -479,7 +455,7 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
479 return NULL; 455 return NULL;
480 } 456 }
481 457
482 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length; 458 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_bytes;
483 459
484 h->cont_datum = cont; 460 h->cont_datum = cont;
485 h->cont_cls = cont_cls; 461 h->cont_cls = cont_cls;
@@ -492,7 +468,7 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
492 msg->header.size = htons (size); 468 msg->header.size = htons (size);
493 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); 469 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
494 msg->element_count = htons (element_count); 470 msg->element_count = htons (element_count);
495 msg->mask_length = htons (mask_length); 471 msg->mask_length = htons (mask_bytes);
496 472
497 vector = (int32_t*) &msg[1]; 473 vector = (int32_t*) &msg[1];
498 // copy each element over to the message 474 // copy each element over to the message
@@ -501,7 +477,7 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
501 477
502 memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); 478 memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
503 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); 479 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
504 memcpy (&vector[element_count], mask, mask_length); 480 memcpy (&vector[element_count], mask, mask_bytes);
505 481
506 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, 482 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
507 GNUNET_TIME_UNIT_FOREVER_REL, 483 GNUNET_TIME_UNIT_FOREVER_REL,
@@ -511,7 +487,7 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
511 { 487 {
512 LOG (GNUNET_ERROR_TYPE_ERROR, 488 LOG (GNUNET_ERROR_TYPE_ERROR,
513 _ ("Failed to send a message to the scalarproduct service\n")); 489 _ ("Failed to send a message to the scalarproduct service\n"));
514 GNUNET_STATISTICS_destroy(h->GNUNET_YES); 490 GNUNET_STATISTICS_destroy(h->stats, GNUNET_YES);
515 GNUNET_CLIENT_disconnect(h->client); 491 GNUNET_CLIENT_disconnect(h->client);
516 GNUNET_free(h->msg); 492 GNUNET_free(h->msg);
517 GNUNET_free(h); 493 GNUNET_free(h);
@@ -524,26 +500,28 @@ GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
524/** 500/**
525 * Disconnect from the scalarproduct service. 501 * Disconnect from the scalarproduct service.
526 * 502 *
527 * @param h handle to the scalarproduct 503 * @param h a computation handle to cancel
528 */ 504 */
529void 505void
530GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_ComputationHandle * h) 506GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h)
531{ 507{
532 struct GNUNET_SCALARPRODUCT_ComputationHandle * qe; 508 struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
533 509
534 LOG (GNUNET_ERROR_TYPE_INFO,
535 "Disconnecting from VectorProduct\n");
536
537 for (qe = head; head != NULL; qe = head) 510 for (qe = head; head != NULL; qe = head)
538 { 511 {
539 GNUNET_CONTAINER_DLL_remove (head, tail, qe); 512 if (qe == h)
540 if (NULL == qe->th) 513 {
541 GNUNET_CLIENT_notify_transmit_ready_cancel(qe->th); 514 GNUNET_CONTAINER_DLL_remove (head, tail, qe);
542 GNUNET_CLIENT_disconnect (h->client); 515 LOG (GNUNET_ERROR_TYPE_INFO,
543 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES); 516 "Disconnecting from VectorProduct\n");
544 qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_ServiceDisconnected); 517 if (NULL == qe->th)
545 GNUNET_free(qe->msg); 518 GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th);
546 GNUNET_free(qe); 519 GNUNET_CLIENT_disconnect (h->client);
520 GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
521 GNUNET_free (qe->msg);
522 GNUNET_free (qe);
523 break;
524 }
547 } 525 }
548} 526}
549 527