aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2012-06-21 23:05:54 +0000
committerChristian Grothoff <christian@grothoff.org>2012-06-21 23:05:54 +0000
commit4e7a535cc04df4eef21dc1db39d782a3365e9df1 (patch)
tree572db9632938b5b8db59424af03440b8da36734b /src/fs
parente94ad8771c6ffe61b6ffb53c15567793760e75ca (diff)
downloadgnunet-4e7a535cc04df4eef21dc1db39d782a3365e9df1.tar.gz
gnunet-4e7a535cc04df4eef21dc1db39d782a3365e9df1.zip
-work on #2437, plus misc minor bugfixes
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/fs_api.c291
-rw-r--r--src/fs/gnunet-download.c22
2 files changed, 168 insertions, 145 deletions
diff --git a/src/fs/fs_api.c b/src/fs/fs_api.c
index 6d9af1187..5b4bfa020 100644
--- a/src/fs/fs_api.c
+++ b/src/fs/fs_api.c
@@ -59,6 +59,7 @@ start_job (struct GNUNET_FS_QueueEntry *qe)
59 qe->start (qe->cls, qe->client); 59 qe->start (qe->cls, qe->client);
60 qe->start_times++; 60 qe->start_times++;
61 qe->h->active_blocks += qe->blocks; 61 qe->h->active_blocks += qe->blocks;
62 qe->h->active_downloads++;
62 qe->start_time = GNUNET_TIME_absolute_get (); 63 qe->start_time = GNUNET_TIME_absolute_get ();
63 GNUNET_CONTAINER_DLL_remove (qe->h->pending_head, qe->h->pending_tail, qe); 64 GNUNET_CONTAINER_DLL_remove (qe->h->pending_head, qe->h->pending_tail, qe);
64 GNUNET_CONTAINER_DLL_insert_after (qe->h->running_head, qe->h->running_tail, 65 GNUNET_CONTAINER_DLL_insert_after (qe->h->running_head, qe->h->running_tail,
@@ -77,6 +78,7 @@ stop_job (struct GNUNET_FS_QueueEntry *qe)
77{ 78{
78 qe->client = NULL; 79 qe->client = NULL;
79 qe->stop (qe->cls); 80 qe->stop (qe->cls);
81 GNUNET_assert (0 < qe->h->active_downloads);
80 qe->h->active_downloads--; 82 qe->h->active_downloads--;
81 qe->h->active_blocks -= qe->blocks; 83 qe->h->active_blocks -= qe->blocks;
82 qe->run_time = 84 qe->run_time =
@@ -106,20 +108,24 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
106 struct GNUNET_TIME_Relative restart_at; 108 struct GNUNET_TIME_Relative restart_at;
107 struct GNUNET_TIME_Relative rst; 109 struct GNUNET_TIME_Relative rst;
108 struct GNUNET_TIME_Absolute end_time; 110 struct GNUNET_TIME_Absolute end_time;
109 unsigned int num_download_waiting; 111 unsigned int num_downloads_waiting;
110 unsigned int num_download_active; 112 unsigned int num_downloads_active;
111 unsigned int num_download_expired; 113 unsigned int num_downloads_expired;
112 unsigned int num_probes_active; 114 unsigned int num_probes_active;
113 unsigned int num_probes_waiting; 115 unsigned int num_probes_waiting;
114 unsigned int num_probes_expired; 116 unsigned int num_probes_expired;
115 int num_probes_change; 117 int num_probes_change;
116 int num_download_change; 118 int num_downloads_change;
119 int block_limit_hit;
117 120
118 h->queue_job = GNUNET_SCHEDULER_NO_TASK; 121 h->queue_job = GNUNET_SCHEDULER_NO_TASK;
122 /* restart_at will be set to the time when it makes sense to
123 re-evaluate the job queue (unless, of course, jobs complete
124 or are added, then we'll be triggered immediately */
119 restart_at = GNUNET_TIME_UNIT_FOREVER_REL; 125 restart_at = GNUNET_TIME_UNIT_FOREVER_REL;
120 /* first, see if we can start all the jobs */ 126 /* first, calculate some basic statistics on pending jobs */
121 num_probes_waiting = 0; 127 num_probes_waiting = 0;
122 num_download_waiting = 0; 128 num_downloads_waiting = 0;
123 for (qe = h->pending_head; NULL != qe; qe = qe->next) 129 for (qe = h->pending_head; NULL != qe; qe = qe->next)
124 { 130 {
125 switch (qe->priority) 131 switch (qe->priority)
@@ -128,17 +134,18 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
128 num_probes_waiting++; 134 num_probes_waiting++;
129 break; 135 break;
130 case GNUNET_FS_QUEUE_PRIORITY_NORMAL: 136 case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
131 num_download_waiting++; 137 num_downloads_waiting++;
132 break; 138 break;
133 default: 139 default:
134 GNUNET_break (0); 140 GNUNET_break (0);
135 break; 141 break;
136 } 142 }
137 } 143 }
144 /* now, calculate some basic statistics on running jobs */
138 num_probes_active = 0; 145 num_probes_active = 0;
139 num_probes_expired = 0; 146 num_probes_expired = 0;
140 num_download_active = 0; 147 num_downloads_active = 0;
141 num_download_expired = 0; 148 num_downloads_expired = 0;
142 for (qe = h->running_head; NULL != qe; qe = qe->next) 149 for (qe = h->running_head; NULL != qe; qe = qe->next)
143 { 150 {
144 run_time = 151 run_time =
@@ -146,48 +153,60 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
146 qe->blocks * qe->start_times); 153 qe->blocks * qe->start_times);
147 switch (qe->priority) 154 switch (qe->priority)
148 { 155 {
149 case GNUNET_FS_QUEUE_PRIORITY_PROBE: 156 case GNUNET_FS_QUEUE_PRIORITY_PROBE:
150 num_probes_active++; 157 num_probes_active++;
151 /* run probes for at most 1s * number-of-restarts; note that 158 /* run probes for at most 1s * number-of-restarts; note that
152 as the total runtime of a probe is limited to 2m, we don't 159 as the total runtime of a probe is limited to 2m, we don't
153 need to additionally limit the total time of a probe to 160 need to additionally limit the total time of a probe to
154 strictly limit its lifetime. */ 161 strictly limit its lifetime. */
155 run_time = GNUNET_TIME_relative_min (run_time, 162 run_time = GNUNET_TIME_relative_min (run_time,
156 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 163 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
157 1 + qe->start_times)); 164 1 + qe->start_times));
158 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); 165 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
159 rst = GNUNET_TIME_absolute_get_remaining (end_time); 166 rst = GNUNET_TIME_absolute_get_remaining (end_time);
160 restart_at = GNUNET_TIME_relative_min (rst, restart_at); 167 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
161 if (0 == rst.rel_value) 168 if (0 == rst.rel_value)
162 num_probes_expired++; 169 num_probes_expired++;
163 break; 170 break;
164 case GNUNET_FS_QUEUE_PRIORITY_NORMAL: 171 case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
165 num_download_active++; 172 num_downloads_active++;
166 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); 173 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
167 rst = GNUNET_TIME_absolute_get_remaining (end_time); 174 rst = GNUNET_TIME_absolute_get_remaining (end_time);
168 restart_at = GNUNET_TIME_relative_min (rst, restart_at); 175 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
169 if (0 == rst.rel_value) 176 if (0 == rst.rel_value)
170 num_download_expired++; 177 num_downloads_expired++;
171 break; 178 break;
172 default: 179 default:
173 GNUNET_break (0); 180 GNUNET_break (0);
174 break; 181 break;
175 } 182 }
176 } 183 }
177 184 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
185 "PA: %u, PE: %u, PW: %u; DA: %u, DE: %u, DW: %u\n",
186 num_probes_active,
187 num_probes_expired,
188 num_probes_waiting,
189 num_downloads_active,
190 num_downloads_expired,
191 num_downloads_waiting);
192
178 /* calculate stop decisions */ 193 /* calculate stop decisions */
179 num_probes_change = 0; 194 num_probes_change = 0;
180 num_download_change = 0; 195 num_downloads_change = 0;
181 if (h->active_downloads + num_download_waiting > h->max_parallel_requests) 196 if (h->active_downloads + num_downloads_waiting > h->max_parallel_requests)
182 { 197 {
183 if (num_probes_active > 0) 198 if (num_probes_active > 0)
184 num_probes_change = - GNUNET_MIN (num_probes_active, 199 num_probes_change = - GNUNET_MIN (num_probes_active,
185 h->max_parallel_requests - (h->active_downloads + num_download_waiting)); 200 h->max_parallel_requests - (h->active_downloads + num_downloads_waiting));
186 else if (h->active_downloads + num_download_waiting > h->max_parallel_requests) 201 else if (h->active_downloads + num_downloads_waiting > h->max_parallel_requests)
187 num_download_change = - GNUNET_MIN (num_download_expired, 202 num_downloads_change = - GNUNET_MIN (num_downloads_expired,
188 h->max_parallel_requests - (h->active_downloads + num_download_waiting)); 203 h->max_parallel_requests - (h->active_downloads + num_downloads_waiting));
189 } 204 }
190 205
206 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
207 "Stopping %d probes and %d downloads\n",
208 num_probes_change,
209 num_downloads_change);
191 /* then, check if we should stop some jobs */ 210 /* then, check if we should stop some jobs */
192 next = h->running_head; 211 next = h->running_head;
193 while (NULL != (qe = next)) 212 while (NULL != (qe = next))
@@ -197,121 +216,113 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
197 GNUNET_TIME_relative_multiply (h->avg_block_latency, 216 GNUNET_TIME_relative_multiply (h->avg_block_latency,
198 qe->blocks * qe->start_times); 217 qe->blocks * qe->start_times);
199 switch (qe->priority) 218 switch (qe->priority)
219 {
220 case GNUNET_FS_QUEUE_PRIORITY_PROBE:
221 /* run probes for at most 1s * number-of-restarts; note that
222 as the total runtime of a probe is limited to 2m, we don't
223 need to additionally limit the total time of a probe to
224 strictly limit its lifetime. */
225 run_time = GNUNET_TIME_relative_min (run_time,
226 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
227 1 + qe->start_times));
228 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
229 rst = GNUNET_TIME_absolute_get_remaining (end_time);
230 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
231 if ( (num_probes_change < 0) &&
232 ( (num_probes_expired < - num_probes_change) ||
233 (0 == rst.rel_value) ) )
200 { 234 {
201 case GNUNET_FS_QUEUE_PRIORITY_PROBE: 235 stop_job (qe);
202 /* run probes for at most 1s * number-of-restarts; note that 236 num_probes_change++;
203 as the total runtime of a probe is limited to 2m, we don't 237 if (0 == rst.rel_value)
204 need to additionally limit the total time of a probe to 238 num_probes_expired--;
205 strictly limit its lifetime. */
206 run_time = GNUNET_TIME_relative_min (run_time,
207 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
208 1 + qe->start_times));
209 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
210 rst = GNUNET_TIME_absolute_get_remaining (end_time);
211 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
212 if ( (num_probes_change < 0) &&
213 ( (num_probes_expired < - num_probes_change) ||
214 (0 == rst.rel_value) ) )
215 {
216 stop_job (qe);
217 num_probes_change++;
218 if (0 == rst.rel_value)
219 num_probes_expired--;
220 }
221 break;
222 case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
223 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
224 rst = GNUNET_TIME_absolute_get_remaining (end_time);
225 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
226 if ( (num_download_change < 0) &&
227 ( (num_download_expired < - num_download_change) ||
228 (0 == rst.rel_value) ) )
229 {
230 stop_job (qe);
231 num_download_change++;
232 if (0 == rst.rel_value)
233 num_download_expired--;
234 }
235 break;
236 default:
237 GNUNET_break (0);
238 break;
239 } 239 }
240 break;
241 case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
242 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
243 rst = GNUNET_TIME_absolute_get_remaining (end_time);
244 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
245 if ( (num_downloads_change < 0) &&
246 ( (num_downloads_expired < - num_downloads_change) ||
247 (0 == rst.rel_value) ) )
248 {
249 stop_job (qe);
250 num_downloads_change++;
251 if (0 == rst.rel_value)
252 num_downloads_expired--;
253 }
254 break;
255 default:
256 GNUNET_break (0);
257 break;
258 }
240 } 259 }
260 GNUNET_break (0 == num_downloads_change);
261 GNUNET_break (0 == num_probes_change);
241 262
242 /* FIXME: calculate start decisions */ 263 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
243 num_probes_change = 0; 264 "AD: %u, MP: %u\n",
244 num_download_change = 0; 265 h->active_downloads,
245 if (h->active_downloads + num_download_waiting < h->max_parallel_requests) 266 h->max_parallel_requests);
246 {
247 num_download_change = num_download_waiting;
248 num_probes_change = GNUNET_MIN (num_probes_waiting,
249 h->max_parallel_requests - (h->active_downloads + num_download_waiting));
250 }
251
252 267
253 next = h->pending_head; 268 /* calculate start decisions */
254 while (NULL != (qe = next)) 269 if (h->active_downloads + num_downloads_waiting < h->max_parallel_requests)
255 { 270 {
256 next = qe->next; 271 /* can start all downloads, fill rest with probes */
257 if (NULL == h->running_head) 272 num_downloads_change = num_downloads_waiting;
258 { 273 num_probes_change = GNUNET_MIN (num_probes_waiting,
259 start_job (qe); 274 h->max_parallel_requests - (h->active_downloads + num_downloads_waiting));
260 continue;
261 }
262 if ((qe->blocks + h->active_blocks <= h->max_parallel_requests) &&
263 (h->active_downloads < h->max_parallel_downloads))
264 {
265 start_job (qe);
266 continue;
267 }
268 } 275 }
269 if (NULL == h->pending_head) 276 else
270 return; /* no need to stop anything */
271 /* then, check if we should stop some jobs */
272 next = h->running_head;
273 while (NULL != (qe = next))
274 { 277 {
275 next = qe->next; 278 num_downloads_change = h->max_parallel_requests - h->active_downloads;
276 run_time = 279 num_probes_change = 0;
277 GNUNET_TIME_relative_multiply (h->avg_block_latency,
278 qe->blocks * qe->start_times);
279 switch (qe->priority)
280 {
281 case GNUNET_FS_QUEUE_PRIORITY_PROBE:
282 /* run probes for at most 1s * number-of-restarts; note that
283 as the total runtime of a probe is limited to 2m, we don't
284 need to additionally limit the total time of a probe to
285 strictly limit its lifetime. */
286 run_time = GNUNET_TIME_relative_min (run_time,
287 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
288 1 + qe->start_times));
289 break;
290 case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
291 break;
292 default:
293 GNUNET_break (0);
294 break;
295 }
296 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
297 rst = GNUNET_TIME_absolute_get_remaining (end_time);
298 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
299 if (rst.rel_value > 0)
300 continue;
301 stop_job (qe);
302 } 280 }
281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
282 "Starting %d probes and %d downloads\n",
283 num_probes_change,
284 num_downloads_change);
285
303 /* finally, start some more tasks if we now have empty slots */ 286 /* finally, start some more tasks if we now have empty slots */
287 block_limit_hit = GNUNET_NO;
304 next = h->pending_head; 288 next = h->pending_head;
305 while (NULL != (qe = next)) 289 while (NULL != (qe = next))
306 { 290 {
307 next = qe->next; 291 next = qe->next;
308 if ((qe->blocks + h->active_blocks <= h->max_parallel_requests) && 292 switch (qe->priority)
309 (h->active_downloads < h->max_parallel_downloads))
310 { 293 {
311 start_job (qe); 294 case GNUNET_FS_QUEUE_PRIORITY_PROBE:
312 continue; 295 if (num_probes_change > 0)
296 {
297 start_job (qe);
298 num_probes_change--;
299 }
300 break;
301 case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
302 if ( (num_downloads_change > 0) &&
303 ( (qe->blocks + h->active_blocks <= h->max_parallel_requests) ||
304 ( (qe->blocks > h->max_parallel_requests) &&
305 (0 == h->active_downloads) ) ) )
306 {
307 start_job (qe);
308 num_downloads_change--;
309 }
310 else if (num_downloads_change > 0)
311 block_limit_hit = GNUNET_YES;
312 break;
313 default:
314 GNUNET_break (0);
315 break;
313 } 316 }
314 } 317 }
318 GNUNET_break ( (0 == num_downloads_change) || (GNUNET_YES == block_limit_hit) );
319 GNUNET_break (0 == num_probes_change);
320 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
321 "Left with %d probes and %d downloads to start\n",
322 num_probes_change,
323 num_downloads_change);
324
325 /* make sure we run again */
315 h->queue_job = 326 h->queue_job =
316 GNUNET_SCHEDULER_add_delayed (restart_at, &process_job_queue, h); 327 GNUNET_SCHEDULER_add_delayed (restart_at, &process_job_queue, h);
317} 328}
diff --git a/src/fs/gnunet-download.c b/src/fs/gnunet-download.c
index 43b306def..2293cedd7 100644
--- a/src/fs/gnunet-download.c
+++ b/src/fs/gnunet-download.c
@@ -52,6 +52,7 @@ static char *filename;
52 52
53static int local_only; 53static int local_only;
54 54
55
55static void 56static void
56cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 57cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
57{ 58{
@@ -63,13 +64,10 @@ cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
63static void 64static void
64shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 65shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
65{ 66{
66 struct GNUNET_FS_DownloadContext *d; 67 if (NULL != dc)
67
68 if (dc != NULL)
69 { 68 {
70 d = dc; 69 GNUNET_FS_download_stop (dc, delete_incomplete);
71 dc = NULL; 70 dc = NULL;
72 GNUNET_FS_download_stop (d, delete_incomplete);
73 } 71 }
74} 72}
75 73
@@ -170,6 +168,13 @@ progress_cb (void *cls, const struct GNUNET_FS_ProgressInfo *info)
170 } 168 }
171 break; 169 break;
172 case GNUNET_FS_STATUS_DOWNLOAD_ERROR: 170 case GNUNET_FS_STATUS_DOWNLOAD_ERROR:
171#if !WINDOWS
172 if (0 != isatty (1))
173 fprintf (stdout, "\n");
174#else
175 if (FILE_TYPE_CHAR == GetFileType (GetStdHandle (STD_OUTPUT_HANDLE)))
176 fprintf (stdout, "\n");
177#endif
173 FPRINTF (stderr, _("Error downloading: %s.\n"), 178 FPRINTF (stderr, _("Error downloading: %s.\n"),
174 info->value.download.specifics.error.message); 179 info->value.download.specifics.error.message);
175 GNUNET_SCHEDULER_shutdown (); 180 GNUNET_SCHEDULER_shutdown ();
@@ -178,6 +183,13 @@ progress_cb (void *cls, const struct GNUNET_FS_ProgressInfo *info)
178 s = GNUNET_STRINGS_byte_size_fancy (info->value.download.completed * 1000 / 183 s = GNUNET_STRINGS_byte_size_fancy (info->value.download.completed * 1000 /
179 (info->value.download. 184 (info->value.download.
180 duration.rel_value + 1)); 185 duration.rel_value + 1));
186#if !WINDOWS
187 if (0 != isatty (1))
188 fprintf (stdout, "\n");
189#else
190 if (FILE_TYPE_CHAR == GetFileType (GetStdHandle (STD_OUTPUT_HANDLE)))
191 fprintf (stdout, "\n");
192#endif
181 FPRINTF (stdout, _("Downloading `%s' done (%s/s).\n"), 193 FPRINTF (stdout, _("Downloading `%s' done (%s/s).\n"),
182 info->value.download.filename, s); 194 info->value.download.filename, s);
183 GNUNET_free (s); 195 GNUNET_free (s);