aboutsummaryrefslogtreecommitdiff
path: root/src/fs/fs_download.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-03 12:05:57 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-03 12:05:57 +0000
commit82d0757e1908c04f76dd69016fbb7d538318f003 (patch)
tree243464194f9e2148adc905f811d46831dea55001 /src/fs/fs_download.c
parent8b46e74546fb643a5d272bb1edd8c909a4ee978d (diff)
downloadgnunet-82d0757e1908c04f76dd69016fbb7d538318f003.tar.gz
gnunet-82d0757e1908c04f76dd69016fbb7d538318f003.zip
convert download API to MQ
Diffstat (limited to 'src/fs/fs_download.c')
-rw-r--r--src/fs/fs_download.c474
1 files changed, 191 insertions, 283 deletions
diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c
index d89d70719..98c76882a 100644
--- a/src/fs/fs_download.c
+++ b/src/fs/fs_download.c
@@ -121,7 +121,7 @@ GNUNET_FS_download_make_status_ (struct GNUNET_FS_ProgressInfo *pi,
121 pi->value.download.anonymity = dc->anonymity; 121 pi->value.download.anonymity = dc->anonymity;
122 pi->value.download.eta = 122 pi->value.download.eta =
123 GNUNET_TIME_calculate_eta (dc->start_time, dc->completed, dc->length); 123 GNUNET_TIME_calculate_eta (dc->start_time, dc->completed, dc->length);
124 pi->value.download.is_active = (NULL == dc->client) ? GNUNET_NO : GNUNET_YES; 124 pi->value.download.is_active = (NULL == dc->mq) ? GNUNET_NO : GNUNET_YES;
125 pi->fsh = dc->h; 125 pi->fsh = dc->h;
126 if (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE)) 126 if (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE))
127 dc->client_info = dc->h->upcb (dc->h->upcb_cls, pi); 127 dc->client_info = dc->h->upcb (dc->h->upcb_cls, pi);
@@ -131,21 +131,6 @@ GNUNET_FS_download_make_status_ (struct GNUNET_FS_ProgressInfo *pi,
131 131
132 132
133/** 133/**
134 * We're ready to transmit a search request to the
135 * file-sharing service. Do it. If there is
136 * more than one request pending, try to send
137 * multiple or request another transmission.
138 *
139 * @param cls closure
140 * @param size number of bytes available in buf
141 * @param buf where the callee should write the message
142 * @return number of bytes written to buf
143 */
144static size_t
145transmit_download_request (void *cls, size_t size, void *buf);
146
147
148/**
149 * Closure for iterator processing results. 134 * Closure for iterator processing results.
150 */ 135 */
151struct ProcessResultClosure 136struct ProcessResultClosure
@@ -206,10 +191,11 @@ struct ProcessResultClosure
206 * @param cls closure (our 'struct ProcessResultClosure') 191 * @param cls closure (our 'struct ProcessResultClosure')
207 * @param key query for the given value / request 192 * @param key query for the given value / request
208 * @param value value in the hash map (a 'struct DownloadRequest') 193 * @param value value in the hash map (a 'struct DownloadRequest')
209 * @return GNUNET_YES (we should continue to iterate); unless serious error 194 * @return #GNUNET_YES (we should continue to iterate); unless serious error
210 */ 195 */
211static int 196static int
212process_result_with_request (void *cls, const struct GNUNET_HashCode * key, 197process_result_with_request (void *cls,
198 const struct GNUNET_HashCode * key,
213 void *value); 199 void *value);
214 200
215 201
@@ -722,6 +708,43 @@ try_top_down_reconstruction (struct GNUNET_FS_DownloadContext *dc,
722 708
723 709
724/** 710/**
711 * Add entries to the message queue.
712 *
713 * @param cls our download context
714 * @param key unused
715 * @param entry entry of type `struct DownloadRequest`
716 * @return #GNUNET_OK
717 */
718static int
719retry_entry (void *cls,
720 const struct GNUNET_HashCode *key,
721 void *entry)
722{
723 struct GNUNET_FS_DownloadContext *dc = cls;
724 struct DownloadRequest *dr = entry;
725 struct SearchMessage *sm;
726 struct GNUNET_MQ_Envelope *env;
727
728 env = GNUNET_MQ_msg (sm,
729 GNUNET_MESSAGE_TYPE_FS_START_SEARCH);
730 if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY))
731 sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY);
732 else
733 sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE);
734 if (0 == dr->depth)
735 sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK);
736 else
737 sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK);
738 sm->anonymity_level = htonl (dc->anonymity);
739 sm->target = dc->target;
740 sm->query = dr->chk.query;
741 GNUNET_MQ_send (dc->mq,
742 env);
743 return GNUNET_OK;
744}
745
746
747/**
725 * Schedule the download of the specified block in the tree. 748 * Schedule the download of the specified block in the tree.
726 * 749 *
727 * @param dc overall download this block belongs to 750 * @param dc overall download this block belongs to
@@ -763,25 +786,23 @@ schedule_block_download (struct GNUNET_FS_DownloadContext *dc,
763 } 786 }
764 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 787 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
765 "Scheduling download at offset %llu and depth %u for `%s'\n", 788 "Scheduling download at offset %llu and depth %u for `%s'\n",
766 (unsigned long long) dr->offset, dr->depth, 789 (unsigned long long) dr->offset,
790 dr->depth,
767 GNUNET_h2s (&dr->chk.query)); 791 GNUNET_h2s (&dr->chk.query));
768 if (GNUNET_NO != 792 if (GNUNET_NO !=
769 GNUNET_CONTAINER_multihashmap_contains_value (dc->active, &dr->chk.query, 793 GNUNET_CONTAINER_multihashmap_contains_value (dc->active,
794 &dr->chk.query,
770 dr)) 795 dr))
771 return; /* already active */ 796 return; /* already active */
772 GNUNET_CONTAINER_multihashmap_put (dc->active, &dr->chk.query, dr, 797 GNUNET_CONTAINER_multihashmap_put (dc->active,
798 &dr->chk.query,
799 dr,
773 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 800 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
774 if (NULL == dc->client) 801 if (NULL == dc->mq)
775 return; /* download not active */ 802 return; /* download not active */
776 GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr); 803 retry_entry (dc,
777 dr->is_pending = GNUNET_YES; 804 &dr->chk.query,
778 if (NULL == dc->th) 805 dr);
779 dc->th =
780 GNUNET_CLIENT_notify_transmit_ready (dc->client,
781 sizeof (struct SearchMessage),
782 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
783 GNUNET_NO,
784 &transmit_download_request, dc);
785} 806}
786 807
787 808
@@ -947,13 +968,14 @@ GNUNET_FS_free_download_request_ (struct DownloadRequest *dr)
947 * Iterator over entries in the pending requests in the 'active' map for the 968 * Iterator over entries in the pending requests in the 'active' map for the
948 * reply that we just got. 969 * reply that we just got.
949 * 970 *
950 * @param cls closure (our 'struct ProcessResultClosure') 971 * @param cls closure (our `struct ProcessResultClosure`)
951 * @param key query for the given value / request 972 * @param key query for the given value / request
952 * @param value value in the hash map (a 'struct DownloadRequest') 973 * @param value value in the hash map (a `struct DownloadRequest`)
953 * @return #GNUNET_YES (we should continue to iterate); unless serious error 974 * @return #GNUNET_YES (we should continue to iterate); unless serious error
954 */ 975 */
955static int 976static int
956process_result_with_request (void *cls, const struct GNUNET_HashCode * key, 977process_result_with_request (void *cls,
978 const struct GNUNET_HashCode *key,
957 void *value) 979 void *value)
958{ 980{
959 struct ProcessResultClosure *prc = cls; 981 struct ProcessResultClosure *prc = cls;
@@ -974,7 +996,9 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key,
974 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 996 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975 "Received %u byte block `%s' matching pending request at depth %u and offset %llu/%llu\n", 997 "Received %u byte block `%s' matching pending request at depth %u and offset %llu/%llu\n",
976 (unsigned int) prc->size, 998 (unsigned int) prc->size,
977 GNUNET_h2s (key), dr->depth, (unsigned long long) dr->offset, 999 GNUNET_h2s (key),
1000 dr->depth,
1001 (unsigned long long) dr->offset,
978 (unsigned long long) GNUNET_ntohll (dc->uri->data. 1002 (unsigned long long) GNUNET_ntohll (dc->uri->data.
979 chk.file_length)); 1003 chk.file_length));
980 bs = GNUNET_FS_tree_calculate_block_size (GNUNET_ntohll 1004 bs = GNUNET_FS_tree_calculate_block_size (GNUNET_ntohll
@@ -999,15 +1023,17 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key,
999 goto signal_error; 1023 goto signal_error;
1000 } 1024 }
1001 1025
1002 (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &prc->query, dr); 1026 (void) GNUNET_CONTAINER_multihashmap_remove (dc->active,
1003 if (GNUNET_YES == dr->is_pending) 1027 &prc->query,
1004 { 1028 dr);
1005 GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr); 1029 GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key,
1006 dr->is_pending = GNUNET_NO; 1030 &skey,
1007 } 1031 &iv);
1008 1032 if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data,
1009 GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key, &skey, &iv); 1033 prc->size,
1010 if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data, prc->size, &skey, &iv, pt)) 1034 &skey,
1035 &iv,
1036 pt))
1011 { 1037 {
1012 GNUNET_break (0); 1038 GNUNET_break (0);
1013 dc->emsg = GNUNET_strdup (_("internal error decrypting content")); 1039 dc->emsg = GNUNET_strdup (_("internal error decrypting content"));
@@ -1015,7 +1041,8 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key,
1015 } 1041 }
1016 off = 1042 off =
1017 compute_disk_offset (GNUNET_ntohll (dc->uri->data.chk.file_length), 1043 compute_disk_offset (GNUNET_ntohll (dc->uri->data.chk.file_length),
1018 dr->offset, dr->depth); 1044 dr->offset,
1045 dr->depth);
1019 /* save to disk */ 1046 /* save to disk */
1020 if ((GNUNET_YES == prc->do_store) && 1047 if ((GNUNET_YES == prc->do_store) &&
1021 ((NULL != dc->filename) || (is_recursive_download (dc))) && 1048 ((NULL != dc->filename) || (is_recursive_download (dc))) &&
@@ -1040,21 +1067,25 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key,
1040 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1067 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041 "Saving decrypted block to disk at offset %llu\n", 1068 "Saving decrypted block to disk at offset %llu\n",
1042 (unsigned long long) off); 1069 (unsigned long long) off);
1043 if ((off != GNUNET_DISK_file_seek (fh, off, GNUNET_DISK_SEEK_SET))) 1070 if ((off != GNUNET_DISK_file_seek (fh,
1071 off,
1072 GNUNET_DISK_SEEK_SET)))
1044 { 1073 {
1045 GNUNET_asprintf (&dc->emsg, 1074 GNUNET_asprintf (&dc->emsg,
1046 _("Failed to seek to offset %llu in file `%s': %s"), 1075 _("Failed to seek to offset %llu in file `%s': %s"),
1047 (unsigned long long) off, dc->filename, 1076 (unsigned long long) off,
1077 dc->filename,
1048 STRERROR (errno)); 1078 STRERROR (errno));
1049 goto signal_error; 1079 goto signal_error;
1050 } 1080 }
1051 if (prc->size != GNUNET_DISK_file_write (fh, pt, prc->size)) 1081 if (prc->size != GNUNET_DISK_file_write (fh, pt, prc->size))
1052 { 1082 {
1053 GNUNET_asprintf (&dc->emsg, 1083 GNUNET_asprintf (&dc->emsg,
1054 _ 1084 _("Failed to write block of %u bytes at offset %llu in file `%s': %s"),
1055 ("Failed to write block of %u bytes at offset %llu in file `%s': %s"), 1085 (unsigned int) prc->size,
1056 (unsigned int) prc->size, (unsigned long long) off, 1086 (unsigned long long) off,
1057 dc->filename, STRERROR (errno)); 1087 dc->filename,
1088 STRERROR (errno));
1058 goto signal_error; 1089 goto signal_error;
1059 } 1090 }
1060 GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh)); 1091 GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh));
@@ -1193,15 +1224,8 @@ signal_error:
1193 pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR; 1224 pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR;
1194 pi.value.download.specifics.error.message = dc->emsg; 1225 pi.value.download.specifics.error.message = dc->emsg;
1195 GNUNET_FS_download_make_status_ (&pi, dc); 1226 GNUNET_FS_download_make_status_ (&pi, dc);
1196 /* abort all pending requests */ 1227 GNUNET_MQ_destroy (dc->mq);
1197 if (NULL != dc->th) 1228 dc->mq = NULL;
1198 {
1199 GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
1200 dc->th = NULL;
1201 }
1202 GNUNET_CLIENT_disconnect (dc->client);
1203 dc->in_receive = GNUNET_NO;
1204 dc->client = NULL;
1205 GNUNET_FS_free_download_request_ (dc->top_request); 1229 GNUNET_FS_free_download_request_ (dc->top_request);
1206 dc->top_request = NULL; 1230 dc->top_request = NULL;
1207 GNUNET_CONTAINER_multihashmap_destroy (dc->active); 1231 GNUNET_CONTAINER_multihashmap_destroy (dc->active);
@@ -1211,49 +1235,24 @@ signal_error:
1211 GNUNET_FS_dequeue_ (dc->job_queue); 1235 GNUNET_FS_dequeue_ (dc->job_queue);
1212 dc->job_queue = NULL; 1236 dc->job_queue = NULL;
1213 } 1237 }
1214 dc->pending_head = NULL;
1215 dc->pending_tail = NULL;
1216 GNUNET_FS_download_sync_ (dc); 1238 GNUNET_FS_download_sync_ (dc);
1217 return GNUNET_NO; 1239 return GNUNET_NO;
1218} 1240}
1219 1241
1220 1242
1221/** 1243/**
1222 * Process a download result. 1244 * Type of a function to call when we check the PUT message
1245 * from the service.
1223 * 1246 *
1224 * @param dc our download context 1247 * @param cls closure
1225 * @param type type of the result 1248 * @param msg message received
1226 * @param respect_offered how much respect did we offer to get this reply?
1227 * @param num_transmissions how often did we transmit the query?
1228 * @param last_transmission when was this block requested the last time? (FOREVER if unknown/not applicable)
1229 * @param data the (encrypted) response
1230 * @param size size of data
1231 */ 1249 */
1232static void 1250static int
1233process_result (struct GNUNET_FS_DownloadContext *dc, 1251check_put (void *cls,
1234 enum GNUNET_BLOCK_Type type, 1252 const struct ClientPutMessage *cm)
1235 uint32_t respect_offered,
1236 uint32_t num_transmissions,
1237 struct GNUNET_TIME_Absolute last_transmission,
1238 const void *data, size_t size)
1239{ 1253{
1240 struct ProcessResultClosure prc; 1254 /* any varsize length is OK */
1241 1255 return GNUNET_OK;
1242 prc.dc = dc;
1243 prc.data = data;
1244 prc.last_transmission = last_transmission;
1245 prc.size = size;
1246 prc.type = type;
1247 prc.do_store = GNUNET_YES;
1248 prc.respect_offered = respect_offered;
1249 prc.num_transmissions = num_transmissions;
1250 GNUNET_CRYPTO_hash (data, size, &prc.query);
1251 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1252 "Received result for query `%s' from `%s'-service\n",
1253 GNUNET_h2s (&prc.query), "FS");
1254 GNUNET_CONTAINER_multihashmap_get_multiple (dc->active, &prc.query,
1255 &process_result_with_request,
1256 &prc);
1257} 1256}
1258 1257
1259 1258
@@ -1262,109 +1261,59 @@ process_result (struct GNUNET_FS_DownloadContext *dc,
1262 * from the service. 1261 * from the service.
1263 * 1262 *
1264 * @param cls closure 1263 * @param cls closure
1265 * @param msg message received, NULL on timeout or fatal error 1264 * @param msg message received
1266 */ 1265 */
1267static void 1266static void
1268receive_results (void *cls, const struct GNUNET_MessageHeader *msg) 1267handle_put (void *cls,
1268 const struct ClientPutMessage *cm)
1269{ 1269{
1270 struct GNUNET_FS_DownloadContext *dc = cls; 1270 struct GNUNET_FS_DownloadContext *dc = cls;
1271 const struct ClientPutMessage *cm; 1271 uint16_t msize = ntohs (cm->header.size) - sizeof (*cm);
1272 uint16_t msize; 1272 struct ProcessResultClosure prc;
1273 1273
1274 if ((NULL == msg) || (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_PUT) || 1274 prc.dc = dc;
1275 (sizeof (struct ClientPutMessage) > ntohs (msg->size))) 1275 prc.data = &cm[1];
1276 { 1276 prc.last_transmission = GNUNET_TIME_absolute_ntoh (cm->last_transmission);
1277 GNUNET_break (NULL == msg); 1277 prc.size = msize;
1278 try_reconnect (dc); 1278 prc.type = ntohl (cm->type);
1279 return; 1279 prc.do_store = GNUNET_YES;
1280 } 1280 prc.respect_offered = ntohl (cm->respect_offered);
1281 msize = ntohs (msg->size); 1281 prc.num_transmissions = ntohl (cm->num_transmissions);
1282 cm = (const struct ClientPutMessage *) msg; 1282 GNUNET_CRYPTO_hash (prc.data,
1283 process_result (dc, ntohl (cm->type), 1283 msize,
1284 ntohl (cm->respect_offered), 1284 &prc.query);
1285 ntohl (cm->num_transmissions), 1285 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1286 GNUNET_TIME_absolute_ntoh (cm->last_transmission), &cm[1], 1286 "Received result for query `%s' from FS service\n",
1287 msize - sizeof (struct ClientPutMessage)); 1287 GNUNET_h2s (&prc.query));
1288 if (NULL == dc->client) 1288 GNUNET_CONTAINER_multihashmap_get_multiple (dc->active,
1289 return; /* fatal error */ 1289 &prc.query,
1290 /* continue receiving */ 1290 &process_result_with_request,
1291 GNUNET_CLIENT_receive (dc->client, &receive_results, dc, 1291 &prc);
1292 GNUNET_TIME_UNIT_FOREVER_REL);
1293} 1292}
1294 1293
1295 1294
1296/** 1295/**
1297 * We're ready to transmit a search request to the 1296 * Generic error handler, called with the appropriate error code and
1298 * file-sharing service. Do it. If there is 1297 * the same closure specified at the creation of the message queue.
1299 * more than one request pending, try to send 1298 * Not every message queue implementation supports an error handler.
1300 * multiple or request another transmission.
1301 * 1299 *
1302 * @param cls closure 1300 * @param cls closure with the `struct GNUNET_FS_DownloadContext *`
1303 * @param size number of bytes available in buf 1301 * @param error error code
1304 * @param buf where the callee should write the message
1305 * @return number of bytes written to buf
1306 */ 1302 */
1307static size_t 1303static void
1308transmit_download_request (void *cls, size_t size, void *buf) 1304download_mq_error_handler (void *cls,
1305 enum GNUNET_MQ_Error error)
1309{ 1306{
1310 struct GNUNET_FS_DownloadContext *dc = cls; 1307 struct GNUNET_FS_DownloadContext *dc = cls;
1311 size_t msize;
1312 struct SearchMessage *sm;
1313 struct DownloadRequest *dr;
1314 1308
1315 dc->th = NULL; 1309 if (NULL != dc->mq)
1316 if (NULL == buf)
1317 { 1310 {
1318 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1311 GNUNET_MQ_destroy (dc->mq);
1319 "Transmitting download request failed, trying to reconnect\n"); 1312 dc->mq = NULL;
1320 try_reconnect (dc);
1321 return 0;
1322 } 1313 }
1323 GNUNET_assert (size >= sizeof (struct SearchMessage)); 1314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1324 msize = 0; 1315 "Transmitting download request failed, trying to reconnect\n");
1325 sm = buf; 1316 try_reconnect (dc);
1326 while ((NULL != (dr = dc->pending_head)) &&
1327 (size >= msize + sizeof (struct SearchMessage)))
1328 {
1329 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1330 "Transmitting download request for `%s' to `%s'-service\n",
1331 GNUNET_h2s (&dr->chk.query), "FS");
1332 memset (sm, 0, sizeof (struct SearchMessage));
1333 sm->header.size = htons (sizeof (struct SearchMessage));
1334 sm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_START_SEARCH);
1335 if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY))
1336 sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY);
1337 else
1338 sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE);
1339 if (0 == dr->depth)
1340 sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK);
1341 else
1342 sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK);
1343 sm->anonymity_level = htonl (dc->anonymity);
1344 sm->target = dc->target;
1345 sm->query = dr->chk.query;
1346 GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr);
1347 dr->is_pending = GNUNET_NO;
1348 msize += sizeof (struct SearchMessage);
1349 sm++;
1350 }
1351 if (NULL != dc->pending_head)
1352 {
1353 dc->th =
1354 GNUNET_CLIENT_notify_transmit_ready (dc->client,
1355 sizeof (struct SearchMessage),
1356 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1357 GNUNET_NO,
1358 &transmit_download_request, dc);
1359 GNUNET_assert (NULL != dc->th);
1360 }
1361 if (GNUNET_NO == dc->in_receive)
1362 {
1363 dc->in_receive = GNUNET_YES;
1364 GNUNET_CLIENT_receive (dc->client, &receive_results, dc,
1365 GNUNET_TIME_UNIT_FOREVER_REL);
1366 }
1367 return msize;
1368} 1317}
1369 1318
1370 1319
@@ -1376,51 +1325,31 @@ transmit_download_request (void *cls, size_t size, void *buf)
1376static void 1325static void
1377do_reconnect (void *cls) 1326do_reconnect (void *cls)
1378{ 1327{
1328 GNUNET_MQ_hd_var_size (put,
1329 GNUNET_MESSAGE_TYPE_FS_PUT,
1330 struct ClientPutMessage);
1379 struct GNUNET_FS_DownloadContext *dc = cls; 1331 struct GNUNET_FS_DownloadContext *dc = cls;
1380 struct GNUNET_CLIENT_Connection *client; 1332 struct GNUNET_MQ_MessageHandler handlers[] = {
1333 make_put_handler (dc),
1334 GNUNET_MQ_handler_end ()
1335 };
1381 1336
1382 dc->task = NULL; 1337 dc->task = NULL;
1383 client = GNUNET_CLIENT_connect ("fs", dc->h->cfg); 1338 dc->mq = GNUNET_CLIENT_connecT (dc->h->cfg,
1384 if (NULL == client) 1339 "fs",
1340 handlers,
1341 &download_mq_error_handler,
1342 dc);
1343 if (NULL == dc->mq)
1385 { 1344 {
1386 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1345 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1387 "Connecting to `%s'-service failed, will try again.\n", "FS"); 1346 "Connecting to `%s'-service failed, will try again.\n", "FS");
1388 try_reconnect (dc); 1347 try_reconnect (dc);
1389 return; 1348 return;
1390 } 1349 }
1391 dc->client = client; 1350 GNUNET_CONTAINER_multihashmap_iterate (dc->active,
1392 if (NULL != dc->pending_head) 1351 &retry_entry,
1393 { 1352 dc);
1394 dc->th =
1395 GNUNET_CLIENT_notify_transmit_ready (client,
1396 sizeof (struct SearchMessage),
1397 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1398 GNUNET_NO,
1399 &transmit_download_request, dc);
1400 GNUNET_assert (NULL != dc->th);
1401 }
1402}
1403
1404
1405/**
1406 * Add entries to the pending list.
1407 *
1408 * @param cls our download context
1409 * @param key unused
1410 * @param entry entry of type "struct DownloadRequest"
1411 * @return GNUNET_OK
1412 */
1413static int
1414retry_entry (void *cls, const struct GNUNET_HashCode * key, void *entry)
1415{
1416 struct GNUNET_FS_DownloadContext *dc = cls;
1417 struct DownloadRequest *dr = entry;
1418
1419 dr->next = NULL;
1420 dr->prev = NULL;
1421 GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr);
1422 dr->is_pending = GNUNET_YES;
1423 return GNUNET_OK;
1424} 1353}
1425 1354
1426 1355
@@ -1435,30 +1364,22 @@ static void
1435try_reconnect (struct GNUNET_FS_DownloadContext *dc) 1364try_reconnect (struct GNUNET_FS_DownloadContext *dc)
1436{ 1365{
1437 1366
1438 if (NULL != dc->client) 1367 if (NULL != dc->mq)
1439 { 1368 {
1440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1369 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441 "Moving all requests back to pending list\n"); 1370 "Moving all requests back to pending list\n");
1442 if (NULL != dc->th) 1371 GNUNET_MQ_destroy (dc->mq);
1443 { 1372 dc->mq = NULL;
1444 GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
1445 dc->th = NULL;
1446 }
1447 /* full reset of the pending list */
1448 dc->pending_head = NULL;
1449 dc->pending_tail = NULL;
1450 GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc);
1451 GNUNET_CLIENT_disconnect (dc->client);
1452 dc->in_receive = GNUNET_NO;
1453 dc->client = NULL;
1454 } 1373 }
1455 if (0 == dc->reconnect_backoff.rel_value_us) 1374 if (0 == dc->reconnect_backoff.rel_value_us)
1456 dc->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; 1375 dc->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1457 else 1376 else
1458 dc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (dc->reconnect_backoff); 1377 dc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (dc->reconnect_backoff);
1459 1378
1460 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Will try to reconnect in %s\n", 1379 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1461 GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff, GNUNET_YES)); 1380 "Will try to reconnect in %s\n",
1381 GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff,
1382 GNUNET_YES));
1462 dc->task = 1383 dc->task =
1463 GNUNET_SCHEDULER_add_delayed (dc->reconnect_backoff, 1384 GNUNET_SCHEDULER_add_delayed (dc->reconnect_backoff,
1464 &do_reconnect, 1385 &do_reconnect,
@@ -1470,37 +1391,23 @@ try_reconnect (struct GNUNET_FS_DownloadContext *dc)
1470 * We're allowed to ask the FS service for our blocks. Start the download. 1391 * We're allowed to ask the FS service for our blocks. Start the download.
1471 * 1392 *
1472 * @param cls the 'struct GNUNET_FS_DownloadContext' 1393 * @param cls the 'struct GNUNET_FS_DownloadContext'
1473 * @param client handle to use for communcation with FS (we must destroy it!) 1394 * @param mq handle to use for communcation with FS (we must destroy it!)
1474 */ 1395 */
1475static void 1396static void
1476activate_fs_download (void *cls, struct GNUNET_CLIENT_Connection *client) 1397activate_fs_download (void *cls)
1477{ 1398{
1478 struct GNUNET_FS_DownloadContext *dc = cls; 1399 struct GNUNET_FS_DownloadContext *dc = cls;
1479 struct GNUNET_FS_ProgressInfo pi; 1400 struct GNUNET_FS_ProgressInfo pi;
1480 1401
1481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download activated\n"); 1402 GNUNET_assert (NULL == dc->mq);
1482 GNUNET_assert (NULL != client);
1483 GNUNET_assert (NULL == dc->client);
1484 GNUNET_assert (NULL == dc->th);
1485 GNUNET_assert (NULL != dc->active); 1403 GNUNET_assert (NULL != dc->active);
1486 dc->client = client; 1404 do_reconnect (dc);
1405 if (NULL != dc->mq)
1406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407 "Download activated\n");
1487 pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE; 1408 pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE;
1488 GNUNET_FS_download_make_status_ (&pi, dc); 1409 GNUNET_FS_download_make_status_ (&pi,
1489 dc->pending_head = NULL; 1410 dc);
1490 dc->pending_tail = NULL;
1491 GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc);
1492 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493 "Asking for transmission to FS service\n");
1494 if (NULL != dc->pending_head)
1495 {
1496 dc->th =
1497 GNUNET_CLIENT_notify_transmit_ready (dc->client,
1498 sizeof (struct SearchMessage),
1499 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1500 GNUNET_NO,
1501 &transmit_download_request, dc);
1502 GNUNET_assert (NULL != dc->th);
1503 }
1504} 1411}
1505 1412
1506 1413
@@ -1515,22 +1422,16 @@ deactivate_fs_download (void *cls)
1515 struct GNUNET_FS_DownloadContext *dc = cls; 1422 struct GNUNET_FS_DownloadContext *dc = cls;
1516 struct GNUNET_FS_ProgressInfo pi; 1423 struct GNUNET_FS_ProgressInfo pi;
1517 1424
1518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download deactivated\n"); 1425 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1519 if (NULL != dc->th) 1426 "Download deactivated\n");
1520 { 1427 if (NULL != dc->mq)
1521 GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
1522 dc->th = NULL;
1523 }
1524 if (NULL != dc->client)
1525 { 1428 {
1526 GNUNET_CLIENT_disconnect (dc->client); 1429 GNUNET_MQ_destroy (dc->mq);
1527 dc->in_receive = GNUNET_NO; 1430 dc->mq = NULL;
1528 dc->client = NULL;
1529 } 1431 }
1530 dc->pending_head = NULL;
1531 dc->pending_tail = NULL;
1532 pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE; 1432 pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE;
1533 GNUNET_FS_download_make_status_ (&pi, dc); 1433 GNUNET_FS_download_make_status_ (&pi,
1434 dc);
1534} 1435}
1535 1436
1536 1437
@@ -1557,7 +1458,8 @@ static struct DownloadRequest *
1557create_download_request (struct DownloadRequest *parent, 1458create_download_request (struct DownloadRequest *parent,
1558 unsigned int chk_idx, 1459 unsigned int chk_idx,
1559 unsigned int depth, 1460 unsigned int depth,
1560 uint64_t dr_offset, uint64_t file_start_offset, 1461 uint64_t dr_offset,
1462 uint64_t file_start_offset,
1561 uint64_t desired_length) 1463 uint64_t desired_length)
1562{ 1464{
1563 struct DownloadRequest *dr; 1465 struct DownloadRequest *dr;
@@ -1746,13 +1648,9 @@ reconstruct_cb (void *cls, const struct ContentHashKey *chk, uint64_t offset,
1746 /* block matches, hence tree below matches; 1648 /* block matches, hence tree below matches;
1747 * this request is done! */ 1649 * this request is done! */
1748 dr->state = BRS_DOWNLOAD_UP; 1650 dr->state = BRS_DOWNLOAD_UP;
1749 (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &dr->chk.query, dr); 1651 (void) GNUNET_CONTAINER_multihashmap_remove (dc->active,
1750 if (GNUNET_YES == dr->is_pending) 1652 &dr->chk.query,
1751 { 1653 dr);
1752 GNUNET_break (0); /* how did we get here? */
1753 GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr);
1754 dr->is_pending = GNUNET_NO;
1755 }
1756 /* calculate how many bytes of payload this block 1654 /* calculate how many bytes of payload this block
1757 * corresponds to */ 1655 * corresponds to */
1758 blen = GNUNET_FS_tree_compute_tree_size (dr->depth); 1656 blen = GNUNET_FS_tree_compute_tree_size (dr->depth);
@@ -1860,7 +1758,8 @@ GNUNET_FS_download_start_task_ (void *cls)
1860 struct GNUNET_FS_ProgressInfo pi; 1758 struct GNUNET_FS_ProgressInfo pi;
1861 struct GNUNET_DISK_FileHandle *fh; 1759 struct GNUNET_DISK_FileHandle *fh;
1862 1760
1863 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Start task running...\n"); 1761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1762 "Start task running...\n");
1864 dc->task = NULL; 1763 dc->task = NULL;
1865 if (0 == dc->length) 1764 if (0 == dc->length)
1866 { 1765 {
@@ -1978,8 +1877,10 @@ GNUNET_FS_download_start_task_ (void *cls)
1978 dc->te = 1877 dc->te =
1979 GNUNET_FS_tree_encoder_create (dc->h, 1878 GNUNET_FS_tree_encoder_create (dc->h,
1980 GNUNET_FS_uri_chk_get_file_size (dc->uri), 1879 GNUNET_FS_uri_chk_get_file_size (dc->uri),
1981 dc, &fh_reader, 1880 dc,
1982 &reconstruct_cb, NULL, 1881 &fh_reader,
1882 &reconstruct_cb,
1883 NULL,
1983 &reconstruct_cont); 1884 &reconstruct_cont);
1984 dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc); 1885 dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc);
1985 } 1886 }
@@ -2079,9 +1980,13 @@ struct GNUNET_FS_DownloadContext *
2079create_download_context (struct GNUNET_FS_Handle *h, 1980create_download_context (struct GNUNET_FS_Handle *h,
2080 const struct GNUNET_FS_Uri *uri, 1981 const struct GNUNET_FS_Uri *uri,
2081 const struct GNUNET_CONTAINER_MetaData *meta, 1982 const struct GNUNET_CONTAINER_MetaData *meta,
2082 const char *filename, const char *tempname, 1983 const char *filename,
2083 uint64_t offset, uint64_t length, uint32_t anonymity, 1984 const char *tempname,
2084 enum GNUNET_FS_DownloadOptions options, void *cctx) 1985 uint64_t offset,
1986 uint64_t length,
1987 uint32_t anonymity,
1988 enum GNUNET_FS_DownloadOptions options,
1989 void *cctx)
2085{ 1990{
2086 struct GNUNET_FS_DownloadContext *dc; 1991 struct GNUNET_FS_DownloadContext *dc;
2087 1992
@@ -2132,7 +2037,8 @@ create_download_context (struct GNUNET_FS_Handle *h,
2132 filename, 2037 filename,
2133 (unsigned long long) length, 2038 (unsigned long long) length,
2134 dc->treedepth); 2039 dc->treedepth);
2135 dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, dc); 2040 dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_,
2041 dc);
2136 return dc; 2042 return dc;
2137} 2043}
2138 2044
@@ -2290,6 +2196,8 @@ GNUNET_FS_download_start_downloading_ (struct GNUNET_FS_DownloadContext *dc)
2290{ 2196{
2291 if (dc->completed == dc->length) 2197 if (dc->completed == dc->length)
2292 return; 2198 return;
2199 if (NULL != dc->mq)
2200 return; /* already running */
2293 GNUNET_assert (NULL == dc->job_queue); 2201 GNUNET_assert (NULL == dc->job_queue);
2294 GNUNET_assert (NULL != dc->active); 2202 GNUNET_assert (NULL != dc->active);
2295 dc->job_queue = 2203 dc->job_queue =