diff options
author | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-09-02 13:20:36 +0000 |
---|---|---|
committer | Christian Fuchs <christian.fuchs@cfuchs.net> | 2013-09-02 13:20:36 +0000 |
commit | 8355426564ccf3afe2e1451cfc6c5b38f6924330 (patch) | |
tree | 609f990275fee3c4bae779e04bf35b1199dd77a6 | |
parent | 105abb1e1fa2d0328bd84a6945f9f045be35dc82 (diff) | |
download | gnunet-8355426564ccf3afe2e1451cfc6c5b38f6924330.tar.gz gnunet-8355426564ccf3afe2e1451cfc6c5b38f6924330.zip |
partial rework of the old API, includes simplifications and gets rid of the extra connect. we now establish one client-connection per session.
-rw-r--r-- | src/scalarproduct/scalarproduct_api.c | 235 |
1 files changed, 127 insertions, 108 deletions
diff --git a/src/scalarproduct/scalarproduct_api.c b/src/scalarproduct/scalarproduct_api.c index 928f5397c..3eae200c3 100644 --- a/src/scalarproduct/scalarproduct_api.c +++ b/src/scalarproduct/scalarproduct_api.c | |||
@@ -94,16 +94,6 @@ struct GNUNET_SCALARPRODUCT_QueueEntry | |||
94 | int16_t was_transmitted; | 94 | int16_t was_transmitted; |
95 | 95 | ||
96 | /** | 96 | /** |
97 | * Timeout for the current operation. | ||
98 | */ | ||
99 | struct GNUNET_TIME_Absolute timeout; | ||
100 | |||
101 | /** | ||
102 | * Task for timeout signaling. | ||
103 | */ | ||
104 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | ||
105 | |||
106 | /** | ||
107 | * Response Processor for response from the service. This function calls the | 97 | * Response Processor for response from the service. This function calls the |
108 | * continuation function provided by the client. | 98 | * continuation function provided by the client. |
109 | */ | 99 | */ |
@@ -224,6 +214,7 @@ free_queue_head_entry (struct GNUNET_SCALARPRODUCT_Handle * h) | |||
224 | if (NULL == h->queue_head && NULL == h->queue_tail) | 214 | if (NULL == h->queue_head && NULL == h->queue_tail) |
225 | { | 215 | { |
226 | // The queue is empty. Just return. | 216 | // The queue is empty. Just return. |
217 | qe = NULL; | ||
227 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n"); | 218 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue was empty when free_queue_head_entry was called.\n"); |
228 | } | 219 | } |
229 | else if (h->queue_head == h->queue_tail) //only one entry | 220 | else if (h->queue_head == h->queue_tail) //only one entry |
@@ -535,154 +526,182 @@ process_queue (struct GNUNET_SCALARPRODUCT_Handle *h) | |||
535 | 526 | ||
536 | 527 | ||
537 | /** | 528 | /** |
538 | * Called by the responder client to prepare response | 529 | * Used by Bob's client to cooperate with Alice, |
539 | * | 530 | * |
540 | * @param h handle to the master context | 531 | * @param h handle to the master context |
541 | * @param key Session key - unique to the requesting client | 532 | * @param key Session key - unique to the requesting client |
542 | * @param element_count Number of elements in the vector | ||
543 | * @param mask_length number of bytes in the mask | ||
544 | * @param elements Array of elements of the vector | 533 | * @param elements Array of elements of the vector |
545 | * @param mask Array of the mask | 534 | * @param element_count Number of elements in the vector |
546 | * @param timeout Relative timeout for the operation | ||
547 | * @param cont Callback function | 535 | * @param cont Callback function |
548 | * @param cont_cls Closure for the callback function | 536 | * @param cont_cls Closure for the callback function |
549 | */ | 537 | */ |
550 | struct GNUNET_SCALARPRODUCT_QueueEntry * | 538 | struct GNUNET_SCALARPRODUCT_Handle * |
551 | GNUNET_SCALARPRODUCT_prepare_response (struct GNUNET_SCALARPRODUCT_Handle *h, | 539 | GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg, |
552 | const struct GNUNET_HashCode * key, | 540 | const struct GNUNET_HashCode * key, |
553 | uint16_t element_count, | 541 | const int32_t * elements, |
554 | int32_t * elements, | 542 | uint32_t element_count, |
555 | struct GNUNET_TIME_Relative timeout, | 543 | GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, |
556 | GNUNET_SCALARPRODUCT_ContinuationWithStatus cont, | 544 | void *cont_cls) |
557 | void *cont_cls) | ||
558 | { | 545 | { |
559 | struct GNUNET_SCALARPRODUCT_QueueEntry *qe = make_queue_entry (h); | 546 | struct GNUNET_SCALARPRODUCT_Handle *h; |
547 | struct GNUNET_SCALARPRODUCT_client_request *msg; | ||
560 | int32_t * vector; | 548 | int32_t * vector; |
561 | uint16_t size; | 549 | uint16_t size; |
562 | unsigned int i; | 550 | uint64_t i; |
563 | 551 | ||
552 | GNUNET_assert(key); | ||
553 | GNUNET_assert(elements); | ||
554 | GNUNET_assert(cont); | ||
555 | GNUNET_assert(element_count > 1); | ||
564 | GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) | 556 | GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) |
565 | +element_count * sizeof (int32_t)); | 557 | + element_count * sizeof (int32_t)); |
566 | size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) +element_count * sizeof (int32_t); | 558 | h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle); |
567 | 559 | h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg); | |
568 | qe->message_size = size; | 560 | if (!h->client) |
569 | qe->msg = GNUNET_malloc (size); | 561 | { |
570 | qe->msg->header.size = htons (size); | 562 | LOG (GNUNET_ERROR_TYPE_ERROR, |
571 | qe->msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_BOB); | 563 | _ ("Failed to connect to the scalarproduct service\n")); |
572 | qe->msg->element_count = htons (element_count); | 564 | GNUNET_free(h); |
573 | qe->msg->mask_length = htons (0); | 565 | return NULL; |
574 | memcpy (&qe->msg->key, key, sizeof (struct GNUNET_HashCode)); | 566 | } |
575 | qe->cont_status = cont; | 567 | h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); |
576 | qe->cont_cls = cont_cls; | 568 | if (!h->th){ |
577 | qe->was_transmitted = GNUNET_NO; | 569 | LOG (GNUNET_ERROR_TYPE_ERROR, |
578 | qe->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, qe); | 570 | _("Failed to send a message to the statistics service\n")); |
579 | qe->response_proc = &process_status_message; | 571 | GNUNET_CLIENT_disconnect(h->client); |
580 | qe->timeout = GNUNET_TIME_relative_to_absolute (timeout); | 572 | GNUNET_free(h); |
581 | 573 | return NULL; | |
582 | vector = (int32_t *) & qe->msg[1]; | 574 | } |
575 | |||
576 | size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t); | ||
577 | |||
578 | h->cont_datum = cont; | ||
579 | h->cont_cls = cont_cls; | ||
580 | h->response_proc = &process_result_message; | ||
581 | h->cfg = cfg; | ||
582 | h->msg = GNUNET_malloc (size); | ||
583 | memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); | ||
584 | |||
585 | msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg; | ||
586 | msg->header.size = htons (size); | ||
587 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); | ||
588 | msg->element_count = htonl (element_count); | ||
589 | |||
590 | vector = (int32_t*) &msg[1]; | ||
583 | // copy each element over to the message | 591 | // copy each element over to the message |
584 | for (i = 0; i < element_count; i++) | 592 | for (i = 0; i < element_count; i++) |
585 | vector[i] = htonl (elements[i]); | 593 | vector[i] = htonl(elements[i]); |
586 | 594 | ||
587 | process_queue (h); | 595 | memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); |
588 | return qe; | 596 | |
597 | |||
598 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, | ||
599 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
600 | GNUNET_YES, // retry is OK in the initial stage | ||
601 | &transmit_request, h); | ||
602 | if (!h->th) | ||
603 | { | ||
604 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
605 | _ ("Failed to send a message to the scalarproduct service\n")); | ||
606 | GNUNET_STATISTICS_destroy(h->GNUNET_YES); | ||
607 | GNUNET_CLIENT_disconnect(h->client); | ||
608 | GNUNET_free(h->msg); | ||
609 | GNUNET_free(h); | ||
610 | return NULL; | ||
611 | } | ||
612 | return h; | ||
589 | } | 613 | } |
590 | 614 | ||
591 | 615 | ||
592 | /** | 616 | /** |
593 | * Request the Scalar Product Evaluation | 617 | * Request by Alice's client for computing a scalar product |
594 | * | 618 | * |
595 | * @param h handle to the master context | 619 | * @param h handle to the master context |
596 | * @param key Session key - unique to the requesting client | 620 | * @param key Session key - unique to the requesting client |
597 | * @param peer PeerID of the other peer | 621 | * @param peer PeerID of the other peer |
598 | * @param element_count Number of elements in the vector | ||
599 | * @param mask_length number of bytes in the mask | ||
600 | * @param elements Array of elements of the vector | 622 | * @param elements Array of elements of the vector |
623 | * @param element_count Number of elements in the vector | ||
601 | * @param mask Array of the mask | 624 | * @param mask Array of the mask |
602 | * @param timeout Relative timeout for the operation | 625 | * @param mask_bytes number of bytes in the mask |
603 | * @param cont Callback function | 626 | * @param cont Callback function |
604 | * @param cont_cls Closure for the callback function | 627 | * @param cont_cls Closure for the callback function |
605 | */ | 628 | */ |
606 | struct GNUNET_SCALARPRODUCT_QueueEntry * | 629 | struct GNUNET_SCALARPRODUCT_Handle * |
607 | GNUNET_SCALARPRODUCT_request (struct GNUNET_SCALARPRODUCT_Handle *h, | 630 | GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg, |
608 | const struct GNUNET_HashCode * key, | 631 | const struct GNUNET_HashCode * key, |
609 | const struct GNUNET_PeerIdentity * peer, | 632 | const struct GNUNET_PeerIdentity *peer, |
610 | uint16_t element_count, | 633 | const int32_t * elements, |
611 | uint16_t mask_length, | 634 | uint32_t element_count, |
612 | int32_t * elements, | ||
613 | const unsigned char * mask, | 635 | const unsigned char * mask, |
614 | struct GNUNET_TIME_Relative timeout, | 636 | uint32_t mask_bytes, |
615 | GNUNET_SCALARPRODUCT_DatumProcessor cont, | 637 | GNUNET_SCALARPRODUCT_DatumProcessor cont, |
616 | void *cont_cls) | 638 | void *cont_cls) |
617 | { | 639 | { |
618 | struct GNUNET_SCALARPRODUCT_QueueEntry *qe = make_queue_entry (h); | 640 | struct GNUNET_CLIENT_Connection *client; |
641 | struct GNUNET_SCALARPRODUCT_Handle *h; | ||
642 | struct GNUNET_SCALARPRODUCT_client_request *msg; | ||
619 | int32_t * vector; | 643 | int32_t * vector; |
620 | uint16_t size; | 644 | uint16_t size; |
621 | unsigned int i; | 645 | uint64_t i; |
622 | 646 | ||
647 | GNUNET_assert(key); | ||
648 | GNUNET_assert(peer); | ||
649 | GNUNET_assert(elements); | ||
650 | GNUNET_assert(mask); | ||
651 | GNUNET_assert(cont); | ||
652 | GNUNET_assert(element_count > 1); | ||
653 | GNUNET_assert(mask_bytes != 0); | ||
623 | GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) | 654 | GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct GNUNET_SCALARPRODUCT_client_request) |
624 | +element_count * sizeof (int32_t) | 655 | + element_count * sizeof (int32_t) |
625 | + mask_length); | 656 | + mask_length); |
626 | size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) +element_count * sizeof (int32_t) + mask_length; | ||
627 | |||
628 | qe->message_size = size; | ||
629 | qe->msg = GNUNET_malloc (size); | ||
630 | qe->msg->header.size = htons (size); | ||
631 | qe->msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); | ||
632 | memcpy (&qe->msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); | ||
633 | qe->msg->element_count = htons (element_count); | ||
634 | qe->msg->mask_length = htons (mask_length); | ||
635 | memcpy (&qe->msg->key, key, sizeof (struct GNUNET_HashCode)); | ||
636 | qe->cont_datum = cont; | ||
637 | qe->cont_cls = cont_cls; | ||
638 | qe->was_transmitted = GNUNET_NO; | ||
639 | qe->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, qe); | ||
640 | qe->response_proc = &process_result_message; | ||
641 | qe->timeout = GNUNET_TIME_relative_to_absolute (timeout); | ||
642 | |||
643 | vector = (int32_t*) & qe->msg[1]; | ||
644 | // copy each element over to the message | ||
645 | for (i = 0; i < element_count; i++) | ||
646 | vector[i] = htonl (elements[i]); | ||
647 | |||
648 | // fill in the mask | ||
649 | memcpy (&vector[element_count], mask, mask_length); | ||
650 | |||
651 | process_queue (h); | ||
652 | return qe; | ||
653 | } | ||
654 | |||
655 | |||
656 | /** | ||
657 | * Connect to the scalarproduct service. | ||
658 | * | ||
659 | * @param cfg configuration to use | ||
660 | * @return handle to use to access the service | ||
661 | */ | ||
662 | struct GNUNET_SCALARPRODUCT_Handle * | ||
663 | GNUNET_SCALARPRODUCT_connect (const struct GNUNET_CONFIGURATION_Handle * cfg) | ||
664 | { | ||
665 | struct GNUNET_CLIENT_Connection *client; | ||
666 | struct GNUNET_SCALARPRODUCT_Handle *h; | ||
667 | |||
668 | client = GNUNET_CLIENT_connect ("scalarproduct", cfg); | 657 | client = GNUNET_CLIENT_connect ("scalarproduct", cfg); |
669 | 658 | ||
670 | if (NULL == client) | 659 | if (!client) |
671 | { | 660 | { |
672 | LOG (GNUNET_ERROR_TYPE_ERROR, | 661 | LOG (GNUNET_ERROR_TYPE_ERROR, |
673 | _ ("Failed to connect to the scalarproduct service\n")); | 662 | _ ("Failed to connect to the scalarproduct service\n")); |
674 | return NULL; | 663 | return NULL; |
675 | } | 664 | } |
676 | 665 | size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * sizeof (int32_t) + mask_length; | |
677 | h = GNUNET_malloc (sizeof (struct GNUNET_SCALARPRODUCT_Handle) + | 666 | |
678 | GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); | 667 | h = GNUNET_new (struct GNUNET_SCALARPRODUCT_Handle); |
668 | h->cont_datum = cont; | ||
669 | h->cont_cls = cont_cls; | ||
670 | h->response_proc = &process_status_message; | ||
679 | h->client = client; | 671 | h->client = client; |
680 | h->cfg = cfg; | 672 | h->cfg = cfg; |
673 | h->msg = GNUNET_malloc (size); | ||
674 | memcpy (&h->key, key, sizeof (struct GNUNET_HashCode)); | ||
675 | |||
676 | msg = (struct GNUNET_SCALARPRODUCT_client_request*) h->msg; | ||
677 | msg->header.size = htons (size); | ||
678 | msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE); | ||
679 | msg->element_count = htons (element_count); | ||
680 | msg->mask_length = htons (mask_length); | ||
681 | |||
682 | vector = (int32_t*) &msg[1]; | ||
683 | // copy each element over to the message | ||
684 | for (i = 0; i < element_count; i++) | ||
685 | vector[i] = htonl(elements[i]); | ||
686 | |||
687 | memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity)); | ||
688 | memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode)); | ||
689 | memcpy (&vector[element_count], mask, mask_length); | ||
690 | |||
681 | h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); | 691 | h->stats = GNUNET_STATISTICS_create ("scalarproduct-api", cfg); |
692 | h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size, | ||
693 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
694 | GNUNET_YES, // retry is OK in the initial stage | ||
695 | &transmit_request, h); | ||
696 | if ( !h->th) | ||
697 | { | ||
698 | LOG (GNUNET_ERROR_TYPE_ERROR, | ||
699 | _ ("Failed to send a message to the scalarproduct service\n")); | ||
700 | return NULL; | ||
701 | } | ||
682 | return h; | 702 | return h; |
683 | } | 703 | } |
684 | 704 | ||
685 | |||
686 | /** | 705 | /** |
687 | * Disconnect from the scalarproduct service. | 706 | * Disconnect from the scalarproduct service. |
688 | * | 707 | * |