diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-03 12:05:57 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-03 12:05:57 +0000 |
commit | 82d0757e1908c04f76dd69016fbb7d538318f003 (patch) | |
tree | 243464194f9e2148adc905f811d46831dea55001 /src/fs/fs_download.c | |
parent | 8b46e74546fb643a5d272bb1edd8c909a4ee978d (diff) | |
download | gnunet-82d0757e1908c04f76dd69016fbb7d538318f003.tar.gz gnunet-82d0757e1908c04f76dd69016fbb7d538318f003.zip |
convert download API to MQ
Diffstat (limited to 'src/fs/fs_download.c')
-rw-r--r-- | src/fs/fs_download.c | 474 |
1 files changed, 191 insertions, 283 deletions
diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c index d89d70719..98c76882a 100644 --- a/src/fs/fs_download.c +++ b/src/fs/fs_download.c | |||
@@ -121,7 +121,7 @@ GNUNET_FS_download_make_status_ (struct GNUNET_FS_ProgressInfo *pi, | |||
121 | pi->value.download.anonymity = dc->anonymity; | 121 | pi->value.download.anonymity = dc->anonymity; |
122 | pi->value.download.eta = | 122 | pi->value.download.eta = |
123 | GNUNET_TIME_calculate_eta (dc->start_time, dc->completed, dc->length); | 123 | GNUNET_TIME_calculate_eta (dc->start_time, dc->completed, dc->length); |
124 | pi->value.download.is_active = (NULL == dc->client) ? GNUNET_NO : GNUNET_YES; | 124 | pi->value.download.is_active = (NULL == dc->mq) ? GNUNET_NO : GNUNET_YES; |
125 | pi->fsh = dc->h; | 125 | pi->fsh = dc->h; |
126 | if (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE)) | 126 | if (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE)) |
127 | dc->client_info = dc->h->upcb (dc->h->upcb_cls, pi); | 127 | dc->client_info = dc->h->upcb (dc->h->upcb_cls, pi); |
@@ -131,21 +131,6 @@ GNUNET_FS_download_make_status_ (struct GNUNET_FS_ProgressInfo *pi, | |||
131 | 131 | ||
132 | 132 | ||
133 | /** | 133 | /** |
134 | * We're ready to transmit a search request to the | ||
135 | * file-sharing service. Do it. If there is | ||
136 | * more than one request pending, try to send | ||
137 | * multiple or request another transmission. | ||
138 | * | ||
139 | * @param cls closure | ||
140 | * @param size number of bytes available in buf | ||
141 | * @param buf where the callee should write the message | ||
142 | * @return number of bytes written to buf | ||
143 | */ | ||
144 | static size_t | ||
145 | transmit_download_request (void *cls, size_t size, void *buf); | ||
146 | |||
147 | |||
148 | /** | ||
149 | * Closure for iterator processing results. | 134 | * Closure for iterator processing results. |
150 | */ | 135 | */ |
151 | struct ProcessResultClosure | 136 | struct ProcessResultClosure |
@@ -206,10 +191,11 @@ struct ProcessResultClosure | |||
206 | * @param cls closure (our 'struct ProcessResultClosure') | 191 | * @param cls closure (our 'struct ProcessResultClosure') |
207 | * @param key query for the given value / request | 192 | * @param key query for the given value / request |
208 | * @param value value in the hash map (a 'struct DownloadRequest') | 193 | * @param value value in the hash map (a 'struct DownloadRequest') |
209 | * @return GNUNET_YES (we should continue to iterate); unless serious error | 194 | * @return #GNUNET_YES (we should continue to iterate); unless serious error |
210 | */ | 195 | */ |
211 | static int | 196 | static int |
212 | process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | 197 | process_result_with_request (void *cls, |
198 | const struct GNUNET_HashCode * key, | ||
213 | void *value); | 199 | void *value); |
214 | 200 | ||
215 | 201 | ||
@@ -722,6 +708,43 @@ try_top_down_reconstruction (struct GNUNET_FS_DownloadContext *dc, | |||
722 | 708 | ||
723 | 709 | ||
724 | /** | 710 | /** |
711 | * Add entries to the message queue. | ||
712 | * | ||
713 | * @param cls our download context | ||
714 | * @param key unused | ||
715 | * @param entry entry of type `struct DownloadRequest` | ||
716 | * @return #GNUNET_OK | ||
717 | */ | ||
718 | static int | ||
719 | retry_entry (void *cls, | ||
720 | const struct GNUNET_HashCode *key, | ||
721 | void *entry) | ||
722 | { | ||
723 | struct GNUNET_FS_DownloadContext *dc = cls; | ||
724 | struct DownloadRequest *dr = entry; | ||
725 | struct SearchMessage *sm; | ||
726 | struct GNUNET_MQ_Envelope *env; | ||
727 | |||
728 | env = GNUNET_MQ_msg (sm, | ||
729 | GNUNET_MESSAGE_TYPE_FS_START_SEARCH); | ||
730 | if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY)) | ||
731 | sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY); | ||
732 | else | ||
733 | sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE); | ||
734 | if (0 == dr->depth) | ||
735 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK); | ||
736 | else | ||
737 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK); | ||
738 | sm->anonymity_level = htonl (dc->anonymity); | ||
739 | sm->target = dc->target; | ||
740 | sm->query = dr->chk.query; | ||
741 | GNUNET_MQ_send (dc->mq, | ||
742 | env); | ||
743 | return GNUNET_OK; | ||
744 | } | ||
745 | |||
746 | |||
747 | /** | ||
725 | * Schedule the download of the specified block in the tree. | 748 | * Schedule the download of the specified block in the tree. |
726 | * | 749 | * |
727 | * @param dc overall download this block belongs to | 750 | * @param dc overall download this block belongs to |
@@ -763,25 +786,23 @@ schedule_block_download (struct GNUNET_FS_DownloadContext *dc, | |||
763 | } | 786 | } |
764 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 787 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
765 | "Scheduling download at offset %llu and depth %u for `%s'\n", | 788 | "Scheduling download at offset %llu and depth %u for `%s'\n", |
766 | (unsigned long long) dr->offset, dr->depth, | 789 | (unsigned long long) dr->offset, |
790 | dr->depth, | ||
767 | GNUNET_h2s (&dr->chk.query)); | 791 | GNUNET_h2s (&dr->chk.query)); |
768 | if (GNUNET_NO != | 792 | if (GNUNET_NO != |
769 | GNUNET_CONTAINER_multihashmap_contains_value (dc->active, &dr->chk.query, | 793 | GNUNET_CONTAINER_multihashmap_contains_value (dc->active, |
794 | &dr->chk.query, | ||
770 | dr)) | 795 | dr)) |
771 | return; /* already active */ | 796 | return; /* already active */ |
772 | GNUNET_CONTAINER_multihashmap_put (dc->active, &dr->chk.query, dr, | 797 | GNUNET_CONTAINER_multihashmap_put (dc->active, |
798 | &dr->chk.query, | ||
799 | dr, | ||
773 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 800 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
774 | if (NULL == dc->client) | 801 | if (NULL == dc->mq) |
775 | return; /* download not active */ | 802 | return; /* download not active */ |
776 | GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr); | 803 | retry_entry (dc, |
777 | dr->is_pending = GNUNET_YES; | 804 | &dr->chk.query, |
778 | if (NULL == dc->th) | 805 | dr); |
779 | dc->th = | ||
780 | GNUNET_CLIENT_notify_transmit_ready (dc->client, | ||
781 | sizeof (struct SearchMessage), | ||
782 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
783 | GNUNET_NO, | ||
784 | &transmit_download_request, dc); | ||
785 | } | 806 | } |
786 | 807 | ||
787 | 808 | ||
@@ -947,13 +968,14 @@ GNUNET_FS_free_download_request_ (struct DownloadRequest *dr) | |||
947 | * Iterator over entries in the pending requests in the 'active' map for the | 968 | * Iterator over entries in the pending requests in the 'active' map for the |
948 | * reply that we just got. | 969 | * reply that we just got. |
949 | * | 970 | * |
950 | * @param cls closure (our 'struct ProcessResultClosure') | 971 | * @param cls closure (our `struct ProcessResultClosure`) |
951 | * @param key query for the given value / request | 972 | * @param key query for the given value / request |
952 | * @param value value in the hash map (a 'struct DownloadRequest') | 973 | * @param value value in the hash map (a `struct DownloadRequest`) |
953 | * @return #GNUNET_YES (we should continue to iterate); unless serious error | 974 | * @return #GNUNET_YES (we should continue to iterate); unless serious error |
954 | */ | 975 | */ |
955 | static int | 976 | static int |
956 | process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | 977 | process_result_with_request (void *cls, |
978 | const struct GNUNET_HashCode *key, | ||
957 | void *value) | 979 | void *value) |
958 | { | 980 | { |
959 | struct ProcessResultClosure *prc = cls; | 981 | struct ProcessResultClosure *prc = cls; |
@@ -974,7 +996,9 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | |||
974 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 996 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
975 | "Received %u byte block `%s' matching pending request at depth %u and offset %llu/%llu\n", | 997 | "Received %u byte block `%s' matching pending request at depth %u and offset %llu/%llu\n", |
976 | (unsigned int) prc->size, | 998 | (unsigned int) prc->size, |
977 | GNUNET_h2s (key), dr->depth, (unsigned long long) dr->offset, | 999 | GNUNET_h2s (key), |
1000 | dr->depth, | ||
1001 | (unsigned long long) dr->offset, | ||
978 | (unsigned long long) GNUNET_ntohll (dc->uri->data. | 1002 | (unsigned long long) GNUNET_ntohll (dc->uri->data. |
979 | chk.file_length)); | 1003 | chk.file_length)); |
980 | bs = GNUNET_FS_tree_calculate_block_size (GNUNET_ntohll | 1004 | bs = GNUNET_FS_tree_calculate_block_size (GNUNET_ntohll |
@@ -999,15 +1023,17 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | |||
999 | goto signal_error; | 1023 | goto signal_error; |
1000 | } | 1024 | } |
1001 | 1025 | ||
1002 | (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &prc->query, dr); | 1026 | (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, |
1003 | if (GNUNET_YES == dr->is_pending) | 1027 | &prc->query, |
1004 | { | 1028 | dr); |
1005 | GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr); | 1029 | GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key, |
1006 | dr->is_pending = GNUNET_NO; | 1030 | &skey, |
1007 | } | 1031 | &iv); |
1008 | 1032 | if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data, | |
1009 | GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key, &skey, &iv); | 1033 | prc->size, |
1010 | if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data, prc->size, &skey, &iv, pt)) | 1034 | &skey, |
1035 | &iv, | ||
1036 | pt)) | ||
1011 | { | 1037 | { |
1012 | GNUNET_break (0); | 1038 | GNUNET_break (0); |
1013 | dc->emsg = GNUNET_strdup (_("internal error decrypting content")); | 1039 | dc->emsg = GNUNET_strdup (_("internal error decrypting content")); |
@@ -1015,7 +1041,8 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | |||
1015 | } | 1041 | } |
1016 | off = | 1042 | off = |
1017 | compute_disk_offset (GNUNET_ntohll (dc->uri->data.chk.file_length), | 1043 | compute_disk_offset (GNUNET_ntohll (dc->uri->data.chk.file_length), |
1018 | dr->offset, dr->depth); | 1044 | dr->offset, |
1045 | dr->depth); | ||
1019 | /* save to disk */ | 1046 | /* save to disk */ |
1020 | if ((GNUNET_YES == prc->do_store) && | 1047 | if ((GNUNET_YES == prc->do_store) && |
1021 | ((NULL != dc->filename) || (is_recursive_download (dc))) && | 1048 | ((NULL != dc->filename) || (is_recursive_download (dc))) && |
@@ -1040,21 +1067,25 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | |||
1040 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1067 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1041 | "Saving decrypted block to disk at offset %llu\n", | 1068 | "Saving decrypted block to disk at offset %llu\n", |
1042 | (unsigned long long) off); | 1069 | (unsigned long long) off); |
1043 | if ((off != GNUNET_DISK_file_seek (fh, off, GNUNET_DISK_SEEK_SET))) | 1070 | if ((off != GNUNET_DISK_file_seek (fh, |
1071 | off, | ||
1072 | GNUNET_DISK_SEEK_SET))) | ||
1044 | { | 1073 | { |
1045 | GNUNET_asprintf (&dc->emsg, | 1074 | GNUNET_asprintf (&dc->emsg, |
1046 | _("Failed to seek to offset %llu in file `%s': %s"), | 1075 | _("Failed to seek to offset %llu in file `%s': %s"), |
1047 | (unsigned long long) off, dc->filename, | 1076 | (unsigned long long) off, |
1077 | dc->filename, | ||
1048 | STRERROR (errno)); | 1078 | STRERROR (errno)); |
1049 | goto signal_error; | 1079 | goto signal_error; |
1050 | } | 1080 | } |
1051 | if (prc->size != GNUNET_DISK_file_write (fh, pt, prc->size)) | 1081 | if (prc->size != GNUNET_DISK_file_write (fh, pt, prc->size)) |
1052 | { | 1082 | { |
1053 | GNUNET_asprintf (&dc->emsg, | 1083 | GNUNET_asprintf (&dc->emsg, |
1054 | _ | 1084 | _("Failed to write block of %u bytes at offset %llu in file `%s': %s"), |
1055 | ("Failed to write block of %u bytes at offset %llu in file `%s': %s"), | 1085 | (unsigned int) prc->size, |
1056 | (unsigned int) prc->size, (unsigned long long) off, | 1086 | (unsigned long long) off, |
1057 | dc->filename, STRERROR (errno)); | 1087 | dc->filename, |
1088 | STRERROR (errno)); | ||
1058 | goto signal_error; | 1089 | goto signal_error; |
1059 | } | 1090 | } |
1060 | GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh)); | 1091 | GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh)); |
@@ -1193,15 +1224,8 @@ signal_error: | |||
1193 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR; | 1224 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR; |
1194 | pi.value.download.specifics.error.message = dc->emsg; | 1225 | pi.value.download.specifics.error.message = dc->emsg; |
1195 | GNUNET_FS_download_make_status_ (&pi, dc); | 1226 | GNUNET_FS_download_make_status_ (&pi, dc); |
1196 | /* abort all pending requests */ | 1227 | GNUNET_MQ_destroy (dc->mq); |
1197 | if (NULL != dc->th) | 1228 | dc->mq = NULL; |
1198 | { | ||
1199 | GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); | ||
1200 | dc->th = NULL; | ||
1201 | } | ||
1202 | GNUNET_CLIENT_disconnect (dc->client); | ||
1203 | dc->in_receive = GNUNET_NO; | ||
1204 | dc->client = NULL; | ||
1205 | GNUNET_FS_free_download_request_ (dc->top_request); | 1229 | GNUNET_FS_free_download_request_ (dc->top_request); |
1206 | dc->top_request = NULL; | 1230 | dc->top_request = NULL; |
1207 | GNUNET_CONTAINER_multihashmap_destroy (dc->active); | 1231 | GNUNET_CONTAINER_multihashmap_destroy (dc->active); |
@@ -1211,49 +1235,24 @@ signal_error: | |||
1211 | GNUNET_FS_dequeue_ (dc->job_queue); | 1235 | GNUNET_FS_dequeue_ (dc->job_queue); |
1212 | dc->job_queue = NULL; | 1236 | dc->job_queue = NULL; |
1213 | } | 1237 | } |
1214 | dc->pending_head = NULL; | ||
1215 | dc->pending_tail = NULL; | ||
1216 | GNUNET_FS_download_sync_ (dc); | 1238 | GNUNET_FS_download_sync_ (dc); |
1217 | return GNUNET_NO; | 1239 | return GNUNET_NO; |
1218 | } | 1240 | } |
1219 | 1241 | ||
1220 | 1242 | ||
1221 | /** | 1243 | /** |
1222 | * Process a download result. | 1244 | * Type of a function to call when we check the PUT message |
1245 | * from the service. | ||
1223 | * | 1246 | * |
1224 | * @param dc our download context | 1247 | * @param cls closure |
1225 | * @param type type of the result | 1248 | * @param msg message received |
1226 | * @param respect_offered how much respect did we offer to get this reply? | ||
1227 | * @param num_transmissions how often did we transmit the query? | ||
1228 | * @param last_transmission when was this block requested the last time? (FOREVER if unknown/not applicable) | ||
1229 | * @param data the (encrypted) response | ||
1230 | * @param size size of data | ||
1231 | */ | 1249 | */ |
1232 | static void | 1250 | static int |
1233 | process_result (struct GNUNET_FS_DownloadContext *dc, | 1251 | check_put (void *cls, |
1234 | enum GNUNET_BLOCK_Type type, | 1252 | const struct ClientPutMessage *cm) |
1235 | uint32_t respect_offered, | ||
1236 | uint32_t num_transmissions, | ||
1237 | struct GNUNET_TIME_Absolute last_transmission, | ||
1238 | const void *data, size_t size) | ||
1239 | { | 1253 | { |
1240 | struct ProcessResultClosure prc; | 1254 | /* any varsize length is OK */ |
1241 | 1255 | return GNUNET_OK; | |
1242 | prc.dc = dc; | ||
1243 | prc.data = data; | ||
1244 | prc.last_transmission = last_transmission; | ||
1245 | prc.size = size; | ||
1246 | prc.type = type; | ||
1247 | prc.do_store = GNUNET_YES; | ||
1248 | prc.respect_offered = respect_offered; | ||
1249 | prc.num_transmissions = num_transmissions; | ||
1250 | GNUNET_CRYPTO_hash (data, size, &prc.query); | ||
1251 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1252 | "Received result for query `%s' from `%s'-service\n", | ||
1253 | GNUNET_h2s (&prc.query), "FS"); | ||
1254 | GNUNET_CONTAINER_multihashmap_get_multiple (dc->active, &prc.query, | ||
1255 | &process_result_with_request, | ||
1256 | &prc); | ||
1257 | } | 1256 | } |
1258 | 1257 | ||
1259 | 1258 | ||
@@ -1262,109 +1261,59 @@ process_result (struct GNUNET_FS_DownloadContext *dc, | |||
1262 | * from the service. | 1261 | * from the service. |
1263 | * | 1262 | * |
1264 | * @param cls closure | 1263 | * @param cls closure |
1265 | * @param msg message received, NULL on timeout or fatal error | 1264 | * @param msg message received |
1266 | */ | 1265 | */ |
1267 | static void | 1266 | static void |
1268 | receive_results (void *cls, const struct GNUNET_MessageHeader *msg) | 1267 | handle_put (void *cls, |
1268 | const struct ClientPutMessage *cm) | ||
1269 | { | 1269 | { |
1270 | struct GNUNET_FS_DownloadContext *dc = cls; | 1270 | struct GNUNET_FS_DownloadContext *dc = cls; |
1271 | const struct ClientPutMessage *cm; | 1271 | uint16_t msize = ntohs (cm->header.size) - sizeof (*cm); |
1272 | uint16_t msize; | 1272 | struct ProcessResultClosure prc; |
1273 | 1273 | ||
1274 | if ((NULL == msg) || (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_PUT) || | 1274 | prc.dc = dc; |
1275 | (sizeof (struct ClientPutMessage) > ntohs (msg->size))) | 1275 | prc.data = &cm[1]; |
1276 | { | 1276 | prc.last_transmission = GNUNET_TIME_absolute_ntoh (cm->last_transmission); |
1277 | GNUNET_break (NULL == msg); | 1277 | prc.size = msize; |
1278 | try_reconnect (dc); | 1278 | prc.type = ntohl (cm->type); |
1279 | return; | 1279 | prc.do_store = GNUNET_YES; |
1280 | } | 1280 | prc.respect_offered = ntohl (cm->respect_offered); |
1281 | msize = ntohs (msg->size); | 1281 | prc.num_transmissions = ntohl (cm->num_transmissions); |
1282 | cm = (const struct ClientPutMessage *) msg; | 1282 | GNUNET_CRYPTO_hash (prc.data, |
1283 | process_result (dc, ntohl (cm->type), | 1283 | msize, |
1284 | ntohl (cm->respect_offered), | 1284 | &prc.query); |
1285 | ntohl (cm->num_transmissions), | 1285 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1286 | GNUNET_TIME_absolute_ntoh (cm->last_transmission), &cm[1], | 1286 | "Received result for query `%s' from FS service\n", |
1287 | msize - sizeof (struct ClientPutMessage)); | 1287 | GNUNET_h2s (&prc.query)); |
1288 | if (NULL == dc->client) | 1288 | GNUNET_CONTAINER_multihashmap_get_multiple (dc->active, |
1289 | return; /* fatal error */ | 1289 | &prc.query, |
1290 | /* continue receiving */ | 1290 | &process_result_with_request, |
1291 | GNUNET_CLIENT_receive (dc->client, &receive_results, dc, | 1291 | &prc); |
1292 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1293 | } | 1292 | } |
1294 | 1293 | ||
1295 | 1294 | ||
1296 | /** | 1295 | /** |
1297 | * We're ready to transmit a search request to the | 1296 | * Generic error handler, called with the appropriate error code and |
1298 | * file-sharing service. Do it. If there is | 1297 | * the same closure specified at the creation of the message queue. |
1299 | * more than one request pending, try to send | 1298 | * Not every message queue implementation supports an error handler. |
1300 | * multiple or request another transmission. | ||
1301 | * | 1299 | * |
1302 | * @param cls closure | 1300 | * @param cls closure with the `struct GNUNET_FS_DownloadContext *` |
1303 | * @param size number of bytes available in buf | 1301 | * @param error error code |
1304 | * @param buf where the callee should write the message | ||
1305 | * @return number of bytes written to buf | ||
1306 | */ | 1302 | */ |
1307 | static size_t | 1303 | static void |
1308 | transmit_download_request (void *cls, size_t size, void *buf) | 1304 | download_mq_error_handler (void *cls, |
1305 | enum GNUNET_MQ_Error error) | ||
1309 | { | 1306 | { |
1310 | struct GNUNET_FS_DownloadContext *dc = cls; | 1307 | struct GNUNET_FS_DownloadContext *dc = cls; |
1311 | size_t msize; | ||
1312 | struct SearchMessage *sm; | ||
1313 | struct DownloadRequest *dr; | ||
1314 | 1308 | ||
1315 | dc->th = NULL; | 1309 | if (NULL != dc->mq) |
1316 | if (NULL == buf) | ||
1317 | { | 1310 | { |
1318 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1311 | GNUNET_MQ_destroy (dc->mq); |
1319 | "Transmitting download request failed, trying to reconnect\n"); | 1312 | dc->mq = NULL; |
1320 | try_reconnect (dc); | ||
1321 | return 0; | ||
1322 | } | 1313 | } |
1323 | GNUNET_assert (size >= sizeof (struct SearchMessage)); | 1314 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1324 | msize = 0; | 1315 | "Transmitting download request failed, trying to reconnect\n"); |
1325 | sm = buf; | 1316 | try_reconnect (dc); |
1326 | while ((NULL != (dr = dc->pending_head)) && | ||
1327 | (size >= msize + sizeof (struct SearchMessage))) | ||
1328 | { | ||
1329 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1330 | "Transmitting download request for `%s' to `%s'-service\n", | ||
1331 | GNUNET_h2s (&dr->chk.query), "FS"); | ||
1332 | memset (sm, 0, sizeof (struct SearchMessage)); | ||
1333 | sm->header.size = htons (sizeof (struct SearchMessage)); | ||
1334 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_START_SEARCH); | ||
1335 | if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY)) | ||
1336 | sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY); | ||
1337 | else | ||
1338 | sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE); | ||
1339 | if (0 == dr->depth) | ||
1340 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK); | ||
1341 | else | ||
1342 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK); | ||
1343 | sm->anonymity_level = htonl (dc->anonymity); | ||
1344 | sm->target = dc->target; | ||
1345 | sm->query = dr->chk.query; | ||
1346 | GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr); | ||
1347 | dr->is_pending = GNUNET_NO; | ||
1348 | msize += sizeof (struct SearchMessage); | ||
1349 | sm++; | ||
1350 | } | ||
1351 | if (NULL != dc->pending_head) | ||
1352 | { | ||
1353 | dc->th = | ||
1354 | GNUNET_CLIENT_notify_transmit_ready (dc->client, | ||
1355 | sizeof (struct SearchMessage), | ||
1356 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
1357 | GNUNET_NO, | ||
1358 | &transmit_download_request, dc); | ||
1359 | GNUNET_assert (NULL != dc->th); | ||
1360 | } | ||
1361 | if (GNUNET_NO == dc->in_receive) | ||
1362 | { | ||
1363 | dc->in_receive = GNUNET_YES; | ||
1364 | GNUNET_CLIENT_receive (dc->client, &receive_results, dc, | ||
1365 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1366 | } | ||
1367 | return msize; | ||
1368 | } | 1317 | } |
1369 | 1318 | ||
1370 | 1319 | ||
@@ -1376,51 +1325,31 @@ transmit_download_request (void *cls, size_t size, void *buf) | |||
1376 | static void | 1325 | static void |
1377 | do_reconnect (void *cls) | 1326 | do_reconnect (void *cls) |
1378 | { | 1327 | { |
1328 | GNUNET_MQ_hd_var_size (put, | ||
1329 | GNUNET_MESSAGE_TYPE_FS_PUT, | ||
1330 | struct ClientPutMessage); | ||
1379 | struct GNUNET_FS_DownloadContext *dc = cls; | 1331 | struct GNUNET_FS_DownloadContext *dc = cls; |
1380 | struct GNUNET_CLIENT_Connection *client; | 1332 | struct GNUNET_MQ_MessageHandler handlers[] = { |
1333 | make_put_handler (dc), | ||
1334 | GNUNET_MQ_handler_end () | ||
1335 | }; | ||
1381 | 1336 | ||
1382 | dc->task = NULL; | 1337 | dc->task = NULL; |
1383 | client = GNUNET_CLIENT_connect ("fs", dc->h->cfg); | 1338 | dc->mq = GNUNET_CLIENT_connecT (dc->h->cfg, |
1384 | if (NULL == client) | 1339 | "fs", |
1340 | handlers, | ||
1341 | &download_mq_error_handler, | ||
1342 | dc); | ||
1343 | if (NULL == dc->mq) | ||
1385 | { | 1344 | { |
1386 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1345 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1387 | "Connecting to `%s'-service failed, will try again.\n", "FS"); | 1346 | "Connecting to `%s'-service failed, will try again.\n", "FS"); |
1388 | try_reconnect (dc); | 1347 | try_reconnect (dc); |
1389 | return; | 1348 | return; |
1390 | } | 1349 | } |
1391 | dc->client = client; | 1350 | GNUNET_CONTAINER_multihashmap_iterate (dc->active, |
1392 | if (NULL != dc->pending_head) | 1351 | &retry_entry, |
1393 | { | 1352 | dc); |
1394 | dc->th = | ||
1395 | GNUNET_CLIENT_notify_transmit_ready (client, | ||
1396 | sizeof (struct SearchMessage), | ||
1397 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
1398 | GNUNET_NO, | ||
1399 | &transmit_download_request, dc); | ||
1400 | GNUNET_assert (NULL != dc->th); | ||
1401 | } | ||
1402 | } | ||
1403 | |||
1404 | |||
1405 | /** | ||
1406 | * Add entries to the pending list. | ||
1407 | * | ||
1408 | * @param cls our download context | ||
1409 | * @param key unused | ||
1410 | * @param entry entry of type "struct DownloadRequest" | ||
1411 | * @return GNUNET_OK | ||
1412 | */ | ||
1413 | static int | ||
1414 | retry_entry (void *cls, const struct GNUNET_HashCode * key, void *entry) | ||
1415 | { | ||
1416 | struct GNUNET_FS_DownloadContext *dc = cls; | ||
1417 | struct DownloadRequest *dr = entry; | ||
1418 | |||
1419 | dr->next = NULL; | ||
1420 | dr->prev = NULL; | ||
1421 | GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr); | ||
1422 | dr->is_pending = GNUNET_YES; | ||
1423 | return GNUNET_OK; | ||
1424 | } | 1353 | } |
1425 | 1354 | ||
1426 | 1355 | ||
@@ -1435,30 +1364,22 @@ static void | |||
1435 | try_reconnect (struct GNUNET_FS_DownloadContext *dc) | 1364 | try_reconnect (struct GNUNET_FS_DownloadContext *dc) |
1436 | { | 1365 | { |
1437 | 1366 | ||
1438 | if (NULL != dc->client) | 1367 | if (NULL != dc->mq) |
1439 | { | 1368 | { |
1440 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1369 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1441 | "Moving all requests back to pending list\n"); | 1370 | "Moving all requests back to pending list\n"); |
1442 | if (NULL != dc->th) | 1371 | GNUNET_MQ_destroy (dc->mq); |
1443 | { | 1372 | dc->mq = NULL; |
1444 | GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); | ||
1445 | dc->th = NULL; | ||
1446 | } | ||
1447 | /* full reset of the pending list */ | ||
1448 | dc->pending_head = NULL; | ||
1449 | dc->pending_tail = NULL; | ||
1450 | GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc); | ||
1451 | GNUNET_CLIENT_disconnect (dc->client); | ||
1452 | dc->in_receive = GNUNET_NO; | ||
1453 | dc->client = NULL; | ||
1454 | } | 1373 | } |
1455 | if (0 == dc->reconnect_backoff.rel_value_us) | 1374 | if (0 == dc->reconnect_backoff.rel_value_us) |
1456 | dc->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | 1375 | dc->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; |
1457 | else | 1376 | else |
1458 | dc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (dc->reconnect_backoff); | 1377 | dc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (dc->reconnect_backoff); |
1459 | 1378 | ||
1460 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Will try to reconnect in %s\n", | 1379 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1461 | GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff, GNUNET_YES)); | 1380 | "Will try to reconnect in %s\n", |
1381 | GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff, | ||
1382 | GNUNET_YES)); | ||
1462 | dc->task = | 1383 | dc->task = |
1463 | GNUNET_SCHEDULER_add_delayed (dc->reconnect_backoff, | 1384 | GNUNET_SCHEDULER_add_delayed (dc->reconnect_backoff, |
1464 | &do_reconnect, | 1385 | &do_reconnect, |
@@ -1470,37 +1391,23 @@ try_reconnect (struct GNUNET_FS_DownloadContext *dc) | |||
1470 | * We're allowed to ask the FS service for our blocks. Start the download. | 1391 | * We're allowed to ask the FS service for our blocks. Start the download. |
1471 | * | 1392 | * |
1472 | * @param cls the 'struct GNUNET_FS_DownloadContext' | 1393 | * @param cls the 'struct GNUNET_FS_DownloadContext' |
1473 | * @param client handle to use for communcation with FS (we must destroy it!) | 1394 | * @param mq handle to use for communcation with FS (we must destroy it!) |
1474 | */ | 1395 | */ |
1475 | static void | 1396 | static void |
1476 | activate_fs_download (void *cls, struct GNUNET_CLIENT_Connection *client) | 1397 | activate_fs_download (void *cls) |
1477 | { | 1398 | { |
1478 | struct GNUNET_FS_DownloadContext *dc = cls; | 1399 | struct GNUNET_FS_DownloadContext *dc = cls; |
1479 | struct GNUNET_FS_ProgressInfo pi; | 1400 | struct GNUNET_FS_ProgressInfo pi; |
1480 | 1401 | ||
1481 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download activated\n"); | 1402 | GNUNET_assert (NULL == dc->mq); |
1482 | GNUNET_assert (NULL != client); | ||
1483 | GNUNET_assert (NULL == dc->client); | ||
1484 | GNUNET_assert (NULL == dc->th); | ||
1485 | GNUNET_assert (NULL != dc->active); | 1403 | GNUNET_assert (NULL != dc->active); |
1486 | dc->client = client; | 1404 | do_reconnect (dc); |
1405 | if (NULL != dc->mq) | ||
1406 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1407 | "Download activated\n"); | ||
1487 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE; | 1408 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE; |
1488 | GNUNET_FS_download_make_status_ (&pi, dc); | 1409 | GNUNET_FS_download_make_status_ (&pi, |
1489 | dc->pending_head = NULL; | 1410 | dc); |
1490 | dc->pending_tail = NULL; | ||
1491 | GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc); | ||
1492 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1493 | "Asking for transmission to FS service\n"); | ||
1494 | if (NULL != dc->pending_head) | ||
1495 | { | ||
1496 | dc->th = | ||
1497 | GNUNET_CLIENT_notify_transmit_ready (dc->client, | ||
1498 | sizeof (struct SearchMessage), | ||
1499 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
1500 | GNUNET_NO, | ||
1501 | &transmit_download_request, dc); | ||
1502 | GNUNET_assert (NULL != dc->th); | ||
1503 | } | ||
1504 | } | 1411 | } |
1505 | 1412 | ||
1506 | 1413 | ||
@@ -1515,22 +1422,16 @@ deactivate_fs_download (void *cls) | |||
1515 | struct GNUNET_FS_DownloadContext *dc = cls; | 1422 | struct GNUNET_FS_DownloadContext *dc = cls; |
1516 | struct GNUNET_FS_ProgressInfo pi; | 1423 | struct GNUNET_FS_ProgressInfo pi; |
1517 | 1424 | ||
1518 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download deactivated\n"); | 1425 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1519 | if (NULL != dc->th) | 1426 | "Download deactivated\n"); |
1520 | { | 1427 | if (NULL != dc->mq) |
1521 | GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); | ||
1522 | dc->th = NULL; | ||
1523 | } | ||
1524 | if (NULL != dc->client) | ||
1525 | { | 1428 | { |
1526 | GNUNET_CLIENT_disconnect (dc->client); | 1429 | GNUNET_MQ_destroy (dc->mq); |
1527 | dc->in_receive = GNUNET_NO; | 1430 | dc->mq = NULL; |
1528 | dc->client = NULL; | ||
1529 | } | 1431 | } |
1530 | dc->pending_head = NULL; | ||
1531 | dc->pending_tail = NULL; | ||
1532 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE; | 1432 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE; |
1533 | GNUNET_FS_download_make_status_ (&pi, dc); | 1433 | GNUNET_FS_download_make_status_ (&pi, |
1434 | dc); | ||
1534 | } | 1435 | } |
1535 | 1436 | ||
1536 | 1437 | ||
@@ -1557,7 +1458,8 @@ static struct DownloadRequest * | |||
1557 | create_download_request (struct DownloadRequest *parent, | 1458 | create_download_request (struct DownloadRequest *parent, |
1558 | unsigned int chk_idx, | 1459 | unsigned int chk_idx, |
1559 | unsigned int depth, | 1460 | unsigned int depth, |
1560 | uint64_t dr_offset, uint64_t file_start_offset, | 1461 | uint64_t dr_offset, |
1462 | uint64_t file_start_offset, | ||
1561 | uint64_t desired_length) | 1463 | uint64_t desired_length) |
1562 | { | 1464 | { |
1563 | struct DownloadRequest *dr; | 1465 | struct DownloadRequest *dr; |
@@ -1746,13 +1648,9 @@ reconstruct_cb (void *cls, const struct ContentHashKey *chk, uint64_t offset, | |||
1746 | /* block matches, hence tree below matches; | 1648 | /* block matches, hence tree below matches; |
1747 | * this request is done! */ | 1649 | * this request is done! */ |
1748 | dr->state = BRS_DOWNLOAD_UP; | 1650 | dr->state = BRS_DOWNLOAD_UP; |
1749 | (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &dr->chk.query, dr); | 1651 | (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, |
1750 | if (GNUNET_YES == dr->is_pending) | 1652 | &dr->chk.query, |
1751 | { | 1653 | dr); |
1752 | GNUNET_break (0); /* how did we get here? */ | ||
1753 | GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr); | ||
1754 | dr->is_pending = GNUNET_NO; | ||
1755 | } | ||
1756 | /* calculate how many bytes of payload this block | 1654 | /* calculate how many bytes of payload this block |
1757 | * corresponds to */ | 1655 | * corresponds to */ |
1758 | blen = GNUNET_FS_tree_compute_tree_size (dr->depth); | 1656 | blen = GNUNET_FS_tree_compute_tree_size (dr->depth); |
@@ -1860,7 +1758,8 @@ GNUNET_FS_download_start_task_ (void *cls) | |||
1860 | struct GNUNET_FS_ProgressInfo pi; | 1758 | struct GNUNET_FS_ProgressInfo pi; |
1861 | struct GNUNET_DISK_FileHandle *fh; | 1759 | struct GNUNET_DISK_FileHandle *fh; |
1862 | 1760 | ||
1863 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Start task running...\n"); | 1761 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1762 | "Start task running...\n"); | ||
1864 | dc->task = NULL; | 1763 | dc->task = NULL; |
1865 | if (0 == dc->length) | 1764 | if (0 == dc->length) |
1866 | { | 1765 | { |
@@ -1978,8 +1877,10 @@ GNUNET_FS_download_start_task_ (void *cls) | |||
1978 | dc->te = | 1877 | dc->te = |
1979 | GNUNET_FS_tree_encoder_create (dc->h, | 1878 | GNUNET_FS_tree_encoder_create (dc->h, |
1980 | GNUNET_FS_uri_chk_get_file_size (dc->uri), | 1879 | GNUNET_FS_uri_chk_get_file_size (dc->uri), |
1981 | dc, &fh_reader, | 1880 | dc, |
1982 | &reconstruct_cb, NULL, | 1881 | &fh_reader, |
1882 | &reconstruct_cb, | ||
1883 | NULL, | ||
1983 | &reconstruct_cont); | 1884 | &reconstruct_cont); |
1984 | dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc); | 1885 | dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc); |
1985 | } | 1886 | } |
@@ -2079,9 +1980,13 @@ struct GNUNET_FS_DownloadContext * | |||
2079 | create_download_context (struct GNUNET_FS_Handle *h, | 1980 | create_download_context (struct GNUNET_FS_Handle *h, |
2080 | const struct GNUNET_FS_Uri *uri, | 1981 | const struct GNUNET_FS_Uri *uri, |
2081 | const struct GNUNET_CONTAINER_MetaData *meta, | 1982 | const struct GNUNET_CONTAINER_MetaData *meta, |
2082 | const char *filename, const char *tempname, | 1983 | const char *filename, |
2083 | uint64_t offset, uint64_t length, uint32_t anonymity, | 1984 | const char *tempname, |
2084 | enum GNUNET_FS_DownloadOptions options, void *cctx) | 1985 | uint64_t offset, |
1986 | uint64_t length, | ||
1987 | uint32_t anonymity, | ||
1988 | enum GNUNET_FS_DownloadOptions options, | ||
1989 | void *cctx) | ||
2085 | { | 1990 | { |
2086 | struct GNUNET_FS_DownloadContext *dc; | 1991 | struct GNUNET_FS_DownloadContext *dc; |
2087 | 1992 | ||
@@ -2132,7 +2037,8 @@ create_download_context (struct GNUNET_FS_Handle *h, | |||
2132 | filename, | 2037 | filename, |
2133 | (unsigned long long) length, | 2038 | (unsigned long long) length, |
2134 | dc->treedepth); | 2039 | dc->treedepth); |
2135 | dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, dc); | 2040 | dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, |
2041 | dc); | ||
2136 | return dc; | 2042 | return dc; |
2137 | } | 2043 | } |
2138 | 2044 | ||
@@ -2290,6 +2196,8 @@ GNUNET_FS_download_start_downloading_ (struct GNUNET_FS_DownloadContext *dc) | |||
2290 | { | 2196 | { |
2291 | if (dc->completed == dc->length) | 2197 | if (dc->completed == dc->length) |
2292 | return; | 2198 | return; |
2199 | if (NULL != dc->mq) | ||
2200 | return; /* already running */ | ||
2293 | GNUNET_assert (NULL == dc->job_queue); | 2201 | GNUNET_assert (NULL == dc->job_queue); |
2294 | GNUNET_assert (NULL != dc->active); | 2202 | GNUNET_assert (NULL != dc->active); |
2295 | dc->job_queue = | 2203 | dc->job_queue = |