aboutsummaryrefslogtreecommitdiff
path: root/src/datastore
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2015-01-07 14:04:57 +0000
committerChristian Grothoff <christian@grothoff.org>2015-01-07 14:04:57 +0000
commitf3e4e62791a032420892a4f947125cf1aceb2c36 (patch)
tree60b279198844fa9ae1ae8815cf1b615e2b419594 /src/datastore
parent22621780b74a570bf160e067b14239602750ee3a (diff)
downloadgnunet-f3e4e62791a032420892a4f947125cf1aceb2c36.tar.gz
gnunet-f3e4e62791a032420892a4f947125cf1aceb2c36.zip
suspend server while handlers are not in place
Diffstat (limited to 'src/datastore')
-rw-r--r--src/datastore/datastore_api.c95
-rw-r--r--src/datastore/gnunet-service-datastore.c6
-rw-r--r--src/datastore/test_datastore_api.c2
3 files changed, 60 insertions, 43 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c
index 497e654b2..6643f0637 100644
--- a/src/datastore/datastore_api.c
+++ b/src/datastore/datastore_api.c
@@ -546,10 +546,10 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
546static void 546static void
547do_disconnect (struct GNUNET_DATASTORE_Handle *h) 547do_disconnect (struct GNUNET_DATASTORE_Handle *h)
548{ 548{
549 if (h->client == NULL) 549 if (NULL == h->client)
550 { 550 {
551 LOG (GNUNET_ERROR_TYPE_DEBUG, 551 LOG (GNUNET_ERROR_TYPE_DEBUG,
552 "client NULL in disconnect, will not try to reconnect\n"); 552 "Client NULL in disconnect, will not try to reconnect\n");
553 return; 553 return;
554 } 554 }
555 GNUNET_CLIENT_disconnect (h->client); 555 GNUNET_CLIENT_disconnect (h->client);
@@ -564,17 +564,19 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h)
564 * Function called whenever we receive a message from 564 * Function called whenever we receive a message from
565 * the service. Calls the appropriate handler. 565 * the service. Calls the appropriate handler.
566 * 566 *
567 * @param cls the 'struct GNUNET_DATASTORE_Handle' 567 * @param cls the `struct GNUNET_DATASTORE_Handle`
568 * @param msg the received message 568 * @param msg the received message
569 */ 569 */
570static void 570static void
571receive_cb (void *cls, const struct GNUNET_MessageHeader *msg) 571receive_cb (void *cls,
572 const struct GNUNET_MessageHeader *msg)
572{ 573{
573 struct GNUNET_DATASTORE_Handle *h = cls; 574 struct GNUNET_DATASTORE_Handle *h = cls;
574 struct GNUNET_DATASTORE_QueueEntry *qe; 575 struct GNUNET_DATASTORE_QueueEntry *qe;
575 576
576 h->in_receive = GNUNET_NO; 577 h->in_receive = GNUNET_NO;
577 LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n"); 578 LOG (GNUNET_ERROR_TYPE_DEBUG,
579 "Receiving reply from datastore\n");
578 if (h->skip_next_messages > 0) 580 if (h->skip_next_messages > 0)
579 { 581 {
580 h->skip_next_messages--; 582 h->skip_next_messages--;
@@ -600,7 +602,9 @@ receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
600 * @return number of bytes written to @a buf 602 * @return number of bytes written to @a buf
601 */ 603 */
602static size_t 604static size_t
603transmit_request (void *cls, size_t size, void *buf) 605transmit_request (void *cls,
606 size_t size,
607 void *buf)
604{ 608{
605 struct GNUNET_DATASTORE_Handle *h = cls; 609 struct GNUNET_DATASTORE_Handle *h = cls;
606 struct GNUNET_DATASTORE_QueueEntry *qe; 610 struct GNUNET_DATASTORE_QueueEntry *qe;
@@ -609,9 +613,10 @@ transmit_request (void *cls, size_t size, void *buf)
609 h->th = NULL; 613 h->th = NULL;
610 if (NULL == (qe = h->queue_head)) 614 if (NULL == (qe = h->queue_head))
611 return 0; /* no entry in queue */ 615 return 0; /* no entry in queue */
612 if (buf == NULL) 616 if (NULL == buf)
613 { 617 {
614 LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to DATASTORE.\n"); 618 LOG (GNUNET_ERROR_TYPE_DEBUG,
619 "Failed to transmit request to DATASTORE.\n");
615 GNUNET_STATISTICS_update (h->stats, 620 GNUNET_STATISTICS_update (h->stats,
616 gettext_noop ("# transmission request failures"), 621 gettext_noop ("# transmission request failures"),
617 1, GNUNET_NO); 622 1, GNUNET_NO);
@@ -623,7 +628,8 @@ transmit_request (void *cls, size_t size, void *buf)
623 process_queue (h); 628 process_queue (h);
624 return 0; 629 return 0;
625 } 630 }
626 LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to DATASTORE\n", 631 LOG (GNUNET_ERROR_TYPE_DEBUG,
632 "Transmitting %u byte request to DATASTORE\n",
627 msize); 633 msize);
628 memcpy (buf, &qe[1], msize); 634 memcpy (buf, &qe[1], msize);
629 qe->was_transmitted = GNUNET_YES; 635 qe->was_transmitted = GNUNET_YES;
@@ -631,7 +637,8 @@ transmit_request (void *cls, size_t size, void *buf)
631 qe->task = NULL; 637 qe->task = NULL;
632 GNUNET_assert (GNUNET_NO == h->in_receive); 638 GNUNET_assert (GNUNET_NO == h->in_receive);
633 h->in_receive = GNUNET_YES; 639 h->in_receive = GNUNET_YES;
634 GNUNET_CLIENT_receive (h->client, &receive_cb, h, 640 GNUNET_CLIENT_receive (h->client,
641 &receive_cb, h,
635 GNUNET_TIME_absolute_get_remaining (qe->timeout)); 642 GNUNET_TIME_absolute_get_remaining (qe->timeout));
636#if INSANE_STATISTICS 643#if INSANE_STATISTICS
637 GNUNET_STATISTICS_update (h->stats, 644 GNUNET_STATISTICS_update (h->stats,
@@ -655,35 +662,44 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
655 662
656 if (NULL == (qe = h->queue_head)) 663 if (NULL == (qe = h->queue_head))
657 { 664 {
658 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n"); 665 /* no entry in queue */
659 return; /* no entry in queue */ 666 LOG (GNUNET_ERROR_TYPE_DEBUG,
667 "Queue empty\n");
668 return;
660 } 669 }
661 if (qe->was_transmitted == GNUNET_YES) 670 if (GNUNET_YES == qe->was_transmitted)
662 { 671 {
663 LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n"); 672 /* waiting for replies */
664 return; /* waiting for replies */ 673 LOG (GNUNET_ERROR_TYPE_DEBUG,
674 "Head request already transmitted\n");
675 return;
665 } 676 }
666 if (h->th != NULL) 677 if (NULL != h->th)
667 { 678 {
668 LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n"); 679 /* request pending */
669 return; /* request pending */ 680 LOG (GNUNET_ERROR_TYPE_DEBUG,
681 "Pending transmission request\n");
682 return;
670 } 683 }
671 if (h->client == NULL) 684 if (NULL == h->client)
672 { 685 {
673 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n"); 686 /* waiting for reconnect */
674 return; /* waiting for reconnect */ 687 LOG (GNUNET_ERROR_TYPE_DEBUG,
688 "Not connected\n");
689 return;
675 } 690 }
676 if (GNUNET_YES == h->in_receive) 691 if (GNUNET_YES == h->in_receive)
677 { 692 {
678 /* wait for response to previous query */ 693 /* wait for response to previous query */
679 return; 694 return;
680 } 695 }
681 LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n", 696 LOG (GNUNET_ERROR_TYPE_DEBUG,
697 "Queueing %u byte request to DATASTORE\n",
682 qe->message_size); 698 qe->message_size);
683 h->th = 699 h->th
684 GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, 700 = GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
685 GNUNET_TIME_absolute_get_remaining 701 GNUNET_TIME_absolute_get_remaining (qe->timeout),
686 (qe->timeout), GNUNET_YES, 702 GNUNET_YES,
687 &transmit_request, h); 703 &transmit_request, h);
688 GNUNET_assert (GNUNET_NO == h->in_receive); 704 GNUNET_assert (GNUNET_NO == h->in_receive);
689 GNUNET_break (NULL != h->th); 705 GNUNET_break (NULL != h->th);
@@ -739,7 +755,8 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
739 * @param msg message received, NULL on timeout or fatal error 755 * @param msg message received, NULL on timeout or fatal error
740 */ 756 */
741static void 757static void
742process_status_message (void *cls, const struct GNUNET_MessageHeader *msg) 758process_status_message (void *cls,
759 const struct GNUNET_MessageHeader *msg)
743{ 760{
744 struct GNUNET_DATASTORE_Handle *h = cls; 761 struct GNUNET_DATASTORE_Handle *h = cls;
745 struct GNUNET_DATASTORE_QueueEntry *qe; 762 struct GNUNET_DATASTORE_QueueEntry *qe;
@@ -756,7 +773,7 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
756 return; 773 return;
757 } 774 }
758 rc = qe->qc.sc; 775 rc = qe->qc.sc;
759 if (msg == NULL) 776 if (NULL == msg)
760 { 777 {
761 was_transmitted = qe->was_transmitted; 778 was_transmitted = qe->was_transmitted;
762 free_queue_entry (qe); 779 free_queue_entry (qe);
@@ -764,7 +781,7 @@ process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
764 do_disconnect (h); 781 do_disconnect (h);
765 else 782 else
766 process_queue (h); 783 process_queue (h);
767 if (rc.cont != NULL) 784 if (NULL != rc.cont)
768 rc.cont (rc.cont_cls, GNUNET_SYSERR, 785 rc.cont (rc.cont_cls, GNUNET_SYSERR,
769 GNUNET_TIME_UNIT_ZERO_ABS, 786 GNUNET_TIME_UNIT_ZERO_ABS,
770 _("Failed to receive status response from database.")); 787 _("Failed to receive status response from database."));
@@ -902,10 +919,6 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
902 * @param h handle to the datastore 919 * @param h handle to the datastore
903 * @param amount how much space (in bytes) should be reserved (for content only) 920 * @param amount how much space (in bytes) should be reserved (for content only)
904 * @param entries how many entries will be created (to calculate per-entry overhead) 921 * @param entries how many entries will be created (to calculate per-entry overhead)
905 * @param queue_priority ranking of this request in the priority queue
906 * @param max_queue_size at what queue size should this request be dropped
907 * (if other requests of higher priority are in the queue)
908 * @param timeout how long to wait at most for a response (or before dying in queue)
909 * @param cont continuation to call when done; "success" will be set to 922 * @param cont continuation to call when done; "success" will be set to
910 * a positive reservation value if space could be reserved. 923 * a positive reservation value if space could be reserved.
911 * @param cont_cls closure for @a cont 924 * @param cont_cls closure for @a cont
@@ -915,9 +928,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
915 */ 928 */
916struct GNUNET_DATASTORE_QueueEntry * 929struct GNUNET_DATASTORE_QueueEntry *
917GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, 930GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
918 uint32_t entries, unsigned int queue_priority, 931 uint32_t entries,
919 unsigned int max_queue_size,
920 struct GNUNET_TIME_Relative timeout,
921 GNUNET_DATASTORE_ContinuationWithStatus cont, 932 GNUNET_DATASTORE_ContinuationWithStatus cont,
922 void *cont_cls) 933 void *cont_cls)
923{ 934{
@@ -925,16 +936,20 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
925 struct ReserveMessage *rm; 936 struct ReserveMessage *rm;
926 union QueueContext qc; 937 union QueueContext qc;
927 938
928 if (cont == NULL) 939 if (NULL == cont)
929 cont = &drop_status_cont; 940 cont = &drop_status_cont;
930 LOG (GNUNET_ERROR_TYPE_DEBUG, 941 LOG (GNUNET_ERROR_TYPE_DEBUG,
931 "Asked to reserve %llu bytes of data and %u entries\n", 942 "Asked to reserve %llu bytes of data and %u entries\n",
932 (unsigned long long) amount, (unsigned int) entries); 943 (unsigned long long) amount, (unsigned int) entries);
933 qc.sc.cont = cont; 944 qc.sc.cont = cont;
934 qc.sc.cont_cls = cont_cls; 945 qc.sc.cont_cls = cont_cls;
935 qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority, 946 qe = make_queue_entry (h,
936 max_queue_size, timeout, &process_status_message, &qc); 947 sizeof (struct ReserveMessage),
937 if (qe == NULL) 948 UINT_MAX,
949 UINT_MAX,
950 GNUNET_TIME_UNIT_FOREVER_REL,
951 &process_status_message, &qc);
952 if (NULL == qe)
938 { 953 {
939 LOG (GNUNET_ERROR_TYPE_DEBUG, 954 LOG (GNUNET_ERROR_TYPE_DEBUG,
940 "Could not create queue entry to reserve\n"); 955 "Could not create queue entry to reserve\n");
diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c
index 37df441c4..dc411bb59 100644
--- a/src/datastore/gnunet-service-datastore.c
+++ b/src/datastore/gnunet-service-datastore.c
@@ -1423,6 +1423,7 @@ process_stat_done (void *cls, int success)
1423 } 1423 }
1424 1424
1425 GNUNET_SERVER_add_handlers (server, handlers); 1425 GNUNET_SERVER_add_handlers (server, handlers);
1426 GNUNET_SERVER_resume (server);
1426 expired_kill_task 1427 expired_kill_task
1427 = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE, 1428 = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1428 &delete_expired, 1429 &delete_expired,
@@ -1451,7 +1452,7 @@ cleaning_task (void *cls,
1451 GNUNET_free (tcc->msg); 1452 GNUNET_free (tcc->msg);
1452 GNUNET_free (tcc); 1453 GNUNET_free (tcc);
1453 } 1454 }
1454 if (expired_kill_task != NULL) 1455 if (NULL != expired_kill_task)
1455 { 1456 {
1456 GNUNET_SCHEDULER_cancel (expired_kill_task); 1457 GNUNET_SCHEDULER_cancel (expired_kill_task);
1457 expired_kill_task = NULL; 1458 expired_kill_task = NULL;
@@ -1509,7 +1510,7 @@ cleanup_reservations (void *cls,
1509 next = pos->next; 1510 next = pos->next;
1510 if (pos->client == client) 1511 if (pos->client == client)
1511 { 1512 {
1512 if (prev == NULL) 1513 if (NULL == prev)
1513 reservations = next; 1514 reservations = next;
1514 else 1515 else
1515 prev->next = next; 1516 prev->next = next;
@@ -1653,6 +1654,7 @@ run (void *cls,
1653 } 1654 }
1654 return; 1655 return;
1655 } 1656 }
1657 GNUNET_SERVER_suspend (server);
1656 stat_get = 1658 stat_get =
1657 GNUNET_STATISTICS_get (stats, 1659 GNUNET_STATISTICS_get (stats,
1658 "datastore", 1660 "datastore",
diff --git a/src/datastore/test_datastore_api.c b/src/datastore/test_datastore_api.c
index 2bc29b9fe..12ff5f3d1 100644
--- a/src/datastore/test_datastore_api.c
+++ b/src/datastore/test_datastore_api.c
@@ -418,7 +418,7 @@ run_continuation (void *cls,
418 break; 418 break;
419 case RP_RESERVE: 419 case RP_RESERVE:
420 crc->phase = RP_PUT_MULTIPLE; 420 crc->phase = RP_PUT_MULTIPLE;
421 GNUNET_DATASTORE_reserve (datastore, 128 * 1024, 2, 1, 1, TIMEOUT, 421 GNUNET_DATASTORE_reserve (datastore, 128 * 1024, 2,
422 &get_reserved, crc); 422 &get_reserved, crc);
423 break; 423 break;
424 case RP_PUT_MULTIPLE: 424 case RP_PUT_MULTIPLE: