aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-07-03 12:05:57 +0000
committerChristian Grothoff <christian@grothoff.org>2016-07-03 12:05:57 +0000
commit82d0757e1908c04f76dd69016fbb7d538318f003 (patch)
tree243464194f9e2148adc905f811d46831dea55001 /src
parent8b46e74546fb643a5d272bb1edd8c909a4ee978d (diff)
downloadgnunet-82d0757e1908c04f76dd69016fbb7d538318f003.tar.gz
gnunet-82d0757e1908c04f76dd69016fbb7d538318f003.zip
convert download API to MQ
Diffstat (limited to 'src')
-rw-r--r--src/fs/fs_api.c62
-rw-r--r--src/fs/fs_api.h81
-rw-r--r--src/fs/fs_download.c474
-rw-r--r--src/fs/fs_search.c2
-rw-r--r--src/fs/fs_unindex.c7
5 files changed, 249 insertions, 377 deletions
diff --git a/src/fs/fs_api.c b/src/fs/fs_api.c
index 2db475528..0bc07183e 100644
--- a/src/fs/fs_api.c
+++ b/src/fs/fs_api.c
@@ -49,14 +49,8 @@
49static void 49static void
50start_job (struct GNUNET_FS_QueueEntry *qe) 50start_job (struct GNUNET_FS_QueueEntry *qe)
51{ 51{
52 GNUNET_assert (NULL == qe->client); 52 qe->active = GNUNET_YES;
53 qe->client = GNUNET_CLIENT_connect ("fs", qe->h->cfg); 53 qe->start (qe->cls);
54 if (NULL == qe->client)
55 {
56 GNUNET_break (0);
57 return;
58 }
59 qe->start (qe->cls, qe->client);
60 qe->start_times++; 54 qe->start_times++;
61 qe->h->active_blocks += qe->blocks; 55 qe->h->active_blocks += qe->blocks;
62 qe->h->active_downloads++; 56 qe->h->active_downloads++;
@@ -84,7 +78,7 @@ start_job (struct GNUNET_FS_QueueEntry *qe)
84static void 78static void
85stop_job (struct GNUNET_FS_QueueEntry *qe) 79stop_job (struct GNUNET_FS_QueueEntry *qe)
86{ 80{
87 qe->client = NULL; 81 qe->active = GNUNET_NO;
88 qe->stop (qe->cls); 82 qe->stop (qe->cls);
89 GNUNET_assert (0 < qe->h->active_downloads); 83 GNUNET_assert (0 < qe->h->active_downloads);
90 qe->h->active_downloads--; 84 qe->h->active_downloads--;
@@ -97,9 +91,13 @@ stop_job (struct GNUNET_FS_QueueEntry *qe)
97 "Stopping job %p (%u active)\n", 91 "Stopping job %p (%u active)\n",
98 qe, 92 qe,
99 qe->h->active_downloads); 93 qe->h->active_downloads);
100 GNUNET_CONTAINER_DLL_remove (qe->h->running_head, qe->h->running_tail, qe); 94 GNUNET_CONTAINER_DLL_remove (qe->h->running_head,
101 GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head, qe->h->pending_tail, 95 qe->h->running_tail,
102 qe->h->pending_tail, qe); 96 qe);
97 GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head,
98 qe->h->pending_tail,
99 qe->h->pending_tail,
100 qe);
103} 101}
104 102
105 103
@@ -328,8 +326,9 @@ process_job_queue (void *cls)
328 */ 326 */
329struct GNUNET_FS_QueueEntry * 327struct GNUNET_FS_QueueEntry *
330GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, 328GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h,
331 GNUNET_FS_QueueStart start, 329 GNUNET_SCHEDULER_TaskCallback start,
332 GNUNET_FS_QueueStop stop, void *cls, 330 GNUNET_SCHEDULER_TaskCallback stop,
331 void *cls,
333 unsigned int blocks, 332 unsigned int blocks,
334 enum GNUNET_FS_QueuePriority priority) 333 enum GNUNET_FS_QueuePriority priority)
335{ 334{
@@ -369,13 +368,16 @@ GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qe)
369 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 368 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
370 "Dequeueing job %p\n", 369 "Dequeueing job %p\n",
371 qe); 370 qe);
372 if (NULL != qe->client) 371 if (GNUNET_YES == qe->active)
373 stop_job (qe); 372 stop_job (qe);
374 GNUNET_CONTAINER_DLL_remove (h->pending_head, h->pending_tail, qe); 373 GNUNET_CONTAINER_DLL_remove (h->pending_head,
374 h->pending_tail,
375 qe);
375 GNUNET_free (qe); 376 GNUNET_free (qe);
376 if (NULL != h->queue_job) 377 if (NULL != h->queue_job)
377 GNUNET_SCHEDULER_cancel (h->queue_job); 378 GNUNET_SCHEDULER_cancel (h->queue_job);
378 h->queue_job = GNUNET_SCHEDULER_add_now (&process_job_queue, h); 379 h->queue_job = GNUNET_SCHEDULER_add_now (&process_job_queue,
380 h);
379} 381}
380 382
381 383
@@ -397,7 +399,9 @@ GNUNET_FS_make_top (struct GNUNET_FS_Handle *h,
397 ret = GNUNET_new (struct TopLevelActivity); 399 ret = GNUNET_new (struct TopLevelActivity);
398 ret->ssf = ssf; 400 ret->ssf = ssf;
399 ret->ssf_cls = ssf_cls; 401 ret->ssf_cls = ssf_cls;
400 GNUNET_CONTAINER_DLL_insert (h->top_head, h->top_tail, ret); 402 GNUNET_CONTAINER_DLL_insert (h->top_head,
403 h->top_tail,
404 ret);
401 return ret; 405 return ret;
402} 406}
403 407
@@ -412,7 +416,9 @@ void
412GNUNET_FS_end_top (struct GNUNET_FS_Handle *h, 416GNUNET_FS_end_top (struct GNUNET_FS_Handle *h,
413 struct TopLevelActivity *top) 417 struct TopLevelActivity *top)
414{ 418{
415 GNUNET_CONTAINER_DLL_remove (h->top_head, h->top_tail, top); 419 GNUNET_CONTAINER_DLL_remove (h->top_head,
420 h->top_tail,
421 top);
416 GNUNET_free (top); 422 GNUNET_free (top);
417} 423}
418 424
@@ -2531,8 +2537,7 @@ signal_download_resume (struct GNUNET_FS_DownloadContext *dc)
2531 signal_download_resume (dcc); 2537 signal_download_resume (dcc);
2532 dcc = dcc->next; 2538 dcc = dcc->next;
2533 } 2539 }
2534 if (NULL != dc->pending_head) 2540 GNUNET_FS_download_start_downloading_ (dc);
2535 GNUNET_FS_download_start_downloading_ (dc);
2536} 2541}
2537 2542
2538 2543
@@ -2815,12 +2820,16 @@ deserialize_download (struct GNUNET_FS_Handle *h,
2815 if (NULL != dn) 2820 if (NULL != dn)
2816 { 2821 {
2817 if (GNUNET_YES == GNUNET_DISK_directory_test (dn, GNUNET_YES)) 2822 if (GNUNET_YES == GNUNET_DISK_directory_test (dn, GNUNET_YES))
2818 GNUNET_DISK_directory_scan (dn, &deserialize_subdownload, dc); 2823 GNUNET_DISK_directory_scan (dn,
2824 &deserialize_subdownload,
2825 dc);
2819 GNUNET_free (dn); 2826 GNUNET_free (dn);
2820 } 2827 }
2821 if (NULL != parent) 2828 if (NULL != parent)
2822 { 2829 {
2823 GNUNET_CONTAINER_DLL_insert (parent->child_head, parent->child_tail, dc); 2830 GNUNET_CONTAINER_DLL_insert (parent->child_head,
2831 parent->child_tail,
2832 dc);
2824 } 2833 }
2825 if (NULL != search) 2834 if (NULL != search)
2826 { 2835 {
@@ -2830,11 +2839,14 @@ deserialize_download (struct GNUNET_FS_Handle *h,
2830 if ((NULL == parent) && (NULL == search)) 2839 if ((NULL == parent) && (NULL == search))
2831 { 2840 {
2832 dc->top = 2841 dc->top =
2833 GNUNET_FS_make_top (dc->h, &GNUNET_FS_download_signal_suspend_, dc); 2842 GNUNET_FS_make_top (dc->h,
2843 &GNUNET_FS_download_signal_suspend_,
2844 dc);
2834 signal_download_resume (dc); 2845 signal_download_resume (dc);
2835 } 2846 }
2836 GNUNET_free (uris); 2847 GNUNET_free (uris);
2837 dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, dc); 2848 dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_,
2849 dc);
2838 return; 2850 return;
2839cleanup: 2851cleanup:
2840 GNUNET_free_non_null (uris); 2852 GNUNET_free_non_null (uris);
diff --git a/src/fs/fs_api.h b/src/fs/fs_api.h
index 398de27fd..86219b3f8 100644
--- a/src/fs/fs_api.h
+++ b/src/fs/fs_api.h
@@ -410,27 +410,6 @@ struct GNUNET_FS_FileInformation
410 410
411 411
412/** 412/**
413 * The job is now ready to run and should use the given client
414 * handle to communicate with the FS service.
415 *
416 * @param cls closure
417 * @param client handle to use for FS communication
418 */
419typedef void
420(*GNUNET_FS_QueueStart) (void *cls,
421 struct GNUNET_CLIENT_Connection *client);
422
423
424/**
425 * The job must now stop to run and should destry the client handle as
426 * soon as possible (ideally prior to returning).
427 */
428typedef void
429(*GNUNET_FS_QueueStop) (void *cls);
430
431
432
433/**
434 * Priorities for the queue. 413 * Priorities for the queue.
435 */ 414 */
436enum GNUNET_FS_QueuePriority 415enum GNUNET_FS_QueuePriority
@@ -465,12 +444,12 @@ struct GNUNET_FS_QueueEntry
465 /** 444 /**
466 * Function to call when the job is started. 445 * Function to call when the job is started.
467 */ 446 */
468 GNUNET_FS_QueueStart start; 447 GNUNET_SCHEDULER_TaskCallback start;
469 448
470 /** 449 /**
471 * Function to call when the job needs to stop (or is done / dequeued). 450 * Function to call when the job needs to stop (or is done / dequeued).
472 */ 451 */
473 GNUNET_FS_QueueStop stop; 452 GNUNET_SCHEDULER_TaskCallback stop;
474 453
475 /** 454 /**
476 * Closure for start and stop. 455 * Closure for start and stop.
@@ -483,9 +462,9 @@ struct GNUNET_FS_QueueEntry
483 struct GNUNET_FS_Handle *h; 462 struct GNUNET_FS_Handle *h;
484 463
485 /** 464 /**
486 * Client handle, or NULL if job is not running. 465 * Message queue handle, or NULL if job is not running.
487 */ 466 */
488 struct GNUNET_CLIENT_Connection *client; 467 struct GNUNET_MQ_Handle *mq;
489 468
490 /** 469 /**
491 * Time the job was originally queued. 470 * Time the job was originally queued.
@@ -518,6 +497,11 @@ struct GNUNET_FS_QueueEntry
518 */ 497 */
519 unsigned int start_times; 498 unsigned int start_times;
520 499
500 /**
501 * #GNUNET_YES if the job is active now.
502 */
503 int active;
504
521}; 505};
522 506
523 507
@@ -658,8 +642,8 @@ struct GNUNET_FS_SearchResult
658 */ 642 */
659struct GNUNET_FS_QueueEntry * 643struct GNUNET_FS_QueueEntry *
660GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, 644GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h,
661 GNUNET_FS_QueueStart start, 645 GNUNET_SCHEDULER_TaskCallback start,
662 GNUNET_FS_QueueStop stop, 646 GNUNET_SCHEDULER_TaskCallback stop,
663 void *cls, 647 void *cls,
664 unsigned int blocks, 648 unsigned int blocks,
665 enum GNUNET_FS_QueuePriority priority); 649 enum GNUNET_FS_QueuePriority priority);
@@ -1221,7 +1205,7 @@ struct GNUNET_FS_PublishContext
1221 /** 1205 /**
1222 * Connection to FS service (only used for LOC URI signing). 1206 * Connection to FS service (only used for LOC URI signing).
1223 */ 1207 */
1224 struct GNUNET_CLIENT_Handle *fs_client; 1208 struct GNUNET_CLIENT_Connection *fs_client;
1225 1209
1226 /** 1210 /**
1227 * Our top-level activity entry (if we are top-level, otherwise NULL). 1211 * Our top-level activity entry (if we are top-level, otherwise NULL).
@@ -1255,7 +1239,7 @@ struct GNUNET_FS_PublishContext
1255 char *serialization; 1239 char *serialization;
1256 1240
1257 /** 1241 /**
1258 * Our own client handle for the FS service; only briefly used when 1242 * Our own message queue for the FS service; only briefly used when
1259 * we start to index a file, otherwise NULL. 1243 * we start to index a file, otherwise NULL.
1260 */ 1244 */
1261 struct GNUNET_CLIENT_Connection *client; 1245 struct GNUNET_CLIENT_Connection *client;
@@ -1740,15 +1724,6 @@ enum BlockRequestState
1740 */ 1724 */
1741struct DownloadRequest 1725struct DownloadRequest
1742{ 1726{
1743 /**
1744 * While pending, we keep all download requests in a doubly-linked list.
1745 */
1746 struct DownloadRequest *next;
1747
1748 /**
1749 * While pending, we keep all download requests in a doubly-linked list.
1750 */
1751 struct DownloadRequest *prev;
1752 1727
1753 /** 1728 /**
1754 * Parent in the CHK-tree. 1729 * Parent in the CHK-tree.
@@ -1774,7 +1749,7 @@ struct DownloadRequest
1774 uint64_t offset; 1749 uint64_t offset;
1775 1750
1776 /** 1751 /**
1777 * Number of entries in 'children' array. 1752 * Number of entries in @e children array.
1778 */ 1753 */
1779 unsigned int num_children; 1754 unsigned int num_children;
1780 1755
@@ -1793,11 +1768,6 @@ struct DownloadRequest
1793 */ 1768 */
1794 enum BlockRequestState state; 1769 enum BlockRequestState state;
1795 1770
1796 /**
1797 * #GNUNET_YES if this entry is in the pending list.
1798 */
1799 int is_pending;
1800
1801}; 1771};
1802 1772
1803 1773
@@ -1838,7 +1808,7 @@ struct GNUNET_FS_DownloadContext
1838 /** 1808 /**
1839 * Connection to the FS service. 1809 * Connection to the FS service.
1840 */ 1810 */
1841 struct GNUNET_CLIENT_Connection *client; 1811 struct GNUNET_MQ_Handle *mq;
1842 1812
1843 /** 1813 /**
1844 * Parent download (used when downloading files 1814 * Parent download (used when downloading files
@@ -1917,12 +1887,6 @@ struct GNUNET_FS_DownloadContext
1917 struct GNUNET_FS_QueueEntry *job_queue; 1887 struct GNUNET_FS_QueueEntry *job_queue;
1918 1888
1919 /** 1889 /**
1920 * Non-NULL if we are currently having a request for
1921 * transmission pending with the client handle.
1922 */
1923 struct GNUNET_CLIENT_TransmitHandle *th;
1924
1925 /**
1926 * Tree encoder used for the reconstruction. 1890 * Tree encoder used for the reconstruction.
1927 */ 1891 */
1928 struct GNUNET_FS_TreeEncoder *te; 1892 struct GNUNET_FS_TreeEncoder *te;
@@ -1940,16 +1904,6 @@ struct GNUNET_FS_DownloadContext
1940 struct GNUNET_CONTAINER_MultiHashMap *active; 1904 struct GNUNET_CONTAINER_MultiHashMap *active;
1941 1905
1942 /** 1906 /**
1943 * Head of linked list of pending requests.
1944 */
1945 struct DownloadRequest *pending_head;
1946
1947 /**
1948 * Head of linked list of pending requests.
1949 */
1950 struct DownloadRequest *pending_tail;
1951
1952 /**
1953 * Top-level download request. 1907 * Top-level download request.
1954 */ 1908 */
1955 struct DownloadRequest *top_request; 1909 struct DownloadRequest *top_request;
@@ -2029,11 +1983,6 @@ struct GNUNET_FS_DownloadContext
2029 int has_finished; 1983 int has_finished;
2030 1984
2031 /** 1985 /**
2032 * Have we started the receive continuation yet?
2033 */
2034 int in_receive;
2035
2036 /**
2037 * Are we ready to issue requests (reconstructions are finished)? 1986 * Are we ready to issue requests (reconstructions are finished)?
2038 */ 1987 */
2039 int issue_requests; 1988 int issue_requests;
diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c
index d89d70719..98c76882a 100644
--- a/src/fs/fs_download.c
+++ b/src/fs/fs_download.c
@@ -121,7 +121,7 @@ GNUNET_FS_download_make_status_ (struct GNUNET_FS_ProgressInfo *pi,
121 pi->value.download.anonymity = dc->anonymity; 121 pi->value.download.anonymity = dc->anonymity;
122 pi->value.download.eta = 122 pi->value.download.eta =
123 GNUNET_TIME_calculate_eta (dc->start_time, dc->completed, dc->length); 123 GNUNET_TIME_calculate_eta (dc->start_time, dc->completed, dc->length);
124 pi->value.download.is_active = (NULL == dc->client) ? GNUNET_NO : GNUNET_YES; 124 pi->value.download.is_active = (NULL == dc->mq) ? GNUNET_NO : GNUNET_YES;
125 pi->fsh = dc->h; 125 pi->fsh = dc->h;
126 if (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE)) 126 if (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE))
127 dc->client_info = dc->h->upcb (dc->h->upcb_cls, pi); 127 dc->client_info = dc->h->upcb (dc->h->upcb_cls, pi);
@@ -131,21 +131,6 @@ GNUNET_FS_download_make_status_ (struct GNUNET_FS_ProgressInfo *pi,
131 131
132 132
133/** 133/**
134 * We're ready to transmit a search request to the
135 * file-sharing service. Do it. If there is
136 * more than one request pending, try to send
137 * multiple or request another transmission.
138 *
139 * @param cls closure
140 * @param size number of bytes available in buf
141 * @param buf where the callee should write the message
142 * @return number of bytes written to buf
143 */
144static size_t
145transmit_download_request (void *cls, size_t size, void *buf);
146
147
148/**
149 * Closure for iterator processing results. 134 * Closure for iterator processing results.
150 */ 135 */
151struct ProcessResultClosure 136struct ProcessResultClosure
@@ -206,10 +191,11 @@ struct ProcessResultClosure
206 * @param cls closure (our 'struct ProcessResultClosure') 191 * @param cls closure (our 'struct ProcessResultClosure')
207 * @param key query for the given value / request 192 * @param key query for the given value / request
208 * @param value value in the hash map (a 'struct DownloadRequest') 193 * @param value value in the hash map (a 'struct DownloadRequest')
209 * @return GNUNET_YES (we should continue to iterate); unless serious error 194 * @return #GNUNET_YES (we should continue to iterate); unless serious error
210 */ 195 */
211static int 196static int
212process_result_with_request (void *cls, const struct GNUNET_HashCode * key, 197process_result_with_request (void *cls,
198 const struct GNUNET_HashCode * key,
213 void *value); 199 void *value);
214 200
215 201
@@ -722,6 +708,43 @@ try_top_down_reconstruction (struct GNUNET_FS_DownloadContext *dc,
722 708
723 709
724/** 710/**
711 * Add entries to the message queue.
712 *
713 * @param cls our download context
714 * @param key unused
715 * @param entry entry of type `struct DownloadRequest`
716 * @return #GNUNET_OK
717 */
718static int
719retry_entry (void *cls,
720 const struct GNUNET_HashCode *key,
721 void *entry)
722{
723 struct GNUNET_FS_DownloadContext *dc = cls;
724 struct DownloadRequest *dr = entry;
725 struct SearchMessage *sm;
726 struct GNUNET_MQ_Envelope *env;
727
728 env = GNUNET_MQ_msg (sm,
729 GNUNET_MESSAGE_TYPE_FS_START_SEARCH);
730 if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY))
731 sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY);
732 else
733 sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE);
734 if (0 == dr->depth)
735 sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK);
736 else
737 sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK);
738 sm->anonymity_level = htonl (dc->anonymity);
739 sm->target = dc->target;
740 sm->query = dr->chk.query;
741 GNUNET_MQ_send (dc->mq,
742 env);
743 return GNUNET_OK;
744}
745
746
747/**
725 * Schedule the download of the specified block in the tree. 748 * Schedule the download of the specified block in the tree.
726 * 749 *
727 * @param dc overall download this block belongs to 750 * @param dc overall download this block belongs to
@@ -763,25 +786,23 @@ schedule_block_download (struct GNUNET_FS_DownloadContext *dc,
763 } 786 }
764 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 787 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
765 "Scheduling download at offset %llu and depth %u for `%s'\n", 788 "Scheduling download at offset %llu and depth %u for `%s'\n",
766 (unsigned long long) dr->offset, dr->depth, 789 (unsigned long long) dr->offset,
790 dr->depth,
767 GNUNET_h2s (&dr->chk.query)); 791 GNUNET_h2s (&dr->chk.query));
768 if (GNUNET_NO != 792 if (GNUNET_NO !=
769 GNUNET_CONTAINER_multihashmap_contains_value (dc->active, &dr->chk.query, 793 GNUNET_CONTAINER_multihashmap_contains_value (dc->active,
794 &dr->chk.query,
770 dr)) 795 dr))
771 return; /* already active */ 796 return; /* already active */
772 GNUNET_CONTAINER_multihashmap_put (dc->active, &dr->chk.query, dr, 797 GNUNET_CONTAINER_multihashmap_put (dc->active,
798 &dr->chk.query,
799 dr,
773 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); 800 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
774 if (NULL == dc->client) 801 if (NULL == dc->mq)
775 return; /* download not active */ 802 return; /* download not active */
776 GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr); 803 retry_entry (dc,
777 dr->is_pending = GNUNET_YES; 804 &dr->chk.query,
778 if (NULL == dc->th) 805 dr);
779 dc->th =
780 GNUNET_CLIENT_notify_transmit_ready (dc->client,
781 sizeof (struct SearchMessage),
782 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
783 GNUNET_NO,
784 &transmit_download_request, dc);
785} 806}
786 807
787 808
@@ -947,13 +968,14 @@ GNUNET_FS_free_download_request_ (struct DownloadRequest *dr)
947 * Iterator over entries in the pending requests in the 'active' map for the 968 * Iterator over entries in the pending requests in the 'active' map for the
948 * reply that we just got. 969 * reply that we just got.
949 * 970 *
950 * @param cls closure (our 'struct ProcessResultClosure') 971 * @param cls closure (our `struct ProcessResultClosure`)
951 * @param key query for the given value / request 972 * @param key query for the given value / request
952 * @param value value in the hash map (a 'struct DownloadRequest') 973 * @param value value in the hash map (a `struct DownloadRequest`)
953 * @return #GNUNET_YES (we should continue to iterate); unless serious error 974 * @return #GNUNET_YES (we should continue to iterate); unless serious error
954 */ 975 */
955static int 976static int
956process_result_with_request (void *cls, const struct GNUNET_HashCode * key, 977process_result_with_request (void *cls,
978 const struct GNUNET_HashCode *key,
957 void *value) 979 void *value)
958{ 980{
959 struct ProcessResultClosure *prc = cls; 981 struct ProcessResultClosure *prc = cls;
@@ -974,7 +996,9 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key,
974 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 996 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975 "Received %u byte block `%s' matching pending request at depth %u and offset %llu/%llu\n", 997 "Received %u byte block `%s' matching pending request at depth %u and offset %llu/%llu\n",
976 (unsigned int) prc->size, 998 (unsigned int) prc->size,
977 GNUNET_h2s (key), dr->depth, (unsigned long long) dr->offset, 999 GNUNET_h2s (key),
1000 dr->depth,
1001 (unsigned long long) dr->offset,
978 (unsigned long long) GNUNET_ntohll (dc->uri->data. 1002 (unsigned long long) GNUNET_ntohll (dc->uri->data.
979 chk.file_length)); 1003 chk.file_length));
980 bs = GNUNET_FS_tree_calculate_block_size (GNUNET_ntohll 1004 bs = GNUNET_FS_tree_calculate_block_size (GNUNET_ntohll
@@ -999,15 +1023,17 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key,
999 goto signal_error; 1023 goto signal_error;
1000 } 1024 }
1001 1025
1002 (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &prc->query, dr); 1026 (void) GNUNET_CONTAINER_multihashmap_remove (dc->active,
1003 if (GNUNET_YES == dr->is_pending) 1027 &prc->query,
1004 { 1028 dr);
1005 GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr); 1029 GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key,
1006 dr->is_pending = GNUNET_NO; 1030 &skey,
1007 } 1031 &iv);
1008 1032 if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data,
1009 GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key, &skey, &iv); 1033 prc->size,
1010 if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data, prc->size, &skey, &iv, pt)) 1034 &skey,
1035 &iv,
1036 pt))
1011 { 1037 {
1012 GNUNET_break (0); 1038 GNUNET_break (0);
1013 dc->emsg = GNUNET_strdup (_("internal error decrypting content")); 1039 dc->emsg = GNUNET_strdup (_("internal error decrypting content"));
@@ -1015,7 +1041,8 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key,
1015 } 1041 }
1016 off = 1042 off =
1017 compute_disk_offset (GNUNET_ntohll (dc->uri->data.chk.file_length), 1043 compute_disk_offset (GNUNET_ntohll (dc->uri->data.chk.file_length),
1018 dr->offset, dr->depth); 1044 dr->offset,
1045 dr->depth);
1019 /* save to disk */ 1046 /* save to disk */
1020 if ((GNUNET_YES == prc->do_store) && 1047 if ((GNUNET_YES == prc->do_store) &&
1021 ((NULL != dc->filename) || (is_recursive_download (dc))) && 1048 ((NULL != dc->filename) || (is_recursive_download (dc))) &&
@@ -1040,21 +1067,25 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key,
1040 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1067 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041 "Saving decrypted block to disk at offset %llu\n", 1068 "Saving decrypted block to disk at offset %llu\n",
1042 (unsigned long long) off); 1069 (unsigned long long) off);
1043 if ((off != GNUNET_DISK_file_seek (fh, off, GNUNET_DISK_SEEK_SET))) 1070 if ((off != GNUNET_DISK_file_seek (fh,
1071 off,
1072 GNUNET_DISK_SEEK_SET)))
1044 { 1073 {
1045 GNUNET_asprintf (&dc->emsg, 1074 GNUNET_asprintf (&dc->emsg,
1046 _("Failed to seek to offset %llu in file `%s': %s"), 1075 _("Failed to seek to offset %llu in file `%s': %s"),
1047 (unsigned long long) off, dc->filename, 1076 (unsigned long long) off,
1077 dc->filename,
1048 STRERROR (errno)); 1078 STRERROR (errno));
1049 goto signal_error; 1079 goto signal_error;
1050 } 1080 }
1051 if (prc->size != GNUNET_DISK_file_write (fh, pt, prc->size)) 1081 if (prc->size != GNUNET_DISK_file_write (fh, pt, prc->size))
1052 { 1082 {
1053 GNUNET_asprintf (&dc->emsg, 1083 GNUNET_asprintf (&dc->emsg,
1054 _ 1084 _("Failed to write block of %u bytes at offset %llu in file `%s': %s"),
1055 ("Failed to write block of %u bytes at offset %llu in file `%s': %s"), 1085 (unsigned int) prc->size,
1056 (unsigned int) prc->size, (unsigned long long) off, 1086 (unsigned long long) off,
1057 dc->filename, STRERROR (errno)); 1087 dc->filename,
1088 STRERROR (errno));
1058 goto signal_error; 1089 goto signal_error;
1059 } 1090 }
1060 GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh)); 1091 GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh));
@@ -1193,15 +1224,8 @@ signal_error:
1193 pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR; 1224 pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR;
1194 pi.value.download.specifics.error.message = dc->emsg; 1225 pi.value.download.specifics.error.message = dc->emsg;
1195 GNUNET_FS_download_make_status_ (&pi, dc); 1226 GNUNET_FS_download_make_status_ (&pi, dc);
1196 /* abort all pending requests */ 1227 GNUNET_MQ_destroy (dc->mq);
1197 if (NULL != dc->th) 1228 dc->mq = NULL;
1198 {
1199 GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
1200 dc->th = NULL;
1201 }
1202 GNUNET_CLIENT_disconnect (dc->client);
1203 dc->in_receive = GNUNET_NO;
1204 dc->client = NULL;
1205 GNUNET_FS_free_download_request_ (dc->top_request); 1229 GNUNET_FS_free_download_request_ (dc->top_request);
1206 dc->top_request = NULL; 1230 dc->top_request = NULL;
1207 GNUNET_CONTAINER_multihashmap_destroy (dc->active); 1231 GNUNET_CONTAINER_multihashmap_destroy (dc->active);
@@ -1211,49 +1235,24 @@ signal_error:
1211 GNUNET_FS_dequeue_ (dc->job_queue); 1235 GNUNET_FS_dequeue_ (dc->job_queue);
1212 dc->job_queue = NULL; 1236 dc->job_queue = NULL;
1213 } 1237 }
1214 dc->pending_head = NULL;
1215 dc->pending_tail = NULL;
1216 GNUNET_FS_download_sync_ (dc); 1238 GNUNET_FS_download_sync_ (dc);
1217 return GNUNET_NO; 1239 return GNUNET_NO;
1218} 1240}
1219 1241
1220 1242
1221/** 1243/**
1222 * Process a download result. 1244 * Type of a function to call when we check the PUT message
1245 * from the service.
1223 * 1246 *
1224 * @param dc our download context 1247 * @param cls closure
1225 * @param type type of the result 1248 * @param msg message received
1226 * @param respect_offered how much respect did we offer to get this reply?
1227 * @param num_transmissions how often did we transmit the query?
1228 * @param last_transmission when was this block requested the last time? (FOREVER if unknown/not applicable)
1229 * @param data the (encrypted) response
1230 * @param size size of data
1231 */ 1249 */
1232static void 1250static int
1233process_result (struct GNUNET_FS_DownloadContext *dc, 1251check_put (void *cls,
1234 enum GNUNET_BLOCK_Type type, 1252 const struct ClientPutMessage *cm)
1235 uint32_t respect_offered,
1236 uint32_t num_transmissions,
1237 struct GNUNET_TIME_Absolute last_transmission,
1238 const void *data, size_t size)
1239{ 1253{
1240 struct ProcessResultClosure prc; 1254 /* any varsize length is OK */
1241 1255 return GNUNET_OK;
1242 prc.dc = dc;
1243 prc.data = data;
1244 prc.last_transmission = last_transmission;
1245 prc.size = size;
1246 prc.type = type;
1247 prc.do_store = GNUNET_YES;
1248 prc.respect_offered = respect_offered;
1249 prc.num_transmissions = num_transmissions;
1250 GNUNET_CRYPTO_hash (data, size, &prc.query);
1251 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1252 "Received result for query `%s' from `%s'-service\n",
1253 GNUNET_h2s (&prc.query), "FS");
1254 GNUNET_CONTAINER_multihashmap_get_multiple (dc->active, &prc.query,
1255 &process_result_with_request,
1256 &prc);
1257} 1256}
1258 1257
1259 1258
@@ -1262,109 +1261,59 @@ process_result (struct GNUNET_FS_DownloadContext *dc,
1262 * from the service. 1261 * from the service.
1263 * 1262 *
1264 * @param cls closure 1263 * @param cls closure
1265 * @param msg message received, NULL on timeout or fatal error 1264 * @param msg message received
1266 */ 1265 */
1267static void 1266static void
1268receive_results (void *cls, const struct GNUNET_MessageHeader *msg) 1267handle_put (void *cls,
1268 const struct ClientPutMessage *cm)
1269{ 1269{
1270 struct GNUNET_FS_DownloadContext *dc = cls; 1270 struct GNUNET_FS_DownloadContext *dc = cls;
1271 const struct ClientPutMessage *cm; 1271 uint16_t msize = ntohs (cm->header.size) - sizeof (*cm);
1272 uint16_t msize; 1272 struct ProcessResultClosure prc;
1273 1273
1274 if ((NULL == msg) || (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_PUT) || 1274 prc.dc = dc;
1275 (sizeof (struct ClientPutMessage) > ntohs (msg->size))) 1275 prc.data = &cm[1];
1276 { 1276 prc.last_transmission = GNUNET_TIME_absolute_ntoh (cm->last_transmission);
1277 GNUNET_break (NULL == msg); 1277 prc.size = msize;
1278 try_reconnect (dc); 1278 prc.type = ntohl (cm->type);
1279 return; 1279 prc.do_store = GNUNET_YES;
1280 } 1280 prc.respect_offered = ntohl (cm->respect_offered);
1281 msize = ntohs (msg->size); 1281 prc.num_transmissions = ntohl (cm->num_transmissions);
1282 cm = (const struct ClientPutMessage *) msg; 1282 GNUNET_CRYPTO_hash (prc.data,
1283 process_result (dc, ntohl (cm->type), 1283 msize,
1284 ntohl (cm->respect_offered), 1284 &prc.query);
1285 ntohl (cm->num_transmissions), 1285 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1286 GNUNET_TIME_absolute_ntoh (cm->last_transmission), &cm[1], 1286 "Received result for query `%s' from FS service\n",
1287 msize - sizeof (struct ClientPutMessage)); 1287 GNUNET_h2s (&prc.query));
1288 if (NULL == dc->client) 1288 GNUNET_CONTAINER_multihashmap_get_multiple (dc->active,
1289 return; /* fatal error */ 1289 &prc.query,
1290 /* continue receiving */ 1290 &process_result_with_request,
1291 GNUNET_CLIENT_receive (dc->client, &receive_results, dc, 1291 &prc);
1292 GNUNET_TIME_UNIT_FOREVER_REL);
1293} 1292}
1294 1293
1295 1294
1296/** 1295/**
1297 * We're ready to transmit a search request to the 1296 * Generic error handler, called with the appropriate error code and
1298 * file-sharing service. Do it. If there is 1297 * the same closure specified at the creation of the message queue.
1299 * more than one request pending, try to send 1298 * Not every message queue implementation supports an error handler.
1300 * multiple or request another transmission.
1301 * 1299 *
1302 * @param cls closure 1300 * @param cls closure with the `struct GNUNET_FS_DownloadContext *`
1303 * @param size number of bytes available in buf 1301 * @param error error code
1304 * @param buf where the callee should write the message
1305 * @return number of bytes written to buf
1306 */ 1302 */
1307static size_t 1303static void
1308transmit_download_request (void *cls, size_t size, void *buf) 1304download_mq_error_handler (void *cls,
1305 enum GNUNET_MQ_Error error)
1309{ 1306{
1310 struct GNUNET_FS_DownloadContext *dc = cls; 1307 struct GNUNET_FS_DownloadContext *dc = cls;
1311 size_t msize;
1312 struct SearchMessage *sm;
1313 struct DownloadRequest *dr;
1314 1308
1315 dc->th = NULL; 1309 if (NULL != dc->mq)
1316 if (NULL == buf)
1317 { 1310 {
1318 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1311 GNUNET_MQ_destroy (dc->mq);
1319 "Transmitting download request failed, trying to reconnect\n"); 1312 dc->mq = NULL;
1320 try_reconnect (dc);
1321 return 0;
1322 } 1313 }
1323 GNUNET_assert (size >= sizeof (struct SearchMessage)); 1314 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1324 msize = 0; 1315 "Transmitting download request failed, trying to reconnect\n");
1325 sm = buf; 1316 try_reconnect (dc);
1326 while ((NULL != (dr = dc->pending_head)) &&
1327 (size >= msize + sizeof (struct SearchMessage)))
1328 {
1329 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1330 "Transmitting download request for `%s' to `%s'-service\n",
1331 GNUNET_h2s (&dr->chk.query), "FS");
1332 memset (sm, 0, sizeof (struct SearchMessage));
1333 sm->header.size = htons (sizeof (struct SearchMessage));
1334 sm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_START_SEARCH);
1335 if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY))
1336 sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY);
1337 else
1338 sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE);
1339 if (0 == dr->depth)
1340 sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK);
1341 else
1342 sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK);
1343 sm->anonymity_level = htonl (dc->anonymity);
1344 sm->target = dc->target;
1345 sm->query = dr->chk.query;
1346 GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr);
1347 dr->is_pending = GNUNET_NO;
1348 msize += sizeof (struct SearchMessage);
1349 sm++;
1350 }
1351 if (NULL != dc->pending_head)
1352 {
1353 dc->th =
1354 GNUNET_CLIENT_notify_transmit_ready (dc->client,
1355 sizeof (struct SearchMessage),
1356 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1357 GNUNET_NO,
1358 &transmit_download_request, dc);
1359 GNUNET_assert (NULL != dc->th);
1360 }
1361 if (GNUNET_NO == dc->in_receive)
1362 {
1363 dc->in_receive = GNUNET_YES;
1364 GNUNET_CLIENT_receive (dc->client, &receive_results, dc,
1365 GNUNET_TIME_UNIT_FOREVER_REL);
1366 }
1367 return msize;
1368} 1317}
1369 1318
1370 1319
@@ -1376,51 +1325,31 @@ transmit_download_request (void *cls, size_t size, void *buf)
1376static void 1325static void
1377do_reconnect (void *cls) 1326do_reconnect (void *cls)
1378{ 1327{
1328 GNUNET_MQ_hd_var_size (put,
1329 GNUNET_MESSAGE_TYPE_FS_PUT,
1330 struct ClientPutMessage);
1379 struct GNUNET_FS_DownloadContext *dc = cls; 1331 struct GNUNET_FS_DownloadContext *dc = cls;
1380 struct GNUNET_CLIENT_Connection *client; 1332 struct GNUNET_MQ_MessageHandler handlers[] = {
1333 make_put_handler (dc),
1334 GNUNET_MQ_handler_end ()
1335 };
1381 1336
1382 dc->task = NULL; 1337 dc->task = NULL;
1383 client = GNUNET_CLIENT_connect ("fs", dc->h->cfg); 1338 dc->mq = GNUNET_CLIENT_connecT (dc->h->cfg,
1384 if (NULL == client) 1339 "fs",
1340 handlers,
1341 &download_mq_error_handler,
1342 dc);
1343 if (NULL == dc->mq)
1385 { 1344 {
1386 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1345 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1387 "Connecting to `%s'-service failed, will try again.\n", "FS"); 1346 "Connecting to `%s'-service failed, will try again.\n", "FS");
1388 try_reconnect (dc); 1347 try_reconnect (dc);
1389 return; 1348 return;
1390 } 1349 }
1391 dc->client = client; 1350 GNUNET_CONTAINER_multihashmap_iterate (dc->active,
1392 if (NULL != dc->pending_head) 1351 &retry_entry,
1393 { 1352 dc);
1394 dc->th =
1395 GNUNET_CLIENT_notify_transmit_ready (client,
1396 sizeof (struct SearchMessage),
1397 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1398 GNUNET_NO,
1399 &transmit_download_request, dc);
1400 GNUNET_assert (NULL != dc->th);
1401 }
1402}
1403
1404
1405/**
1406 * Add entries to the pending list.
1407 *
1408 * @param cls our download context
1409 * @param key unused
1410 * @param entry entry of type "struct DownloadRequest"
1411 * @return GNUNET_OK
1412 */
1413static int
1414retry_entry (void *cls, const struct GNUNET_HashCode * key, void *entry)
1415{
1416 struct GNUNET_FS_DownloadContext *dc = cls;
1417 struct DownloadRequest *dr = entry;
1418
1419 dr->next = NULL;
1420 dr->prev = NULL;
1421 GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr);
1422 dr->is_pending = GNUNET_YES;
1423 return GNUNET_OK;
1424} 1353}
1425 1354
1426 1355
@@ -1435,30 +1364,22 @@ static void
1435try_reconnect (struct GNUNET_FS_DownloadContext *dc) 1364try_reconnect (struct GNUNET_FS_DownloadContext *dc)
1436{ 1365{
1437 1366
1438 if (NULL != dc->client) 1367 if (NULL != dc->mq)
1439 { 1368 {
1440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1369 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441 "Moving all requests back to pending list\n"); 1370 "Moving all requests back to pending list\n");
1442 if (NULL != dc->th) 1371 GNUNET_MQ_destroy (dc->mq);
1443 { 1372 dc->mq = NULL;
1444 GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
1445 dc->th = NULL;
1446 }
1447 /* full reset of the pending list */
1448 dc->pending_head = NULL;
1449 dc->pending_tail = NULL;
1450 GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc);
1451 GNUNET_CLIENT_disconnect (dc->client);
1452 dc->in_receive = GNUNET_NO;
1453 dc->client = NULL;
1454 } 1373 }
1455 if (0 == dc->reconnect_backoff.rel_value_us) 1374 if (0 == dc->reconnect_backoff.rel_value_us)
1456 dc->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; 1375 dc->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1457 else 1376 else
1458 dc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (dc->reconnect_backoff); 1377 dc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (dc->reconnect_backoff);
1459 1378
1460 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Will try to reconnect in %s\n", 1379 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1461 GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff, GNUNET_YES)); 1380 "Will try to reconnect in %s\n",
1381 GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff,
1382 GNUNET_YES));
1462 dc->task = 1383 dc->task =
1463 GNUNET_SCHEDULER_add_delayed (dc->reconnect_backoff, 1384 GNUNET_SCHEDULER_add_delayed (dc->reconnect_backoff,
1464 &do_reconnect, 1385 &do_reconnect,
@@ -1470,37 +1391,23 @@ try_reconnect (struct GNUNET_FS_DownloadContext *dc)
1470 * We're allowed to ask the FS service for our blocks. Start the download. 1391 * We're allowed to ask the FS service for our blocks. Start the download.
1471 * 1392 *
1472 * @param cls the 'struct GNUNET_FS_DownloadContext' 1393 * @param cls the 'struct GNUNET_FS_DownloadContext'
1473 * @param client handle to use for communcation with FS (we must destroy it!) 1394 * @param mq handle to use for communcation with FS (we must destroy it!)
1474 */ 1395 */
1475static void 1396static void
1476activate_fs_download (void *cls, struct GNUNET_CLIENT_Connection *client) 1397activate_fs_download (void *cls)
1477{ 1398{
1478 struct GNUNET_FS_DownloadContext *dc = cls; 1399 struct GNUNET_FS_DownloadContext *dc = cls;
1479 struct GNUNET_FS_ProgressInfo pi; 1400 struct GNUNET_FS_ProgressInfo pi;
1480 1401
1481 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download activated\n"); 1402 GNUNET_assert (NULL == dc->mq);
1482 GNUNET_assert (NULL != client);
1483 GNUNET_assert (NULL == dc->client);
1484 GNUNET_assert (NULL == dc->th);
1485 GNUNET_assert (NULL != dc->active); 1403 GNUNET_assert (NULL != dc->active);
1486 dc->client = client; 1404 do_reconnect (dc);
1405 if (NULL != dc->mq)
1406 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407 "Download activated\n");
1487 pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE; 1408 pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE;
1488 GNUNET_FS_download_make_status_ (&pi, dc); 1409 GNUNET_FS_download_make_status_ (&pi,
1489 dc->pending_head = NULL; 1410 dc);
1490 dc->pending_tail = NULL;
1491 GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc);
1492 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493 "Asking for transmission to FS service\n");
1494 if (NULL != dc->pending_head)
1495 {
1496 dc->th =
1497 GNUNET_CLIENT_notify_transmit_ready (dc->client,
1498 sizeof (struct SearchMessage),
1499 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1500 GNUNET_NO,
1501 &transmit_download_request, dc);
1502 GNUNET_assert (NULL != dc->th);
1503 }
1504} 1411}
1505 1412
1506 1413
@@ -1515,22 +1422,16 @@ deactivate_fs_download (void *cls)
1515 struct GNUNET_FS_DownloadContext *dc = cls; 1422 struct GNUNET_FS_DownloadContext *dc = cls;
1516 struct GNUNET_FS_ProgressInfo pi; 1423 struct GNUNET_FS_ProgressInfo pi;
1517 1424
1518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download deactivated\n"); 1425 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1519 if (NULL != dc->th) 1426 "Download deactivated\n");
1520 { 1427 if (NULL != dc->mq)
1521 GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th);
1522 dc->th = NULL;
1523 }
1524 if (NULL != dc->client)
1525 { 1428 {
1526 GNUNET_CLIENT_disconnect (dc->client); 1429 GNUNET_MQ_destroy (dc->mq);
1527 dc->in_receive = GNUNET_NO; 1430 dc->mq = NULL;
1528 dc->client = NULL;
1529 } 1431 }
1530 dc->pending_head = NULL;
1531 dc->pending_tail = NULL;
1532 pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE; 1432 pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE;
1533 GNUNET_FS_download_make_status_ (&pi, dc); 1433 GNUNET_FS_download_make_status_ (&pi,
1434 dc);
1534} 1435}
1535 1436
1536 1437
@@ -1557,7 +1458,8 @@ static struct DownloadRequest *
1557create_download_request (struct DownloadRequest *parent, 1458create_download_request (struct DownloadRequest *parent,
1558 unsigned int chk_idx, 1459 unsigned int chk_idx,
1559 unsigned int depth, 1460 unsigned int depth,
1560 uint64_t dr_offset, uint64_t file_start_offset, 1461 uint64_t dr_offset,
1462 uint64_t file_start_offset,
1561 uint64_t desired_length) 1463 uint64_t desired_length)
1562{ 1464{
1563 struct DownloadRequest *dr; 1465 struct DownloadRequest *dr;
@@ -1746,13 +1648,9 @@ reconstruct_cb (void *cls, const struct ContentHashKey *chk, uint64_t offset,
1746 /* block matches, hence tree below matches; 1648 /* block matches, hence tree below matches;
1747 * this request is done! */ 1649 * this request is done! */
1748 dr->state = BRS_DOWNLOAD_UP; 1650 dr->state = BRS_DOWNLOAD_UP;
1749 (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &dr->chk.query, dr); 1651 (void) GNUNET_CONTAINER_multihashmap_remove (dc->active,
1750 if (GNUNET_YES == dr->is_pending) 1652 &dr->chk.query,
1751 { 1653 dr);
1752 GNUNET_break (0); /* how did we get here? */
1753 GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr);
1754 dr->is_pending = GNUNET_NO;
1755 }
1756 /* calculate how many bytes of payload this block 1654 /* calculate how many bytes of payload this block
1757 * corresponds to */ 1655 * corresponds to */
1758 blen = GNUNET_FS_tree_compute_tree_size (dr->depth); 1656 blen = GNUNET_FS_tree_compute_tree_size (dr->depth);
@@ -1860,7 +1758,8 @@ GNUNET_FS_download_start_task_ (void *cls)
1860 struct GNUNET_FS_ProgressInfo pi; 1758 struct GNUNET_FS_ProgressInfo pi;
1861 struct GNUNET_DISK_FileHandle *fh; 1759 struct GNUNET_DISK_FileHandle *fh;
1862 1760
1863 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Start task running...\n"); 1761 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1762 "Start task running...\n");
1864 dc->task = NULL; 1763 dc->task = NULL;
1865 if (0 == dc->length) 1764 if (0 == dc->length)
1866 { 1765 {
@@ -1978,8 +1877,10 @@ GNUNET_FS_download_start_task_ (void *cls)
1978 dc->te = 1877 dc->te =
1979 GNUNET_FS_tree_encoder_create (dc->h, 1878 GNUNET_FS_tree_encoder_create (dc->h,
1980 GNUNET_FS_uri_chk_get_file_size (dc->uri), 1879 GNUNET_FS_uri_chk_get_file_size (dc->uri),
1981 dc, &fh_reader, 1880 dc,
1982 &reconstruct_cb, NULL, 1881 &fh_reader,
1882 &reconstruct_cb,
1883 NULL,
1983 &reconstruct_cont); 1884 &reconstruct_cont);
1984 dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc); 1885 dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc);
1985 } 1886 }
@@ -2079,9 +1980,13 @@ struct GNUNET_FS_DownloadContext *
2079create_download_context (struct GNUNET_FS_Handle *h, 1980create_download_context (struct GNUNET_FS_Handle *h,
2080 const struct GNUNET_FS_Uri *uri, 1981 const struct GNUNET_FS_Uri *uri,
2081 const struct GNUNET_CONTAINER_MetaData *meta, 1982 const struct GNUNET_CONTAINER_MetaData *meta,
2082 const char *filename, const char *tempname, 1983 const char *filename,
2083 uint64_t offset, uint64_t length, uint32_t anonymity, 1984 const char *tempname,
2084 enum GNUNET_FS_DownloadOptions options, void *cctx) 1985 uint64_t offset,
1986 uint64_t length,
1987 uint32_t anonymity,
1988 enum GNUNET_FS_DownloadOptions options,
1989 void *cctx)
2085{ 1990{
2086 struct GNUNET_FS_DownloadContext *dc; 1991 struct GNUNET_FS_DownloadContext *dc;
2087 1992
@@ -2132,7 +2037,8 @@ create_download_context (struct GNUNET_FS_Handle *h,
2132 filename, 2037 filename,
2133 (unsigned long long) length, 2038 (unsigned long long) length,
2134 dc->treedepth); 2039 dc->treedepth);
2135 dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, dc); 2040 dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_,
2041 dc);
2136 return dc; 2042 return dc;
2137} 2043}
2138 2044
@@ -2290,6 +2196,8 @@ GNUNET_FS_download_start_downloading_ (struct GNUNET_FS_DownloadContext *dc)
2290{ 2196{
2291 if (dc->completed == dc->length) 2197 if (dc->completed == dc->length)
2292 return; 2198 return;
2199 if (NULL != dc->mq)
2200 return; /* already running */
2293 GNUNET_assert (NULL == dc->job_queue); 2201 GNUNET_assert (NULL == dc->job_queue);
2294 GNUNET_assert (NULL != dc->active); 2202 GNUNET_assert (NULL != dc->active);
2295 dc->job_queue = 2203 dc->job_queue =
diff --git a/src/fs/fs_search.c b/src/fs/fs_search.c
index f3221ac76..9a1b822e1 100644
--- a/src/fs/fs_search.c
+++ b/src/fs/fs_search.c
@@ -368,7 +368,7 @@ probe_ping_task_cb (void *cls)
368 struct GNUNET_FS_SearchResult *sr; 368 struct GNUNET_FS_SearchResult *sr;
369 369
370 for (sr = h->probes_head; NULL != sr; sr = sr->next) 370 for (sr = h->probes_head; NULL != sr; sr = sr->next)
371 if (NULL != sr->probe_ctx->client) 371 if (NULL != sr->probe_ctx->mq)
372 signal_probe_result (sr); 372 signal_probe_result (sr);
373 h->probe_ping_task 373 h->probe_ping_task
374 = GNUNET_SCHEDULER_add_delayed (GNUNET_FS_PROBE_UPDATE_FREQUENCY, 374 = GNUNET_SCHEDULER_add_delayed (GNUNET_FS_PROBE_UPDATE_FREQUENCY,
diff --git a/src/fs/fs_unindex.c b/src/fs/fs_unindex.c
index e614fbe03..f43725a59 100644
--- a/src/fs/fs_unindex.c
+++ b/src/fs/fs_unindex.c
@@ -110,8 +110,11 @@ GNUNET_FS_unindex_make_status_ (struct GNUNET_FS_ProgressInfo *pi,
110 * @param depth depth of the block in the tree, 0 for DBLOCK 110 * @param depth depth of the block in the tree, 0 for DBLOCK
111 */ 111 */
112static void 112static void
113unindex_progress (void *cls, uint64_t offset, const void *pt_block, 113unindex_progress (void *cls,
114 size_t pt_size, unsigned int depth) 114 uint64_t offset,
115 const void *pt_block,
116 size_t pt_size,
117 unsigned int depth)
115{ 118{
116 struct GNUNET_FS_UnindexContext *uc = cls; 119 struct GNUNET_FS_UnindexContext *uc = cls;
117 struct GNUNET_FS_ProgressInfo pi; 120 struct GNUNET_FS_ProgressInfo pi;