diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-07-03 12:05:57 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-07-03 12:05:57 +0000 |
commit | 82d0757e1908c04f76dd69016fbb7d538318f003 (patch) | |
tree | 243464194f9e2148adc905f811d46831dea55001 /src | |
parent | 8b46e74546fb643a5d272bb1edd8c909a4ee978d (diff) | |
download | gnunet-82d0757e1908c04f76dd69016fbb7d538318f003.tar.gz gnunet-82d0757e1908c04f76dd69016fbb7d538318f003.zip |
convert download API to MQ
Diffstat (limited to 'src')
-rw-r--r-- | src/fs/fs_api.c | 62 | ||||
-rw-r--r-- | src/fs/fs_api.h | 81 | ||||
-rw-r--r-- | src/fs/fs_download.c | 474 | ||||
-rw-r--r-- | src/fs/fs_search.c | 2 | ||||
-rw-r--r-- | src/fs/fs_unindex.c | 7 |
5 files changed, 249 insertions, 377 deletions
diff --git a/src/fs/fs_api.c b/src/fs/fs_api.c index 2db475528..0bc07183e 100644 --- a/src/fs/fs_api.c +++ b/src/fs/fs_api.c | |||
@@ -49,14 +49,8 @@ | |||
49 | static void | 49 | static void |
50 | start_job (struct GNUNET_FS_QueueEntry *qe) | 50 | start_job (struct GNUNET_FS_QueueEntry *qe) |
51 | { | 51 | { |
52 | GNUNET_assert (NULL == qe->client); | 52 | qe->active = GNUNET_YES; |
53 | qe->client = GNUNET_CLIENT_connect ("fs", qe->h->cfg); | 53 | qe->start (qe->cls); |
54 | if (NULL == qe->client) | ||
55 | { | ||
56 | GNUNET_break (0); | ||
57 | return; | ||
58 | } | ||
59 | qe->start (qe->cls, qe->client); | ||
60 | qe->start_times++; | 54 | qe->start_times++; |
61 | qe->h->active_blocks += qe->blocks; | 55 | qe->h->active_blocks += qe->blocks; |
62 | qe->h->active_downloads++; | 56 | qe->h->active_downloads++; |
@@ -84,7 +78,7 @@ start_job (struct GNUNET_FS_QueueEntry *qe) | |||
84 | static void | 78 | static void |
85 | stop_job (struct GNUNET_FS_QueueEntry *qe) | 79 | stop_job (struct GNUNET_FS_QueueEntry *qe) |
86 | { | 80 | { |
87 | qe->client = NULL; | 81 | qe->active = GNUNET_NO; |
88 | qe->stop (qe->cls); | 82 | qe->stop (qe->cls); |
89 | GNUNET_assert (0 < qe->h->active_downloads); | 83 | GNUNET_assert (0 < qe->h->active_downloads); |
90 | qe->h->active_downloads--; | 84 | qe->h->active_downloads--; |
@@ -97,9 +91,13 @@ stop_job (struct GNUNET_FS_QueueEntry *qe) | |||
97 | "Stopping job %p (%u active)\n", | 91 | "Stopping job %p (%u active)\n", |
98 | qe, | 92 | qe, |
99 | qe->h->active_downloads); | 93 | qe->h->active_downloads); |
100 | GNUNET_CONTAINER_DLL_remove (qe->h->running_head, qe->h->running_tail, qe); | 94 | GNUNET_CONTAINER_DLL_remove (qe->h->running_head, |
101 | GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head, qe->h->pending_tail, | 95 | qe->h->running_tail, |
102 | qe->h->pending_tail, qe); | 96 | qe); |
97 | GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head, | ||
98 | qe->h->pending_tail, | ||
99 | qe->h->pending_tail, | ||
100 | qe); | ||
103 | } | 101 | } |
104 | 102 | ||
105 | 103 | ||
@@ -328,8 +326,9 @@ process_job_queue (void *cls) | |||
328 | */ | 326 | */ |
329 | struct GNUNET_FS_QueueEntry * | 327 | struct GNUNET_FS_QueueEntry * |
330 | GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, | 328 | GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, |
331 | GNUNET_FS_QueueStart start, | 329 | GNUNET_SCHEDULER_TaskCallback start, |
332 | GNUNET_FS_QueueStop stop, void *cls, | 330 | GNUNET_SCHEDULER_TaskCallback stop, |
331 | void *cls, | ||
333 | unsigned int blocks, | 332 | unsigned int blocks, |
334 | enum GNUNET_FS_QueuePriority priority) | 333 | enum GNUNET_FS_QueuePriority priority) |
335 | { | 334 | { |
@@ -369,13 +368,16 @@ GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qe) | |||
369 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 368 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
370 | "Dequeueing job %p\n", | 369 | "Dequeueing job %p\n", |
371 | qe); | 370 | qe); |
372 | if (NULL != qe->client) | 371 | if (GNUNET_YES == qe->active) |
373 | stop_job (qe); | 372 | stop_job (qe); |
374 | GNUNET_CONTAINER_DLL_remove (h->pending_head, h->pending_tail, qe); | 373 | GNUNET_CONTAINER_DLL_remove (h->pending_head, |
374 | h->pending_tail, | ||
375 | qe); | ||
375 | GNUNET_free (qe); | 376 | GNUNET_free (qe); |
376 | if (NULL != h->queue_job) | 377 | if (NULL != h->queue_job) |
377 | GNUNET_SCHEDULER_cancel (h->queue_job); | 378 | GNUNET_SCHEDULER_cancel (h->queue_job); |
378 | h->queue_job = GNUNET_SCHEDULER_add_now (&process_job_queue, h); | 379 | h->queue_job = GNUNET_SCHEDULER_add_now (&process_job_queue, |
380 | h); | ||
379 | } | 381 | } |
380 | 382 | ||
381 | 383 | ||
@@ -397,7 +399,9 @@ GNUNET_FS_make_top (struct GNUNET_FS_Handle *h, | |||
397 | ret = GNUNET_new (struct TopLevelActivity); | 399 | ret = GNUNET_new (struct TopLevelActivity); |
398 | ret->ssf = ssf; | 400 | ret->ssf = ssf; |
399 | ret->ssf_cls = ssf_cls; | 401 | ret->ssf_cls = ssf_cls; |
400 | GNUNET_CONTAINER_DLL_insert (h->top_head, h->top_tail, ret); | 402 | GNUNET_CONTAINER_DLL_insert (h->top_head, |
403 | h->top_tail, | ||
404 | ret); | ||
401 | return ret; | 405 | return ret; |
402 | } | 406 | } |
403 | 407 | ||
@@ -412,7 +416,9 @@ void | |||
412 | GNUNET_FS_end_top (struct GNUNET_FS_Handle *h, | 416 | GNUNET_FS_end_top (struct GNUNET_FS_Handle *h, |
413 | struct TopLevelActivity *top) | 417 | struct TopLevelActivity *top) |
414 | { | 418 | { |
415 | GNUNET_CONTAINER_DLL_remove (h->top_head, h->top_tail, top); | 419 | GNUNET_CONTAINER_DLL_remove (h->top_head, |
420 | h->top_tail, | ||
421 | top); | ||
416 | GNUNET_free (top); | 422 | GNUNET_free (top); |
417 | } | 423 | } |
418 | 424 | ||
@@ -2531,8 +2537,7 @@ signal_download_resume (struct GNUNET_FS_DownloadContext *dc) | |||
2531 | signal_download_resume (dcc); | 2537 | signal_download_resume (dcc); |
2532 | dcc = dcc->next; | 2538 | dcc = dcc->next; |
2533 | } | 2539 | } |
2534 | if (NULL != dc->pending_head) | 2540 | GNUNET_FS_download_start_downloading_ (dc); |
2535 | GNUNET_FS_download_start_downloading_ (dc); | ||
2536 | } | 2541 | } |
2537 | 2542 | ||
2538 | 2543 | ||
@@ -2815,12 +2820,16 @@ deserialize_download (struct GNUNET_FS_Handle *h, | |||
2815 | if (NULL != dn) | 2820 | if (NULL != dn) |
2816 | { | 2821 | { |
2817 | if (GNUNET_YES == GNUNET_DISK_directory_test (dn, GNUNET_YES)) | 2822 | if (GNUNET_YES == GNUNET_DISK_directory_test (dn, GNUNET_YES)) |
2818 | GNUNET_DISK_directory_scan (dn, &deserialize_subdownload, dc); | 2823 | GNUNET_DISK_directory_scan (dn, |
2824 | &deserialize_subdownload, | ||
2825 | dc); | ||
2819 | GNUNET_free (dn); | 2826 | GNUNET_free (dn); |
2820 | } | 2827 | } |
2821 | if (NULL != parent) | 2828 | if (NULL != parent) |
2822 | { | 2829 | { |
2823 | GNUNET_CONTAINER_DLL_insert (parent->child_head, parent->child_tail, dc); | 2830 | GNUNET_CONTAINER_DLL_insert (parent->child_head, |
2831 | parent->child_tail, | ||
2832 | dc); | ||
2824 | } | 2833 | } |
2825 | if (NULL != search) | 2834 | if (NULL != search) |
2826 | { | 2835 | { |
@@ -2830,11 +2839,14 @@ deserialize_download (struct GNUNET_FS_Handle *h, | |||
2830 | if ((NULL == parent) && (NULL == search)) | 2839 | if ((NULL == parent) && (NULL == search)) |
2831 | { | 2840 | { |
2832 | dc->top = | 2841 | dc->top = |
2833 | GNUNET_FS_make_top (dc->h, &GNUNET_FS_download_signal_suspend_, dc); | 2842 | GNUNET_FS_make_top (dc->h, |
2843 | &GNUNET_FS_download_signal_suspend_, | ||
2844 | dc); | ||
2834 | signal_download_resume (dc); | 2845 | signal_download_resume (dc); |
2835 | } | 2846 | } |
2836 | GNUNET_free (uris); | 2847 | GNUNET_free (uris); |
2837 | dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, dc); | 2848 | dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, |
2849 | dc); | ||
2838 | return; | 2850 | return; |
2839 | cleanup: | 2851 | cleanup: |
2840 | GNUNET_free_non_null (uris); | 2852 | GNUNET_free_non_null (uris); |
diff --git a/src/fs/fs_api.h b/src/fs/fs_api.h index 398de27fd..86219b3f8 100644 --- a/src/fs/fs_api.h +++ b/src/fs/fs_api.h | |||
@@ -410,27 +410,6 @@ struct GNUNET_FS_FileInformation | |||
410 | 410 | ||
411 | 411 | ||
412 | /** | 412 | /** |
413 | * The job is now ready to run and should use the given client | ||
414 | * handle to communicate with the FS service. | ||
415 | * | ||
416 | * @param cls closure | ||
417 | * @param client handle to use for FS communication | ||
418 | */ | ||
419 | typedef void | ||
420 | (*GNUNET_FS_QueueStart) (void *cls, | ||
421 | struct GNUNET_CLIENT_Connection *client); | ||
422 | |||
423 | |||
424 | /** | ||
425 | * The job must now stop to run and should destry the client handle as | ||
426 | * soon as possible (ideally prior to returning). | ||
427 | */ | ||
428 | typedef void | ||
429 | (*GNUNET_FS_QueueStop) (void *cls); | ||
430 | |||
431 | |||
432 | |||
433 | /** | ||
434 | * Priorities for the queue. | 413 | * Priorities for the queue. |
435 | */ | 414 | */ |
436 | enum GNUNET_FS_QueuePriority | 415 | enum GNUNET_FS_QueuePriority |
@@ -465,12 +444,12 @@ struct GNUNET_FS_QueueEntry | |||
465 | /** | 444 | /** |
466 | * Function to call when the job is started. | 445 | * Function to call when the job is started. |
467 | */ | 446 | */ |
468 | GNUNET_FS_QueueStart start; | 447 | GNUNET_SCHEDULER_TaskCallback start; |
469 | 448 | ||
470 | /** | 449 | /** |
471 | * Function to call when the job needs to stop (or is done / dequeued). | 450 | * Function to call when the job needs to stop (or is done / dequeued). |
472 | */ | 451 | */ |
473 | GNUNET_FS_QueueStop stop; | 452 | GNUNET_SCHEDULER_TaskCallback stop; |
474 | 453 | ||
475 | /** | 454 | /** |
476 | * Closure for start and stop. | 455 | * Closure for start and stop. |
@@ -483,9 +462,9 @@ struct GNUNET_FS_QueueEntry | |||
483 | struct GNUNET_FS_Handle *h; | 462 | struct GNUNET_FS_Handle *h; |
484 | 463 | ||
485 | /** | 464 | /** |
486 | * Client handle, or NULL if job is not running. | 465 | * Message queue handle, or NULL if job is not running. |
487 | */ | 466 | */ |
488 | struct GNUNET_CLIENT_Connection *client; | 467 | struct GNUNET_MQ_Handle *mq; |
489 | 468 | ||
490 | /** | 469 | /** |
491 | * Time the job was originally queued. | 470 | * Time the job was originally queued. |
@@ -518,6 +497,11 @@ struct GNUNET_FS_QueueEntry | |||
518 | */ | 497 | */ |
519 | unsigned int start_times; | 498 | unsigned int start_times; |
520 | 499 | ||
500 | /** | ||
501 | * #GNUNET_YES if the job is active now. | ||
502 | */ | ||
503 | int active; | ||
504 | |||
521 | }; | 505 | }; |
522 | 506 | ||
523 | 507 | ||
@@ -658,8 +642,8 @@ struct GNUNET_FS_SearchResult | |||
658 | */ | 642 | */ |
659 | struct GNUNET_FS_QueueEntry * | 643 | struct GNUNET_FS_QueueEntry * |
660 | GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, | 644 | GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, |
661 | GNUNET_FS_QueueStart start, | 645 | GNUNET_SCHEDULER_TaskCallback start, |
662 | GNUNET_FS_QueueStop stop, | 646 | GNUNET_SCHEDULER_TaskCallback stop, |
663 | void *cls, | 647 | void *cls, |
664 | unsigned int blocks, | 648 | unsigned int blocks, |
665 | enum GNUNET_FS_QueuePriority priority); | 649 | enum GNUNET_FS_QueuePriority priority); |
@@ -1221,7 +1205,7 @@ struct GNUNET_FS_PublishContext | |||
1221 | /** | 1205 | /** |
1222 | * Connection to FS service (only used for LOC URI signing). | 1206 | * Connection to FS service (only used for LOC URI signing). |
1223 | */ | 1207 | */ |
1224 | struct GNUNET_CLIENT_Handle *fs_client; | 1208 | struct GNUNET_CLIENT_Connection *fs_client; |
1225 | 1209 | ||
1226 | /** | 1210 | /** |
1227 | * Our top-level activity entry (if we are top-level, otherwise NULL). | 1211 | * Our top-level activity entry (if we are top-level, otherwise NULL). |
@@ -1255,7 +1239,7 @@ struct GNUNET_FS_PublishContext | |||
1255 | char *serialization; | 1239 | char *serialization; |
1256 | 1240 | ||
1257 | /** | 1241 | /** |
1258 | * Our own client handle for the FS service; only briefly used when | 1242 | * Our own message queue for the FS service; only briefly used when |
1259 | * we start to index a file, otherwise NULL. | 1243 | * we start to index a file, otherwise NULL. |
1260 | */ | 1244 | */ |
1261 | struct GNUNET_CLIENT_Connection *client; | 1245 | struct GNUNET_CLIENT_Connection *client; |
@@ -1740,15 +1724,6 @@ enum BlockRequestState | |||
1740 | */ | 1724 | */ |
1741 | struct DownloadRequest | 1725 | struct DownloadRequest |
1742 | { | 1726 | { |
1743 | /** | ||
1744 | * While pending, we keep all download requests in a doubly-linked list. | ||
1745 | */ | ||
1746 | struct DownloadRequest *next; | ||
1747 | |||
1748 | /** | ||
1749 | * While pending, we keep all download requests in a doubly-linked list. | ||
1750 | */ | ||
1751 | struct DownloadRequest *prev; | ||
1752 | 1727 | ||
1753 | /** | 1728 | /** |
1754 | * Parent in the CHK-tree. | 1729 | * Parent in the CHK-tree. |
@@ -1774,7 +1749,7 @@ struct DownloadRequest | |||
1774 | uint64_t offset; | 1749 | uint64_t offset; |
1775 | 1750 | ||
1776 | /** | 1751 | /** |
1777 | * Number of entries in 'children' array. | 1752 | * Number of entries in @e children array. |
1778 | */ | 1753 | */ |
1779 | unsigned int num_children; | 1754 | unsigned int num_children; |
1780 | 1755 | ||
@@ -1793,11 +1768,6 @@ struct DownloadRequest | |||
1793 | */ | 1768 | */ |
1794 | enum BlockRequestState state; | 1769 | enum BlockRequestState state; |
1795 | 1770 | ||
1796 | /** | ||
1797 | * #GNUNET_YES if this entry is in the pending list. | ||
1798 | */ | ||
1799 | int is_pending; | ||
1800 | |||
1801 | }; | 1771 | }; |
1802 | 1772 | ||
1803 | 1773 | ||
@@ -1838,7 +1808,7 @@ struct GNUNET_FS_DownloadContext | |||
1838 | /** | 1808 | /** |
1839 | * Connection to the FS service. | 1809 | * Connection to the FS service. |
1840 | */ | 1810 | */ |
1841 | struct GNUNET_CLIENT_Connection *client; | 1811 | struct GNUNET_MQ_Handle *mq; |
1842 | 1812 | ||
1843 | /** | 1813 | /** |
1844 | * Parent download (used when downloading files | 1814 | * Parent download (used when downloading files |
@@ -1917,12 +1887,6 @@ struct GNUNET_FS_DownloadContext | |||
1917 | struct GNUNET_FS_QueueEntry *job_queue; | 1887 | struct GNUNET_FS_QueueEntry *job_queue; |
1918 | 1888 | ||
1919 | /** | 1889 | /** |
1920 | * Non-NULL if we are currently having a request for | ||
1921 | * transmission pending with the client handle. | ||
1922 | */ | ||
1923 | struct GNUNET_CLIENT_TransmitHandle *th; | ||
1924 | |||
1925 | /** | ||
1926 | * Tree encoder used for the reconstruction. | 1890 | * Tree encoder used for the reconstruction. |
1927 | */ | 1891 | */ |
1928 | struct GNUNET_FS_TreeEncoder *te; | 1892 | struct GNUNET_FS_TreeEncoder *te; |
@@ -1940,16 +1904,6 @@ struct GNUNET_FS_DownloadContext | |||
1940 | struct GNUNET_CONTAINER_MultiHashMap *active; | 1904 | struct GNUNET_CONTAINER_MultiHashMap *active; |
1941 | 1905 | ||
1942 | /** | 1906 | /** |
1943 | * Head of linked list of pending requests. | ||
1944 | */ | ||
1945 | struct DownloadRequest *pending_head; | ||
1946 | |||
1947 | /** | ||
1948 | * Head of linked list of pending requests. | ||
1949 | */ | ||
1950 | struct DownloadRequest *pending_tail; | ||
1951 | |||
1952 | /** | ||
1953 | * Top-level download request. | 1907 | * Top-level download request. |
1954 | */ | 1908 | */ |
1955 | struct DownloadRequest *top_request; | 1909 | struct DownloadRequest *top_request; |
@@ -2029,11 +1983,6 @@ struct GNUNET_FS_DownloadContext | |||
2029 | int has_finished; | 1983 | int has_finished; |
2030 | 1984 | ||
2031 | /** | 1985 | /** |
2032 | * Have we started the receive continuation yet? | ||
2033 | */ | ||
2034 | int in_receive; | ||
2035 | |||
2036 | /** | ||
2037 | * Are we ready to issue requests (reconstructions are finished)? | 1986 | * Are we ready to issue requests (reconstructions are finished)? |
2038 | */ | 1987 | */ |
2039 | int issue_requests; | 1988 | int issue_requests; |
diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c index d89d70719..98c76882a 100644 --- a/src/fs/fs_download.c +++ b/src/fs/fs_download.c | |||
@@ -121,7 +121,7 @@ GNUNET_FS_download_make_status_ (struct GNUNET_FS_ProgressInfo *pi, | |||
121 | pi->value.download.anonymity = dc->anonymity; | 121 | pi->value.download.anonymity = dc->anonymity; |
122 | pi->value.download.eta = | 122 | pi->value.download.eta = |
123 | GNUNET_TIME_calculate_eta (dc->start_time, dc->completed, dc->length); | 123 | GNUNET_TIME_calculate_eta (dc->start_time, dc->completed, dc->length); |
124 | pi->value.download.is_active = (NULL == dc->client) ? GNUNET_NO : GNUNET_YES; | 124 | pi->value.download.is_active = (NULL == dc->mq) ? GNUNET_NO : GNUNET_YES; |
125 | pi->fsh = dc->h; | 125 | pi->fsh = dc->h; |
126 | if (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE)) | 126 | if (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE)) |
127 | dc->client_info = dc->h->upcb (dc->h->upcb_cls, pi); | 127 | dc->client_info = dc->h->upcb (dc->h->upcb_cls, pi); |
@@ -131,21 +131,6 @@ GNUNET_FS_download_make_status_ (struct GNUNET_FS_ProgressInfo *pi, | |||
131 | 131 | ||
132 | 132 | ||
133 | /** | 133 | /** |
134 | * We're ready to transmit a search request to the | ||
135 | * file-sharing service. Do it. If there is | ||
136 | * more than one request pending, try to send | ||
137 | * multiple or request another transmission. | ||
138 | * | ||
139 | * @param cls closure | ||
140 | * @param size number of bytes available in buf | ||
141 | * @param buf where the callee should write the message | ||
142 | * @return number of bytes written to buf | ||
143 | */ | ||
144 | static size_t | ||
145 | transmit_download_request (void *cls, size_t size, void *buf); | ||
146 | |||
147 | |||
148 | /** | ||
149 | * Closure for iterator processing results. | 134 | * Closure for iterator processing results. |
150 | */ | 135 | */ |
151 | struct ProcessResultClosure | 136 | struct ProcessResultClosure |
@@ -206,10 +191,11 @@ struct ProcessResultClosure | |||
206 | * @param cls closure (our 'struct ProcessResultClosure') | 191 | * @param cls closure (our 'struct ProcessResultClosure') |
207 | * @param key query for the given value / request | 192 | * @param key query for the given value / request |
208 | * @param value value in the hash map (a 'struct DownloadRequest') | 193 | * @param value value in the hash map (a 'struct DownloadRequest') |
209 | * @return GNUNET_YES (we should continue to iterate); unless serious error | 194 | * @return #GNUNET_YES (we should continue to iterate); unless serious error |
210 | */ | 195 | */ |
211 | static int | 196 | static int |
212 | process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | 197 | process_result_with_request (void *cls, |
198 | const struct GNUNET_HashCode * key, | ||
213 | void *value); | 199 | void *value); |
214 | 200 | ||
215 | 201 | ||
@@ -722,6 +708,43 @@ try_top_down_reconstruction (struct GNUNET_FS_DownloadContext *dc, | |||
722 | 708 | ||
723 | 709 | ||
724 | /** | 710 | /** |
711 | * Add entries to the message queue. | ||
712 | * | ||
713 | * @param cls our download context | ||
714 | * @param key unused | ||
715 | * @param entry entry of type `struct DownloadRequest` | ||
716 | * @return #GNUNET_OK | ||
717 | */ | ||
718 | static int | ||
719 | retry_entry (void *cls, | ||
720 | const struct GNUNET_HashCode *key, | ||
721 | void *entry) | ||
722 | { | ||
723 | struct GNUNET_FS_DownloadContext *dc = cls; | ||
724 | struct DownloadRequest *dr = entry; | ||
725 | struct SearchMessage *sm; | ||
726 | struct GNUNET_MQ_Envelope *env; | ||
727 | |||
728 | env = GNUNET_MQ_msg (sm, | ||
729 | GNUNET_MESSAGE_TYPE_FS_START_SEARCH); | ||
730 | if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY)) | ||
731 | sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY); | ||
732 | else | ||
733 | sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE); | ||
734 | if (0 == dr->depth) | ||
735 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK); | ||
736 | else | ||
737 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK); | ||
738 | sm->anonymity_level = htonl (dc->anonymity); | ||
739 | sm->target = dc->target; | ||
740 | sm->query = dr->chk.query; | ||
741 | GNUNET_MQ_send (dc->mq, | ||
742 | env); | ||
743 | return GNUNET_OK; | ||
744 | } | ||
745 | |||
746 | |||
747 | /** | ||
725 | * Schedule the download of the specified block in the tree. | 748 | * Schedule the download of the specified block in the tree. |
726 | * | 749 | * |
727 | * @param dc overall download this block belongs to | 750 | * @param dc overall download this block belongs to |
@@ -763,25 +786,23 @@ schedule_block_download (struct GNUNET_FS_DownloadContext *dc, | |||
763 | } | 786 | } |
764 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 787 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
765 | "Scheduling download at offset %llu and depth %u for `%s'\n", | 788 | "Scheduling download at offset %llu and depth %u for `%s'\n", |
766 | (unsigned long long) dr->offset, dr->depth, | 789 | (unsigned long long) dr->offset, |
790 | dr->depth, | ||
767 | GNUNET_h2s (&dr->chk.query)); | 791 | GNUNET_h2s (&dr->chk.query)); |
768 | if (GNUNET_NO != | 792 | if (GNUNET_NO != |
769 | GNUNET_CONTAINER_multihashmap_contains_value (dc->active, &dr->chk.query, | 793 | GNUNET_CONTAINER_multihashmap_contains_value (dc->active, |
794 | &dr->chk.query, | ||
770 | dr)) | 795 | dr)) |
771 | return; /* already active */ | 796 | return; /* already active */ |
772 | GNUNET_CONTAINER_multihashmap_put (dc->active, &dr->chk.query, dr, | 797 | GNUNET_CONTAINER_multihashmap_put (dc->active, |
798 | &dr->chk.query, | ||
799 | dr, | ||
773 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | 800 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); |
774 | if (NULL == dc->client) | 801 | if (NULL == dc->mq) |
775 | return; /* download not active */ | 802 | return; /* download not active */ |
776 | GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr); | 803 | retry_entry (dc, |
777 | dr->is_pending = GNUNET_YES; | 804 | &dr->chk.query, |
778 | if (NULL == dc->th) | 805 | dr); |
779 | dc->th = | ||
780 | GNUNET_CLIENT_notify_transmit_ready (dc->client, | ||
781 | sizeof (struct SearchMessage), | ||
782 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
783 | GNUNET_NO, | ||
784 | &transmit_download_request, dc); | ||
785 | } | 806 | } |
786 | 807 | ||
787 | 808 | ||
@@ -947,13 +968,14 @@ GNUNET_FS_free_download_request_ (struct DownloadRequest *dr) | |||
947 | * Iterator over entries in the pending requests in the 'active' map for the | 968 | * Iterator over entries in the pending requests in the 'active' map for the |
948 | * reply that we just got. | 969 | * reply that we just got. |
949 | * | 970 | * |
950 | * @param cls closure (our 'struct ProcessResultClosure') | 971 | * @param cls closure (our `struct ProcessResultClosure`) |
951 | * @param key query for the given value / request | 972 | * @param key query for the given value / request |
952 | * @param value value in the hash map (a 'struct DownloadRequest') | 973 | * @param value value in the hash map (a `struct DownloadRequest`) |
953 | * @return #GNUNET_YES (we should continue to iterate); unless serious error | 974 | * @return #GNUNET_YES (we should continue to iterate); unless serious error |
954 | */ | 975 | */ |
955 | static int | 976 | static int |
956 | process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | 977 | process_result_with_request (void *cls, |
978 | const struct GNUNET_HashCode *key, | ||
957 | void *value) | 979 | void *value) |
958 | { | 980 | { |
959 | struct ProcessResultClosure *prc = cls; | 981 | struct ProcessResultClosure *prc = cls; |
@@ -974,7 +996,9 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | |||
974 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 996 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
975 | "Received %u byte block `%s' matching pending request at depth %u and offset %llu/%llu\n", | 997 | "Received %u byte block `%s' matching pending request at depth %u and offset %llu/%llu\n", |
976 | (unsigned int) prc->size, | 998 | (unsigned int) prc->size, |
977 | GNUNET_h2s (key), dr->depth, (unsigned long long) dr->offset, | 999 | GNUNET_h2s (key), |
1000 | dr->depth, | ||
1001 | (unsigned long long) dr->offset, | ||
978 | (unsigned long long) GNUNET_ntohll (dc->uri->data. | 1002 | (unsigned long long) GNUNET_ntohll (dc->uri->data. |
979 | chk.file_length)); | 1003 | chk.file_length)); |
980 | bs = GNUNET_FS_tree_calculate_block_size (GNUNET_ntohll | 1004 | bs = GNUNET_FS_tree_calculate_block_size (GNUNET_ntohll |
@@ -999,15 +1023,17 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | |||
999 | goto signal_error; | 1023 | goto signal_error; |
1000 | } | 1024 | } |
1001 | 1025 | ||
1002 | (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &prc->query, dr); | 1026 | (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, |
1003 | if (GNUNET_YES == dr->is_pending) | 1027 | &prc->query, |
1004 | { | 1028 | dr); |
1005 | GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr); | 1029 | GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key, |
1006 | dr->is_pending = GNUNET_NO; | 1030 | &skey, |
1007 | } | 1031 | &iv); |
1008 | 1032 | if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data, | |
1009 | GNUNET_CRYPTO_hash_to_aes_key (&dr->chk.key, &skey, &iv); | 1033 | prc->size, |
1010 | if (-1 == GNUNET_CRYPTO_symmetric_decrypt (prc->data, prc->size, &skey, &iv, pt)) | 1034 | &skey, |
1035 | &iv, | ||
1036 | pt)) | ||
1011 | { | 1037 | { |
1012 | GNUNET_break (0); | 1038 | GNUNET_break (0); |
1013 | dc->emsg = GNUNET_strdup (_("internal error decrypting content")); | 1039 | dc->emsg = GNUNET_strdup (_("internal error decrypting content")); |
@@ -1015,7 +1041,8 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | |||
1015 | } | 1041 | } |
1016 | off = | 1042 | off = |
1017 | compute_disk_offset (GNUNET_ntohll (dc->uri->data.chk.file_length), | 1043 | compute_disk_offset (GNUNET_ntohll (dc->uri->data.chk.file_length), |
1018 | dr->offset, dr->depth); | 1044 | dr->offset, |
1045 | dr->depth); | ||
1019 | /* save to disk */ | 1046 | /* save to disk */ |
1020 | if ((GNUNET_YES == prc->do_store) && | 1047 | if ((GNUNET_YES == prc->do_store) && |
1021 | ((NULL != dc->filename) || (is_recursive_download (dc))) && | 1048 | ((NULL != dc->filename) || (is_recursive_download (dc))) && |
@@ -1040,21 +1067,25 @@ process_result_with_request (void *cls, const struct GNUNET_HashCode * key, | |||
1040 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1067 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1041 | "Saving decrypted block to disk at offset %llu\n", | 1068 | "Saving decrypted block to disk at offset %llu\n", |
1042 | (unsigned long long) off); | 1069 | (unsigned long long) off); |
1043 | if ((off != GNUNET_DISK_file_seek (fh, off, GNUNET_DISK_SEEK_SET))) | 1070 | if ((off != GNUNET_DISK_file_seek (fh, |
1071 | off, | ||
1072 | GNUNET_DISK_SEEK_SET))) | ||
1044 | { | 1073 | { |
1045 | GNUNET_asprintf (&dc->emsg, | 1074 | GNUNET_asprintf (&dc->emsg, |
1046 | _("Failed to seek to offset %llu in file `%s': %s"), | 1075 | _("Failed to seek to offset %llu in file `%s': %s"), |
1047 | (unsigned long long) off, dc->filename, | 1076 | (unsigned long long) off, |
1077 | dc->filename, | ||
1048 | STRERROR (errno)); | 1078 | STRERROR (errno)); |
1049 | goto signal_error; | 1079 | goto signal_error; |
1050 | } | 1080 | } |
1051 | if (prc->size != GNUNET_DISK_file_write (fh, pt, prc->size)) | 1081 | if (prc->size != GNUNET_DISK_file_write (fh, pt, prc->size)) |
1052 | { | 1082 | { |
1053 | GNUNET_asprintf (&dc->emsg, | 1083 | GNUNET_asprintf (&dc->emsg, |
1054 | _ | 1084 | _("Failed to write block of %u bytes at offset %llu in file `%s': %s"), |
1055 | ("Failed to write block of %u bytes at offset %llu in file `%s': %s"), | 1085 | (unsigned int) prc->size, |
1056 | (unsigned int) prc->size, (unsigned long long) off, | 1086 | (unsigned long long) off, |
1057 | dc->filename, STRERROR (errno)); | 1087 | dc->filename, |
1088 | STRERROR (errno)); | ||
1058 | goto signal_error; | 1089 | goto signal_error; |
1059 | } | 1090 | } |
1060 | GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh)); | 1091 | GNUNET_break (GNUNET_OK == GNUNET_DISK_file_close (fh)); |
@@ -1193,15 +1224,8 @@ signal_error: | |||
1193 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR; | 1224 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_ERROR; |
1194 | pi.value.download.specifics.error.message = dc->emsg; | 1225 | pi.value.download.specifics.error.message = dc->emsg; |
1195 | GNUNET_FS_download_make_status_ (&pi, dc); | 1226 | GNUNET_FS_download_make_status_ (&pi, dc); |
1196 | /* abort all pending requests */ | 1227 | GNUNET_MQ_destroy (dc->mq); |
1197 | if (NULL != dc->th) | 1228 | dc->mq = NULL; |
1198 | { | ||
1199 | GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); | ||
1200 | dc->th = NULL; | ||
1201 | } | ||
1202 | GNUNET_CLIENT_disconnect (dc->client); | ||
1203 | dc->in_receive = GNUNET_NO; | ||
1204 | dc->client = NULL; | ||
1205 | GNUNET_FS_free_download_request_ (dc->top_request); | 1229 | GNUNET_FS_free_download_request_ (dc->top_request); |
1206 | dc->top_request = NULL; | 1230 | dc->top_request = NULL; |
1207 | GNUNET_CONTAINER_multihashmap_destroy (dc->active); | 1231 | GNUNET_CONTAINER_multihashmap_destroy (dc->active); |
@@ -1211,49 +1235,24 @@ signal_error: | |||
1211 | GNUNET_FS_dequeue_ (dc->job_queue); | 1235 | GNUNET_FS_dequeue_ (dc->job_queue); |
1212 | dc->job_queue = NULL; | 1236 | dc->job_queue = NULL; |
1213 | } | 1237 | } |
1214 | dc->pending_head = NULL; | ||
1215 | dc->pending_tail = NULL; | ||
1216 | GNUNET_FS_download_sync_ (dc); | 1238 | GNUNET_FS_download_sync_ (dc); |
1217 | return GNUNET_NO; | 1239 | return GNUNET_NO; |
1218 | } | 1240 | } |
1219 | 1241 | ||
1220 | 1242 | ||
1221 | /** | 1243 | /** |
1222 | * Process a download result. | 1244 | * Type of a function to call when we check the PUT message |
1245 | * from the service. | ||
1223 | * | 1246 | * |
1224 | * @param dc our download context | 1247 | * @param cls closure |
1225 | * @param type type of the result | 1248 | * @param msg message received |
1226 | * @param respect_offered how much respect did we offer to get this reply? | ||
1227 | * @param num_transmissions how often did we transmit the query? | ||
1228 | * @param last_transmission when was this block requested the last time? (FOREVER if unknown/not applicable) | ||
1229 | * @param data the (encrypted) response | ||
1230 | * @param size size of data | ||
1231 | */ | 1249 | */ |
1232 | static void | 1250 | static int |
1233 | process_result (struct GNUNET_FS_DownloadContext *dc, | 1251 | check_put (void *cls, |
1234 | enum GNUNET_BLOCK_Type type, | 1252 | const struct ClientPutMessage *cm) |
1235 | uint32_t respect_offered, | ||
1236 | uint32_t num_transmissions, | ||
1237 | struct GNUNET_TIME_Absolute last_transmission, | ||
1238 | const void *data, size_t size) | ||
1239 | { | 1253 | { |
1240 | struct ProcessResultClosure prc; | 1254 | /* any varsize length is OK */ |
1241 | 1255 | return GNUNET_OK; | |
1242 | prc.dc = dc; | ||
1243 | prc.data = data; | ||
1244 | prc.last_transmission = last_transmission; | ||
1245 | prc.size = size; | ||
1246 | prc.type = type; | ||
1247 | prc.do_store = GNUNET_YES; | ||
1248 | prc.respect_offered = respect_offered; | ||
1249 | prc.num_transmissions = num_transmissions; | ||
1250 | GNUNET_CRYPTO_hash (data, size, &prc.query); | ||
1251 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1252 | "Received result for query `%s' from `%s'-service\n", | ||
1253 | GNUNET_h2s (&prc.query), "FS"); | ||
1254 | GNUNET_CONTAINER_multihashmap_get_multiple (dc->active, &prc.query, | ||
1255 | &process_result_with_request, | ||
1256 | &prc); | ||
1257 | } | 1256 | } |
1258 | 1257 | ||
1259 | 1258 | ||
@@ -1262,109 +1261,59 @@ process_result (struct GNUNET_FS_DownloadContext *dc, | |||
1262 | * from the service. | 1261 | * from the service. |
1263 | * | 1262 | * |
1264 | * @param cls closure | 1263 | * @param cls closure |
1265 | * @param msg message received, NULL on timeout or fatal error | 1264 | * @param msg message received |
1266 | */ | 1265 | */ |
1267 | static void | 1266 | static void |
1268 | receive_results (void *cls, const struct GNUNET_MessageHeader *msg) | 1267 | handle_put (void *cls, |
1268 | const struct ClientPutMessage *cm) | ||
1269 | { | 1269 | { |
1270 | struct GNUNET_FS_DownloadContext *dc = cls; | 1270 | struct GNUNET_FS_DownloadContext *dc = cls; |
1271 | const struct ClientPutMessage *cm; | 1271 | uint16_t msize = ntohs (cm->header.size) - sizeof (*cm); |
1272 | uint16_t msize; | 1272 | struct ProcessResultClosure prc; |
1273 | 1273 | ||
1274 | if ((NULL == msg) || (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_PUT) || | 1274 | prc.dc = dc; |
1275 | (sizeof (struct ClientPutMessage) > ntohs (msg->size))) | 1275 | prc.data = &cm[1]; |
1276 | { | 1276 | prc.last_transmission = GNUNET_TIME_absolute_ntoh (cm->last_transmission); |
1277 | GNUNET_break (NULL == msg); | 1277 | prc.size = msize; |
1278 | try_reconnect (dc); | 1278 | prc.type = ntohl (cm->type); |
1279 | return; | 1279 | prc.do_store = GNUNET_YES; |
1280 | } | 1280 | prc.respect_offered = ntohl (cm->respect_offered); |
1281 | msize = ntohs (msg->size); | 1281 | prc.num_transmissions = ntohl (cm->num_transmissions); |
1282 | cm = (const struct ClientPutMessage *) msg; | 1282 | GNUNET_CRYPTO_hash (prc.data, |
1283 | process_result (dc, ntohl (cm->type), | 1283 | msize, |
1284 | ntohl (cm->respect_offered), | 1284 | &prc.query); |
1285 | ntohl (cm->num_transmissions), | 1285 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1286 | GNUNET_TIME_absolute_ntoh (cm->last_transmission), &cm[1], | 1286 | "Received result for query `%s' from FS service\n", |
1287 | msize - sizeof (struct ClientPutMessage)); | 1287 | GNUNET_h2s (&prc.query)); |
1288 | if (NULL == dc->client) | 1288 | GNUNET_CONTAINER_multihashmap_get_multiple (dc->active, |
1289 | return; /* fatal error */ | 1289 | &prc.query, |
1290 | /* continue receiving */ | 1290 | &process_result_with_request, |
1291 | GNUNET_CLIENT_receive (dc->client, &receive_results, dc, | 1291 | &prc); |
1292 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1293 | } | 1292 | } |
1294 | 1293 | ||
1295 | 1294 | ||
1296 | /** | 1295 | /** |
1297 | * We're ready to transmit a search request to the | 1296 | * Generic error handler, called with the appropriate error code and |
1298 | * file-sharing service. Do it. If there is | 1297 | * the same closure specified at the creation of the message queue. |
1299 | * more than one request pending, try to send | 1298 | * Not every message queue implementation supports an error handler. |
1300 | * multiple or request another transmission. | ||
1301 | * | 1299 | * |
1302 | * @param cls closure | 1300 | * @param cls closure with the `struct GNUNET_FS_DownloadContext *` |
1303 | * @param size number of bytes available in buf | 1301 | * @param error error code |
1304 | * @param buf where the callee should write the message | ||
1305 | * @return number of bytes written to buf | ||
1306 | */ | 1302 | */ |
1307 | static size_t | 1303 | static void |
1308 | transmit_download_request (void *cls, size_t size, void *buf) | 1304 | download_mq_error_handler (void *cls, |
1305 | enum GNUNET_MQ_Error error) | ||
1309 | { | 1306 | { |
1310 | struct GNUNET_FS_DownloadContext *dc = cls; | 1307 | struct GNUNET_FS_DownloadContext *dc = cls; |
1311 | size_t msize; | ||
1312 | struct SearchMessage *sm; | ||
1313 | struct DownloadRequest *dr; | ||
1314 | 1308 | ||
1315 | dc->th = NULL; | 1309 | if (NULL != dc->mq) |
1316 | if (NULL == buf) | ||
1317 | { | 1310 | { |
1318 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1311 | GNUNET_MQ_destroy (dc->mq); |
1319 | "Transmitting download request failed, trying to reconnect\n"); | 1312 | dc->mq = NULL; |
1320 | try_reconnect (dc); | ||
1321 | return 0; | ||
1322 | } | 1313 | } |
1323 | GNUNET_assert (size >= sizeof (struct SearchMessage)); | 1314 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1324 | msize = 0; | 1315 | "Transmitting download request failed, trying to reconnect\n"); |
1325 | sm = buf; | 1316 | try_reconnect (dc); |
1326 | while ((NULL != (dr = dc->pending_head)) && | ||
1327 | (size >= msize + sizeof (struct SearchMessage))) | ||
1328 | { | ||
1329 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1330 | "Transmitting download request for `%s' to `%s'-service\n", | ||
1331 | GNUNET_h2s (&dr->chk.query), "FS"); | ||
1332 | memset (sm, 0, sizeof (struct SearchMessage)); | ||
1333 | sm->header.size = htons (sizeof (struct SearchMessage)); | ||
1334 | sm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_START_SEARCH); | ||
1335 | if (0 != (dc->options & GNUNET_FS_DOWNLOAD_OPTION_LOOPBACK_ONLY)) | ||
1336 | sm->options = htonl (GNUNET_FS_SEARCH_OPTION_LOOPBACK_ONLY); | ||
1337 | else | ||
1338 | sm->options = htonl (GNUNET_FS_SEARCH_OPTION_NONE); | ||
1339 | if (0 == dr->depth) | ||
1340 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_DBLOCK); | ||
1341 | else | ||
1342 | sm->type = htonl (GNUNET_BLOCK_TYPE_FS_IBLOCK); | ||
1343 | sm->anonymity_level = htonl (dc->anonymity); | ||
1344 | sm->target = dc->target; | ||
1345 | sm->query = dr->chk.query; | ||
1346 | GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr); | ||
1347 | dr->is_pending = GNUNET_NO; | ||
1348 | msize += sizeof (struct SearchMessage); | ||
1349 | sm++; | ||
1350 | } | ||
1351 | if (NULL != dc->pending_head) | ||
1352 | { | ||
1353 | dc->th = | ||
1354 | GNUNET_CLIENT_notify_transmit_ready (dc->client, | ||
1355 | sizeof (struct SearchMessage), | ||
1356 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
1357 | GNUNET_NO, | ||
1358 | &transmit_download_request, dc); | ||
1359 | GNUNET_assert (NULL != dc->th); | ||
1360 | } | ||
1361 | if (GNUNET_NO == dc->in_receive) | ||
1362 | { | ||
1363 | dc->in_receive = GNUNET_YES; | ||
1364 | GNUNET_CLIENT_receive (dc->client, &receive_results, dc, | ||
1365 | GNUNET_TIME_UNIT_FOREVER_REL); | ||
1366 | } | ||
1367 | return msize; | ||
1368 | } | 1317 | } |
1369 | 1318 | ||
1370 | 1319 | ||
@@ -1376,51 +1325,31 @@ transmit_download_request (void *cls, size_t size, void *buf) | |||
1376 | static void | 1325 | static void |
1377 | do_reconnect (void *cls) | 1326 | do_reconnect (void *cls) |
1378 | { | 1327 | { |
1328 | GNUNET_MQ_hd_var_size (put, | ||
1329 | GNUNET_MESSAGE_TYPE_FS_PUT, | ||
1330 | struct ClientPutMessage); | ||
1379 | struct GNUNET_FS_DownloadContext *dc = cls; | 1331 | struct GNUNET_FS_DownloadContext *dc = cls; |
1380 | struct GNUNET_CLIENT_Connection *client; | 1332 | struct GNUNET_MQ_MessageHandler handlers[] = { |
1333 | make_put_handler (dc), | ||
1334 | GNUNET_MQ_handler_end () | ||
1335 | }; | ||
1381 | 1336 | ||
1382 | dc->task = NULL; | 1337 | dc->task = NULL; |
1383 | client = GNUNET_CLIENT_connect ("fs", dc->h->cfg); | 1338 | dc->mq = GNUNET_CLIENT_connecT (dc->h->cfg, |
1384 | if (NULL == client) | 1339 | "fs", |
1340 | handlers, | ||
1341 | &download_mq_error_handler, | ||
1342 | dc); | ||
1343 | if (NULL == dc->mq) | ||
1385 | { | 1344 | { |
1386 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 1345 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
1387 | "Connecting to `%s'-service failed, will try again.\n", "FS"); | 1346 | "Connecting to `%s'-service failed, will try again.\n", "FS"); |
1388 | try_reconnect (dc); | 1347 | try_reconnect (dc); |
1389 | return; | 1348 | return; |
1390 | } | 1349 | } |
1391 | dc->client = client; | 1350 | GNUNET_CONTAINER_multihashmap_iterate (dc->active, |
1392 | if (NULL != dc->pending_head) | 1351 | &retry_entry, |
1393 | { | 1352 | dc); |
1394 | dc->th = | ||
1395 | GNUNET_CLIENT_notify_transmit_ready (client, | ||
1396 | sizeof (struct SearchMessage), | ||
1397 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
1398 | GNUNET_NO, | ||
1399 | &transmit_download_request, dc); | ||
1400 | GNUNET_assert (NULL != dc->th); | ||
1401 | } | ||
1402 | } | ||
1403 | |||
1404 | |||
1405 | /** | ||
1406 | * Add entries to the pending list. | ||
1407 | * | ||
1408 | * @param cls our download context | ||
1409 | * @param key unused | ||
1410 | * @param entry entry of type "struct DownloadRequest" | ||
1411 | * @return GNUNET_OK | ||
1412 | */ | ||
1413 | static int | ||
1414 | retry_entry (void *cls, const struct GNUNET_HashCode * key, void *entry) | ||
1415 | { | ||
1416 | struct GNUNET_FS_DownloadContext *dc = cls; | ||
1417 | struct DownloadRequest *dr = entry; | ||
1418 | |||
1419 | dr->next = NULL; | ||
1420 | dr->prev = NULL; | ||
1421 | GNUNET_CONTAINER_DLL_insert (dc->pending_head, dc->pending_tail, dr); | ||
1422 | dr->is_pending = GNUNET_YES; | ||
1423 | return GNUNET_OK; | ||
1424 | } | 1353 | } |
1425 | 1354 | ||
1426 | 1355 | ||
@@ -1435,30 +1364,22 @@ static void | |||
1435 | try_reconnect (struct GNUNET_FS_DownloadContext *dc) | 1364 | try_reconnect (struct GNUNET_FS_DownloadContext *dc) |
1436 | { | 1365 | { |
1437 | 1366 | ||
1438 | if (NULL != dc->client) | 1367 | if (NULL != dc->mq) |
1439 | { | 1368 | { |
1440 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | 1369 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1441 | "Moving all requests back to pending list\n"); | 1370 | "Moving all requests back to pending list\n"); |
1442 | if (NULL != dc->th) | 1371 | GNUNET_MQ_destroy (dc->mq); |
1443 | { | 1372 | dc->mq = NULL; |
1444 | GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); | ||
1445 | dc->th = NULL; | ||
1446 | } | ||
1447 | /* full reset of the pending list */ | ||
1448 | dc->pending_head = NULL; | ||
1449 | dc->pending_tail = NULL; | ||
1450 | GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc); | ||
1451 | GNUNET_CLIENT_disconnect (dc->client); | ||
1452 | dc->in_receive = GNUNET_NO; | ||
1453 | dc->client = NULL; | ||
1454 | } | 1373 | } |
1455 | if (0 == dc->reconnect_backoff.rel_value_us) | 1374 | if (0 == dc->reconnect_backoff.rel_value_us) |
1456 | dc->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; | 1375 | dc->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; |
1457 | else | 1376 | else |
1458 | dc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (dc->reconnect_backoff); | 1377 | dc->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (dc->reconnect_backoff); |
1459 | 1378 | ||
1460 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Will try to reconnect in %s\n", | 1379 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1461 | GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff, GNUNET_YES)); | 1380 | "Will try to reconnect in %s\n", |
1381 | GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff, | ||
1382 | GNUNET_YES)); | ||
1462 | dc->task = | 1383 | dc->task = |
1463 | GNUNET_SCHEDULER_add_delayed (dc->reconnect_backoff, | 1384 | GNUNET_SCHEDULER_add_delayed (dc->reconnect_backoff, |
1464 | &do_reconnect, | 1385 | &do_reconnect, |
@@ -1470,37 +1391,23 @@ try_reconnect (struct GNUNET_FS_DownloadContext *dc) | |||
1470 | * We're allowed to ask the FS service for our blocks. Start the download. | 1391 | * We're allowed to ask the FS service for our blocks. Start the download. |
1471 | * | 1392 | * |
1472 | * @param cls the 'struct GNUNET_FS_DownloadContext' | 1393 | * @param cls the 'struct GNUNET_FS_DownloadContext' |
1473 | * @param client handle to use for communcation with FS (we must destroy it!) | 1394 | * @param mq handle to use for communcation with FS (we must destroy it!) |
1474 | */ | 1395 | */ |
1475 | static void | 1396 | static void |
1476 | activate_fs_download (void *cls, struct GNUNET_CLIENT_Connection *client) | 1397 | activate_fs_download (void *cls) |
1477 | { | 1398 | { |
1478 | struct GNUNET_FS_DownloadContext *dc = cls; | 1399 | struct GNUNET_FS_DownloadContext *dc = cls; |
1479 | struct GNUNET_FS_ProgressInfo pi; | 1400 | struct GNUNET_FS_ProgressInfo pi; |
1480 | 1401 | ||
1481 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download activated\n"); | 1402 | GNUNET_assert (NULL == dc->mq); |
1482 | GNUNET_assert (NULL != client); | ||
1483 | GNUNET_assert (NULL == dc->client); | ||
1484 | GNUNET_assert (NULL == dc->th); | ||
1485 | GNUNET_assert (NULL != dc->active); | 1403 | GNUNET_assert (NULL != dc->active); |
1486 | dc->client = client; | 1404 | do_reconnect (dc); |
1405 | if (NULL != dc->mq) | ||
1406 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1407 | "Download activated\n"); | ||
1487 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE; | 1408 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE; |
1488 | GNUNET_FS_download_make_status_ (&pi, dc); | 1409 | GNUNET_FS_download_make_status_ (&pi, |
1489 | dc->pending_head = NULL; | 1410 | dc); |
1490 | dc->pending_tail = NULL; | ||
1491 | GNUNET_CONTAINER_multihashmap_iterate (dc->active, &retry_entry, dc); | ||
1492 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1493 | "Asking for transmission to FS service\n"); | ||
1494 | if (NULL != dc->pending_head) | ||
1495 | { | ||
1496 | dc->th = | ||
1497 | GNUNET_CLIENT_notify_transmit_ready (dc->client, | ||
1498 | sizeof (struct SearchMessage), | ||
1499 | GNUNET_CONSTANTS_SERVICE_TIMEOUT, | ||
1500 | GNUNET_NO, | ||
1501 | &transmit_download_request, dc); | ||
1502 | GNUNET_assert (NULL != dc->th); | ||
1503 | } | ||
1504 | } | 1411 | } |
1505 | 1412 | ||
1506 | 1413 | ||
@@ -1515,22 +1422,16 @@ deactivate_fs_download (void *cls) | |||
1515 | struct GNUNET_FS_DownloadContext *dc = cls; | 1422 | struct GNUNET_FS_DownloadContext *dc = cls; |
1516 | struct GNUNET_FS_ProgressInfo pi; | 1423 | struct GNUNET_FS_ProgressInfo pi; |
1517 | 1424 | ||
1518 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Download deactivated\n"); | 1425 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1519 | if (NULL != dc->th) | 1426 | "Download deactivated\n"); |
1520 | { | 1427 | if (NULL != dc->mq) |
1521 | GNUNET_CLIENT_notify_transmit_ready_cancel (dc->th); | ||
1522 | dc->th = NULL; | ||
1523 | } | ||
1524 | if (NULL != dc->client) | ||
1525 | { | 1428 | { |
1526 | GNUNET_CLIENT_disconnect (dc->client); | 1429 | GNUNET_MQ_destroy (dc->mq); |
1527 | dc->in_receive = GNUNET_NO; | 1430 | dc->mq = NULL; |
1528 | dc->client = NULL; | ||
1529 | } | 1431 | } |
1530 | dc->pending_head = NULL; | ||
1531 | dc->pending_tail = NULL; | ||
1532 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE; | 1432 | pi.status = GNUNET_FS_STATUS_DOWNLOAD_INACTIVE; |
1533 | GNUNET_FS_download_make_status_ (&pi, dc); | 1433 | GNUNET_FS_download_make_status_ (&pi, |
1434 | dc); | ||
1534 | } | 1435 | } |
1535 | 1436 | ||
1536 | 1437 | ||
@@ -1557,7 +1458,8 @@ static struct DownloadRequest * | |||
1557 | create_download_request (struct DownloadRequest *parent, | 1458 | create_download_request (struct DownloadRequest *parent, |
1558 | unsigned int chk_idx, | 1459 | unsigned int chk_idx, |
1559 | unsigned int depth, | 1460 | unsigned int depth, |
1560 | uint64_t dr_offset, uint64_t file_start_offset, | 1461 | uint64_t dr_offset, |
1462 | uint64_t file_start_offset, | ||
1561 | uint64_t desired_length) | 1463 | uint64_t desired_length) |
1562 | { | 1464 | { |
1563 | struct DownloadRequest *dr; | 1465 | struct DownloadRequest *dr; |
@@ -1746,13 +1648,9 @@ reconstruct_cb (void *cls, const struct ContentHashKey *chk, uint64_t offset, | |||
1746 | /* block matches, hence tree below matches; | 1648 | /* block matches, hence tree below matches; |
1747 | * this request is done! */ | 1649 | * this request is done! */ |
1748 | dr->state = BRS_DOWNLOAD_UP; | 1650 | dr->state = BRS_DOWNLOAD_UP; |
1749 | (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, &dr->chk.query, dr); | 1651 | (void) GNUNET_CONTAINER_multihashmap_remove (dc->active, |
1750 | if (GNUNET_YES == dr->is_pending) | 1652 | &dr->chk.query, |
1751 | { | 1653 | dr); |
1752 | GNUNET_break (0); /* how did we get here? */ | ||
1753 | GNUNET_CONTAINER_DLL_remove (dc->pending_head, dc->pending_tail, dr); | ||
1754 | dr->is_pending = GNUNET_NO; | ||
1755 | } | ||
1756 | /* calculate how many bytes of payload this block | 1654 | /* calculate how many bytes of payload this block |
1757 | * corresponds to */ | 1655 | * corresponds to */ |
1758 | blen = GNUNET_FS_tree_compute_tree_size (dr->depth); | 1656 | blen = GNUNET_FS_tree_compute_tree_size (dr->depth); |
@@ -1860,7 +1758,8 @@ GNUNET_FS_download_start_task_ (void *cls) | |||
1860 | struct GNUNET_FS_ProgressInfo pi; | 1758 | struct GNUNET_FS_ProgressInfo pi; |
1861 | struct GNUNET_DISK_FileHandle *fh; | 1759 | struct GNUNET_DISK_FileHandle *fh; |
1862 | 1760 | ||
1863 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Start task running...\n"); | 1761 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
1762 | "Start task running...\n"); | ||
1864 | dc->task = NULL; | 1763 | dc->task = NULL; |
1865 | if (0 == dc->length) | 1764 | if (0 == dc->length) |
1866 | { | 1765 | { |
@@ -1978,8 +1877,10 @@ GNUNET_FS_download_start_task_ (void *cls) | |||
1978 | dc->te = | 1877 | dc->te = |
1979 | GNUNET_FS_tree_encoder_create (dc->h, | 1878 | GNUNET_FS_tree_encoder_create (dc->h, |
1980 | GNUNET_FS_uri_chk_get_file_size (dc->uri), | 1879 | GNUNET_FS_uri_chk_get_file_size (dc->uri), |
1981 | dc, &fh_reader, | 1880 | dc, |
1982 | &reconstruct_cb, NULL, | 1881 | &fh_reader, |
1882 | &reconstruct_cb, | ||
1883 | NULL, | ||
1983 | &reconstruct_cont); | 1884 | &reconstruct_cont); |
1984 | dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc); | 1885 | dc->task = GNUNET_SCHEDULER_add_now (&get_next_block, dc); |
1985 | } | 1886 | } |
@@ -2079,9 +1980,13 @@ struct GNUNET_FS_DownloadContext * | |||
2079 | create_download_context (struct GNUNET_FS_Handle *h, | 1980 | create_download_context (struct GNUNET_FS_Handle *h, |
2080 | const struct GNUNET_FS_Uri *uri, | 1981 | const struct GNUNET_FS_Uri *uri, |
2081 | const struct GNUNET_CONTAINER_MetaData *meta, | 1982 | const struct GNUNET_CONTAINER_MetaData *meta, |
2082 | const char *filename, const char *tempname, | 1983 | const char *filename, |
2083 | uint64_t offset, uint64_t length, uint32_t anonymity, | 1984 | const char *tempname, |
2084 | enum GNUNET_FS_DownloadOptions options, void *cctx) | 1985 | uint64_t offset, |
1986 | uint64_t length, | ||
1987 | uint32_t anonymity, | ||
1988 | enum GNUNET_FS_DownloadOptions options, | ||
1989 | void *cctx) | ||
2085 | { | 1990 | { |
2086 | struct GNUNET_FS_DownloadContext *dc; | 1991 | struct GNUNET_FS_DownloadContext *dc; |
2087 | 1992 | ||
@@ -2132,7 +2037,8 @@ create_download_context (struct GNUNET_FS_Handle *h, | |||
2132 | filename, | 2037 | filename, |
2133 | (unsigned long long) length, | 2038 | (unsigned long long) length, |
2134 | dc->treedepth); | 2039 | dc->treedepth); |
2135 | dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, dc); | 2040 | dc->task = GNUNET_SCHEDULER_add_now (&GNUNET_FS_download_start_task_, |
2041 | dc); | ||
2136 | return dc; | 2042 | return dc; |
2137 | } | 2043 | } |
2138 | 2044 | ||
@@ -2290,6 +2196,8 @@ GNUNET_FS_download_start_downloading_ (struct GNUNET_FS_DownloadContext *dc) | |||
2290 | { | 2196 | { |
2291 | if (dc->completed == dc->length) | 2197 | if (dc->completed == dc->length) |
2292 | return; | 2198 | return; |
2199 | if (NULL != dc->mq) | ||
2200 | return; /* already running */ | ||
2293 | GNUNET_assert (NULL == dc->job_queue); | 2201 | GNUNET_assert (NULL == dc->job_queue); |
2294 | GNUNET_assert (NULL != dc->active); | 2202 | GNUNET_assert (NULL != dc->active); |
2295 | dc->job_queue = | 2203 | dc->job_queue = |
diff --git a/src/fs/fs_search.c b/src/fs/fs_search.c index f3221ac76..9a1b822e1 100644 --- a/src/fs/fs_search.c +++ b/src/fs/fs_search.c | |||
@@ -368,7 +368,7 @@ probe_ping_task_cb (void *cls) | |||
368 | struct GNUNET_FS_SearchResult *sr; | 368 | struct GNUNET_FS_SearchResult *sr; |
369 | 369 | ||
370 | for (sr = h->probes_head; NULL != sr; sr = sr->next) | 370 | for (sr = h->probes_head; NULL != sr; sr = sr->next) |
371 | if (NULL != sr->probe_ctx->client) | 371 | if (NULL != sr->probe_ctx->mq) |
372 | signal_probe_result (sr); | 372 | signal_probe_result (sr); |
373 | h->probe_ping_task | 373 | h->probe_ping_task |
374 | = GNUNET_SCHEDULER_add_delayed (GNUNET_FS_PROBE_UPDATE_FREQUENCY, | 374 | = GNUNET_SCHEDULER_add_delayed (GNUNET_FS_PROBE_UPDATE_FREQUENCY, |
diff --git a/src/fs/fs_unindex.c b/src/fs/fs_unindex.c index e614fbe03..f43725a59 100644 --- a/src/fs/fs_unindex.c +++ b/src/fs/fs_unindex.c | |||
@@ -110,8 +110,11 @@ GNUNET_FS_unindex_make_status_ (struct GNUNET_FS_ProgressInfo *pi, | |||
110 | * @param depth depth of the block in the tree, 0 for DBLOCK | 110 | * @param depth depth of the block in the tree, 0 for DBLOCK |
111 | */ | 111 | */ |
112 | static void | 112 | static void |
113 | unindex_progress (void *cls, uint64_t offset, const void *pt_block, | 113 | unindex_progress (void *cls, |
114 | size_t pt_size, unsigned int depth) | 114 | uint64_t offset, |
115 | const void *pt_block, | ||
116 | size_t pt_size, | ||
117 | unsigned int depth) | ||
115 | { | 118 | { |
116 | struct GNUNET_FS_UnindexContext *uc = cls; | 119 | struct GNUNET_FS_UnindexContext *uc = cls; |
117 | struct GNUNET_FS_ProgressInfo pi; | 120 | struct GNUNET_FS_ProgressInfo pi; |