diff options
author | David Barksdale <amatus@amat.us> | 2017-11-10 17:45:13 -0600 |
---|---|---|
committer | David Barksdale <amatus@amat.us> | 2017-11-10 17:45:13 -0600 |
commit | 0d8e5fb7ae0282a1e4779c86e8cc30b803299e3e (patch) | |
tree | ad394d73a0227594fb32c51fb04193141d9dd620 /src/datastore | |
parent | a3dace8401c482f18bddbad37da108433c1b08c7 (diff) | |
download | gnunet-0d8e5fb7ae0282a1e4779c86e8cc30b803299e3e.tar.gz gnunet-0d8e5fb7ae0282a1e4779c86e8cc30b803299e3e.zip |
Fix skip_next_messages counting, combine logic
This fixes messages like these:
Nov 10 08:57:34-927033 fs-22733 ERROR Request 0x6080017479a0 of type 100
at head of datastore queue for more than 1 m
And might fix issue #3903
Diffstat (limited to 'src/datastore')
-rw-r--r-- | src/datastore/datastore_api.c | 115 |
1 files changed, 47 insertions, 68 deletions
diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 31f7a997f..2ad864987 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c | |||
@@ -651,6 +651,46 @@ process_queue (struct GNUNET_DATASTORE_Handle *h) | |||
651 | } | 651 | } |
652 | 652 | ||
653 | 653 | ||
654 | /** | ||
655 | * Get the entry at the head of the message queue. | ||
656 | * | ||
657 | * @param h handle to the datastore | ||
658 | * @param response_type the expected response type | ||
659 | * @return the queue entry | ||
660 | */ | ||
661 | static struct GNUNET_DATASTORE_QueueEntry * | ||
662 | get_queue_head (struct GNUNET_DATASTORE_Handle *h, | ||
663 | uint16_t response_type) | ||
664 | { | ||
665 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
666 | |||
667 | if (h->skip_next_messages > 0) | ||
668 | { | ||
669 | h->skip_next_messages--; | ||
670 | process_queue (h); | ||
671 | return NULL; | ||
672 | } | ||
673 | qe = h->queue_head; | ||
674 | if (NULL == qe) | ||
675 | { | ||
676 | GNUNET_break (0); | ||
677 | do_disconnect (h); | ||
678 | return NULL; | ||
679 | } | ||
680 | if (NULL != qe->env) | ||
681 | { | ||
682 | GNUNET_break (0); | ||
683 | do_disconnect (h); | ||
684 | return NULL; | ||
685 | } | ||
686 | if (response_type != qe->response_type) | ||
687 | { | ||
688 | GNUNET_break (0); | ||
689 | do_disconnect (h); | ||
690 | return NULL; | ||
691 | } | ||
692 | return qe; | ||
693 | } | ||
654 | 694 | ||
655 | 695 | ||
656 | /** | 696 | /** |
@@ -702,30 +742,10 @@ handle_status (void *cls, | |||
702 | const char *emsg; | 742 | const char *emsg; |
703 | int32_t status = ntohl (sm->status); | 743 | int32_t status = ntohl (sm->status); |
704 | 744 | ||
705 | if (h->skip_next_messages > 0) | 745 | qe = get_queue_head (h, |
706 | { | 746 | GNUNET_MESSAGE_TYPE_DATASTORE_STATUS); |
707 | h->skip_next_messages--; | 747 | if (NULL == qe) |
708 | process_queue (h); | ||
709 | return; | ||
710 | } | ||
711 | if (NULL == (qe = h->queue_head)) | ||
712 | { | ||
713 | GNUNET_break (0); | ||
714 | do_disconnect (h); | ||
715 | return; | ||
716 | } | ||
717 | if (NULL != qe->env) | ||
718 | { | ||
719 | GNUNET_break (0); | ||
720 | do_disconnect (h); | ||
721 | return; | ||
722 | } | ||
723 | if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type) | ||
724 | { | ||
725 | GNUNET_break (0); | ||
726 | do_disconnect (h); | ||
727 | return; | 748 | return; |
728 | } | ||
729 | rc = qe->qc.sc; | 749 | rc = qe->qc.sc; |
730 | free_queue_entry (qe); | 750 | free_queue_entry (qe); |
731 | if (ntohs (sm->header.size) > sizeof (struct StatusMessage)) | 751 | if (ntohs (sm->header.size) > sizeof (struct StatusMessage)) |
@@ -785,30 +805,10 @@ handle_data (void *cls, | |||
785 | struct GNUNET_DATASTORE_QueueEntry *qe; | 805 | struct GNUNET_DATASTORE_QueueEntry *qe; |
786 | struct ResultContext rc; | 806 | struct ResultContext rc; |
787 | 807 | ||
788 | if (h->skip_next_messages > 0) | 808 | qe = get_queue_head (h, |
789 | { | 809 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA); |
790 | process_queue (h); | ||
791 | return; | ||
792 | } | ||
793 | qe = h->queue_head; | ||
794 | if (NULL == qe) | 810 | if (NULL == qe) |
795 | { | ||
796 | GNUNET_break (0); | ||
797 | do_disconnect (h); | ||
798 | return; | ||
799 | } | ||
800 | if (NULL != qe->env) | ||
801 | { | ||
802 | GNUNET_break (0); | ||
803 | do_disconnect (h); | ||
804 | return; | ||
805 | } | ||
806 | if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) | ||
807 | { | ||
808 | GNUNET_break (0); | ||
809 | do_disconnect (h); | ||
810 | return; | 811 | return; |
811 | } | ||
812 | #if INSANE_STATISTICS | 812 | #if INSANE_STATISTICS |
813 | GNUNET_STATISTICS_update (h->stats, | 813 | GNUNET_STATISTICS_update (h->stats, |
814 | gettext_noop ("# Results received"), | 814 | gettext_noop ("# Results received"), |
@@ -854,31 +854,10 @@ handle_data_end (void *cls, | |||
854 | struct GNUNET_DATASTORE_QueueEntry *qe; | 854 | struct GNUNET_DATASTORE_QueueEntry *qe; |
855 | struct ResultContext rc; | 855 | struct ResultContext rc; |
856 | 856 | ||
857 | if (h->skip_next_messages > 0) | 857 | qe = get_queue_head (h, |
858 | { | 858 | GNUNET_MESSAGE_TYPE_DATASTORE_DATA); |
859 | h->skip_next_messages--; | ||
860 | process_queue (h); | ||
861 | return; | ||
862 | } | ||
863 | qe = h->queue_head; | ||
864 | if (NULL == qe) | 859 | if (NULL == qe) |
865 | { | ||
866 | GNUNET_break (0); | ||
867 | do_disconnect (h); | ||
868 | return; | 860 | return; |
869 | } | ||
870 | if (NULL != qe->env) | ||
871 | { | ||
872 | GNUNET_break (0); | ||
873 | do_disconnect (h); | ||
874 | return; | ||
875 | } | ||
876 | if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type) | ||
877 | { | ||
878 | GNUNET_break (0); | ||
879 | do_disconnect (h); | ||
880 | return; | ||
881 | } | ||
882 | rc = qe->qc.rc; | 861 | rc = qe->qc.rc; |
883 | free_queue_entry (qe); | 862 | free_queue_entry (qe); |
884 | LOG (GNUNET_ERROR_TYPE_DEBUG, | 863 | LOG (GNUNET_ERROR_TYPE_DEBUG, |