aboutsummaryrefslogtreecommitdiff
path: root/src/fs/fs_publish.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-03 13:18:48 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-03 13:18:48 +0000
commit07a59d7b5b66e8b59029894b9ee42069abb7a187 (patch)
tree23db38ef9b9b17450d55ecebfbd8c8d60753491e /src/fs/fs_publish.c
parent5dff30a84e1ac0c52f9bd8b671335b5100d37b0d (diff)
downloadgnunet-07a59d7b5b66e8b59029894b9ee42069abb7a187.tar.gz
gnunet-07a59d7b5b66e8b59029894b9ee42069abb7a187.zip
convert fs publish to MQ
Diffstat (limited to 'src/fs/fs_publish.c')
-rw-r--r--src/fs/fs_publish.c284
1 files changed, 186 insertions, 98 deletions
diff --git a/src/fs/fs_publish.c b/src/fs/fs_publish.c
index 89cc2714c..7cf8b4815 100644
--- a/src/fs/fs_publish.c
+++ b/src/fs/fs_publish.c
@@ -92,10 +92,10 @@ publish_cleanup (struct GNUNET_FS_PublishContext *pc)
92 GNUNET_DATASTORE_disconnect (pc->dsh, GNUNET_NO); 92 GNUNET_DATASTORE_disconnect (pc->dsh, GNUNET_NO);
93 pc->dsh = NULL; 93 pc->dsh = NULL;
94 } 94 }
95 if (NULL != pc->client) 95 if (NULL != pc->mq)
96 { 96 {
97 GNUNET_CLIENT_disconnect (pc->client); 97 GNUNET_MQ_destroy (pc->mq);
98 pc->client = NULL; 98 pc->mq = NULL;
99 } 99 }
100 GNUNET_assert (NULL == pc->upload_task); 100 GNUNET_assert (NULL == pc->upload_task);
101 GNUNET_free (pc); 101 GNUNET_free (pc);
@@ -493,7 +493,8 @@ block_proc (void *cls,
493 p = pc->fi_pos; 493 p = pc->fi_pos;
494 if (NULL == pc->dsh) 494 if (NULL == pc->dsh)
495 { 495 {
496 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Waiting for datastore connection\n"); 496 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
497 "Waiting for datastore connection\n");
497 GNUNET_assert (NULL == pc->upload_task); 498 GNUNET_assert (NULL == pc->upload_task);
498 pc->upload_task = 499 pc->upload_task =
499 GNUNET_SCHEDULER_add_with_priority 500 GNUNET_SCHEDULER_add_with_priority
@@ -679,53 +680,105 @@ publish_content (struct GNUNET_FS_PublishContext *pc)
679 680
680 681
681/** 682/**
682 * Process the response (or lack thereof) from 683 * Check the response from the "fs" service to our 'start index'
683 * the "fs" service to our 'start index' request. 684 * request.
685 *
686 * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
687 * @param msg the response we got
688 */
689static int
690check_index_start_failed (void *cls,
691 const struct GNUNET_MessageHeader *msg)
692{
693 size_t msize = ntohs (msg->size) - sizeof (*msg);
694 const char *emsg = (const char *) &msg[1];
695
696 if (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0')
697 {
698 GNUNET_break (0);
699 return GNUNET_SYSERR;
700 }
701 return GNUNET_OK;
702}
703
704
705/**
706 * Process the response from the "fs" service to our 'start index'
707 * request.
684 * 708 *
685 * @param cls closure (of type `struct GNUNET_FS_PublishContext *`) 709 * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
686 * @param msg the response we got 710 * @param msg the response we got
687 */ 711 */
688static void 712static void
689process_index_start_response (void *cls, 713handle_index_start_failed (void *cls,
690 const struct GNUNET_MessageHeader *msg) 714 const struct GNUNET_MessageHeader *msg)
691{ 715{
692 struct GNUNET_FS_PublishContext *pc = cls; 716 struct GNUNET_FS_PublishContext *pc = cls;
693 struct GNUNET_FS_FileInformation *p; 717 struct GNUNET_FS_FileInformation *p;
694 const char *emsg; 718 const char *emsg = (const char *) &msg[1];
695 uint16_t msize;
696 719
697 GNUNET_CLIENT_disconnect (pc->client); 720 GNUNET_MQ_destroy (pc->mq);
698 pc->client = NULL; 721 pc->mq = NULL;
699 p = pc->fi_pos; 722 p = pc->fi_pos;
700 if (NULL == msg) 723 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
701 { 724 _("Can not index file `%s': %s. Will try to insert instead.\n"),
702 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 725 p->filename,
703 _("Can not index file `%s': %s. Will try to insert instead.\n"), 726 gettext (emsg));
704 p->filename, 727 p->data.file.do_index = GNUNET_NO;
705 _("timeout on index-start request to `fs' service")); 728 GNUNET_FS_file_information_sync_ (p);
706 p->data.file.do_index = GNUNET_NO; 729 publish_content (pc);
707 GNUNET_FS_file_information_sync_ (p); 730}
708 publish_content (pc); 731
709 return; 732
710 } 733/**
711 if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK) 734 * Process the response from the "fs" service to our 'start index'
735 * request.
736 *
737 * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
738 * @param msg the response we got
739 */
740static void
741handle_index_start_ok (void *cls,
742 const struct GNUNET_MessageHeader *msg)
743{
744 struct GNUNET_FS_PublishContext *pc = cls;
745 struct GNUNET_FS_FileInformation *p;
746
747 GNUNET_MQ_destroy (pc->mq);
748 pc->mq = NULL;
749 p = pc->fi_pos;
750 p->data.file.index_start_confirmed = GNUNET_YES;
751 GNUNET_FS_file_information_sync_ (p);
752 publish_content (pc);
753}
754
755
756/**
757 * Generic error handler, called with the appropriate error code and
758 * the same closure specified at the creation of the message queue.
759 * Not every message queue implementation supports an error handler.
760 *
761 * @param cls closure with the `struct GNUNET_FS_PublishContext *`
762 * @param error error code
763 */
764static void
765index_mq_error_handler (void *cls,
766 enum GNUNET_MQ_Error error)
767{
768 struct GNUNET_FS_PublishContext *pc = cls;
769 struct GNUNET_FS_FileInformation *p;
770
771 if (NULL != pc->mq)
712 { 772 {
713 msize = ntohs (msg->size); 773 GNUNET_MQ_destroy (pc->mq);
714 emsg = (const char *) &msg[1]; 774 pc->mq = NULL;
715 if ((msize <= sizeof (struct GNUNET_MessageHeader)) ||
716 (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0'))
717 emsg = gettext_noop ("unknown error");
718 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
719 _
720 ("Can not index file `%s': %s. Will try to insert instead.\n"),
721 p->filename, gettext (emsg));
722 p->data.file.do_index = GNUNET_NO;
723 GNUNET_FS_file_information_sync_ (p);
724 publish_content (pc);
725 return;
726 } 775 }
727 p->data.file.index_start_confirmed = GNUNET_YES; 776 p = pc->fi_pos;
728 /* success! continue with indexing */ 777 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
778 _("Can not index file `%s': %s. Will try to insert instead.\n"),
779 p->filename,
780 _("error on index-start request to `fs' service"));
781 p->data.file.do_index = GNUNET_NO;
729 GNUNET_FS_file_information_sync_ (p); 782 GNUNET_FS_file_information_sync_ (p);
730 publish_content (pc); 783 publish_content (pc);
731} 784}
@@ -742,11 +795,22 @@ static void
742hash_for_index_cb (void *cls, 795hash_for_index_cb (void *cls,
743 const struct GNUNET_HashCode *res) 796 const struct GNUNET_HashCode *res)
744{ 797{
798 GNUNET_MQ_hd_fixed_size (index_start_ok,
799 GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK,
800 struct GNUNET_MessageHeader);
801 GNUNET_MQ_hd_var_size (index_start_failed,
802 GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED,
803 struct GNUNET_MessageHeader);
745 struct GNUNET_FS_PublishContext *pc = cls; 804 struct GNUNET_FS_PublishContext *pc = cls;
805 struct GNUNET_MQ_MessageHandler handlers[] = {
806 make_index_start_ok_handler (pc),
807 make_index_start_failed_handler (pc),
808 GNUNET_MQ_handler_end ()
809 };
746 struct GNUNET_FS_FileInformation *p; 810 struct GNUNET_FS_FileInformation *p;
811 struct GNUNET_MQ_Envelope *env;
747 struct IndexStartMessage *ism; 812 struct IndexStartMessage *ism;
748 size_t slen; 813 size_t slen;
749 struct GNUNET_CLIENT_Connection *client;
750 uint64_t dev; 814 uint64_t dev;
751 uint64_t ino; 815 uint64_t ino;
752 char *fn; 816 char *fn;
@@ -785,8 +849,10 @@ hash_for_index_cb (void *cls,
785 publish_content (pc); 849 publish_content (pc);
786 return; 850 return;
787 } 851 }
788 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Hash of indexed file `%s' is `%s'\n", 852 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
789 p->filename, GNUNET_h2s (res)); 853 "Hash of indexed file `%s' is `%s'\n",
854 p->filename,
855 GNUNET_h2s (res));
790 if (0 != (pc->options & GNUNET_FS_PUBLISH_OPTION_SIMULATE_ONLY)) 856 if (0 != (pc->options & GNUNET_FS_PUBLISH_OPTION_SIMULATE_ONLY))
791 { 857 {
792 p->data.file.file_id = *res; 858 p->data.file.file_id = *res;
@@ -797,8 +863,12 @@ hash_for_index_cb (void *cls,
797 GNUNET_free (fn); 863 GNUNET_free (fn);
798 return; 864 return;
799 } 865 }
800 client = GNUNET_CLIENT_connect ("fs", pc->h->cfg); 866 pc->mq = GNUNET_CLIENT_connecT (pc->h->cfg,
801 if (NULL == client) 867 "fs",
868 handlers,
869 &index_mq_error_handler,
870 pc);
871 if (NULL == pc->mq)
802 { 872 {
803 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 873 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
804 _("Can not index file `%s': %s. Will try to insert instead.\n"), 874 _("Can not index file `%s': %s. Will try to insert instead.\n"),
@@ -815,10 +885,13 @@ hash_for_index_cb (void *cls,
815 p->data.file.have_hash = GNUNET_YES; 885 p->data.file.have_hash = GNUNET_YES;
816 GNUNET_FS_file_information_sync_ (p); 886 GNUNET_FS_file_information_sync_ (p);
817 } 887 }
818 ism = GNUNET_malloc (sizeof (struct IndexStartMessage) + slen); 888 env = GNUNET_MQ_msg_extra (ism,
819 ism->header.size = htons (sizeof (struct IndexStartMessage) + slen); 889 slen,
820 ism->header.type = htons (GNUNET_MESSAGE_TYPE_FS_INDEX_START); 890 GNUNET_MESSAGE_TYPE_FS_INDEX_START);
821 if (GNUNET_OK == GNUNET_DISK_file_get_identifiers (p->filename, &dev, &ino)) 891 if (GNUNET_OK ==
892 GNUNET_DISK_file_get_identifiers (p->filename,
893 &dev,
894 &ino))
822 { 895 {
823 ism->device = GNUNET_htonll (dev); 896 ism->device = GNUNET_htonll (dev);
824 ism->inode = GNUNET_htonll (ino); 897 ism->inode = GNUNET_htonll (ino);
@@ -826,19 +899,16 @@ hash_for_index_cb (void *cls,
826 else 899 else
827 { 900 {
828 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 901 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
829 _("Failed to get file identifiers for `%s'\n"), p->filename); 902 _("Failed to get file identifiers for `%s'\n"),
903 p->filename);
830 } 904 }
831 ism->file_id = *res; 905 ism->file_id = *res;
832 memcpy (&ism[1], fn, slen); 906 memcpy (&ism[1],
907 fn,
908 slen);
833 GNUNET_free (fn); 909 GNUNET_free (fn);
834 pc->client = client; 910 GNUNET_MQ_send (pc->mq,
835 GNUNET_break (GNUNET_YES == 911 env);
836 GNUNET_CLIENT_transmit_and_get_response (client, &ism->header,
837 GNUNET_TIME_UNIT_FOREVER_REL,
838 GNUNET_YES,
839 &process_index_start_response,
840 pc));
841 GNUNET_free (ism);
842} 912}
843 913
844 914
@@ -862,7 +932,8 @@ publish_kblocks (struct GNUNET_FS_PublishContext *pc)
862 p->chk_uri, 932 p->chk_uri,
863 &p->bo, 933 &p->bo,
864 pc->options, 934 pc->options,
865 &publish_kblocks_cont, pc); 935 &publish_kblocks_cont,
936 pc);
866 } 937 }
867 else 938 else
868 { 939 {
@@ -872,40 +943,24 @@ publish_kblocks (struct GNUNET_FS_PublishContext *pc)
872 943
873 944
874/** 945/**
875 * Process the response (or lack thereof) from 946 * Process the response from the "fs" service to our LOC sign request.
876 * the "fs" service to our LOC sign request.
877 * 947 *
878 * @param cls closure (of type `struct GNUNET_FS_PublishContext *`) 948 * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
879 * @param msg the response we got 949 * @param sig the response we got
880 */ 950 */
881static void 951static void
882process_signature_response (void *cls, 952handle_signature_response (void *cls,
883 const struct GNUNET_MessageHeader *msg) 953 const struct ResponseLocSignatureMessage *sig)
884{ 954{
885 struct GNUNET_FS_PublishContext *pc = cls; 955 struct GNUNET_FS_PublishContext *pc = cls;
886 const struct ResponseLocSignatureMessage *sig;
887 struct GNUNET_FS_FileInformation *p; 956 struct GNUNET_FS_FileInformation *p;
888 957
889 p = pc->fi_pos; 958 p = pc->fi_pos;
890 if (NULL == msg)
891 {
892 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
893 _("Can not create LOC URI. Will continue with CHK instead.\n"));
894 publish_kblocks (pc);
895 return;
896 }
897 if (sizeof (struct ResponseLocSignatureMessage) !=
898 ntohs (msg->size))
899 {
900 GNUNET_break (0);
901 publish_kblocks (pc);
902 return;
903 }
904 sig = (const struct ResponseLocSignatureMessage *) msg;
905 p->chk_uri->type = GNUNET_FS_URI_LOC; 959 p->chk_uri->type = GNUNET_FS_URI_LOC;
906 /* p->data.loc.fi kept from CHK before */ 960 /* p->data.loc.fi kept from CHK before */
907 p->chk_uri->data.loc.peer = sig->peer; 961 p->chk_uri->data.loc.peer = sig->peer;
908 p->chk_uri->data.loc.expirationTime = GNUNET_TIME_absolute_ntoh (sig->expiration_time); 962 p->chk_uri->data.loc.expirationTime
963 = GNUNET_TIME_absolute_ntoh (sig->expiration_time);
909 p->chk_uri->data.loc.contentSignature = sig->signature; 964 p->chk_uri->data.loc.contentSignature = sig->signature;
910 GNUNET_FS_file_information_sync_ (p); 965 GNUNET_FS_file_information_sync_ (p);
911 GNUNET_FS_publish_sync_ (pc); 966 GNUNET_FS_publish_sync_ (pc);
@@ -914,6 +969,31 @@ process_signature_response (void *cls,
914 969
915 970
916/** 971/**
972 * Generic error handler, called with the appropriate error code and
973 * the same closure specified at the creation of the message queue.
974 * Not every message queue implementation supports an error handler.
975 *
976 * @param cls closure with the `struct GNUNET_FS_PublishContext *`
977 * @param error error code
978 */
979static void
980loc_mq_error_handler (void *cls,
981 enum GNUNET_MQ_Error error)
982{
983 struct GNUNET_FS_PublishContext *pc = cls;
984
985 if (NULL != pc->mq)
986 {
987 GNUNET_MQ_destroy (pc->mq);
988 pc->mq = NULL;
989 }
990 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
991 _("Can not create LOC URI. Will continue with CHK instead.\n"));
992 publish_kblocks (pc);
993}
994
995
996/**
917 * We're publishing without anonymity. Contact the FS service 997 * We're publishing without anonymity. Contact the FS service
918 * to create a signed LOC URI for further processing, then 998 * to create a signed LOC URI for further processing, then
919 * continue with KSKs. 999 * continue with KSKs.
@@ -923,12 +1003,25 @@ process_signature_response (void *cls,
923static void 1003static void
924create_loc_uri (struct GNUNET_FS_PublishContext *pc) 1004create_loc_uri (struct GNUNET_FS_PublishContext *pc)
925{ 1005{
926 struct RequestLocSignatureMessage req; 1006 GNUNET_MQ_hd_fixed_size (signature_response,
1007 GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGNATURE,
1008 struct ResponseLocSignatureMessage);
1009 struct GNUNET_MQ_MessageHandler handlers[] = {
1010 make_signature_response_handler (pc),
1011 GNUNET_MQ_handler_end ()
1012 };
1013 struct GNUNET_MQ_Envelope *env;
1014 struct RequestLocSignatureMessage *req;
927 struct GNUNET_FS_FileInformation *p; 1015 struct GNUNET_FS_FileInformation *p;
928 1016
929 if (NULL == pc->client) 1017 if (NULL != pc->mq)
930 pc->client = GNUNET_CLIENT_connect ("fs", pc->h->cfg); 1018 GNUNET_MQ_destroy (pc->mq);
931 if (NULL == pc->client) 1019 pc->mq = GNUNET_CLIENT_connecT (pc->h->cfg,
1020 "fs",
1021 handlers,
1022 &loc_mq_error_handler,
1023 pc);
1024 if (NULL == pc->mq)
932 { 1025 {
933 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1026 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
934 _("Can not create LOC URI. Will continue with CHK instead.\n")); 1027 _("Can not create LOC URI. Will continue with CHK instead.\n"));
@@ -936,19 +1029,14 @@ create_loc_uri (struct GNUNET_FS_PublishContext *pc)
936 return; 1029 return;
937 } 1030 }
938 p = pc->fi_pos; 1031 p = pc->fi_pos;
939 req.header.size = htons (sizeof (struct RequestLocSignatureMessage)); 1032 env = GNUNET_MQ_msg (req,
940 req.header.type = htons (GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN); 1033 GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN);
941 req.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT); 1034 req->purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT);
942 req.expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time); 1035 req->expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time);
943 req.chk = p->chk_uri->data.chk.chk; 1036 req->chk = p->chk_uri->data.chk.chk;
944 req.file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length); 1037 req->file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length);
945 GNUNET_break (GNUNET_YES == 1038 GNUNET_MQ_send (pc->mq,
946 GNUNET_CLIENT_transmit_and_get_response (pc->client, 1039 env);
947 &req.header,
948 GNUNET_TIME_UNIT_FOREVER_REL,
949 GNUNET_YES,
950 &process_signature_response,
951 pc));
952} 1040}
953 1041
954 1042