From 82d0757e1908c04f76dd69016fbb7d538318f003 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 3 Jul 2016 12:05:57 +0000 Subject: convert download API to MQ --- src/fs/fs_download.c | 474 +++++++++++++++++++++------------------------------ 1 file changed, 191 insertions(+), 283 deletions(-) (limited to 'src/fs/fs_download.c') diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c index d89d70719..98c76882a 100644 --- a/src/fs/fs_download.c +++ b/src/fs/fs_download.c @@ -121,7 +121,7 @@ GNUNET_FS_download_make_status_ (struct GNUNET_FS_ProgressInfo *pi, pi->value.download.anonymity = dc->anonymity; pi->value.download.eta = GNUNET_TIME_calculate_eta (dc->start_time, dc->completed, dc->length); - pi->value.download.is_active = (NULL == dc->client) ? GNUNET_NO : GNUNET_YES; + pi->value.download.is_active = (NULL == dc->mq) ? GNUNET_NO : GNUNET_YES; pi->fsh = dc->h; if (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE)) dc->client_info = dc->h->upcb (dc->h->upcb_cls, pi); @@ -130,21 +130,6 @@ GNUNET_FS_download_make_status_ (struct GNUNET_FS_ProgressInfo *pi, } -/** - * We're ready to transmit a search request to the - * file-sharing service. Do it. If there is - * more than one request pending, try to send - * multiple or request another transmission. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_download_request (void *cls, size_t size, void *buf); - - /** * Closure for iterator processing results. */ @@ -206,10 +191,11 @@ struct ProcessResultClosure * @param cls closure (our 'struct ProcessResultClosure') * @param key query for the given value / request * @param value value in the hash map (a 'struct DownloadRequest') - * @return GNUNET_YES (we should continue to iterate); unless serious error + * @return #GNUNET_YES (we should continue to iterate); unless serious error */ static int -process_result_with_request (void *cls, const struct GNUNET_HashCode * key, +process_result_with_request (void *cls, + const struct GNUNET_HashCode * key, void *value); @@ -721,6 +707,43 @@ try_top_down_reconstruction (struct GNUNET_FS_DownloadContext *dc, } +/** + * Add entries to the message queue. + * + * @param cls our download context + * @param key unused + * @param entry entry of type `struct DownloadRequest` + * @return #GNUNET_OK + */ +static int +retry_entry (void *cls, + const struct GNUNET_HashCode *key, + void *entry) +{ + struct GNUNET_FS_DownloadContext *dc = cls; + struct DownloadRequest *dr = entry; + struct SearchMessage *sm; + struct GNUNET_MQ_Envelope *env; + + env = GNUNET_MQ_msg (sm, + GNUNET_MESSAGE_TYPE_FS_START_SEARCH); + if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY)) + sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY); + else + sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE); + if (0 == dr->depth) + sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK); + else + sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK); + sm->anonymity_level = htonl (dc->anonymity); + sm->target = dc->target; + sm->query = dr->chk.query; + GNUNET_MQ_send (dc->mq, + env); + return GNUNET_OK; +} + + /** * Schedule the download of the specified block in the tree. * @@ -763,25 +786,23 @@ schedule_block_download (struct GNUNET_FS_DownloadContext *dc, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling download at offset %llu and depth %u for `%s'\n", - (unsigned long long) dr->offset, dr->depth, + (unsigned long long) dr->offset, + dr->depth, GNUNET_h2s (&dr->chk.query)); if (GNUNET_NO != - GNUNET_CONTAINER_multihashmap_contains_value (dc->active, &dr->chk.query, + GNUNET_CONTAINER_multihashmap_contains_value (dc->active, + &dr->chk.query, dr)) return; /* already active */ - GNUNET_CONTAINER_multihashmap_put (dc->active, &dr->chk.query, dr, + GNUNET_CONTAINER_multihashmap_put (dc->active, + &dr->chk.query, + dr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - if (NULL == dc->client) + if (NULL == dc->mq) return; /* download not active */ - GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr); - dr->is_pending = GNUNET_YES; - if (NULL == dc->th) - dc->th = - GNUNET_CLIENT_notify_transmit_ready (dc->client, - sizeof (struct SearchMessage), - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - GNUNET_NO, - &transmit_download_request, dc); + retry_entry (dc, + &dr->chk.query, + dr); } @@ -947,13 +968,14 @@ GNUNET_FS_free_download_request_ (struct DownloadRequest *dr) * Iterator over entries in the pending requests in the 'active' map for the * reply that we just got. * - * @param cls closure (our 'struct ProcessResultClosure') + * @param cls closure (our `struct ProcessResultClosure`) * @param key query for the given value / request - * @param value value in the hash map (a 'struct DownloadRequest') + * @param value value in the hash map (a `struct DownloadRequest`) * @return #GNUNET_YES (we should continue to iterate); unless serious error */ static int -process_result_with_request (void *cls, const struct GNUNET_HashCode * key, +process_result_with_request (void *cls, + const struct GNUNET_HashCode *key, void *value) { struct ProcessResultClosure *prc = cls; @@ -974,7 +996,9 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received %u byte block `%s' matching pending request at depth %u and offset %llu/%llu\n", (unsigned int) prc->size, - GNUNET_h2s (key), dr->depth, (unsigned long long) dr->offset, + GNUNET_h2s (key), + dr->depth, + (unsigned long long) dr->offset, (unsigned long long) GNUNET_ntohll (dc->uri->data. chk.file_length)); bs = GNUNET_FS_tree_calculate_block_size (GNUNET_ntohll @@ -999,15 +1023,17 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, goto signal_error; } - (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &prc->query, dr); - if (GNUNET_YES == dr->is_pending) - { - GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr); - dr->is_pending = GNUNET_NO; - } - - GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key, &skey, &iv); - if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data, prc->size, &skey, &iv, pt)) + (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, + &prc->query, + dr); + GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key, + &skey, + &iv); + if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data, + prc->size, + &skey, + &iv, + pt)) { GNUNET_break (0); dc->emsg = GNUNET_strdup (_("internal error decrypting content")); @@ -1015,7 +1041,8 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, } off = compute_disk_offset (GNUNET_ntohll (dc->uri->data.chk.file_length), - dr->offset, dr->depth); + dr->offset, + dr->depth); /* save to disk */ if ((GNUNET_YES == prc->do_store) && ((NULL != dc->filename) || (is_recursive_download (dc))) && @@ -1040,21 +1067,25 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Saving decrypted block to disk at offset %llu\n", (unsigned long long) off); - if ((off != GNUNET_DISK_file_seek (fh, off, GNUNET_DISK_SEEK_SET))) + if ((off != GNUNET_DISK_file_seek (fh, + off, + GNUNET_DISK_SEEK_SET))) { GNUNET_asprintf (&dc->emsg, _("Failed to seek to offset %llu in file `%s': %s"), - (unsigned long long) off, dc->filename, + (unsigned long long) off, + dc->filename, STRERROR (errno)); goto signal_error; } if (prc->size != GNUNET_DISK_file_write (fh, pt, prc->size)) { GNUNET_asprintf (&dc->emsg, - _ - ("Failed to write block of %u bytes at offset %llu in file `%s': %s"), - (unsigned int) prc->size, (unsigned long long) off, - dc->filename, STRERROR (errno)); + _("Failed to write block of %u bytes at offset %llu in file `%s': %s"), + (unsigned int) prc->size, + (unsigned long long) off, + dc->filename, + STRERROR (errno)); goto signal_error; } GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh)); @@ -1193,15 +1224,8 @@ signal_error: pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR; pi.value.download.specifics.error.message = dc->emsg; GNUNET_FS_download_make_status_ (&pi, dc); - /* abort all pending requests */ - if (NULL != dc->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); - dc->th = NULL; - } - GNUNET_CLIENT_disconnect (dc->client); - dc->in_receive = GNUNET_NO; - dc->client = NULL; + GNUNET_MQ_destroy (dc->mq); + dc->mq = NULL; GNUNET_FS_free_download_request_ (dc->top_request); dc->top_request = NULL; GNUNET_CONTAINER_multihashmap_destroy (dc->active); @@ -1211,49 +1235,24 @@ signal_error: GNUNET_FS_dequeue_ (dc->job_queue); dc->job_queue = NULL; } - dc->pending_head = NULL; - dc->pending_tail = NULL; GNUNET_FS_download_sync_ (dc); return GNUNET_NO; } /** - * Process a download result. + * Type of a function to call when we check the PUT message + * from the service. * - * @param dc our download context - * @param type type of the result - * @param respect_offered how much respect did we offer to get this reply? - * @param num_transmissions how often did we transmit the query? - * @param last_transmission when was this block requested the last time? (FOREVER if unknown/not applicable) - * @param data the (encrypted) response - * @param size size of data + * @param cls closure + * @param msg message received */ -static void -process_result (struct GNUNET_FS_DownloadContext *dc, - enum GNUNET_BLOCK_Type type, - uint32_t respect_offered, - uint32_t num_transmissions, - struct GNUNET_TIME_Absolute last_transmission, - const void *data, size_t size) +static int +check_put (void *cls, + const struct ClientPutMessage *cm) { - struct ProcessResultClosure prc; - - prc.dc = dc; - prc.data = data; - prc.last_transmission = last_transmission; - prc.size = size; - prc.type = type; - prc.do_store = GNUNET_YES; - prc.respect_offered = respect_offered; - prc.num_transmissions = num_transmissions; - GNUNET_CRYPTO_hash (data, size, &prc.query); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received result for query `%s' from `%s'-service\n", - GNUNET_h2s (&prc.query), "FS"); - GNUNET_CONTAINER_multihashmap_get_multiple (dc->active, &prc.query, - &process_result_with_request, - &prc); + /* any varsize length is OK */ + return GNUNET_OK; } @@ -1262,109 +1261,59 @@ process_result (struct GNUNET_FS_DownloadContext *dc, * from the service. * * @param cls closure - * @param msg message received, NULL on timeout or fatal error + * @param msg message received */ static void -receive_results (void *cls, const struct GNUNET_MessageHeader *msg) +handle_put (void *cls, + const struct ClientPutMessage *cm) { struct GNUNET_FS_DownloadContext *dc = cls; - const struct ClientPutMessage *cm; - uint16_t msize; + uint16_t msize = ntohs (cm->header.size) - sizeof (*cm); + struct ProcessResultClosure prc; - if ((NULL == msg) || (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_PUT) || - (sizeof (struct ClientPutMessage) > ntohs (msg->size))) - { - GNUNET_break (NULL == msg); - try_reconnect (dc); - return; - } - msize = ntohs (msg->size); - cm = (const struct ClientPutMessage *) msg; - process_result (dc, ntohl (cm->type), - ntohl (cm->respect_offered), - ntohl (cm->num_transmissions), - GNUNET_TIME_absolute_ntoh (cm->last_transmission), &cm[1], - msize - sizeof (struct ClientPutMessage)); - if (NULL == dc->client) - return; /* fatal error */ - /* continue receiving */ - GNUNET_CLIENT_receive (dc->client, &receive_results, dc, - GNUNET_TIME_UNIT_FOREVER_REL); + prc.dc = dc; + prc.data = &cm[1]; + prc.last_transmission = GNUNET_TIME_absolute_ntoh (cm->last_transmission); + prc.size = msize; + prc.type = ntohl (cm->type); + prc.do_store = GNUNET_YES; + prc.respect_offered = ntohl (cm->respect_offered); + prc.num_transmissions = ntohl (cm->num_transmissions); + GNUNET_CRYPTO_hash (prc.data, + msize, + &prc.query); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received result for query `%s' from FS service\n", + GNUNET_h2s (&prc.query)); + GNUNET_CONTAINER_multihashmap_get_multiple (dc->active, + &prc.query, + &process_result_with_request, + &prc); } /** - * We're ready to transmit a search request to the - * file-sharing service. Do it. If there is - * more than one request pending, try to send - * multiple or request another transmission. + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls closure with the `struct GNUNET_FS_DownloadContext *` + * @param error error code */ -static size_t -transmit_download_request (void *cls, size_t size, void *buf) +static void +download_mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { struct GNUNET_FS_DownloadContext *dc = cls; - size_t msize; - struct SearchMessage *sm; - struct DownloadRequest *dr; - dc->th = NULL; - if (NULL == buf) + if (NULL != dc->mq) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting download request failed, trying to reconnect\n"); - try_reconnect (dc); - return 0; + GNUNET_MQ_destroy (dc->mq); + dc->mq = NULL; } - GNUNET_assert (size >= sizeof (struct SearchMessage)); - msize = 0; - sm = buf; - while ((NULL != (dr = dc->pending_head)) && - (size >= msize + sizeof (struct SearchMessage))) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting download request for `%s' to `%s'-service\n", - GNUNET_h2s (&dr->chk.query), "FS"); - memset (sm, 0, sizeof (struct SearchMessage)); - sm->header.size = htons (sizeof (struct SearchMessage)); - sm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_START_SEARCH); - if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY)) - sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY); - else - sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE); - if (0 == dr->depth) - sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK); - else - sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK); - sm->anonymity_level = htonl (dc->anonymity); - sm->target = dc->target; - sm->query = dr->chk.query; - GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr); - dr->is_pending = GNUNET_NO; - msize += sizeof (struct SearchMessage); - sm++; - } - if (NULL != dc->pending_head) - { - dc->th = - GNUNET_CLIENT_notify_transmit_ready (dc->client, - sizeof (struct SearchMessage), - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - GNUNET_NO, - &transmit_download_request, dc); - GNUNET_assert (NULL != dc->th); - } - if (GNUNET_NO == dc->in_receive) - { - dc->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (dc->client, &receive_results, dc, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return msize; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting download request failed, trying to reconnect\n"); + try_reconnect (dc); } @@ -1376,51 +1325,31 @@ transmit_download_request (void *cls, size_t size, void *buf) static void do_reconnect (void *cls) { + GNUNET_MQ_hd_var_size (put, + GNUNET_MESSAGE_TYPE_FS_PUT, + struct ClientPutMessage); struct GNUNET_FS_DownloadContext *dc = cls; - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_MessageHandler handlers[] = { + make_put_handler (dc), + GNUNET_MQ_handler_end () + }; dc->task = NULL; - client = GNUNET_CLIENT_connect ("fs", dc->h->cfg); - if (NULL == client) + dc->mq = GNUNET_CLIENT_connecT (dc->h->cfg, + "fs", + handlers, + &download_mq_error_handler, + dc); + if (NULL == dc->mq) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Connecting to `%s'-service failed, will try again.\n", "FS"); try_reconnect (dc); return; } - dc->client = client; - if (NULL != dc->pending_head) - { - dc->th = - GNUNET_CLIENT_notify_transmit_ready (client, - sizeof (struct SearchMessage), - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - GNUNET_NO, - &transmit_download_request, dc); - GNUNET_assert (NULL != dc->th); - } -} - - -/** - * Add entries to the pending list. - * - * @param cls our download context - * @param key unused - * @param entry entry of type "struct DownloadRequest" - * @return GNUNET_OK - */ -static int -retry_entry (void *cls, const struct GNUNET_HashCode * key, void *entry) -{ - struct GNUNET_FS_DownloadContext *dc = cls; - struct DownloadRequest *dr = entry; - - dr->next = NULL; - dr->prev = NULL; - GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr); - dr->is_pending = GNUNET_YES; - return GNUNET_OK; + GNUNET_CONTAINER_multihashmap_iterate (dc->active, + &retry_entry, + dc); } @@ -1435,30 +1364,22 @@ static void try_reconnect (struct GNUNET_FS_DownloadContext *dc) { - if (NULL != dc->client) + if (NULL != dc->mq) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Moving all requests back to pending list\n"); - if (NULL != dc->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); - dc->th = NULL; - } - /* full reset of the pending list */ - dc->pending_head = NULL; - dc->pending_tail = NULL; - GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc); - GNUNET_CLIENT_disconnect (dc->client); - dc->in_receive = GNUNET_NO; - dc->client = NULL; + GNUNET_MQ_destroy (dc->mq); + dc->mq = NULL; } if (0 == dc->reconnect_backoff.rel_value_us) dc->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; else dc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (dc->reconnect_backoff); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Will try to reconnect in %s\n", - GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff, GNUNET_YES)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Will try to reconnect in %s\n", + GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff, + GNUNET_YES)); dc->task = GNUNET_SCHEDULER_add_delayed (dc->reconnect_backoff, &do_reconnect, @@ -1470,37 +1391,23 @@ try_reconnect (struct GNUNET_FS_DownloadContext *dc) * We're allowed to ask the FS service for our blocks. Start the download. * * @param cls the 'struct GNUNET_FS_DownloadContext' - * @param client handle to use for communcation with FS (we must destroy it!) + * @param mq handle to use for communcation with FS (we must destroy it!) */ static void -activate_fs_download (void *cls, struct GNUNET_CLIENT_Connection *client) +activate_fs_download (void *cls) { struct GNUNET_FS_DownloadContext *dc = cls; struct GNUNET_FS_ProgressInfo pi; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download activated\n"); - GNUNET_assert (NULL != client); - GNUNET_assert (NULL == dc->client); - GNUNET_assert (NULL == dc->th); + GNUNET_assert (NULL == dc->mq); GNUNET_assert (NULL != dc->active); - dc->client = client; + do_reconnect (dc); + if (NULL != dc->mq) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Download activated\n"); pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE; - GNUNET_FS_download_make_status_ (&pi, dc); - dc->pending_head = NULL; - dc->pending_tail = NULL; - GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking for transmission to FS service\n"); - if (NULL != dc->pending_head) - { - dc->th = - GNUNET_CLIENT_notify_transmit_ready (dc->client, - sizeof (struct SearchMessage), - GNUNET_CONSTANTS_SERVICE_TIMEOUT, - GNUNET_NO, - &transmit_download_request, dc); - GNUNET_assert (NULL != dc->th); - } + GNUNET_FS_download_make_status_ (&pi, + dc); } @@ -1515,22 +1422,16 @@ deactivate_fs_download (void *cls) struct GNUNET_FS_DownloadContext *dc = cls; struct GNUNET_FS_ProgressInfo pi; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download deactivated\n"); - if (NULL != dc->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); - dc->th = NULL; - } - if (NULL != dc->client) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Download deactivated\n"); + if (NULL != dc->mq) { - GNUNET_CLIENT_disconnect (dc->client); - dc->in_receive = GNUNET_NO; - dc->client = NULL; + GNUNET_MQ_destroy (dc->mq); + dc->mq = NULL; } - dc->pending_head = NULL; - dc->pending_tail = NULL; pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE; - GNUNET_FS_download_make_status_ (&pi, dc); + GNUNET_FS_download_make_status_ (&pi, + dc); } @@ -1557,7 +1458,8 @@ static struct DownloadRequest * create_download_request (struct DownloadRequest *parent, unsigned int chk_idx, unsigned int depth, - uint64_t dr_offset, uint64_t file_start_offset, + uint64_t dr_offset, + uint64_t file_start_offset, uint64_t desired_length) { struct DownloadRequest *dr; @@ -1746,13 +1648,9 @@ reconstruct_cb (void *cls, const struct ContentHashKey *chk, uint64_t offset, /* block matches, hence tree below matches; * this request is done! */ dr->state = BRS_DOWNLOAD_UP; - (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &dr->chk.query, dr); - if (GNUNET_YES == dr->is_pending) - { - GNUNET_break (0); /* how did we get here? */ - GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr); - dr->is_pending = GNUNET_NO; - } + (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, + &dr->chk.query, + dr); /* calculate how many bytes of payload this block * corresponds to */ blen = GNUNET_FS_tree_compute_tree_size (dr->depth); @@ -1860,7 +1758,8 @@ GNUNET_FS_download_start_task_ (void *cls) struct GNUNET_FS_ProgressInfo pi; struct GNUNET_DISK_FileHandle *fh; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Start task running...\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Start task running...\n"); dc->task = NULL; if (0 == dc->length) { @@ -1978,8 +1877,10 @@ GNUNET_FS_download_start_task_ (void *cls) dc->te = GNUNET_FS_tree_encoder_create (dc->h, GNUNET_FS_uri_chk_get_file_size (dc->uri), - dc, &fh_reader, - &reconstruct_cb, NULL, + dc, + &fh_reader, + &reconstruct_cb, + NULL, &reconstruct_cont); dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc); } @@ -2079,9 +1980,13 @@ struct GNUNET_FS_DownloadContext * create_download_context (struct GNUNET_FS_Handle *h, const struct GNUNET_FS_Uri *uri, const struct GNUNET_CONTAINER_MetaData *meta, - const char *filename, const char *tempname, - uint64_t offset, uint64_t length, uint32_t anonymity, - enum GNUNET_FS_DownloadOptions options, void *cctx) + const char *filename, + const char *tempname, + uint64_t offset, + uint64_t length, + uint32_t anonymity, + enum GNUNET_FS_DownloadOptions options, + void *cctx) { struct GNUNET_FS_DownloadContext *dc; @@ -2132,7 +2037,8 @@ create_download_context (struct GNUNET_FS_Handle *h, filename, (unsigned long long) length, dc->treedepth); - dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, dc); + dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, + dc); return dc; } @@ -2290,6 +2196,8 @@ GNUNET_FS_download_start_downloading_ (struct GNUNET_FS_DownloadContext *dc) { if (dc->completed == dc->length) return; + if (NULL != dc->mq) + return; /* already running */ GNUNET_assert (NULL == dc->job_queue); GNUNET_assert (NULL != dc->active); dc->job_queue = -- cgit v1.2.3