diff options
author | Christian Grothoff <christian@grothoff.org> | 2015-01-07 14:04:57 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2015-01-07 14:04:57 +0000 |
commit | f3e4e62791a032420892a4f947125cf1aceb2c36 (patch) | |
tree | 60b279198844fa9ae1ae8815cf1b615e2b419594 /src/datastore/datastore_api.c | |
parent | 22621780b74a570bf160e067b14239602750ee3a (diff) | |
download | gnunet-f3e4e62791a032420892a4f947125cf1aceb2c36.tar.gz gnunet-f3e4e62791a032420892a4f947125cf1aceb2c36.zip |
suspend server while handlers are not in place
Diffstat (limited to 'src/datastore/datastore_api.c')
-rw-r--r-- | src/datastore/datastore_api.c | 95 |
1 files changed, 55 insertions, 40 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 497e654b2..6643f0637 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c | |||
@@ -546,10 +546,10 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
546 | static void | 546 | static void |
547 | do_disconnect (struct GNUNET_DATASTORE_Handle *h) | 547 | do_disconnect (struct GNUNET_DATASTORE_Handle *h) |
548 | { | 548 | { |
549 | if (h->client == NULL) | 549 | if (NULL == h->client) |
550 | { | 550 | { |
551 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 551 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
552 | "client NULL in disconnect, will not try to reconnect\n"); | 552 | "Client NULL in disconnect, will not try to reconnect\n"); |
553 | return; | 553 | return; |
554 | } | 554 | } |
555 | GNUNET_CLIENT_disconnect (h->client); | 555 | GNUNET_CLIENT_disconnect (h->client); |
@@ -564,17 +564,19 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h) | |||
564 | * Function called whenever we receive a message from | 564 | * Function called whenever we receive a message from |
565 | * the service. Calls the appropriate handler. | 565 | * the service. Calls the appropriate handler. |
566 | * | 566 | * |
567 | * @param cls the 'struct GNUNET_DATASTORE_Handle' | 567 | * @param cls the `struct GNUNET_DATASTORE_Handle` |
568 | * @param msg the received message | 568 | * @param msg the received message |
569 | */ | 569 | */ |
570 | static void | 570 | static void |
571 | receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) | 571 | receive_cb (void *cls, |
572 | const struct GNUNET_MessageHeader *msg) | ||
572 | { | 573 | { |
573 | struct GNUNET_DATASTORE_Handle *h = cls; | 574 | struct GNUNET_DATASTORE_Handle *h = cls; |
574 | struct GNUNET_DATASTORE_QueueEntry *qe; | 575 | struct GNUNET_DATASTORE_QueueEntry *qe; |
575 | 576 | ||
576 | h->in_receive = GNUNET_NO; | 577 | h->in_receive = GNUNET_NO; |
577 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n"); | 578 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
579 | "Receiving reply from datastore\n"); | ||
578 | if (h->skip_next_messages > 0) | 580 | if (h->skip_next_messages > 0) |
579 | { | 581 | { |
580 | h->skip_next_messages--; | 582 | h->skip_next_messages--; |
@@ -600,7 +602,9 @@ receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) | |||
600 | * @return number of bytes written to @a buf | 602 | * @return number of bytes written to @a buf |
601 | */ | 603 | */ |
602 | static size_t | 604 | static size_t |
603 | transmit_request (void *cls, size_t size, void *buf) | 605 | transmit_request (void *cls, |
606 | size_t size, | ||
607 | void *buf) | ||
604 | { | 608 | { |
605 | struct GNUNET_DATASTORE_Handle *h = cls; | 609 | struct GNUNET_DATASTORE_Handle *h = cls; |
606 | struct GNUNET_DATASTORE_QueueEntry *qe; | 610 | struct GNUNET_DATASTORE_QueueEntry *qe; |
@@ -609,9 +613,10 @@ transmit_request (void *cls, size_t size, void *buf) | |||
609 | h->th = NULL; | 613 | h->th = NULL; |
610 | if (NULL == (qe = h->queue_head)) | 614 | if (NULL == (qe = h->queue_head)) |
611 | return 0; /* no entry in queue */ | 615 | return 0; /* no entry in queue */ |
612 | if (buf == NULL) | 616 | if (NULL == buf) |
613 | { | 617 | { |
614 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to DATASTORE.\n"); | 618 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
619 | "Failed to transmit request to DATASTORE.\n"); | ||
615 | GNUNET_STATISTICS_update (h->stats, | 620 | GNUNET_STATISTICS_update (h->stats, |
616 | gettext_noop ("# transmission request failures"), | 621 | gettext_noop ("# transmission request failures"), |
617 | 1, GNUNET_NO); | 622 | 1, GNUNET_NO); |
@@ -623,7 +628,8 @@ transmit_request (void *cls, size_t size, void *buf) | |||
623 | process_queue (h); | 628 | process_queue (h); |
624 | return 0; | 629 | return 0; |
625 | } | 630 | } |
626 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to DATASTORE\n", | 631 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
632 | "Transmitting %u byte request to DATASTORE\n", | ||
627 | msize); | 633 | msize); |
628 | memcpy (buf, &qe[1], msize); | 634 | memcpy (buf, &qe[1], msize); |
629 | qe->was_transmitted = GNUNET_YES; | 635 | qe->was_transmitted = GNUNET_YES; |
@@ -631,7 +637,8 @@ transmit_request (void *cls, size_t size, void *buf) | |||
631 | qe->task = NULL; | 637 | qe->task = NULL; |
632 | GNUNET_assert (GNUNET_NO == h->in_receive); | 638 | GNUNET_assert (GNUNET_NO == h->in_receive); |
633 | h->in_receive = GNUNET_YES; | 639 | h->in_receive = GNUNET_YES; |
634 | GNUNET_CLIENT_receive (h->client, &receive_cb, h, | 640 | GNUNET_CLIENT_receive (h->client, |
641 | &receive_cb, h, | ||
635 | GNUNET_TIME_absolute_get_remaining (qe->timeout)); | 642 | GNUNET_TIME_absolute_get_remaining (qe->timeout)); |
636 | #if INSANE_STATISTICS | 643 | #if INSANE_STATISTICS |
637 | GNUNET_STATISTICS_update (h->stats, | 644 | GNUNET_STATISTICS_update (h->stats, |
@@ -655,35 +662,44 @@ process_queue (struct GNUNET_DATASTORE_Handle *h) | |||
655 | 662 | ||
656 | if (NULL == (qe = h->queue_head)) | 663 | if (NULL == (qe = h->queue_head)) |
657 | { | 664 | { |
658 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n"); | 665 | /* no entry in queue */ |
659 | return; /* no entry in queue */ | 666 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
667 | "Queue empty\n"); | ||
668 | return; | ||
660 | } | 669 | } |
661 | if (qe->was_transmitted == GNUNET_YES) | 670 | if (GNUNET_YES == qe->was_transmitted) |
662 | { | 671 | { |
663 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n"); | 672 | /* waiting for replies */ |
664 | return; /* waiting for replies */ | 673 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
674 | "Head request already transmitted\n"); | ||
675 | return; | ||
665 | } | 676 | } |
666 | if (h->th != NULL) | 677 | if (NULL != h->th) |
667 | { | 678 | { |
668 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n"); | 679 | /* request pending */ |
669 | return; /* request pending */ | 680 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
681 | "Pending transmission request\n"); | ||
682 | return; | ||
670 | } | 683 | } |
671 | if (h->client == NULL) | 684 | if (NULL == h->client) |
672 | { | 685 | { |
673 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n"); | 686 | /* waiting for reconnect */ |
674 | return; /* waiting for reconnect */ | 687 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
688 | "Not connected\n"); | ||
689 | return; | ||
675 | } | 690 | } |
676 | if (GNUNET_YES == h->in_receive) | 691 | if (GNUNET_YES == h->in_receive) |
677 | { | 692 | { |
678 | /* wait for response to previous query */ | 693 | /* wait for response to previous query */ |
679 | return; | 694 | return; |
680 | } | 695 | } |
681 | LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n", | 696 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
697 | "Queueing %u byte request to DATASTORE\n", | ||
682 | qe->message_size); | 698 | qe->message_size); |
683 | h->th = | 699 | h->th |
684 | GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, | 700 | = GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, |
685 | GNUNET_TIME_absolute_get_remaining | 701 | GNUNET_TIME_absolute_get_remaining (qe->timeout), |
686 | (qe->timeout), GNUNET_YES, | 702 | GNUNET_YES, |
687 | &transmit_request, h); | 703 | &transmit_request, h); |
688 | GNUNET_assert (GNUNET_NO == h->in_receive); | 704 | GNUNET_assert (GNUNET_NO == h->in_receive); |
689 | GNUNET_break (NULL != h->th); | 705 | GNUNET_break (NULL != h->th); |
@@ -739,7 +755,8 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) | |||
739 | * @param msg message received, NULL on timeout or fatal error | 755 | * @param msg message received, NULL on timeout or fatal error |
740 | */ | 756 | */ |
741 | static void | 757 | static void |
742 | process_status_message (void *cls, const struct GNUNET_MessageHeader *msg) | 758 | process_status_message (void *cls, |
759 | const struct GNUNET_MessageHeader *msg) | ||
743 | { | 760 | { |
744 | struct GNUNET_DATASTORE_Handle *h = cls; | 761 | struct GNUNET_DATASTORE_Handle *h = cls; |
745 | struct GNUNET_DATASTORE_QueueEntry *qe; | 762 | struct GNUNET_DATASTORE_QueueEntry *qe; |
@@ -756,7 +773,7 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader *msg) | |||
756 | return; | 773 | return; |
757 | } | 774 | } |
758 | rc = qe->qc.sc; | 775 | rc = qe->qc.sc; |
759 | if (msg == NULL) | 776 | if (NULL == msg) |
760 | { | 777 | { |
761 | was_transmitted = qe->was_transmitted; | 778 | was_transmitted = qe->was_transmitted; |
762 | free_queue_entry (qe); | 779 | free_queue_entry (qe); |
@@ -764,7 +781,7 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader *msg) | |||
764 | do_disconnect (h); | 781 | do_disconnect (h); |
765 | else | 782 | else |
766 | process_queue (h); | 783 | process_queue (h); |
767 | if (rc.cont != NULL) | 784 | if (NULL != rc.cont) |
768 | rc.cont (rc.cont_cls, GNUNET_SYSERR, | 785 | rc.cont (rc.cont_cls, GNUNET_SYSERR, |
769 | GNUNET_TIME_UNIT_ZERO_ABS, | 786 | GNUNET_TIME_UNIT_ZERO_ABS, |
770 | _("Failed to receive status response from database.")); | 787 | _("Failed to receive status response from database.")); |
@@ -902,10 +919,6 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid, | |||
902 | * @param h handle to the datastore | 919 | * @param h handle to the datastore |
903 | * @param amount how much space (in bytes) should be reserved (for content only) | 920 | * @param amount how much space (in bytes) should be reserved (for content only) |
904 | * @param entries how many entries will be created (to calculate per-entry overhead) | 921 | * @param entries how many entries will be created (to calculate per-entry overhead) |
905 | * @param queue_priority ranking of this request in the priority queue | ||
906 | * @param max_queue_size at what queue size should this request be dropped | ||
907 | * (if other requests of higher priority are in the queue) | ||
908 | * @param timeout how long to wait at most for a response (or before dying in queue) | ||
909 | * @param cont continuation to call when done; "success" will be set to | 922 | * @param cont continuation to call when done; "success" will be set to |
910 | * a positive reservation value if space could be reserved. | 923 | * a positive reservation value if space could be reserved. |
911 | * @param cont_cls closure for @a cont | 924 | * @param cont_cls closure for @a cont |
@@ -915,9 +928,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid, | |||
915 | */ | 928 | */ |
916 | struct GNUNET_DATASTORE_QueueEntry * | 929 | struct GNUNET_DATASTORE_QueueEntry * |
917 | GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, | 930 | GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, |
918 | uint32_t entries, unsigned int queue_priority, | 931 | uint32_t entries, |
919 | unsigned int max_queue_size, | ||
920 | struct GNUNET_TIME_Relative timeout, | ||
921 | GNUNET_DATASTORE_ContinuationWithStatus cont, | 932 | GNUNET_DATASTORE_ContinuationWithStatus cont, |
922 | void *cont_cls) | 933 | void *cont_cls) |
923 | { | 934 | { |
@@ -925,16 +936,20 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, | |||
925 | struct ReserveMessage *rm; | 936 | struct ReserveMessage *rm; |
926 | union QueueContext qc; | 937 | union QueueContext qc; |
927 | 938 | ||
928 | if (cont == NULL) | 939 | if (NULL == cont) |
929 | cont = &drop_status_cont; | 940 | cont = &drop_status_cont; |
930 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 941 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
931 | "Asked to reserve %llu bytes of data and %u entries\n", | 942 | "Asked to reserve %llu bytes of data and %u entries\n", |
932 | (unsigned long long) amount, (unsigned int) entries); | 943 | (unsigned long long) amount, (unsigned int) entries); |
933 | qc.sc.cont = cont; | 944 | qc.sc.cont = cont; |
934 | qc.sc.cont_cls = cont_cls; | 945 | qc.sc.cont_cls = cont_cls; |
935 | qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority, | 946 | qe = make_queue_entry (h, |
936 | max_queue_size, timeout, &process_status_message, &qc); | 947 | sizeof (struct ReserveMessage), |
937 | if (qe == NULL) | 948 | UINT_MAX, |
949 | UINT_MAX, | ||
950 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
951 | &process_status_message, &qc); | ||
952 | if (NULL == qe) | ||
938 | { | 953 | { |
939 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 954 | LOG (GNUNET_ERROR_TYPE_DEBUG, |
940 | "Could not create queue entry to reserve\n"); | 955 | "Could not create queue entry to reserve\n"); |