aboutsummaryrefslogtreecommitdiff
path: root/src/scalarproduct/scalarproduct_api.c
diff options
context:
space:
mode:
authorChristian Fuchs <christian.fuchs@cfuchs.net>2013-09-02 13:20:36 +0000
committerChristian Fuchs <christian.fuchs@cfuchs.net>2013-09-02 13:20:36 +0000
commit8355426564ccf3afe2e1451cfc6c5b38f6924330 (patch)
tree609f990275fee3c4bae779e04bf35b1199dd77a6 /src/scalarproduct/scalarproduct_api.c
parent105abb1e1fa2d0328bd84a6945f9f045be35dc82 (diff)
downloadgnunet-8355426564ccf3afe2e1451cfc6c5b38f6924330.tar.gz
gnunet-8355426564ccf3afe2e1451cfc6c5b38f6924330.zip
partial rework of the old API, includes simplifications and gets rid of the extra connect. we now establish one client-connection per session.
Diffstat (limited to 'src/scalarproduct/scalarproduct_api.c')
-rw-r--r--src/scalarproduct/scalarproduct_api.c235
1 files changed, 127 insertions, 108 deletions
diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c
index 928f5397c..3eae200c3 100644
--- a/src/scalarproduct/scalarproduct_api.c
+++ b/src/scalarproduct/scalarproduct_api.c
@@ -94,16 +94,6 @@ struct GNUNET_SCALARPRODUCT_QueueEntry
94 int16_t was_transmitted; 94 int16_t was_transmitted;
95 95
96 /** 96 /**
97 * Timeout for the current operation.
98 */
99 struct GNUNET_TIME_Absolute timeout;
100
101 /**
102 * Task for timeout signaling.
103 */
104 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
105
106 /**
107 * Response Processor for response from the service. This function calls the 97 * Response Processor for response from the service. This function calls the
108 * continuation function provided by the client. 98 * continuation function provided by the client.
109 */ 99 */
@@ -224,6 +214,7 @@ free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h)
224 if (NULL == h->queue_head && NULL == h->queue_tail) 214 if (NULL == h->queue_head && NULL == h->queue_tail)
225 { 215 {
226 // The queue is empty. Just return. 216 // The queue is empty. Just return.
217 qe = NULL;
227 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n"); 218 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n");
228 } 219 }
229 else if (h->queue_head == h->queue_tail) //only one entry 220 else if (h->queue_head == h->queue_tail) //only one entry
@@ -535,154 +526,182 @@ process_queue (struct GNUNET_SCALARPRODUCT_Handle *h)
535 526
536 527
537/** 528/**
538 * Called by the responder client to prepare response 529 * Used by Bob's client to cooperate with Alice,
539 * 530 *
540 * @param h handle to the master context 531 * @param h handle to the master context
541 * @param key Session key - unique to the requesting client 532 * @param key Session key - unique to the requesting client
542 * @param element_count Number of elements in the vector
543 * @param mask_length number of bytes in the mask
544 * @param elements Array of elements of the vector 533 * @param elements Array of elements of the vector
545 * @param mask Array of the mask 534 * @param element_count Number of elements in the vector
546 * @param timeout Relative timeout for the operation
547 * @param cont Callback function 535 * @param cont Callback function
548 * @param cont_cls Closure for the callback function 536 * @param cont_cls Closure for the callback function
549 */ 537 */
550struct GNUNET_SCALARPRODUCT_QueueEntry * 538struct GNUNET_SCALARPRODUCT_Handle *
551GNUNET_SCALARPRODUCT_prepare_response (struct GNUNET_SCALARPRODUCT_Handle *h, 539GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
552 const struct GNUNET_HashCode * key, 540 const struct GNUNET_HashCode * key,
553 uint16_t element_count, 541 const int32_t * elements,
554 int32_t * elements, 542 uint32_t element_count,
555 struct GNUNET_TIME_Relative timeout, 543 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont,
556 GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, 544 void *cont_cls)
557 void *cont_cls)
558{ 545{
559 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = make_queue_entry (h); 546 struct GNUNET_SCALARPRODUCT_Handle *h;
547 struct GNUNET_SCALARPRODUCT_client_request *msg;
560 int32_t * vector; 548 int32_t * vector;
561 uint16_t size; 549 uint16_t size;
562 unsigned int i; 550 uint64_t i;
563 551
552 GNUNET_assert(key);
553 GNUNET_assert(elements);
554 GNUNET_assert(cont);
555 GNUNET_assert(element_count > 1);
564 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) 556 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
565 +element_count * sizeof (int32_t)); 557 + element_count * sizeof (int32_t));
566 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) +element_count * sizeof (int32_t); 558 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
567 559 h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
568 qe->message_size = size; 560 if (!h->client)
569 qe->msg = GNUNET_malloc (size); 561 {
570 qe->msg->header.size = htons (size); 562 LOG (GNUNET_ERROR_TYPE_ERROR,
571 qe->msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB); 563 _ ("Failed to connect to the scalarproduct service\n"));
572 qe->msg->element_count = htons (element_count); 564 GNUNET_free(h);
573 qe->msg->mask_length = htons (0); 565 return NULL;
574 memcpy (&qe->msg->key, key, sizeof (struct GNUNET_HashCode)); 566 }
575 qe->cont_status = cont; 567 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
576 qe->cont_cls = cont_cls; 568 if (!h->th){
577 qe->was_transmitted = GNUNET_NO; 569 LOG (GNUNET_ERROR_TYPE_ERROR,
578 qe->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, qe); 570 _("Failed to send a message to the statistics service\n"));
579 qe->response_proc = &process_status_message; 571 GNUNET_CLIENT_disconnect(h->client);
580 qe->timeout = GNUNET_TIME_relative_to_absolute (timeout); 572 GNUNET_free(h);
581 573 return NULL;
582 vector = (int32_t *) & qe->msg[1]; 574 }
575
576 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t);
577
578 h->cont_datum = cont;
579 h->cont_cls = cont_cls;
580 h->response_proc = &process_result_message;
581 h->cfg = cfg;
582 h->msg = GNUNET_malloc (size);
583 memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
584
585 msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
586 msg->header.size = htons (size);
587 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
588 msg->element_count = htonl (element_count);
589
590 vector = (int32_t*) &msg[1];
583 // copy each element over to the message 591 // copy each element over to the message
584 for (i = 0; i < element_count; i++) 592 for (i = 0; i < element_count; i++)
585 vector[i] = htonl (elements[i]); 593 vector[i] = htonl(elements[i]);
586 594
587 process_queue (h); 595 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
588 return qe; 596
597
598 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
599 GNUNET_TIME_UNIT_FOREVER_REL,
600 GNUNET_YES, // retry is OK in the initial stage
601 &transmit_request, h);
602 if (!h->th)
603 {
604 LOG (GNUNET_ERROR_TYPE_ERROR,
605 _ ("Failed to send a message to the scalarproduct service\n"));
606 GNUNET_STATISTICS_destroy(h->GNUNET_YES);
607 GNUNET_CLIENT_disconnect(h->client);
608 GNUNET_free(h->msg);
609 GNUNET_free(h);
610 return NULL;
611 }
612 return h;
589} 613}
590 614
591 615
592/** 616/**
593 * Request the Scalar Product Evaluation 617 * Request by Alice's client for computing a scalar product
594 * 618 *
595 * @param h handle to the master context 619 * @param h handle to the master context
596 * @param key Session key - unique to the requesting client 620 * @param key Session key - unique to the requesting client
597 * @param peer PeerID of the other peer 621 * @param peer PeerID of the other peer
598 * @param element_count Number of elements in the vector
599 * @param mask_length number of bytes in the mask
600 * @param elements Array of elements of the vector 622 * @param elements Array of elements of the vector
623 * @param element_count Number of elements in the vector
601 * @param mask Array of the mask 624 * @param mask Array of the mask
602 * @param timeout Relative timeout for the operation 625 * @param mask_bytes number of bytes in the mask
603 * @param cont Callback function 626 * @param cont Callback function
604 * @param cont_cls Closure for the callback function 627 * @param cont_cls Closure for the callback function
605 */ 628 */
606struct GNUNET_SCALARPRODUCT_QueueEntry * 629struct GNUNET_SCALARPRODUCT_Handle *
607GNUNET_SCALARPRODUCT_request (struct GNUNET_SCALARPRODUCT_Handle *h, 630GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
608 const struct GNUNET_HashCode * key, 631 const struct GNUNET_HashCode * key,
609 const struct GNUNET_PeerIdentity * peer, 632 const struct GNUNET_PeerIdentity *peer,
610 uint16_t element_count, 633 const int32_t * elements,
611 uint16_t mask_length, 634 uint32_t element_count,
612 int32_t * elements,
613 const unsigned char * mask, 635 const unsigned char * mask,
614 struct GNUNET_TIME_Relative timeout, 636 uint32_t mask_bytes,
615 GNUNET_SCALARPRODUCT_DatumProcessor cont, 637 GNUNET_SCALARPRODUCT_DatumProcessor cont,
616 void *cont_cls) 638 void *cont_cls)
617{ 639{
618 struct GNUNET_SCALARPRODUCT_QueueEntry *qe = make_queue_entry (h); 640 struct GNUNET_CLIENT_Connection *client;
641 struct GNUNET_SCALARPRODUCT_Handle *h;
642 struct GNUNET_SCALARPRODUCT_client_request *msg;
619 int32_t * vector; 643 int32_t * vector;
620 uint16_t size; 644 uint16_t size;
621 unsigned int i; 645 uint64_t i;
622 646
647 GNUNET_assert(key);
648 GNUNET_assert(peer);
649 GNUNET_assert(elements);
650 GNUNET_assert(mask);
651 GNUNET_assert(cont);
652 GNUNET_assert(element_count > 1);
653 GNUNET_assert(mask_bytes != 0);
623 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) 654 GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request)
624 +element_count * sizeof (int32_t) 655 + element_count * sizeof (int32_t)
625 + mask_length); 656 + mask_length);
626 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) +element_count * sizeof (int32_t) + mask_length;
627
628 qe->message_size = size;
629 qe->msg = GNUNET_malloc (size);
630 qe->msg->header.size = htons (size);
631 qe->msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
632 memcpy (&qe->msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
633 qe->msg->element_count = htons (element_count);
634 qe->msg->mask_length = htons (mask_length);
635 memcpy (&qe->msg->key, key, sizeof (struct GNUNET_HashCode));
636 qe->cont_datum = cont;
637 qe->cont_cls = cont_cls;
638 qe->was_transmitted = GNUNET_NO;
639 qe->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, qe);
640 qe->response_proc = &process_result_message;
641 qe->timeout = GNUNET_TIME_relative_to_absolute (timeout);
642
643 vector = (int32_t*) & qe->msg[1];
644 // copy each element over to the message
645 for (i = 0; i < element_count; i++)
646 vector[i] = htonl (elements[i]);
647
648 // fill in the mask
649 memcpy (&vector[element_count], mask, mask_length);
650
651 process_queue (h);
652 return qe;
653}
654
655
656/**
657 * Connect to the scalarproduct service.
658 *
659 * @param cfg configuration to use
660 * @return handle to use to access the service
661 */
662struct GNUNET_SCALARPRODUCT_Handle *
663GNUNET_SCALARPRODUCT_connect (const struct GNUNET_CONFIGURATION_Handle * cfg)
664{
665 struct GNUNET_CLIENT_Connection *client;
666 struct GNUNET_SCALARPRODUCT_Handle *h;
667
668 client = GNUNET_CLIENT_connect ("scalarproduct", cfg); 657 client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
669 658
670 if (NULL == client) 659 if (!client)
671 { 660 {
672 LOG (GNUNET_ERROR_TYPE_ERROR, 661 LOG (GNUNET_ERROR_TYPE_ERROR,
673 _ ("Failed to connect to the scalarproduct service\n")); 662 _ ("Failed to connect to the scalarproduct service\n"));
674 return NULL; 663 return NULL;
675 } 664 }
676 665 size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length;
677 h = GNUNET_malloc (sizeof (struct GNUNET_SCALARPRODUCT_Handle) + 666
678 GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); 667 h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle);
668 h->cont_datum = cont;
669 h->cont_cls = cont_cls;
670 h->response_proc = &process_status_message;
679 h->client = client; 671 h->client = client;
680 h->cfg = cfg; 672 h->cfg = cfg;
673 h->msg = GNUNET_malloc (size);
674 memcpy (&h->key, key, sizeof (struct GNUNET_HashCode));
675
676 msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg;
677 msg->header.size = htons (size);
678 msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
679 msg->element_count = htons (element_count);
680 msg->mask_length = htons (mask_length);
681
682 vector = (int32_t*) &msg[1];
683 // copy each element over to the message
684 for (i = 0; i < element_count; i++)
685 vector[i] = htonl(elements[i]);
686
687 memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
688 memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
689 memcpy (&vector[element_count], mask, mask_length);
690
681 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); 691 h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg);
692 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
693 GNUNET_TIME_UNIT_FOREVER_REL,
694 GNUNET_YES, // retry is OK in the initial stage
695 &transmit_request, h);
696 if ( !h->th)
697 {
698 LOG (GNUNET_ERROR_TYPE_ERROR,
699 _ ("Failed to send a message to the scalarproduct service\n"));
700 return NULL;
701 }
682 return h; 702 return h;
683} 703}
684 704
685
686/** 705/**
687 * Disconnect from the scalarproduct service. 706 * Disconnect from the scalarproduct service.
688 * 707 *