diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-03 12:50:36 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-03 12:50:36 +0000 |
commit | 5dff30a84e1ac0c52f9bd8b671335b5100d37b0d (patch) | |
tree | 0aa2d10e352d9d523befdc331191084350f0091a | |
parent | 82d0757e1908c04f76dd69016fbb7d538318f003 (diff) | |
download | gnunet-5dff30a84e1ac0c52f9bd8b671335b5100d37b0d.tar.gz gnunet-5dff30a84e1ac0c52f9bd8b671335b5100d37b0d.zip |
convert search to MQ
-rw-r--r-- | src/fs/fs_api.c | 2 | ||||
-rw-r--r-- | src/fs/fs_api.h | 14 | ||||
-rw-r--r-- | src/fs/fs_search.c | 421 |
3 files changed, 214 insertions, 223 deletions
diff --git a/src/fs/fs_api.c b/src/fs/fs_api.c index 0bc07183e..7ebcd093e 100644 --- a/src/fs/fs_api.c +++ b/src/fs/fs_api.c | |||
@@ -2869,7 +2869,7 @@ signal_search_resume (struct GNUNET_FS_SearchContext *sc) | |||
2869 | pi.status = GNUNET_FS_STATUS_SEARCH_RESUME; | 2869 | pi.status = GNUNET_FS_STATUS_SEARCH_RESUME; |
2870 | pi.value.search.specifics.resume.message = sc->emsg; | 2870 | pi.value.search.specifics.resume.message = sc->emsg; |
2871 | pi.value.search.specifics.resume.is_paused = | 2871 | pi.value.search.specifics.resume.is_paused = |
2872 | (NULL == sc->client) ? GNUNET_YES : GNUNET_NO; | 2872 | (NULL == sc->mq) ? GNUNET_YES : GNUNET_NO; |
2873 | sc->client_info = GNUNET_FS_search_make_status_ (&pi, sc->h, sc); | 2873 | sc->client_info = GNUNET_FS_search_make_status_ (&pi, sc->h, sc); |
2874 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, | 2874 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, |
2875 | &signal_result_resume, sc); | 2875 | &signal_result_resume, sc); |
diff --git a/src/fs/fs_api.h b/src/fs/fs_api.h index 86219b3f8..126f5902e 100644 --- a/src/fs/fs_api.h +++ b/src/fs/fs_api.h | |||
@@ -1571,7 +1571,7 @@ struct GNUNET_FS_SearchContext | |||
1571 | /** | 1571 | /** |
1572 | * Connection to the FS service. | 1572 | * Connection to the FS service. |
1573 | */ | 1573 | */ |
1574 | struct GNUNET_CLIENT_Connection *client; | 1574 | struct GNUNET_MQ_Handle *mq; |
1575 | 1575 | ||
1576 | /** | 1576 | /** |
1577 | * Pointer we keep for the client. | 1577 | * Pointer we keep for the client. |
@@ -1621,18 +1621,6 @@ struct GNUNET_FS_SearchContext | |||
1621 | struct GNUNET_SCHEDULER_Task *task; | 1621 | struct GNUNET_SCHEDULER_Task *task; |
1622 | 1622 | ||
1623 | /** | 1623 | /** |
1624 | * How many of the entries in the search request | ||
1625 | * map have been passed to the service so far? | ||
1626 | */ | ||
1627 | unsigned int search_request_map_offset; | ||
1628 | |||
1629 | /** | ||
1630 | * How many of the keywords in the KSK | ||
1631 | * map have been passed to the service so far? | ||
1632 | */ | ||
1633 | unsigned int keyword_offset; | ||
1634 | |||
1635 | /** | ||
1636 | * Anonymity level for the search. | 1624 | * Anonymity level for the search. |
1637 | */ | 1625 | */ |
1638 | uint32_t anonymity; | 1626 | uint32_t anonymity; |
diff --git a/src/fs/fs_search.c b/src/fs/fs_search.c index 9a1b822e1..8a3652e3f 100644 --- a/src/fs/fs_search.c +++ b/src/fs/fs_search.c | |||
@@ -892,22 +892,46 @@ process_sblock (struct GNUNET_FS_SearchContext *sc, | |||
892 | 892 | ||
893 | 893 | ||
894 | /** | 894 | /** |
895 | * Process a search result. | 895 | * Shutdown any existing connection to the FS |
896 | * service and try to establish a fresh one | ||
897 | * (and then re-transmit our search request). | ||
896 | * | 898 | * |
897 | * @param sc our search context | 899 | * @param sc the search to reconnec |
898 | * @param type type of the result | ||
899 | * @param expiration when it will expire | ||
900 | * @param data the (encrypted) response | ||
901 | * @param size size of @a data | ||
902 | */ | 900 | */ |
903 | static void | 901 | static void |
904 | process_result (struct GNUNET_FS_SearchContext *sc, | 902 | try_reconnect (struct GNUNET_FS_SearchContext *sc); |
905 | enum GNUNET_BLOCK_Type type, | 903 | |
906 | struct GNUNET_TIME_Absolute expiration, | 904 | |
907 | const void *data, | 905 | /** |
908 | size_t size) | 906 | * We check a result message from the service. |
907 | * | ||
908 | * @param cls closure | ||
909 | * @param msg result message received | ||
910 | */ | ||
911 | static int | ||
912 | check_result (void *cls, | ||
913 | const struct ClientPutMessage *cm) | ||
909 | { | 914 | { |
910 | if (GNUNET_TIME_absolute_get_duration (expiration).rel_value_us > 0) | 915 | /* payload of any variable size is OK */ |
916 | return GNUNET_OK; | ||
917 | } | ||
918 | |||
919 | |||
920 | /** | ||
921 | * We process a search result from the service. | ||
922 | * | ||
923 | * @param cls closure | ||
924 | * @param msg result message received | ||
925 | */ | ||
926 | static void | ||
927 | handle_result (void *cls, | ||
928 | const struct ClientPutMessage *cm) | ||
929 | { | ||
930 | struct GNUNET_FS_SearchContext *sc = cls; | ||
931 | uint16_t msize = ntohs (cm->header.size) - sizeof (*cm); | ||
932 | enum GNUNET_BLOCK_Type type = ntohl (cm->type); | ||
933 | |||
934 | if (GNUNET_TIME_absolute_get_duration (GNUNET_TIME_absolute_ntoh (cm->expiration)).rel_value_us > 0) | ||
911 | { | 935 | { |
912 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 936 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
913 | "Result received has already expired.\n"); | 937 | "Result received has already expired.\n"); |
@@ -917,9 +941,13 @@ process_result (struct GNUNET_FS_SearchContext *sc, | |||
917 | { | 941 | { |
918 | case GNUNET_BLOCK_TYPE_FS_UBLOCK: | 942 | case GNUNET_BLOCK_TYPE_FS_UBLOCK: |
919 | if (GNUNET_FS_URI_SKS == sc->uri->type) | 943 | if (GNUNET_FS_URI_SKS == sc->uri->type) |
920 | process_sblock (sc, data, size); | 944 | process_sblock (sc, |
945 | (const struct UBlock *) &cm[1], | ||
946 | msize); | ||
921 | else | 947 | else |
922 | process_kblock (sc, data, size); | 948 | process_kblock (sc, |
949 | (const struct UBlock *) &cm[1], | ||
950 | msize); | ||
923 | break; | 951 | break; |
924 | case GNUNET_BLOCK_TYPE_ANY: | 952 | case GNUNET_BLOCK_TYPE_ANY: |
925 | GNUNET_break (0); | 953 | GNUNET_break (0); |
@@ -935,61 +963,14 @@ process_result (struct GNUNET_FS_SearchContext *sc, | |||
935 | break; | 963 | break; |
936 | default: | 964 | default: |
937 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 965 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
938 | _("Got result with unknown block type `%d', ignoring"), type); | 966 | _("Got result with unknown block type `%d', ignoring"), |
967 | type); | ||
939 | break; | 968 | break; |
940 | } | 969 | } |
941 | } | 970 | } |
942 | 971 | ||
943 | 972 | ||
944 | /** | 973 | /** |
945 | * Shutdown any existing connection to the FS | ||
946 | * service and try to establish a fresh one | ||
947 | * (and then re-transmit our search request). | ||
948 | * | ||
949 | * @param sc the search to reconnec | ||
950 | */ | ||
951 | static void | ||
952 | try_reconnect (struct GNUNET_FS_SearchContext *sc); | ||
953 | |||
954 | |||
955 | /** | ||
956 | * Type of a function to call when we receive a message | ||
957 | * from the service. | ||
958 | * | ||
959 | * @param cls closure | ||
960 | * @param msg message received, NULL on timeout or fatal error | ||
961 | */ | ||
962 | static void | ||
963 | receive_results (void *cls, | ||
964 | const struct GNUNET_MessageHeader *msg) | ||
965 | { | ||
966 | struct GNUNET_FS_SearchContext *sc = cls; | ||
967 | const struct ClientPutMessage *cm; | ||
968 | uint16_t msize; | ||
969 | |||
970 | if ((NULL == msg) || (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_PUT) || | ||
971 | (ntohs (msg->size) <= sizeof (struct ClientPutMessage))) | ||
972 | { | ||
973 | try_reconnect (sc); | ||
974 | return; | ||
975 | } | ||
976 | msize = ntohs (msg->size); | ||
977 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
978 | "Receiving %u bytes of result from fs service\n", | ||
979 | msize); | ||
980 | cm = (const struct ClientPutMessage *) msg; | ||
981 | process_result (sc, ntohl (cm->type), | ||
982 | GNUNET_TIME_absolute_ntoh (cm->expiration), &cm[1], | ||
983 | msize - sizeof (struct ClientPutMessage)); | ||
984 | /* continue receiving */ | ||
985 | GNUNET_CLIENT_receive (sc->client, | ||
986 | &receive_results, | ||
987 | sc, | ||
988 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
989 | } | ||
990 | |||
991 | |||
992 | /** | ||
993 | * Schedule the transmission of the (next) search request | 974 | * Schedule the transmission of the (next) search request |
994 | * to the service. | 975 | * to the service. |
995 | * | 976 | * |
@@ -1058,7 +1039,6 @@ build_result_set (void *cls, | |||
1058 | } | 1039 | } |
1059 | if (0 == mbc->put_cnt) | 1040 | if (0 == mbc->put_cnt) |
1060 | return GNUNET_SYSERR; | 1041 | return GNUNET_SYSERR; |
1061 | mbc->sc->search_request_map_offset++; | ||
1062 | mbc->xoff[--mbc->put_cnt] = *key; | 1042 | mbc->xoff[--mbc->put_cnt] = *key; |
1063 | 1043 | ||
1064 | return GNUNET_OK; | 1044 | return GNUNET_OK; |
@@ -1091,155 +1071,157 @@ find_result_set (void *cls, | |||
1091 | 1071 | ||
1092 | 1072 | ||
1093 | /** | 1073 | /** |
1094 | * We're ready to transmit the search request to the file-sharing | 1074 | * Schedule the transmission of the (next) search request |
1095 | * service. Do it. If the request is too large to fit into a single | 1075 | * to the service. |
1096 | * message, transmit in increments. | ||
1097 | * | 1076 | * |
1098 | * @param cls closure | 1077 | * @param sc context for the search |
1099 | * @param size number of bytes available in @a buf | ||
1100 | * @param buf where the callee should write the message | ||
1101 | * @return number of bytes written to @a buf | ||
1102 | */ | 1078 | */ |
1103 | static size_t | 1079 | static void |
1104 | transmit_search_request (void *cls, | 1080 | schedule_transmit_search_request (struct GNUNET_FS_SearchContext *sc) |
1105 | size_t size, | ||
1106 | void *buf) | ||
1107 | { | 1081 | { |
1108 | struct GNUNET_FS_SearchContext *sc = cls; | ||
1109 | struct MessageBuilderContext mbc; | 1082 | struct MessageBuilderContext mbc; |
1110 | size_t msize; | 1083 | struct GNUNET_MQ_Envelope *env; |
1111 | struct SearchMessage *sm; | 1084 | struct SearchMessage *sm; |
1112 | struct GNUNET_CRYPTO_EcdsaPublicKey dpub; | 1085 | struct GNUNET_CRYPTO_EcdsaPublicKey dpub; |
1113 | unsigned int total_seen_results; /* total number of result hashes to send */ | 1086 | unsigned int total_seen_results; /* total number of result hashes to send */ |
1114 | unsigned int message_size_limit; | ||
1115 | uint32_t options; | 1087 | uint32_t options; |
1088 | unsigned int left; | ||
1089 | unsigned int todo; | ||
1090 | unsigned int fit; | ||
1091 | int first_call; | ||
1092 | unsigned int search_request_map_offset; | ||
1093 | unsigned int keyword_offset; | ||
1116 | 1094 | ||
1117 | if (NULL == buf) | 1095 | memset (&mbc, 0, sizeof (mbc)); |
1118 | { | ||
1119 | try_reconnect (sc); | ||
1120 | return 0; | ||
1121 | } | ||
1122 | mbc.sc = sc; | 1096 | mbc.sc = sc; |
1123 | mbc.skip_cnt = sc->search_request_map_offset; | ||
1124 | sm = buf; | ||
1125 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_START_SEARCH); | ||
1126 | mbc.xoff = (struct GNUNET_HashCode *) &sm[1]; | ||
1127 | options = SEARCH_MESSAGE_OPTION_NONE; | ||
1128 | if (0 != (sc->options & GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY)) | ||
1129 | options |= SEARCH_MESSAGE_OPTION_LOOPBACK_ONLY; | ||
1130 | if (GNUNET_FS_uri_test_ksk (sc->uri)) | 1097 | if (GNUNET_FS_uri_test_ksk (sc->uri)) |
1131 | { | 1098 | { |
1132 | msize = sizeof (struct SearchMessage); | ||
1133 | GNUNET_assert (size >= msize); | ||
1134 | mbc.keyword_offset = sc->keyword_offset; | ||
1135 | /* calculate total number of known results (in put_cnt => total_seen_results) */ | ||
1136 | mbc.put_cnt = 0; | 1099 | mbc.put_cnt = 0; |
1137 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, | 1100 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, |
1138 | &find_result_set, &mbc); | 1101 | &find_result_set, |
1102 | &mbc); | ||
1139 | total_seen_results = mbc.put_cnt; | 1103 | total_seen_results = mbc.put_cnt; |
1140 | /* calculate how many results we can send in this message */ | ||
1141 | message_size_limit = (size - msize) / sizeof (struct GNUNET_HashCode); | ||
1142 | mbc.put_cnt = GNUNET_MIN (message_size_limit, | ||
1143 | total_seen_results - mbc.skip_cnt); | ||
1144 | if (sc->search_request_map_offset < total_seen_results) | ||
1145 | GNUNET_assert (mbc.put_cnt > 0); | ||
1146 | |||
1147 | /* now build message */ | ||
1148 | msize += sizeof (struct GNUNET_HashCode) * mbc.put_cnt; | ||
1149 | sm->header.size = htons (msize); | ||
1150 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_UBLOCK); | ||
1151 | sm->anonymity_level = htonl (sc->anonymity); | ||
1152 | memset (&sm->target, 0, sizeof (struct GNUNET_PeerIdentity)); | ||
1153 | sm->query = sc->requests[sc->keyword_offset].uquery; | ||
1154 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, | ||
1155 | &build_result_set, &mbc); | ||
1156 | GNUNET_assert (0 == mbc.put_cnt); | ||
1157 | GNUNET_assert (total_seen_results >= sc->search_request_map_offset); | ||
1158 | if (total_seen_results != sc->search_request_map_offset) | ||
1159 | { | ||
1160 | /* more requesting to be done... */ | ||
1161 | sm->options = htonl (options | SEARCH_MESSAGE_OPTION_CONTINUED); | ||
1162 | schedule_transmit_search_request (sc); | ||
1163 | return msize; | ||
1164 | } | ||
1165 | sm->options = htonl (options); | ||
1166 | sc->keyword_offset++; | ||
1167 | sc->search_request_map_offset = 0; | ||
1168 | if (sc->uri->data.ksk.keywordCount != sc->keyword_offset) | ||
1169 | { | ||
1170 | /* more requesting to be done... */ | ||
1171 | schedule_transmit_search_request (sc); | ||
1172 | return msize; | ||
1173 | } | ||
1174 | } | 1104 | } |
1175 | else | 1105 | else |
1176 | { | 1106 | { |
1177 | GNUNET_assert (GNUNET_FS_uri_test_sks (sc->uri)); | 1107 | total_seen_results |
1178 | msize = sizeof (struct SearchMessage); | 1108 | = GNUNET_CONTAINER_multihashmap_size (sc->master_result_map); |
1179 | GNUNET_assert (size >= msize); | 1109 | } |
1180 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_UBLOCK); | 1110 | search_request_map_offset = 0; |
1181 | sm->anonymity_level = htonl (sc->anonymity); | 1111 | keyword_offset = 0; |
1182 | memset (&sm->target, 0, sizeof (struct GNUNET_PeerIdentity)); | 1112 | |
1183 | GNUNET_CRYPTO_ecdsa_public_key_derive (&sc->uri->data.sks.ns, | 1113 | first_call = GNUNET_YES; |
1184 | sc->uri->data.sks.identifier, | 1114 | while ( (0 != (left = |
1185 | "fs-ublock", | 1115 | (total_seen_results - search_request_map_offset))) || |
1186 | &dpub); | 1116 | (GNUNET_YES == first_call) ) |
1187 | GNUNET_CRYPTO_hash (&dpub, | 1117 | { |
1188 | sizeof (dpub), | 1118 | first_call = GNUNET_NO; |
1189 | &sm->query); | 1119 | options = SEARCH_MESSAGE_OPTION_NONE; |
1190 | message_size_limit = (size - msize) / sizeof (struct GNUNET_HashCode); | 1120 | if (0 != (sc->options & GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY)) |
1191 | total_seen_results = GNUNET_CONTAINER_multihashmap_size (sc->master_result_map); | 1121 | options |= SEARCH_MESSAGE_OPTION_LOOPBACK_ONLY; |
1192 | mbc.put_cnt = GNUNET_MIN (message_size_limit, | 1122 | |
1193 | total_seen_results - mbc.skip_cnt); | 1123 | fit = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - sizeof (*sm)) / sizeof (struct GNUNET_HashCode); |
1194 | mbc.keyword_offset = 0; | 1124 | todo = GNUNET_MIN (fit, |
1195 | if (sc->search_request_map_offset < total_seen_results) | 1125 | left); |
1196 | GNUNET_assert (mbc.put_cnt > 0); | 1126 | env = GNUNET_MQ_msg_extra (sm, |
1197 | msize += sizeof (struct GNUNET_HashCode) * mbc.put_cnt; | 1127 | sizeof (struct GNUNET_HashCode) * todo, |
1198 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, | 1128 | GNUNET_MESSAGE_TYPE_FS_START_SEARCH); |
1199 | &build_result_set, &mbc); | 1129 | mbc.skip_cnt = search_request_map_offset; |
1200 | sm->header.size = htons (msize); | 1130 | mbc.xoff = (struct GNUNET_HashCode *) &sm[1]; |
1201 | GNUNET_assert (total_seen_results >= sc->search_request_map_offset); | 1131 | |
1202 | if (total_seen_results != sc->search_request_map_offset) | 1132 | if (GNUNET_FS_uri_test_ksk (sc->uri)) |
1133 | { | ||
1134 | mbc.keyword_offset = keyword_offset; | ||
1135 | /* calculate how many results we can send in this message */ | ||
1136 | mbc.put_cnt = todo; | ||
1137 | /* now build message */ | ||
1138 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_UBLOCK); | ||
1139 | sm->anonymity_level = htonl (sc->anonymity); | ||
1140 | memset (&sm->target, | ||
1141 | 0, | ||
1142 | sizeof (struct GNUNET_PeerIdentity)); | ||
1143 | sm->query = sc->requests[keyword_offset].uquery; | ||
1144 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, | ||
1145 | &build_result_set, | ||
1146 | &mbc); | ||
1147 | search_request_map_offset += todo; | ||
1148 | GNUNET_assert (0 == mbc.put_cnt); | ||
1149 | GNUNET_assert (total_seen_results >= search_request_map_offset); | ||
1150 | if (total_seen_results != search_request_map_offset) | ||
1151 | { | ||
1152 | /* more requesting to be done... */ | ||
1153 | sm->options = htonl (options | SEARCH_MESSAGE_OPTION_CONTINUED); | ||
1154 | } | ||
1155 | else | ||
1156 | { | ||
1157 | sm->options = htonl (options); | ||
1158 | keyword_offset++; | ||
1159 | search_request_map_offset = 0; | ||
1160 | if (sc->uri->data.ksk.keywordCount != keyword_offset) | ||
1161 | { | ||
1162 | /* more keywords => more requesting to be done... */ | ||
1163 | first_call = GNUNET_YES; | ||
1164 | } | ||
1165 | } | ||
1166 | } | ||
1167 | else | ||
1203 | { | 1168 | { |
1204 | /* more requesting to be done... */ | 1169 | GNUNET_assert (GNUNET_FS_uri_test_sks (sc->uri)); |
1205 | sm->options = htonl (options | SEARCH_MESSAGE_OPTION_CONTINUED); | 1170 | |
1206 | schedule_transmit_search_request (sc); | 1171 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_UBLOCK); |
1207 | return msize; | 1172 | sm->anonymity_level = htonl (sc->anonymity); |
1173 | memset (&sm->target, | ||
1174 | 0, | ||
1175 | sizeof (struct GNUNET_PeerIdentity)); | ||
1176 | GNUNET_CRYPTO_ecdsa_public_key_derive (&sc->uri->data.sks.ns, | ||
1177 | sc->uri->data.sks.identifier, | ||
1178 | "fs-ublock", | ||
1179 | &dpub); | ||
1180 | GNUNET_CRYPTO_hash (&dpub, | ||
1181 | sizeof (dpub), | ||
1182 | &sm->query); | ||
1183 | mbc.put_cnt = todo; | ||
1184 | mbc.keyword_offset = 0; | ||
1185 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, | ||
1186 | &build_result_set, | ||
1187 | &mbc); | ||
1188 | GNUNET_assert (total_seen_results >= search_request_map_offset); | ||
1189 | if (total_seen_results != search_request_map_offset) | ||
1190 | { | ||
1191 | /* more requesting to be done... */ | ||
1192 | sm->options = htonl (options | SEARCH_MESSAGE_OPTION_CONTINUED); | ||
1193 | } | ||
1194 | else | ||
1195 | { | ||
1196 | sm->options = htonl (options); | ||
1197 | } | ||
1208 | } | 1198 | } |
1209 | sm->options = htonl (options); | 1199 | GNUNET_MQ_send (sc->mq, |
1200 | env); | ||
1210 | } | 1201 | } |
1211 | GNUNET_CLIENT_receive (sc->client, | ||
1212 | &receive_results, sc, | ||
1213 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1214 | return msize; | ||
1215 | } | 1202 | } |
1216 | 1203 | ||
1217 | 1204 | ||
1218 | /** | 1205 | /** |
1219 | * Schedule the transmission of the (next) search request | 1206 | * Generic error handler, called with the appropriate error code and |
1220 | * to the service. | 1207 | * the same closure specified at the creation of the message queue. |
1208 | * Not every message queue implementation supports an error handler. | ||
1221 | * | 1209 | * |
1222 | * @param sc context for the search | 1210 | * @param cls closure with the `struct GNUNET_FS_SearchContext *` |
1211 | * @param error error code | ||
1223 | */ | 1212 | */ |
1224 | static void | 1213 | static void |
1225 | schedule_transmit_search_request (struct GNUNET_FS_SearchContext *sc) | 1214 | search_mq_error_handler (void *cls, |
1215 | enum GNUNET_MQ_Error error) | ||
1226 | { | 1216 | { |
1227 | size_t size; | 1217 | struct GNUNET_FS_SearchContext *sc = cls; |
1228 | unsigned int left; | 1218 | |
1229 | unsigned int fit; | 1219 | if (NULL != sc->mq) |
1230 | unsigned int request; | 1220 | { |
1231 | 1221 | GNUNET_MQ_destroy (sc->mq); | |
1232 | size = sizeof (struct SearchMessage); | 1222 | sc->mq = NULL; |
1233 | left = | 1223 | } |
1234 | GNUNET_CONTAINER_multihashmap_size (sc->master_result_map) - | 1224 | try_reconnect (sc); |
1235 | sc->search_request_map_offset; | ||
1236 | fit = (GNUNET_SERVER_MAX_MESSAGE_SIZE - 1 - size) / sizeof (struct GNUNET_HashCode); | ||
1237 | request = GNUNET_MIN (fit, left); | ||
1238 | size += sizeof (struct GNUNET_HashCode) * request; | ||
1239 | GNUNET_CLIENT_notify_transmit_ready (sc->client, size, | ||
1240 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
1241 | GNUNET_NO, | ||
1242 | &transmit_search_request, sc); | ||
1243 | } | 1225 | } |
1244 | 1226 | ||
1245 | 1227 | ||
@@ -1252,19 +1234,26 @@ schedule_transmit_search_request (struct GNUNET_FS_SearchContext *sc) | |||
1252 | static void | 1234 | static void |
1253 | do_reconnect (void *cls) | 1235 | do_reconnect (void *cls) |
1254 | { | 1236 | { |
1237 | GNUNET_MQ_hd_var_size (result, | ||
1238 | GNUNET_MESSAGE_TYPE_FS_PUT, | ||
1239 | struct ClientPutMessage); | ||
1255 | struct GNUNET_FS_SearchContext *sc = cls; | 1240 | struct GNUNET_FS_SearchContext *sc = cls; |
1256 | struct GNUNET_CLIENT_Connection *client; | 1241 | struct GNUNET_MQ_MessageHandler handlers[] = { |
1242 | make_result_handler (sc), | ||
1243 | GNUNET_MQ_handler_end () | ||
1244 | }; | ||
1257 | 1245 | ||
1258 | sc->task = NULL; | 1246 | sc->task = NULL; |
1259 | client = GNUNET_CLIENT_connect ("fs", sc->h->cfg); | 1247 | sc->mq = GNUNET_CLIENT_connecT (sc->h->cfg, |
1260 | if (NULL == client) | 1248 | "fs", |
1249 | handlers, | ||
1250 | &search_mq_error_handler, | ||
1251 | sc); | ||
1252 | if (NULL == sc->mq) | ||
1261 | { | 1253 | { |
1262 | try_reconnect (sc); | 1254 | try_reconnect (sc); |
1263 | return; | 1255 | return; |
1264 | } | 1256 | } |
1265 | sc->client = client; | ||
1266 | sc->search_request_map_offset = 0; | ||
1267 | sc->keyword_offset = 0; | ||
1268 | schedule_transmit_search_request (sc); | 1257 | schedule_transmit_search_request (sc); |
1269 | } | 1258 | } |
1270 | 1259 | ||
@@ -1279,10 +1268,10 @@ do_reconnect (void *cls) | |||
1279 | static void | 1268 | static void |
1280 | try_reconnect (struct GNUNET_FS_SearchContext *sc) | 1269 | try_reconnect (struct GNUNET_FS_SearchContext *sc) |
1281 | { | 1270 | { |
1282 | if (NULL != sc->client) | 1271 | if (NULL != sc->mq) |
1283 | { | 1272 | { |
1284 | GNUNET_CLIENT_disconnect (sc->client); | 1273 | GNUNET_MQ_destroy (sc->mq); |
1285 | sc->client = NULL; | 1274 | sc->mq = NULL; |
1286 | } | 1275 | } |
1287 | sc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (sc->reconnect_backoff); | 1276 | sc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (sc->reconnect_backoff); |
1288 | sc->task = | 1277 | sc->task = |
@@ -1388,7 +1377,7 @@ GNUNET_FS_search_start_searching_ (struct GNUNET_FS_SearchContext *sc) | |||
1388 | struct GNUNET_CRYPTO_EcdsaPublicKey anon_pub; | 1377 | struct GNUNET_CRYPTO_EcdsaPublicKey anon_pub; |
1389 | struct SearchRequestEntry *sre; | 1378 | struct SearchRequestEntry *sre; |
1390 | 1379 | ||
1391 | GNUNET_assert (NULL == sc->client); | 1380 | GNUNET_assert (NULL == sc->mq); |
1392 | if (GNUNET_FS_uri_test_ksk (sc->uri)) | 1381 | if (GNUNET_FS_uri_test_ksk (sc->uri)) |
1393 | { | 1382 | { |
1394 | GNUNET_assert (0 != sc->uri->data.ksk.keywordCount); | 1383 | GNUNET_assert (0 != sc->uri->data.ksk.keywordCount); |
@@ -1418,11 +1407,14 @@ GNUNET_FS_search_start_searching_ (struct GNUNET_FS_SearchContext *sc) | |||
1418 | &update_sre_result_maps, | 1407 | &update_sre_result_maps, |
1419 | sc); | 1408 | sc); |
1420 | } | 1409 | } |
1421 | sc->client = GNUNET_CLIENT_connect ("fs", sc->h->cfg); | 1410 | GNUNET_assert (NULL == sc->task); |
1422 | if (NULL == sc->client) | 1411 | do_reconnect (sc); |
1412 | if (NULL == sc->mq) | ||
1413 | { | ||
1414 | GNUNET_SCHEDULER_cancel (sc->task); | ||
1415 | sc->task = NULL; | ||
1423 | return GNUNET_SYSERR; | 1416 | return GNUNET_SYSERR; |
1424 | sc->search_request_map_offset = 0; | 1417 | } |
1425 | schedule_transmit_search_request (sc); | ||
1426 | return GNUNET_OK; | 1418 | return GNUNET_OK; |
1427 | } | 1419 | } |
1428 | 1420 | ||
@@ -1552,10 +1544,10 @@ GNUNET_FS_search_signal_suspend_ (void *cls) | |||
1552 | GNUNET_SCHEDULER_cancel (sc->task); | 1544 | GNUNET_SCHEDULER_cancel (sc->task); |
1553 | sc->task = NULL; | 1545 | sc->task = NULL; |
1554 | } | 1546 | } |
1555 | if (NULL != sc->client) | 1547 | if (NULL != sc->mq) |
1556 | { | 1548 | { |
1557 | GNUNET_CLIENT_disconnect (sc->client); | 1549 | GNUNET_MQ_destroy (sc->mq); |
1558 | sc->client = NULL; | 1550 | sc->mq = NULL; |
1559 | } | 1551 | } |
1560 | GNUNET_CONTAINER_multihashmap_destroy (sc->master_result_map); | 1552 | GNUNET_CONTAINER_multihashmap_destroy (sc->master_result_map); |
1561 | if (NULL != sc->requests) | 1553 | if (NULL != sc->requests) |
@@ -1616,14 +1608,19 @@ GNUNET_FS_search_pause (struct GNUNET_FS_SearchContext *sc) | |||
1616 | GNUNET_SCHEDULER_cancel (sc->task); | 1608 | GNUNET_SCHEDULER_cancel (sc->task); |
1617 | sc->task = NULL; | 1609 | sc->task = NULL; |
1618 | } | 1610 | } |
1619 | if (NULL != sc->client) | 1611 | if (NULL != sc->mq) |
1620 | GNUNET_CLIENT_disconnect (sc->client); | 1612 | { |
1621 | sc->client = NULL; | 1613 | GNUNET_MQ_destroy (sc->mq); |
1614 | sc->mq = NULL; | ||
1615 | } | ||
1622 | GNUNET_FS_search_sync_ (sc); | 1616 | GNUNET_FS_search_sync_ (sc); |
1623 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, | 1617 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, |
1624 | &search_result_freeze_probes, sc); | 1618 | &search_result_freeze_probes, |
1619 | sc); | ||
1625 | pi.status = GNUNET_FS_STATUS_SEARCH_PAUSED; | 1620 | pi.status = GNUNET_FS_STATUS_SEARCH_PAUSED; |
1626 | sc->client_info = GNUNET_FS_search_make_status_ (&pi, sc->h, sc); | 1621 | sc->client_info = GNUNET_FS_search_make_status_ (&pi, |
1622 | sc->h, | ||
1623 | sc); | ||
1627 | } | 1624 | } |
1628 | 1625 | ||
1629 | 1626 | ||
@@ -1637,7 +1634,7 @@ GNUNET_FS_search_continue (struct GNUNET_FS_SearchContext *sc) | |||
1637 | { | 1634 | { |
1638 | struct GNUNET_FS_ProgressInfo pi; | 1635 | struct GNUNET_FS_ProgressInfo pi; |
1639 | 1636 | ||
1640 | GNUNET_assert (NULL == sc->client); | 1637 | GNUNET_assert (NULL == sc->mq); |
1641 | GNUNET_assert (NULL == sc->task); | 1638 | GNUNET_assert (NULL == sc->task); |
1642 | do_reconnect (sc); | 1639 | do_reconnect (sc); |
1643 | GNUNET_FS_search_sync_ (sc); | 1640 | GNUNET_FS_search_sync_ (sc); |
@@ -1769,9 +1766,15 @@ GNUNET_FS_search_stop (struct GNUNET_FS_SearchContext *sc) | |||
1769 | sc->client_info = GNUNET_FS_search_make_status_ (&pi, sc->h, sc); | 1766 | sc->client_info = GNUNET_FS_search_make_status_ (&pi, sc->h, sc); |
1770 | GNUNET_break (NULL == sc->client_info); | 1767 | GNUNET_break (NULL == sc->client_info); |
1771 | if (NULL != sc->task) | 1768 | if (NULL != sc->task) |
1769 | { | ||
1772 | GNUNET_SCHEDULER_cancel (sc->task); | 1770 | GNUNET_SCHEDULER_cancel (sc->task); |
1773 | if (NULL != sc->client) | 1771 | sc->task = NULL; |
1774 | GNUNET_CLIENT_disconnect (sc->client); | 1772 | } |
1773 | if (NULL != sc->mq) | ||
1774 | { | ||
1775 | GNUNET_MQ_destroy (sc->mq); | ||
1776 | sc->mq = NULL; | ||
1777 | } | ||
1775 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, | 1778 | GNUNET_CONTAINER_multihashmap_iterate (sc->master_result_map, |
1776 | &search_result_free, sc); | 1779 | &search_result_free, sc); |
1777 | GNUNET_CONTAINER_multihashmap_destroy (sc->master_result_map); | 1780 | GNUNET_CONTAINER_multihashmap_destroy (sc->master_result_map); |