aboutsummaryrefslogtreecommitdiff
path: root/src/fs/fs_api.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2012-10-06 19:52:19 +0000
committerChristian Grothoff <christian@grothoff.org>2012-10-06 19:52:19 +0000
commit34a2835c1a3746d8db8428651437f44eb49ca336 (patch)
tree9e50bedc1170bfdf2b578fa82f7e7747af33b406 /src/fs/fs_api.c
parente2bd3276c59e502e19e4c8a4469ff8d8f2e2c202 (diff)
downloadgnunet-34a2835c1a3746d8db8428651437f44eb49ca336.tar.gz
gnunet-34a2835c1a3746d8db8428651437f44eb49ca336.zip
-fixing #2578
Diffstat (limited to 'src/fs/fs_api.c')
-rw-r--r--src/fs/fs_api.c157
1 files changed, 58 insertions, 99 deletions
diff --git a/src/fs/fs_api.c b/src/fs/fs_api.c
index d6a434e6e..c99131d94 100644
--- a/src/fs/fs_api.c
+++ b/src/fs/fs_api.c
@@ -146,35 +146,43 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
146 num_probes_expired = 0; 146 num_probes_expired = 0;
147 num_downloads_active = 0; 147 num_downloads_active = 0;
148 num_downloads_expired = 0; 148 num_downloads_expired = 0;
149 for (qe = h->running_head; NULL != qe; qe = qe->next) 149 next = h->running_head;
150 while (NULL != (qe = next))
150 { 151 {
151 run_time = 152 next = qe->next;
152 GNUNET_TIME_relative_multiply (h->avg_block_latency,
153 qe->blocks * qe->start_times);
154 switch (qe->priority) 153 switch (qe->priority)
155 { 154 {
156 case GNUNET_FS_QUEUE_PRIORITY_PROBE: 155 case GNUNET_FS_QUEUE_PRIORITY_PROBE:
157 num_probes_active++; 156 run_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2);
158 /* run probes for at most 1s * number-of-restarts; note that
159 as the total runtime of a probe is limited to 2m, we don't
160 need to additionally limit the total time of a probe to
161 strictly limit its lifetime. */
162 run_time = GNUNET_TIME_relative_min (run_time,
163 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
164 1 + qe->start_times));
165 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); 157 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
166 rst = GNUNET_TIME_absolute_get_remaining (end_time); 158 rst = GNUNET_TIME_absolute_get_remaining (end_time);
167 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
168 if (0 == rst.rel_value) 159 if (0 == rst.rel_value)
160 {
169 num_probes_expired++; 161 num_probes_expired++;
162 stop_job (qe);
163 }
164 else
165 {
166 num_probes_active++;
167 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
168 }
170 break; 169 break;
171 case GNUNET_FS_QUEUE_PRIORITY_NORMAL: 170 case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
172 num_downloads_active++; 171 run_time =
172 GNUNET_TIME_relative_multiply (h->avg_block_latency,
173 qe->blocks * qe->start_times);
173 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); 174 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
174 rst = GNUNET_TIME_absolute_get_remaining (end_time); 175 rst = GNUNET_TIME_absolute_get_remaining (end_time);
175 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
176 if (0 == rst.rel_value) 176 if (0 == rst.rel_value)
177 {
177 num_downloads_expired++; 178 num_downloads_expired++;
179 stop_job (qe);
180 }
181 else
182 {
183 num_downloads_active++;
184 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
185 }
178 break; 186 break;
179 default: 187 default:
180 GNUNET_break (0); 188 GNUNET_break (0);
@@ -189,104 +197,49 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
189 num_downloads_active, 197 num_downloads_active,
190 num_downloads_expired, 198 num_downloads_expired,
191 num_downloads_waiting); 199 num_downloads_waiting);
192 200 /* calculate start/stop decisions */
193 /* calculate stop decisions */
194 num_probes_change = 0;
195 num_downloads_change = 0;
196 if (h->active_downloads + num_downloads_waiting > h->max_parallel_requests) 201 if (h->active_downloads + num_downloads_waiting > h->max_parallel_requests)
197 { 202 {
198 if (num_probes_active > 0) 203 /* stop probes if possible */
199 num_probes_change = - GNUNET_MIN (num_probes_active, 204 num_probes_change = - num_probes_active;
200 h->max_parallel_requests - (h->active_downloads + num_downloads_waiting)); 205 num_downloads_change = h->max_parallel_requests - h->active_downloads;
201 else if (h->active_downloads + num_downloads_waiting > h->max_parallel_requests) 206 }
202 num_downloads_change = - GNUNET_MIN (num_downloads_expired, 207 else
203 h->max_parallel_requests - (h->active_downloads + num_downloads_waiting)); 208 {
209 /* start all downloads */
210 num_downloads_change = num_downloads_waiting;
211 /* start as many probes as we can */
212 num_probes_change = GNUNET_MIN (num_probes_waiting,
213 h->max_parallel_requests - (h->active_downloads + num_downloads_waiting));
204 } 214 }
205 215
206 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 216 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
207 "Stopping %d probes and %d downloads\n", 217 "Changing %d probes and %d downloads\n",
208 num_probes_change, 218 num_probes_change,
209 num_downloads_change); 219 num_downloads_change);
210 /* then, check if we should stop some jobs */ 220 /* actually stop probes */
211 next = h->running_head; 221 next = h->running_head;
212 while (NULL != (qe = next)) 222 while (NULL != (qe = next))
213 { 223 {
214 next = qe->next; 224 next = qe->next;
215 run_time = 225 if (GNUNET_FS_QUEUE_PRIORITY_PROBE != qe->priority)
216 GNUNET_TIME_relative_multiply (h->avg_block_latency, 226 continue;
217 qe->blocks * qe->start_times); 227 if (num_probes_change < 0)
218 switch (qe->priority)
219 { 228 {
220 case GNUNET_FS_QUEUE_PRIORITY_PROBE: 229 stop_job (qe);
221 /* run probes for at most 1s * number-of-restarts; note that 230 num_probes_change++;
222 as the total runtime of a probe is limited to 2m, we don't 231 if (0 == num_probes_change)
223 need to additionally limit the total time of a probe to 232 break;
224 strictly limit its lifetime. */
225 run_time = GNUNET_TIME_relative_min (run_time,
226 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
227 1 + qe->start_times));
228 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
229 rst = GNUNET_TIME_absolute_get_remaining (end_time);
230 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
231 if ( (num_probes_change < 0) &&
232 ( (num_probes_expired < - num_probes_change) ||
233 (0 == rst.rel_value) ) )
234 {
235 stop_job (qe);
236 num_probes_change++;
237 if (0 == rst.rel_value)
238 num_probes_expired--;
239 }
240 break;
241 case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
242 end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time);
243 rst = GNUNET_TIME_absolute_get_remaining (end_time);
244 restart_at = GNUNET_TIME_relative_min (rst, restart_at);
245 if ( (num_downloads_change < 0) &&
246 ( (num_downloads_expired < - num_downloads_change) ||
247 (0 == rst.rel_value) ) )
248 {
249 stop_job (qe);
250 num_downloads_change++;
251 if (0 == rst.rel_value)
252 num_downloads_expired--;
253 }
254 break;
255 default:
256 GNUNET_break (0);
257 break;
258 } 233 }
259 } 234 }
260 GNUNET_break (0 == num_downloads_change); 235 GNUNET_break (0 <= num_probes_change);
261 GNUNET_break (0 == num_probes_change);
262
263 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
264 "AD: %u, MP: %u\n",
265 h->active_downloads,
266 h->max_parallel_requests);
267
268 /* calculate start decisions */
269 if (h->active_downloads + num_downloads_waiting < h->max_parallel_requests)
270 {
271 /* can start all downloads, fill rest with probes */
272 num_downloads_change = num_downloads_waiting;
273 num_probes_change = GNUNET_MIN (num_probes_waiting,
274 h->max_parallel_requests - (h->active_downloads + num_downloads_waiting));
275 }
276 else
277 {
278 num_downloads_change = h->max_parallel_requests - h->active_downloads;
279 num_probes_change = 0;
280 }
281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
282 "Starting %d probes and %d downloads\n",
283 num_probes_change,
284 num_downloads_change);
285 236
286 /* finally, start some more tasks if we now have empty slots */ 237 /* start some more tasks if we now have empty slots */
287 block_limit_hit = GNUNET_NO; 238 block_limit_hit = GNUNET_NO;
288 next = h->pending_head; 239 next = h->pending_head;
289 while (NULL != (qe = next)) 240 while ( (NULL != (qe = next)) &&
241 ( (num_probes_change > 0) ||
242 (num_downloads_change > 0) ) )
290 { 243 {
291 next = qe->next; 244 next = qe->next;
292 switch (qe->priority) 245 switch (qe->priority)
@@ -296,6 +249,8 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
296 { 249 {
297 start_job (qe); 250 start_job (qe);
298 num_probes_change--; 251 num_probes_change--;
252 run_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2);
253 restart_at = GNUNET_TIME_relative_min (run_time, restart_at);
299 } 254 }
300 break; 255 break;
301 case GNUNET_FS_QUEUE_PRIORITY_NORMAL: 256 case GNUNET_FS_QUEUE_PRIORITY_NORMAL:
@@ -317,10 +272,14 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
317 } 272 }
318 GNUNET_break ( (0 == num_downloads_change) || (GNUNET_YES == block_limit_hit) ); 273 GNUNET_break ( (0 == num_downloads_change) || (GNUNET_YES == block_limit_hit) );
319 GNUNET_break (0 == num_probes_change); 274 GNUNET_break (0 == num_probes_change);
275
320 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 276 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
321 "Left with %d probes and %d downloads to start\n", 277 "AD: %u, MP: %u; %d probes and %d downloads to start, will run again in %s\n",
278 h->active_downloads,
279 h->max_parallel_requests,
322 num_probes_change, 280 num_probes_change,
323 num_downloads_change); 281 num_downloads_change,
282 GNUNET_STRINGS_relative_time_to_string (restart_at, GNUNET_YES));
324 283
325 /* make sure we run again */ 284 /* make sure we run again */
326 h->queue_job = 285 h->queue_job =