aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/gnunet/util/Scheduler.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/gnunet/util/Scheduler.java')
-rw-r--r--src/main/java/org/gnunet/util/Scheduler.java466
1 files changed, 238 insertions, 228 deletions
diff --git a/src/main/java/org/gnunet/util/Scheduler.java b/src/main/java/org/gnunet/util/Scheduler.java
index f5eee8c..1f8d9dc 100644
--- a/src/main/java/org/gnunet/util/Scheduler.java
+++ b/src/main/java/org/gnunet/util/Scheduler.java
@@ -20,6 +20,7 @@
20 20
21package org.gnunet.util; 21package org.gnunet.util;
22 22
23import com.google.common.collect.Lists;
23import org.slf4j.Logger; 24import org.slf4j.Logger;
24import org.slf4j.LoggerFactory; 25import org.slf4j.LoggerFactory;
25 26
@@ -28,10 +29,13 @@ import java.nio.ByteBuffer;
28import java.nio.channels.*; 29import java.nio.channels.*;
29import java.nio.channels.spi.SelectorProvider; 30import java.nio.channels.spi.SelectorProvider;
30import java.util.*; 31import java.util.*;
31import java.util.concurrent.locks.ReentrantLock;
32 32
33/** 33/**
34 * Schedule computations using continuation passing style. 34 * Schedule computations using continuation passing style.
35 * <p/>
36 * By design, it is not safe to scheduler tasks from different threads.
37 * When threads can't be avoided, they should communicate with the scheduler by
38 * a pipe.
35 * 39 *
36 * @author Florian Dold 40 * @author Florian Dold
37 */ 41 */
@@ -39,12 +43,10 @@ public class Scheduler {
39 private static final Logger logger = LoggerFactory 43 private static final Logger logger = LoggerFactory
40 .getLogger(Scheduler.class); 44 .getLogger(Scheduler.class);
41 45
42 private static ReentrantLock lock = new ReentrantLock(true);
43
44 /** 46 /**
45 * Task that we are currently executing, or null if no task is currently running. 47 * Task that we are currently executing, or null if no task is currently running.
46 */ 48 */
47 private static TaskConfiguration activeTask = null; 49 private static TaskIdentifier activeTask = null;
48 50
49 /** 51 /**
50 * Number of tasks in the ready lists, that is, number of tasks that is ready to run 52 * Number of tasks in the ready lists, that is, number of tasks that is ready to run
@@ -69,21 +71,14 @@ public class Scheduler {
69 * For every priority, there is a list of tasks that is definitely ready to run. 71 * For every priority, there is a list of tasks that is definitely ready to run.
70 */ 72 */
71 @SuppressWarnings("unchecked") 73 @SuppressWarnings("unchecked")
72 final private static LinkedList<TaskConfiguration>[] readyLists = new LinkedList[Priority.numberOfPriorities]; 74 final private static LinkedList<TaskIdentifier>[] readyLists = new LinkedList[Priority.numberOfPriorities];
73 75
74 static { 76 static {
75 for (int i = 0; i < Priority.numberOfPriorities; ++i) { 77 for (int i = 0; i < Priority.numberOfPriorities; ++i) {
76 readyLists[i] = new LinkedList<TaskConfiguration>(); 78 readyLists[i] = new LinkedList<TaskIdentifier>();
77 } 79 }
78 } 80 }
79 81
80 private static final int EVENT_READ = 0, EVENT_WRITE = 1, EVENT_ACCEPT = 2, EVENT_CONNECT = 3;
81 private static final int[] eventToInterestOp = new int[]{SelectionKey.OP_READ, SelectionKey.OP_WRITE,
82 SelectionKey.OP_ACCEPT, SelectionKey.OP_CONNECT};
83 private static final Reason[] eventToReason = new Reason[]{Reason.READ_READY, Reason.WRITE_READY,
84 Reason.ACCEPT_READY, Reason.CONNECT_READY};
85
86
87 /** 82 /**
88 * Selector, used to check file descriptors for readiness. 83 * Selector, used to check file descriptors for readiness.
89 */ 84 */
@@ -102,37 +97,20 @@ public class Scheduler {
102 /** 97 /**
103 * true iff the scheduler is currently running. 98 * true iff the scheduler is currently running.
104 */ 99 */
105 private static boolean scheduler_running = false; 100 private static boolean schedulerRunning = false;
106
107 101
108 /** 102 /**
109 * Pending tasks are waiting for an event. Each pending task has a (possibly infinitely long) 103 * Pending tasks are waiting for an event. Each pending task has a (possibly infinitely long)
110 * deadline after which the task is executed regardless of the prerequisites. 104 * deadline after which the task is executed regardless of the prerequisites.
111 */ 105 */
112 final private static Queue<TaskConfiguration> pending = new PriorityQueue<TaskConfiguration>(5, new Comparator 106 final private static Queue<TaskIdentifier> pending = new PriorityQueue<TaskIdentifier>(5, new Comparator
113 <TaskConfiguration>() { 107 <TaskIdentifier>() {
114 @Override 108 @Override
115 public int compare(TaskConfiguration a, TaskConfiguration b) { 109 public int compare(TaskIdentifier a, TaskIdentifier b) {
116 return a.deadline.compareTo(b.deadline); 110 return a.deadline.compareTo(b.deadline);
117 } 111 }
118 }); 112 });
119 113
120
121 /**
122 * Reset the scheduler forcefully.
123 * Intended to be used internally in the Scheduler, as well as in test teardown.
124 */
125 public static void forceReset() {
126 scheduler_running = false;
127 readyCount = 0;
128 activeTask = null;
129 for (int i = 0; i < Priority.numberOfPriorities; ++i) {
130 readyLists[i] = new LinkedList<TaskConfiguration>();
131 }
132 pending.clear();
133 }
134
135
136 /** 114 /**
137 * Reasons for executing a task. 115 * Reasons for executing a task.
138 */ 116 */
@@ -147,94 +125,133 @@ public class Scheduler {
147 /** 125 /**
148 * The reason this task has been called by the scheduler. 126 * The reason this task has been called by the scheduler.
149 */ 127 */
150 public Set<Reason> reasons = EnumSet.noneOf(Reason.class); 128 public EnumSet<Reason> reasons = EnumSet.noneOf(Reason.class);
151 } 129 }
152 130
153 /** 131 /**
154 * A task is the basic unit of work that is managed by the scheduler. 132 * Which operations is a task identifier interested in?
155 */ 133 */
156 public static interface Task { 134 private static class TaskInterestOps {
157 public void run(RunContext ctx); 135 TaskIdentifier tid;
136 int interestOps;
158 } 137 }
159 138
160 /** 139 /**
161 * A TaskConfiguration represents a Task that will execute or has already been executed. 140 * Manage subscriptions for selection events on channels.
162 */ 141 */
163 public static class TaskConfiguration implements Cancelable { 142 private static class Subscriptions {
164 private final Task task; 143 private static class ChannelInterest {
165 private RunContext ctx = new RunContext(); 144 SelectableChannel channel;
166 private boolean lifeness = true; 145 int interestOps;
167 private Priority priority;
168 private final AbsoluteTime deadline;
169
170 private ArrayList<SelectableChannel> eventChannels = null;
171 private ArrayList<Integer> eventTypes = null;
172
173 private boolean hasRun = false;
174 private boolean isCanceled = false;
175
176 /**
177 * Create a TaskIdentifier.
178 *
179 * @param delay when will the task be run?
180 * may be null to indicate that this task may not be run
181 * (but only queued directly)
182 * @param task task to run with this TaskIdentifier
183 */
184 TaskConfiguration(RelativeTime delay, Task task) {
185 this.task = task;
186 if (delay == null)
187 this.deadline = null;
188 else
189 this.deadline = delay.toAbsolute();
190 } 146 }
191 147
192 /** 148 List<ChannelInterest> channelInterests = Lists.newLinkedList();
193 * Register the task configuration to run once the given event completed. 149
194 * 150 void add(SelectableChannel channel, int interestOps) {
195 * @param channel channel that we wait for an event on 151 boolean found = false;
196 * @param eventType the event we wait on 152 for (ChannelInterest ci : channelInterests) {
197 */ 153 if (ci.channel == channel) {
198 private void addChannelEvent(SelectableChannel channel, int eventType) { 154 ci.interestOps |= interestOps;
199 if (channel == null) { 155 if ((ci.interestOps | SelectionKey.OP_CONNECT | SelectionKey.OP_READ) != 0) {
200 throw new AssertionError("channel must be non-null"); 156 throw new AssertionError("OP_CONNECT and OP_READ are incompatible in java");
157 }
158 found = true;
159 break;
160 }
201 } 161 }
202 if (eventChannels == null) { 162 if (!found) {
203 eventChannels = new ArrayList<SelectableChannel>(); 163 ChannelInterest ci = new ChannelInterest();
204 eventTypes = new ArrayList<Integer>(); 164 ci.channel = channel;
165 ci.interestOps = interestOps;
166 channelInterests.add(ci);
205 } 167 }
206 eventChannels.add(channel); 168 }
207 eventTypes.add(eventType);
208 169
209 int interestOp = eventToInterestOp[eventType]; 170 void apply(TaskIdentifier tid) {
171 for (ChannelInterest ci : channelInterests) {
172 SelectionKey key = ci.channel.keyFor(selector);
173 if (key == null || !key.isValid()) {
174 try {
175 key = ci.channel.register(selector, ci.interestOps);
176 key.attach(new LinkedList());
177 } catch (ClosedChannelException e) {
178 throw new IOError(e);
179 }
180 } else {
181 key.interestOps(key.interestOps() | ci.interestOps);
182 }
183 @SuppressWarnings("unchecked")
184 LinkedList<TaskInterestOps> opl = (LinkedList<TaskInterestOps>) key.attachment();
185 TaskInterestOps tio = new TaskInterestOps();
186 tio.tid = tid;
187 tio.interestOps = ci.interestOps;
188 opl.add(tio);
189 }
190 }
210 191
211 SelectionKey key = channel.keyFor(selector); 192 void stop(TaskIdentifier tid) {
212 if (key == null || !key.isValid()) { 193 for (ChannelInterest ci : channelInterests) {
213 try { 194 SelectionKey key = ci.channel.keyFor(selector);
214 // tread safety ... avoid deadlock 195 if (key == null || !key.isValid()) {
215 selector.wakeup(); 196 logger.warn("missing selection key");
216 key = channel.register(selector, interestOp, new TaskConfiguration[4]); 197 return;
217 } catch (ClosedChannelException e) {
218 throw new IOError(e);
219 } 198 }
220 } else { 199 @SuppressWarnings("unchecked")
221 if ((key.interestOps() & interestOp) != 0) { 200 LinkedList<TaskInterestOps> interestList = (LinkedList<TaskInterestOps>) key.attachment();
222 throw new AssertionError("interest op registered twice"); 201 Iterator<TaskInterestOps> it = interestList.iterator();
202 int remainingInterestOps = 0;
203 while (it.hasNext()) {
204 TaskInterestOps ops = it.next();
205 if (ops.tid == tid) {
206 it.remove();
207 } else {
208 remainingInterestOps |= ops.interestOps;
209 }
223 } 210 }
224 key.interestOps(key.interestOps() | interestOp); 211 key.interestOps(remainingInterestOps);
225 } 212 }
213 }
214 }
226 215
227 TaskConfiguration[] subscribers = (TaskConfiguration[]) key.attachment(); 216 /**
228 if (subscribers[eventType] != null) { 217 * A task is the basic unit of work that is managed by the scheduler.
229 throw new AssertionError("subscriber registered twice"); 218 */
230 } 219 public static interface Task {
231 subscribers[eventType] = this; 220 public void run(RunContext ctx);
221 }
232 222
233 if (subscribers[EVENT_CONNECT] != null && subscribers[EVENT_READ] != null) { 223 /**
234 throw new AssertionError("OP_CONNECT and OP_READ are incompatible in java"); 224 * Representation of a task that has been scheduled, and can be canceled
235 } 225 * until the task has run.
226 */
227 public static class TaskIdentifier implements Cancelable {
228 private boolean hasRun = false;
229 private boolean isCanceled = false;
230 private final Task task;
231 private final RunContext ctx = new RunContext();
232 private final boolean lifeness;
233 private final Priority priority;
234 private final AbsoluteTime deadline;
235 private final Subscriptions subscriptions;
236
237 public TaskIdentifier(Task task, EnumSet<Reason> reasons) {
238 this.ctx.reasons = reasons;
239 this.task = task;
240 lifeness = true;
241 priority = Priority.DEFAULT;
242 deadline = null;
243 subscriptions = null;
244 }
245
246 public TaskIdentifier(TaskConfiguration tc) {
247 this.task = tc.task;
248 this.subscriptions = tc.subscriptions;
249 this.deadline = tc.deadline;
250 this.priority = tc.priority;
251 this.lifeness = tc.lifeness;
236 } 252 }
237 253
254
238 private void run() { 255 private void run() {
239 if (hasRun) { 256 if (hasRun) {
240 throw new AssertionError("same task ran twice"); 257 throw new AssertionError("same task ran twice");
@@ -242,14 +259,18 @@ public class Scheduler {
242 if (isCanceled) { 259 if (isCanceled) {
243 return; 260 return;
244 } 261 }
245 TaskConfiguration old = activeTask; 262 TaskIdentifier old = activeTask;
246 activeTask = this; 263 activeTask = this;
247 task.run(ctx); 264 task.run(ctx);
248 hasRun = true; 265 hasRun = true;
249 activeTask = old; 266 activeTask = old;
250 } 267 }
251 268
269 @Override
252 public void cancel() { 270 public void cancel() {
271 if (hasRun) {
272 throw new AssertionError("can't cancel task that already ran");
273 }
253 if (isCanceled) { 274 if (isCanceled) {
254 throw new AssertionError("task canceled twice"); 275 throw new AssertionError("task canceled twice");
255 } 276 }
@@ -257,55 +278,59 @@ public class Scheduler {
257 pending.remove(this); 278 pending.remove(this);
258 } 279 }
259 280
260 public Cancelable schedule() { 281 public void deregister() {
261 lock.lock(); 282 if (subscriptions != null) {
262 if (this.deadline == null) 283 subscriptions.stop(this);
263 throw new AssertionError("a task without deadline may not be scheduled");
264 if (priority == null) {
265 if (activeTask != null) {
266 priority = activeTask.priority;
267 } else {
268 priority = Priority.DEFAULT;
269 }
270 } 284 }
271 pending.add(this);
272 lock.unlock();
273 return this;
274 } 285 }
286 }
275 287
276 private void deregister() { 288 /**
277 if (eventChannels == null) { 289 * A TaskConfiguration contains all information to schedule a task.
278 return; 290 */
279 } 291 public static class TaskConfiguration {
280 lock.lock(); 292 private final Task task;
281 for (int i = 0; i < eventChannels.size(); ++i) { 293 private boolean lifeness = true;
282 SelectionKey key = eventChannels.get(i).keyFor(selector); 294 private Priority priority;
283 TaskConfiguration[] subscribers = (TaskConfiguration[]) key.attachment(); 295 private final AbsoluteTime deadline;
284 int interestOp = eventToInterestOp[eventTypes.get(i)];
285 if (subscribers[eventTypes.get(i)] == null || (key.interestOps() | interestOp) == 0) {
286 throw new AssertionError("deregistering event that has not been registered");
287 }
288 subscribers[eventTypes.get(i)] = null;
289 key.interestOps(key.interestOps() & (~interestOp));
290 }
291 296
292 lock.unlock(); 297 private Subscriptions subscriptions;
293 }
294 298
295 public void selectRead(SelectableChannel channel) { 299 /**
296 addChannelEvent(channel, EVENT_READ); 300 * Create a TaskIdentifier.
301 *
302 * @param delay when will the task be run?
303 * may be null to indicate that this task may not be run
304 * (but only queued directly)
305 * @param task task to run with this TaskIdentifier
306 */
307 TaskConfiguration(RelativeTime delay, Task task) {
308 if (delay == null)
309 throw new AssertionError("task delay may not be 'null'");
310 this.task = task;
311 this.deadline = delay.toAbsolute();
297 } 312 }
298 313
299 public void selectWrite(SelectableChannel channel) { 314 public TaskIdentifier schedule() {
300 addChannelEvent(channel, EVENT_WRITE);
301 }
302 315
303 public void selectConnect(SelectableChannel channel) { 316 if (priority == null) {
304 addChannelEvent(channel, EVENT_CONNECT); 317 if (activeTask != null) {
318 priority = activeTask.priority;
319 } else {
320 priority = Priority.DEFAULT;
321 }
322 }
323 TaskIdentifier tid = new TaskIdentifier(this);
324 if (subscriptions != null)
325 subscriptions.apply(tid);
326 pending.add(tid);
327 return tid;
305 } 328 }
306 329
307 public void selectAccept(SelectableChannel channel) { 330 public void addSelectEvent(SelectableChannel channel, int event) {
308 addChannelEvent(channel, EVENT_ACCEPT); 331 if (subscriptions == null)
332 subscriptions = new Subscriptions();
333 subscriptions.add(channel, event);
309 } 334 }
310 } 335 }
311 336
@@ -314,14 +339,7 @@ public class Scheduler {
314 * the same priority. 339 * the same priority.
315 */ 340 */
316 public static void addContinuation(Task task, EnumSet<Reason> reasons) { 341 public static void addContinuation(Task task, EnumSet<Reason> reasons) {
317 lock.lock(); 342 queueReady(new TaskIdentifier(task, reasons));
318 TaskConfiguration t = new TaskConfiguration(null, task);
319 t.ctx.reasons = reasons;
320 t.priority = Priority.DEFAULT;
321 queueReady(t);
322 logger.debug("about to wake up");
323 lock.unlock();
324 selector.wakeup();
325 } 343 }
326 344
327 /** 345 /**
@@ -343,26 +361,41 @@ public class Scheduler {
343 * @param task the task to run after delay 361 * @param task the task to run after delay
344 * @return the TaskIdentifier, can be used to cancel the task until it has been executed. 362 * @return the TaskIdentifier, can be used to cancel the task until it has been executed.
345 */ 363 */
346 public static TaskConfiguration addDelayed(RelativeTime delay, Task task) { 364 public static TaskIdentifier addDelayed(RelativeTime delay, Task task) {
347 TaskConfiguration tid = new TaskConfiguration(delay, task); 365 TaskConfiguration tid = new TaskConfiguration(delay, task);
348 tid.schedule(); 366 return tid.schedule();
349 return tid;
350 } 367 }
351 368
352 public static TaskConfiguration addRead(RelativeTime timeout, 369 /**
353 SelectableChannel chan, Task task) { 370 * Add a task to run after the specified delay, or after the given channel
371 * is ready to read, whichever occurs first.
372 *
373 * @param timeout time to wait until running the task
374 * @param chan chennel of interest
375 * @param task task to run
376 * @return task identifier
377 */
378 public static TaskIdentifier addRead(RelativeTime timeout,
379 SelectableChannel chan, Task task) {
354 TaskConfiguration tid = new TaskConfiguration(timeout, task); 380 TaskConfiguration tid = new TaskConfiguration(timeout, task);
355 tid.addChannelEvent(chan, EVENT_READ); 381 tid.addSelectEvent(chan, SelectionKey.OP_READ);
356 tid.schedule(); 382 return tid.schedule();
357 return tid;
358 } 383 }
359 384
360 public static TaskConfiguration addWrite(RelativeTime timeout, 385 /**
361 SelectableChannel chan, Task task) { 386 * Add a task to run after the specified delay, or after the given channel
387 * is ready to write, whichever occurs first.
388 *
389 * @param timeout to wait until running the task
390 * @param chan channel of interest
391 * @param task task to run
392 * @return task identifier
393 */
394 public static TaskIdentifier addWrite(RelativeTime timeout,
395 SelectableChannel chan, Task task) {
362 TaskConfiguration tid = new TaskConfiguration(timeout, task); 396 TaskConfiguration tid = new TaskConfiguration(timeout, task);
363 tid.addChannelEvent(chan, EVENT_WRITE); 397 tid.addSelectEvent(chan, SelectionKey.OP_WRITE);
364 tid.schedule(); 398 return tid.schedule();
365 return tid;
366 } 399 }
367 400
368 /** 401 /**
@@ -372,14 +405,11 @@ public class Scheduler {
372 * @return true to continue the main loop, false to exit 405 * @return true to continue the main loop, false to exit
373 */ 406 */
374 private static boolean checkLiveness() { 407 private static boolean checkLiveness() {
375 lock.lock();
376 if (readyCount > 0) { 408 if (readyCount > 0) {
377 lock.unlock();
378 return true; 409 return true;
379 } 410 }
380 for (TaskConfiguration t : pending) { 411 for (TaskIdentifier t : pending) {
381 if (t.lifeness) { 412 if (t.lifeness) {
382 lock.unlock();
383 return true; 413 return true;
384 } 414 }
385 } 415 }
@@ -387,30 +417,23 @@ public class Scheduler {
387 if (!pending.isEmpty()) { 417 if (!pending.isEmpty()) {
388 logger.debug("tasks pending but not alive -- disconnect"); 418 logger.debug("tasks pending but not alive -- disconnect");
389 shutdown(); 419 shutdown();
390 lock.unlock();
391 return true; 420 return true;
392 } 421 }
393
394 lock.unlock();
395 return false; 422 return false;
396 } 423 }
397 424
398
399 /** 425 /**
400 * Queue a Task for execution. 426 * Queue a Task for execution.
401 * 427 *
402 * @param tid TaskIdentifier of the ready task 428 * @param tid TaskIdentifier of the ready task
403 */ 429 */
404 private static void queueReady(TaskConfiguration tid) { 430 private static void queueReady(TaskIdentifier tid) {
405 lock.lock();
406 int idx = tid.priority.ordinal(); 431 int idx = tid.priority.ordinal();
407 readyLists[idx].add(tid); 432 readyLists[idx].add(tid);
408 readyCount++; 433 readyCount++;
409 pending.remove(tid); 434 pending.remove(tid);
410 lock.unlock();
411 } 435 }
412 436
413
414 /** 437 /**
415 * Queue all tasks with expired timeout. 438 * Queue all tasks with expired timeout.
416 * 439 *
@@ -418,11 +441,10 @@ public class Scheduler {
418 */ 441 */
419 private static RelativeTime handleTimeouts() { 442 private static RelativeTime handleTimeouts() {
420 RelativeTime timeout = RelativeTime.FOREVER; 443 RelativeTime timeout = RelativeTime.FOREVER;
421 lock.lock();
422 444
423 // check if any timeouts occurred 445 // check if any timeouts occurred
424 while (true) { 446 while (true) {
425 TaskConfiguration t = pending.peek(); 447 TaskIdentifier t = pending.peek();
426 if (t == null) { 448 if (t == null) {
427 break; 449 break;
428 } 450 }
@@ -436,25 +458,19 @@ public class Scheduler {
436 break; 458 break;
437 } 459 }
438 } 460 }
439 lock.unlock();
440 return timeout; 461 return timeout;
441 } 462 }
442 463
443 /** 464
444 * If there is a subscribing task for the given event type, add it to the set of executable tasks. 465 private static void addReasonsFromInterestOp(EnumSet<Reason> reasons, int interestOps) {
445 * 466 if ((interestOps & SelectionKey.OP_READ) != 0)
446 * @param executableTasks set of executable tasks 467 reasons.add(Reason.READ_READY);
447 * @param subscribers subscriber set, one subscriber for each event type 468 if ((interestOps & SelectionKey.OP_WRITE) != 0)
448 * @param eventType event type we are interested in 469 reasons.add(Reason.WRITE_READY);
449 */ 470 if ((interestOps & SelectionKey.OP_CONNECT) != 0)
450 private static void addSubscriberTask(Collection<TaskConfiguration> executableTasks, 471 reasons.add(Reason.CONNECT_READY);
451 TaskConfiguration[] subscribers, int eventType) { 472 if ((interestOps & SelectionKey.OP_ACCEPT) != 0)
452 TaskConfiguration tc = subscribers[eventType]; 473 reasons.add(Reason.ACCEPT_READY);
453 if (tc == null) {
454 return;
455 }
456 executableTasks.add(tc);
457 tc.ctx.reasons.add(eventToReason[eventType]);
458 } 474 }
459 475
460 /** 476 /**
@@ -463,10 +479,8 @@ public class Scheduler {
463 * @param timeout timeout for select 479 * @param timeout timeout for select
464 */ 480 */
465 private static void handleSelect(RelativeTime timeout) { 481 private static void handleSelect(RelativeTime timeout) {
466 if (!lock.isHeldByCurrentThread()) 482 // gnunet-java uses microseconds, but the select api uses milliseconds
467 throw new AssertionError();
468 long timeout_ms = timeout.getMicroseconds() / 1000; 483 long timeout_ms = timeout.getMicroseconds() / 1000;
469 lock.unlock();
470 try { 484 try {
471 // selector.select(0) would block indefinitely (counter-intuitive, java's fault) 485 // selector.select(0) would block indefinitely (counter-intuitive, java's fault)
472 if (timeout_ms == 0) { 486 if (timeout_ms == 0) {
@@ -480,39 +494,29 @@ public class Scheduler {
480 } 494 }
481 } catch (IOException e) { 495 } catch (IOException e) {
482 throw new IOError(e); 496 throw new IOError(e);
483 } finally {
484 lock.lock();
485 } 497 }
486 498
487 logger.debug("select over"); 499 logger.debug("select over");
488 500
489 // we use a set so that we don't execute any task twice 501 // we use a set so that we don't execute any task twice
490 Collection<TaskConfiguration> executableTasks = new HashSet<TaskConfiguration>(); 502 Collection<TaskIdentifier> executableTasks = new HashSet<TaskIdentifier>();
491 for (SelectionKey sk : selector.selectedKeys()) { 503 for (SelectionKey sk : selector.selectedKeys()) {
492 TaskConfiguration[] subscribers = (TaskConfiguration[]) sk.attachment(); 504 @SuppressWarnings("unchecked")
493 505 LinkedList<TaskInterestOps> subscribers = (LinkedList<TaskInterestOps>) sk.attachment();
494 if (sk.isReadable()) { 506 for (TaskInterestOps ops : subscribers) {
495 addSubscriberTask(executableTasks, subscribers, EVENT_READ); 507 if ((sk.readyOps() & ops.interestOps) != 0) {
496 } 508 executableTasks.add(ops.tid);
497 if (sk.isWritable()) { 509 addReasonsFromInterestOp(ops.tid.ctx.reasons, sk.readyOps() & ops.interestOps);
498 addSubscriberTask(executableTasks, subscribers, EVENT_WRITE); 510 }
499 }
500 if (sk.isAcceptable()) {
501 addSubscriberTask(executableTasks, subscribers, EVENT_ACCEPT);
502 }
503 if (sk.isConnectable()) {
504 addSubscriberTask(executableTasks, subscribers, EVENT_CONNECT);
505 } 511 }
506
507 } 512 }
508 for (TaskConfiguration tt : executableTasks) { 513 for (TaskIdentifier tt : executableTasks) {
509 // cancel subscriptions to other events, we can execute now! 514 // cancel subscriptions to other events, we can execute now!
510 tt.deregister(); 515 tt.deregister();
511 queueReady(tt); 516 queueReady(tt);
512 } 517 }
513 } 518 }
514 519
515
516 /** 520 /**
517 * Initialize and run scheduler. This function will return when all tasks 521 * Initialize and run scheduler. This function will return when all tasks
518 * have completed. 522 * have completed.
@@ -529,10 +533,10 @@ public class Scheduler {
529 */ 533 */
530 public static void run(Task initialTask) { 534 public static void run(Task initialTask) {
531 logger.debug("running scheduler"); 535 logger.debug("running scheduler");
532 if (scheduler_running) { 536 if (schedulerRunning) {
533 throw new AssertionError("Scheduler already running"); 537 throw new AssertionError("Scheduler already running");
534 } 538 }
535 scheduler_running = true; 539 schedulerRunning = true;
536 try { 540 try {
537 run_unchecked(initialTask); 541 run_unchecked(initialTask);
538 } finally { 542 } finally {
@@ -543,7 +547,6 @@ public class Scheduler {
543 } 547 }
544 } 548 }
545 549
546
547 /** 550 /**
548 * Initialize and run scheduler. This function will return when all tasks 551 * Initialize and run scheduler. This function will return when all tasks
549 * have completed. Don't check if the scheduler is already running or not. 552 * have completed. Don't check if the scheduler is already running or not.
@@ -556,12 +559,7 @@ public class Scheduler {
556 } 559 }
557 560
558 // the gnunet main loop 561 // the gnunet main loop
559 while (true) { 562 while (checkLiveness()) {
560 lock.lock();
561 if (checkLiveness() == false) {
562 lock.unlock();
563 break;
564 }
565 RelativeTime nextTimeout = handleTimeouts(); 563 RelativeTime nextTimeout = handleTimeouts();
566 if (nextTimeout.getMicroseconds() < 0) { 564 if (nextTimeout.getMicroseconds() < 0) {
567 logger.warn("negative timeout for select"); 565 logger.warn("negative timeout for select");
@@ -569,7 +567,6 @@ public class Scheduler {
569 567
570 // don't select if there are no tasks; we are done! 568 // don't select if there are no tasks; we are done!
571 if (readyCount == 0 && pending.isEmpty()) { 569 if (readyCount == 0 && pending.isEmpty()) {
572 lock.unlock();
573 return; 570 return;
574 } 571 }
575 572
@@ -580,7 +577,6 @@ public class Scheduler {
580 handleSelect(nextTimeout); 577 handleSelect(nextTimeout);
581 } 578 }
582 runReady(); 579 runReady();
583 lock.unlock();
584 } 580 }
585 581
586 if (readyCount != 0) { 582 if (readyCount != 0) {
@@ -618,9 +614,9 @@ public class Scheduler {
618 // start executing from the highest priority down to 0 614 // start executing from the highest priority down to 0
619 for (int p = Priority.numberOfPriorities - 1; p >= 0; p--) { 615 for (int p = Priority.numberOfPriorities - 1; p >= 0; p--) {
620 // execute all tasks with priority p 616 // execute all tasks with priority p
621 LinkedList<TaskConfiguration> queue = readyLists[p]; 617 LinkedList<TaskIdentifier> queue = readyLists[p];
622 while (!queue.isEmpty()) { 618 while (!queue.isEmpty()) {
623 TaskConfiguration tid = queue.removeFirst(); 619 TaskIdentifier tid = queue.removeFirst();
624 readyCount--; 620 readyCount--;
625 tid.run(); 621 tid.run();
626 } 622 }
@@ -637,13 +633,27 @@ public class Scheduler {
637 */ 633 */
638 public static void shutdown() { 634 public static void shutdown() {
639 // queueReady() while iterating would yield concurrent modification exn otherwise 635 // queueReady() while iterating would yield concurrent modification exn otherwise
640 for (TaskConfiguration tid : new ArrayList<TaskConfiguration>(pending)) { 636 for (TaskIdentifier tid : new ArrayList<TaskIdentifier>(pending)) {
641 tid.ctx.reasons.add(Reason.SHUTDOWN); 637 tid.ctx.reasons.add(Reason.SHUTDOWN);
642 queueReady(tid); 638 queueReady(tid);
643 } 639 }
644 pending.clear(); 640 pending.clear();
645 } 641 }
646 642
643 /**
644 * Reset the scheduler forcefully.
645 * Intended to be used internally in the Scheduler, as well as in test teardown.
646 */
647 public static void forceReset() {
648 schedulerRunning = false;
649 readyCount = 0;
650 activeTask = null;
651 for (int i = 0; i < Priority.numberOfPriorities; ++i) {
652 readyLists[i] = Lists.newLinkedList();
653 }
654 pending.clear();
655 }
656
647 657
648 /** 658 /**
649 * A handle to a file system object that can be selected on. 659 * A handle to a file system object that can be selected on.