diff options
Diffstat (limited to 'src/main/java/org/gnunet/util/Scheduler.java')
-rw-r--r-- | src/main/java/org/gnunet/util/Scheduler.java | 466 |
1 files changed, 238 insertions, 228 deletions
diff --git a/src/main/java/org/gnunet/util/Scheduler.java b/src/main/java/org/gnunet/util/Scheduler.java index f5eee8c..1f8d9dc 100644 --- a/src/main/java/org/gnunet/util/Scheduler.java +++ b/src/main/java/org/gnunet/util/Scheduler.java | |||
@@ -20,6 +20,7 @@ | |||
20 | 20 | ||
21 | package org.gnunet.util; | 21 | package org.gnunet.util; |
22 | 22 | ||
23 | import com.google.common.collect.Lists; | ||
23 | import org.slf4j.Logger; | 24 | import org.slf4j.Logger; |
24 | import org.slf4j.LoggerFactory; | 25 | import org.slf4j.LoggerFactory; |
25 | 26 | ||
@@ -28,10 +29,13 @@ import java.nio.ByteBuffer; | |||
28 | import java.nio.channels.*; | 29 | import java.nio.channels.*; |
29 | import java.nio.channels.spi.SelectorProvider; | 30 | import java.nio.channels.spi.SelectorProvider; |
30 | import java.util.*; | 31 | import java.util.*; |
31 | import java.util.concurrent.locks.ReentrantLock; | ||
32 | 32 | ||
33 | /** | 33 | /** |
34 | * Schedule computations using continuation passing style. | 34 | * Schedule computations using continuation passing style. |
35 | * <p/> | ||
36 | * By design, it is not safe to scheduler tasks from different threads. | ||
37 | * When threads can't be avoided, they should communicate with the scheduler by | ||
38 | * a pipe. | ||
35 | * | 39 | * |
36 | * @author Florian Dold | 40 | * @author Florian Dold |
37 | */ | 41 | */ |
@@ -39,12 +43,10 @@ public class Scheduler { | |||
39 | private static final Logger logger = LoggerFactory | 43 | private static final Logger logger = LoggerFactory |
40 | .getLogger(Scheduler.class); | 44 | .getLogger(Scheduler.class); |
41 | 45 | ||
42 | private static ReentrantLock lock = new ReentrantLock(true); | ||
43 | |||
44 | /** | 46 | /** |
45 | * Task that we are currently executing, or null if no task is currently running. | 47 | * Task that we are currently executing, or null if no task is currently running. |
46 | */ | 48 | */ |
47 | private static TaskConfiguration activeTask = null; | 49 | private static TaskIdentifier activeTask = null; |
48 | 50 | ||
49 | /** | 51 | /** |
50 | * Number of tasks in the ready lists, that is, number of tasks that is ready to run | 52 | * Number of tasks in the ready lists, that is, number of tasks that is ready to run |
@@ -69,21 +71,14 @@ public class Scheduler { | |||
69 | * For every priority, there is a list of tasks that is definitely ready to run. | 71 | * For every priority, there is a list of tasks that is definitely ready to run. |
70 | */ | 72 | */ |
71 | @SuppressWarnings("unchecked") | 73 | @SuppressWarnings("unchecked") |
72 | final private static LinkedList<TaskConfiguration>[] readyLists = new LinkedList[Priority.numberOfPriorities]; | 74 | final private static LinkedList<TaskIdentifier>[] readyLists = new LinkedList[Priority.numberOfPriorities]; |
73 | 75 | ||
74 | static { | 76 | static { |
75 | for (int i = 0; i < Priority.numberOfPriorities; ++i) { | 77 | for (int i = 0; i < Priority.numberOfPriorities; ++i) { |
76 | readyLists[i] = new LinkedList<TaskConfiguration>(); | 78 | readyLists[i] = new LinkedList<TaskIdentifier>(); |
77 | } | 79 | } |
78 | } | 80 | } |
79 | 81 | ||
80 | private static final int EVENT_READ = 0, EVENT_WRITE = 1, EVENT_ACCEPT = 2, EVENT_CONNECT = 3; | ||
81 | private static final int[] eventToInterestOp = new int[]{SelectionKey.OP_READ, SelectionKey.OP_WRITE, | ||
82 | SelectionKey.OP_ACCEPT, SelectionKey.OP_CONNECT}; | ||
83 | private static final Reason[] eventToReason = new Reason[]{Reason.READ_READY, Reason.WRITE_READY, | ||
84 | Reason.ACCEPT_READY, Reason.CONNECT_READY}; | ||
85 | |||
86 | |||
87 | /** | 82 | /** |
88 | * Selector, used to check file descriptors for readiness. | 83 | * Selector, used to check file descriptors for readiness. |
89 | */ | 84 | */ |
@@ -102,37 +97,20 @@ public class Scheduler { | |||
102 | /** | 97 | /** |
103 | * true iff the scheduler is currently running. | 98 | * true iff the scheduler is currently running. |
104 | */ | 99 | */ |
105 | private static boolean scheduler_running = false; | 100 | private static boolean schedulerRunning = false; |
106 | |||
107 | 101 | ||
108 | /** | 102 | /** |
109 | * Pending tasks are waiting for an event. Each pending task has a (possibly infinitely long) | 103 | * Pending tasks are waiting for an event. Each pending task has a (possibly infinitely long) |
110 | * deadline after which the task is executed regardless of the prerequisites. | 104 | * deadline after which the task is executed regardless of the prerequisites. |
111 | */ | 105 | */ |
112 | final private static Queue<TaskConfiguration> pending = new PriorityQueue<TaskConfiguration>(5, new Comparator | 106 | final private static Queue<TaskIdentifier> pending = new PriorityQueue<TaskIdentifier>(5, new Comparator |
113 | <TaskConfiguration>() { | 107 | <TaskIdentifier>() { |
114 | @Override | 108 | @Override |
115 | public int compare(TaskConfiguration a, TaskConfiguration b) { | 109 | public int compare(TaskIdentifier a, TaskIdentifier b) { |
116 | return a.deadline.compareTo(b.deadline); | 110 | return a.deadline.compareTo(b.deadline); |
117 | } | 111 | } |
118 | }); | 112 | }); |
119 | 113 | ||
120 | |||
121 | /** | ||
122 | * Reset the scheduler forcefully. | ||
123 | * Intended to be used internally in the Scheduler, as well as in test teardown. | ||
124 | */ | ||
125 | public static void forceReset() { | ||
126 | scheduler_running = false; | ||
127 | readyCount = 0; | ||
128 | activeTask = null; | ||
129 | for (int i = 0; i < Priority.numberOfPriorities; ++i) { | ||
130 | readyLists[i] = new LinkedList<TaskConfiguration>(); | ||
131 | } | ||
132 | pending.clear(); | ||
133 | } | ||
134 | |||
135 | |||
136 | /** | 114 | /** |
137 | * Reasons for executing a task. | 115 | * Reasons for executing a task. |
138 | */ | 116 | */ |
@@ -147,94 +125,133 @@ public class Scheduler { | |||
147 | /** | 125 | /** |
148 | * The reason this task has been called by the scheduler. | 126 | * The reason this task has been called by the scheduler. |
149 | */ | 127 | */ |
150 | public Set<Reason> reasons = EnumSet.noneOf(Reason.class); | 128 | public EnumSet<Reason> reasons = EnumSet.noneOf(Reason.class); |
151 | } | 129 | } |
152 | 130 | ||
153 | /** | 131 | /** |
154 | * A task is the basic unit of work that is managed by the scheduler. | 132 | * Which operations is a task identifier interested in? |
155 | */ | 133 | */ |
156 | public static interface Task { | 134 | private static class TaskInterestOps { |
157 | public void run(RunContext ctx); | 135 | TaskIdentifier tid; |
136 | int interestOps; | ||
158 | } | 137 | } |
159 | 138 | ||
160 | /** | 139 | /** |
161 | * A TaskConfiguration represents a Task that will execute or has already been executed. | 140 | * Manage subscriptions for selection events on channels. |
162 | */ | 141 | */ |
163 | public static class TaskConfiguration implements Cancelable { | 142 | private static class Subscriptions { |
164 | private final Task task; | 143 | private static class ChannelInterest { |
165 | private RunContext ctx = new RunContext(); | 144 | SelectableChannel channel; |
166 | private boolean lifeness = true; | 145 | int interestOps; |
167 | private Priority priority; | ||
168 | private final AbsoluteTime deadline; | ||
169 | |||
170 | private ArrayList<SelectableChannel> eventChannels = null; | ||
171 | private ArrayList<Integer> eventTypes = null; | ||
172 | |||
173 | private boolean hasRun = false; | ||
174 | private boolean isCanceled = false; | ||
175 | |||
176 | /** | ||
177 | * Create a TaskIdentifier. | ||
178 | * | ||
179 | * @param delay when will the task be run? | ||
180 | * may be null to indicate that this task may not be run | ||
181 | * (but only queued directly) | ||
182 | * @param task task to run with this TaskIdentifier | ||
183 | */ | ||
184 | TaskConfiguration(RelativeTime delay, Task task) { | ||
185 | this.task = task; | ||
186 | if (delay == null) | ||
187 | this.deadline = null; | ||
188 | else | ||
189 | this.deadline = delay.toAbsolute(); | ||
190 | } | 146 | } |
191 | 147 | ||
192 | /** | 148 | List<ChannelInterest> channelInterests = Lists.newLinkedList(); |
193 | * Register the task configuration to run once the given event completed. | 149 | |
194 | * | 150 | void add(SelectableChannel channel, int interestOps) { |
195 | * @param channel channel that we wait for an event on | 151 | boolean found = false; |
196 | * @param eventType the event we wait on | 152 | for (ChannelInterest ci : channelInterests) { |
197 | */ | 153 | if (ci.channel == channel) { |
198 | private void addChannelEvent(SelectableChannel channel, int eventType) { | 154 | ci.interestOps |= interestOps; |
199 | if (channel == null) { | 155 | if ((ci.interestOps | SelectionKey.OP_CONNECT | SelectionKey.OP_READ) != 0) { |
200 | throw new AssertionError("channel must be non-null"); | 156 | throw new AssertionError("OP_CONNECT and OP_READ are incompatible in java"); |
157 | } | ||
158 | found = true; | ||
159 | break; | ||
160 | } | ||
201 | } | 161 | } |
202 | if (eventChannels == null) { | 162 | if (!found) { |
203 | eventChannels = new ArrayList<SelectableChannel>(); | 163 | ChannelInterest ci = new ChannelInterest(); |
204 | eventTypes = new ArrayList<Integer>(); | 164 | ci.channel = channel; |
165 | ci.interestOps = interestOps; | ||
166 | channelInterests.add(ci); | ||
205 | } | 167 | } |
206 | eventChannels.add(channel); | 168 | } |
207 | eventTypes.add(eventType); | ||
208 | 169 | ||
209 | int interestOp = eventToInterestOp[eventType]; | 170 | void apply(TaskIdentifier tid) { |
171 | for (ChannelInterest ci : channelInterests) { | ||
172 | SelectionKey key = ci.channel.keyFor(selector); | ||
173 | if (key == null || !key.isValid()) { | ||
174 | try { | ||
175 | key = ci.channel.register(selector, ci.interestOps); | ||
176 | key.attach(new LinkedList()); | ||
177 | } catch (ClosedChannelException e) { | ||
178 | throw new IOError(e); | ||
179 | } | ||
180 | } else { | ||
181 | key.interestOps(key.interestOps() | ci.interestOps); | ||
182 | } | ||
183 | @SuppressWarnings("unchecked") | ||
184 | LinkedList<TaskInterestOps> opl = (LinkedList<TaskInterestOps>) key.attachment(); | ||
185 | TaskInterestOps tio = new TaskInterestOps(); | ||
186 | tio.tid = tid; | ||
187 | tio.interestOps = ci.interestOps; | ||
188 | opl.add(tio); | ||
189 | } | ||
190 | } | ||
210 | 191 | ||
211 | SelectionKey key = channel.keyFor(selector); | 192 | void stop(TaskIdentifier tid) { |
212 | if (key == null || !key.isValid()) { | 193 | for (ChannelInterest ci : channelInterests) { |
213 | try { | 194 | SelectionKey key = ci.channel.keyFor(selector); |
214 | // tread safety ... avoid deadlock | 195 | if (key == null || !key.isValid()) { |
215 | selector.wakeup(); | 196 | logger.warn("missing selection key"); |
216 | key = channel.register(selector, interestOp, new TaskConfiguration[4]); | 197 | return; |
217 | } catch (ClosedChannelException e) { | ||
218 | throw new IOError(e); | ||
219 | } | 198 | } |
220 | } else { | 199 | @SuppressWarnings("unchecked") |
221 | if ((key.interestOps() & interestOp) != 0) { | 200 | LinkedList<TaskInterestOps> interestList = (LinkedList<TaskInterestOps>) key.attachment(); |
222 | throw new AssertionError("interest op registered twice"); | 201 | Iterator<TaskInterestOps> it = interestList.iterator(); |
202 | int remainingInterestOps = 0; | ||
203 | while (it.hasNext()) { | ||
204 | TaskInterestOps ops = it.next(); | ||
205 | if (ops.tid == tid) { | ||
206 | it.remove(); | ||
207 | } else { | ||
208 | remainingInterestOps |= ops.interestOps; | ||
209 | } | ||
223 | } | 210 | } |
224 | key.interestOps(key.interestOps() | interestOp); | 211 | key.interestOps(remainingInterestOps); |
225 | } | 212 | } |
213 | } | ||
214 | } | ||
226 | 215 | ||
227 | TaskConfiguration[] subscribers = (TaskConfiguration[]) key.attachment(); | 216 | /** |
228 | if (subscribers[eventType] != null) { | 217 | * A task is the basic unit of work that is managed by the scheduler. |
229 | throw new AssertionError("subscriber registered twice"); | 218 | */ |
230 | } | 219 | public static interface Task { |
231 | subscribers[eventType] = this; | 220 | public void run(RunContext ctx); |
221 | } | ||
232 | 222 | ||
233 | if (subscribers[EVENT_CONNECT] != null && subscribers[EVENT_READ] != null) { | 223 | /** |
234 | throw new AssertionError("OP_CONNECT and OP_READ are incompatible in java"); | 224 | * Representation of a task that has been scheduled, and can be canceled |
235 | } | 225 | * until the task has run. |
226 | */ | ||
227 | public static class TaskIdentifier implements Cancelable { | ||
228 | private boolean hasRun = false; | ||
229 | private boolean isCanceled = false; | ||
230 | private final Task task; | ||
231 | private final RunContext ctx = new RunContext(); | ||
232 | private final boolean lifeness; | ||
233 | private final Priority priority; | ||
234 | private final AbsoluteTime deadline; | ||
235 | private final Subscriptions subscriptions; | ||
236 | |||
237 | public TaskIdentifier(Task task, EnumSet<Reason> reasons) { | ||
238 | this.ctx.reasons = reasons; | ||
239 | this.task = task; | ||
240 | lifeness = true; | ||
241 | priority = Priority.DEFAULT; | ||
242 | deadline = null; | ||
243 | subscriptions = null; | ||
244 | } | ||
245 | |||
246 | public TaskIdentifier(TaskConfiguration tc) { | ||
247 | this.task = tc.task; | ||
248 | this.subscriptions = tc.subscriptions; | ||
249 | this.deadline = tc.deadline; | ||
250 | this.priority = tc.priority; | ||
251 | this.lifeness = tc.lifeness; | ||
236 | } | 252 | } |
237 | 253 | ||
254 | |||
238 | private void run() { | 255 | private void run() { |
239 | if (hasRun) { | 256 | if (hasRun) { |
240 | throw new AssertionError("same task ran twice"); | 257 | throw new AssertionError("same task ran twice"); |
@@ -242,14 +259,18 @@ public class Scheduler { | |||
242 | if (isCanceled) { | 259 | if (isCanceled) { |
243 | return; | 260 | return; |
244 | } | 261 | } |
245 | TaskConfiguration old = activeTask; | 262 | TaskIdentifier old = activeTask; |
246 | activeTask = this; | 263 | activeTask = this; |
247 | task.run(ctx); | 264 | task.run(ctx); |
248 | hasRun = true; | 265 | hasRun = true; |
249 | activeTask = old; | 266 | activeTask = old; |
250 | } | 267 | } |
251 | 268 | ||
269 | @Override | ||
252 | public void cancel() { | 270 | public void cancel() { |
271 | if (hasRun) { | ||
272 | throw new AssertionError("can't cancel task that already ran"); | ||
273 | } | ||
253 | if (isCanceled) { | 274 | if (isCanceled) { |
254 | throw new AssertionError("task canceled twice"); | 275 | throw new AssertionError("task canceled twice"); |
255 | } | 276 | } |
@@ -257,55 +278,59 @@ public class Scheduler { | |||
257 | pending.remove(this); | 278 | pending.remove(this); |
258 | } | 279 | } |
259 | 280 | ||
260 | public Cancelable schedule() { | 281 | public void deregister() { |
261 | lock.lock(); | 282 | if (subscriptions != null) { |
262 | if (this.deadline == null) | 283 | subscriptions.stop(this); |
263 | throw new AssertionError("a task without deadline may not be scheduled"); | ||
264 | if (priority == null) { | ||
265 | if (activeTask != null) { | ||
266 | priority = activeTask.priority; | ||
267 | } else { | ||
268 | priority = Priority.DEFAULT; | ||
269 | } | ||
270 | } | 284 | } |
271 | pending.add(this); | ||
272 | lock.unlock(); | ||
273 | return this; | ||
274 | } | 285 | } |
286 | } | ||
275 | 287 | ||
276 | private void deregister() { | 288 | /** |
277 | if (eventChannels == null) { | 289 | * A TaskConfiguration contains all information to schedule a task. |
278 | return; | 290 | */ |
279 | } | 291 | public static class TaskConfiguration { |
280 | lock.lock(); | 292 | private final Task task; |
281 | for (int i = 0; i < eventChannels.size(); ++i) { | 293 | private boolean lifeness = true; |
282 | SelectionKey key = eventChannels.get(i).keyFor(selector); | 294 | private Priority priority; |
283 | TaskConfiguration[] subscribers = (TaskConfiguration[]) key.attachment(); | 295 | private final AbsoluteTime deadline; |
284 | int interestOp = eventToInterestOp[eventTypes.get(i)]; | ||
285 | if (subscribers[eventTypes.get(i)] == null || (key.interestOps() | interestOp) == 0) { | ||
286 | throw new AssertionError("deregistering event that has not been registered"); | ||
287 | } | ||
288 | subscribers[eventTypes.get(i)] = null; | ||
289 | key.interestOps(key.interestOps() & (~interestOp)); | ||
290 | } | ||
291 | 296 | ||
292 | lock.unlock(); | 297 | private Subscriptions subscriptions; |
293 | } | ||
294 | 298 | ||
295 | public void selectRead(SelectableChannel channel) { | 299 | /** |
296 | addChannelEvent(channel, EVENT_READ); | 300 | * Create a TaskIdentifier. |
301 | * | ||
302 | * @param delay when will the task be run? | ||
303 | * may be null to indicate that this task may not be run | ||
304 | * (but only queued directly) | ||
305 | * @param task task to run with this TaskIdentifier | ||
306 | */ | ||
307 | TaskConfiguration(RelativeTime delay, Task task) { | ||
308 | if (delay == null) | ||
309 | throw new AssertionError("task delay may not be 'null'"); | ||
310 | this.task = task; | ||
311 | this.deadline = delay.toAbsolute(); | ||
297 | } | 312 | } |
298 | 313 | ||
299 | public void selectWrite(SelectableChannel channel) { | 314 | public TaskIdentifier schedule() { |
300 | addChannelEvent(channel, EVENT_WRITE); | ||
301 | } | ||
302 | 315 | ||
303 | public void selectConnect(SelectableChannel channel) { | 316 | if (priority == null) { |
304 | addChannelEvent(channel, EVENT_CONNECT); | 317 | if (activeTask != null) { |
318 | priority = activeTask.priority; | ||
319 | } else { | ||
320 | priority = Priority.DEFAULT; | ||
321 | } | ||
322 | } | ||
323 | TaskIdentifier tid = new TaskIdentifier(this); | ||
324 | if (subscriptions != null) | ||
325 | subscriptions.apply(tid); | ||
326 | pending.add(tid); | ||
327 | return tid; | ||
305 | } | 328 | } |
306 | 329 | ||
307 | public void selectAccept(SelectableChannel channel) { | 330 | public void addSelectEvent(SelectableChannel channel, int event) { |
308 | addChannelEvent(channel, EVENT_ACCEPT); | 331 | if (subscriptions == null) |
332 | subscriptions = new Subscriptions(); | ||
333 | subscriptions.add(channel, event); | ||
309 | } | 334 | } |
310 | } | 335 | } |
311 | 336 | ||
@@ -314,14 +339,7 @@ public class Scheduler { | |||
314 | * the same priority. | 339 | * the same priority. |
315 | */ | 340 | */ |
316 | public static void addContinuation(Task task, EnumSet<Reason> reasons) { | 341 | public static void addContinuation(Task task, EnumSet<Reason> reasons) { |
317 | lock.lock(); | 342 | queueReady(new TaskIdentifier(task, reasons)); |
318 | TaskConfiguration t = new TaskConfiguration(null, task); | ||
319 | t.ctx.reasons = reasons; | ||
320 | t.priority = Priority.DEFAULT; | ||
321 | queueReady(t); | ||
322 | logger.debug("about to wake up"); | ||
323 | lock.unlock(); | ||
324 | selector.wakeup(); | ||
325 | } | 343 | } |
326 | 344 | ||
327 | /** | 345 | /** |
@@ -343,26 +361,41 @@ public class Scheduler { | |||
343 | * @param task the task to run after delay | 361 | * @param task the task to run after delay |
344 | * @return the TaskIdentifier, can be used to cancel the task until it has been executed. | 362 | * @return the TaskIdentifier, can be used to cancel the task until it has been executed. |
345 | */ | 363 | */ |
346 | public static TaskConfiguration addDelayed(RelativeTime delay, Task task) { | 364 | public static TaskIdentifier addDelayed(RelativeTime delay, Task task) { |
347 | TaskConfiguration tid = new TaskConfiguration(delay, task); | 365 | TaskConfiguration tid = new TaskConfiguration(delay, task); |
348 | tid.schedule(); | 366 | return tid.schedule(); |
349 | return tid; | ||
350 | } | 367 | } |
351 | 368 | ||
352 | public static TaskConfiguration addRead(RelativeTime timeout, | 369 | /** |
353 | SelectableChannel chan, Task task) { | 370 | * Add a task to run after the specified delay, or after the given channel |
371 | * is ready to read, whichever occurs first. | ||
372 | * | ||
373 | * @param timeout time to wait until running the task | ||
374 | * @param chan chennel of interest | ||
375 | * @param task task to run | ||
376 | * @return task identifier | ||
377 | */ | ||
378 | public static TaskIdentifier addRead(RelativeTime timeout, | ||
379 | SelectableChannel chan, Task task) { | ||
354 | TaskConfiguration tid = new TaskConfiguration(timeout, task); | 380 | TaskConfiguration tid = new TaskConfiguration(timeout, task); |
355 | tid.addChannelEvent(chan, EVENT_READ); | 381 | tid.addSelectEvent(chan, SelectionKey.OP_READ); |
356 | tid.schedule(); | 382 | return tid.schedule(); |
357 | return tid; | ||
358 | } | 383 | } |
359 | 384 | ||
360 | public static TaskConfiguration addWrite(RelativeTime timeout, | 385 | /** |
361 | SelectableChannel chan, Task task) { | 386 | * Add a task to run after the specified delay, or after the given channel |
387 | * is ready to write, whichever occurs first. | ||
388 | * | ||
389 | * @param timeout to wait until running the task | ||
390 | * @param chan channel of interest | ||
391 | * @param task task to run | ||
392 | * @return task identifier | ||
393 | */ | ||
394 | public static TaskIdentifier addWrite(RelativeTime timeout, | ||
395 | SelectableChannel chan, Task task) { | ||
362 | TaskConfiguration tid = new TaskConfiguration(timeout, task); | 396 | TaskConfiguration tid = new TaskConfiguration(timeout, task); |
363 | tid.addChannelEvent(chan, EVENT_WRITE); | 397 | tid.addSelectEvent(chan, SelectionKey.OP_WRITE); |
364 | tid.schedule(); | 398 | return tid.schedule(); |
365 | return tid; | ||
366 | } | 399 | } |
367 | 400 | ||
368 | /** | 401 | /** |
@@ -372,14 +405,11 @@ public class Scheduler { | |||
372 | * @return true to continue the main loop, false to exit | 405 | * @return true to continue the main loop, false to exit |
373 | */ | 406 | */ |
374 | private static boolean checkLiveness() { | 407 | private static boolean checkLiveness() { |
375 | lock.lock(); | ||
376 | if (readyCount > 0) { | 408 | if (readyCount > 0) { |
377 | lock.unlock(); | ||
378 | return true; | 409 | return true; |
379 | } | 410 | } |
380 | for (TaskConfiguration t : pending) { | 411 | for (TaskIdentifier t : pending) { |
381 | if (t.lifeness) { | 412 | if (t.lifeness) { |
382 | lock.unlock(); | ||
383 | return true; | 413 | return true; |
384 | } | 414 | } |
385 | } | 415 | } |
@@ -387,30 +417,23 @@ public class Scheduler { | |||
387 | if (!pending.isEmpty()) { | 417 | if (!pending.isEmpty()) { |
388 | logger.debug("tasks pending but not alive -- disconnect"); | 418 | logger.debug("tasks pending but not alive -- disconnect"); |
389 | shutdown(); | 419 | shutdown(); |
390 | lock.unlock(); | ||
391 | return true; | 420 | return true; |
392 | } | 421 | } |
393 | |||
394 | lock.unlock(); | ||
395 | return false; | 422 | return false; |
396 | } | 423 | } |
397 | 424 | ||
398 | |||
399 | /** | 425 | /** |
400 | * Queue a Task for execution. | 426 | * Queue a Task for execution. |
401 | * | 427 | * |
402 | * @param tid TaskIdentifier of the ready task | 428 | * @param tid TaskIdentifier of the ready task |
403 | */ | 429 | */ |
404 | private static void queueReady(TaskConfiguration tid) { | 430 | private static void queueReady(TaskIdentifier tid) { |
405 | lock.lock(); | ||
406 | int idx = tid.priority.ordinal(); | 431 | int idx = tid.priority.ordinal(); |
407 | readyLists[idx].add(tid); | 432 | readyLists[idx].add(tid); |
408 | readyCount++; | 433 | readyCount++; |
409 | pending.remove(tid); | 434 | pending.remove(tid); |
410 | lock.unlock(); | ||
411 | } | 435 | } |
412 | 436 | ||
413 | |||
414 | /** | 437 | /** |
415 | * Queue all tasks with expired timeout. | 438 | * Queue all tasks with expired timeout. |
416 | * | 439 | * |
@@ -418,11 +441,10 @@ public class Scheduler { | |||
418 | */ | 441 | */ |
419 | private static RelativeTime handleTimeouts() { | 442 | private static RelativeTime handleTimeouts() { |
420 | RelativeTime timeout = RelativeTime.FOREVER; | 443 | RelativeTime timeout = RelativeTime.FOREVER; |
421 | lock.lock(); | ||
422 | 444 | ||
423 | // check if any timeouts occurred | 445 | // check if any timeouts occurred |
424 | while (true) { | 446 | while (true) { |
425 | TaskConfiguration t = pending.peek(); | 447 | TaskIdentifier t = pending.peek(); |
426 | if (t == null) { | 448 | if (t == null) { |
427 | break; | 449 | break; |
428 | } | 450 | } |
@@ -436,25 +458,19 @@ public class Scheduler { | |||
436 | break; | 458 | break; |
437 | } | 459 | } |
438 | } | 460 | } |
439 | lock.unlock(); | ||
440 | return timeout; | 461 | return timeout; |
441 | } | 462 | } |
442 | 463 | ||
443 | /** | 464 | |
444 | * If there is a subscribing task for the given event type, add it to the set of executable tasks. | 465 | private static void addReasonsFromInterestOp(EnumSet<Reason> reasons, int interestOps) { |
445 | * | 466 | if ((interestOps & SelectionKey.OP_READ) != 0) |
446 | * @param executableTasks set of executable tasks | 467 | reasons.add(Reason.READ_READY); |
447 | * @param subscribers subscriber set, one subscriber for each event type | 468 | if ((interestOps & SelectionKey.OP_WRITE) != 0) |
448 | * @param eventType event type we are interested in | 469 | reasons.add(Reason.WRITE_READY); |
449 | */ | 470 | if ((interestOps & SelectionKey.OP_CONNECT) != 0) |
450 | private static void addSubscriberTask(Collection<TaskConfiguration> executableTasks, | 471 | reasons.add(Reason.CONNECT_READY); |
451 | TaskConfiguration[] subscribers, int eventType) { | 472 | if ((interestOps & SelectionKey.OP_ACCEPT) != 0) |
452 | TaskConfiguration tc = subscribers[eventType]; | 473 | reasons.add(Reason.ACCEPT_READY); |
453 | if (tc == null) { | ||
454 | return; | ||
455 | } | ||
456 | executableTasks.add(tc); | ||
457 | tc.ctx.reasons.add(eventToReason[eventType]); | ||
458 | } | 474 | } |
459 | 475 | ||
460 | /** | 476 | /** |
@@ -463,10 +479,8 @@ public class Scheduler { | |||
463 | * @param timeout timeout for select | 479 | * @param timeout timeout for select |
464 | */ | 480 | */ |
465 | private static void handleSelect(RelativeTime timeout) { | 481 | private static void handleSelect(RelativeTime timeout) { |
466 | if (!lock.isHeldByCurrentThread()) | 482 | // gnunet-java uses microseconds, but the select api uses milliseconds |
467 | throw new AssertionError(); | ||
468 | long timeout_ms = timeout.getMicroseconds() / 1000; | 483 | long timeout_ms = timeout.getMicroseconds() / 1000; |
469 | lock.unlock(); | ||
470 | try { | 484 | try { |
471 | // selector.select(0) would block indefinitely (counter-intuitive, java's fault) | 485 | // selector.select(0) would block indefinitely (counter-intuitive, java's fault) |
472 | if (timeout_ms == 0) { | 486 | if (timeout_ms == 0) { |
@@ -480,39 +494,29 @@ public class Scheduler { | |||
480 | } | 494 | } |
481 | } catch (IOException e) { | 495 | } catch (IOException e) { |
482 | throw new IOError(e); | 496 | throw new IOError(e); |
483 | } finally { | ||
484 | lock.lock(); | ||
485 | } | 497 | } |
486 | 498 | ||
487 | logger.debug("select over"); | 499 | logger.debug("select over"); |
488 | 500 | ||
489 | // we use a set so that we don't execute any task twice | 501 | // we use a set so that we don't execute any task twice |
490 | Collection<TaskConfiguration> executableTasks = new HashSet<TaskConfiguration>(); | 502 | Collection<TaskIdentifier> executableTasks = new HashSet<TaskIdentifier>(); |
491 | for (SelectionKey sk : selector.selectedKeys()) { | 503 | for (SelectionKey sk : selector.selectedKeys()) { |
492 | TaskConfiguration[] subscribers = (TaskConfiguration[]) sk.attachment(); | 504 | @SuppressWarnings("unchecked") |
493 | 505 | LinkedList<TaskInterestOps> subscribers = (LinkedList<TaskInterestOps>) sk.attachment(); | |
494 | if (sk.isReadable()) { | 506 | for (TaskInterestOps ops : subscribers) { |
495 | addSubscriberTask(executableTasks, subscribers, EVENT_READ); | 507 | if ((sk.readyOps() & ops.interestOps) != 0) { |
496 | } | 508 | executableTasks.add(ops.tid); |
497 | if (sk.isWritable()) { | 509 | addReasonsFromInterestOp(ops.tid.ctx.reasons, sk.readyOps() & ops.interestOps); |
498 | addSubscriberTask(executableTasks, subscribers, EVENT_WRITE); | 510 | } |
499 | } | ||
500 | if (sk.isAcceptable()) { | ||
501 | addSubscriberTask(executableTasks, subscribers, EVENT_ACCEPT); | ||
502 | } | ||
503 | if (sk.isConnectable()) { | ||
504 | addSubscriberTask(executableTasks, subscribers, EVENT_CONNECT); | ||
505 | } | 511 | } |
506 | |||
507 | } | 512 | } |
508 | for (TaskConfiguration tt : executableTasks) { | 513 | for (TaskIdentifier tt : executableTasks) { |
509 | // cancel subscriptions to other events, we can execute now! | 514 | // cancel subscriptions to other events, we can execute now! |
510 | tt.deregister(); | 515 | tt.deregister(); |
511 | queueReady(tt); | 516 | queueReady(tt); |
512 | } | 517 | } |
513 | } | 518 | } |
514 | 519 | ||
515 | |||
516 | /** | 520 | /** |
517 | * Initialize and run scheduler. This function will return when all tasks | 521 | * Initialize and run scheduler. This function will return when all tasks |
518 | * have completed. | 522 | * have completed. |
@@ -529,10 +533,10 @@ public class Scheduler { | |||
529 | */ | 533 | */ |
530 | public static void run(Task initialTask) { | 534 | public static void run(Task initialTask) { |
531 | logger.debug("running scheduler"); | 535 | logger.debug("running scheduler"); |
532 | if (scheduler_running) { | 536 | if (schedulerRunning) { |
533 | throw new AssertionError("Scheduler already running"); | 537 | throw new AssertionError("Scheduler already running"); |
534 | } | 538 | } |
535 | scheduler_running = true; | 539 | schedulerRunning = true; |
536 | try { | 540 | try { |
537 | run_unchecked(initialTask); | 541 | run_unchecked(initialTask); |
538 | } finally { | 542 | } finally { |
@@ -543,7 +547,6 @@ public class Scheduler { | |||
543 | } | 547 | } |
544 | } | 548 | } |
545 | 549 | ||
546 | |||
547 | /** | 550 | /** |
548 | * Initialize and run scheduler. This function will return when all tasks | 551 | * Initialize and run scheduler. This function will return when all tasks |
549 | * have completed. Don't check if the scheduler is already running or not. | 552 | * have completed. Don't check if the scheduler is already running or not. |
@@ -556,12 +559,7 @@ public class Scheduler { | |||
556 | } | 559 | } |
557 | 560 | ||
558 | // the gnunet main loop | 561 | // the gnunet main loop |
559 | while (true) { | 562 | while (checkLiveness()) { |
560 | lock.lock(); | ||
561 | if (checkLiveness() == false) { | ||
562 | lock.unlock(); | ||
563 | break; | ||
564 | } | ||
565 | RelativeTime nextTimeout = handleTimeouts(); | 563 | RelativeTime nextTimeout = handleTimeouts(); |
566 | if (nextTimeout.getMicroseconds() < 0) { | 564 | if (nextTimeout.getMicroseconds() < 0) { |
567 | logger.warn("negative timeout for select"); | 565 | logger.warn("negative timeout for select"); |
@@ -569,7 +567,6 @@ public class Scheduler { | |||
569 | 567 | ||
570 | // don't select if there are no tasks; we are done! | 568 | // don't select if there are no tasks; we are done! |
571 | if (readyCount == 0 && pending.isEmpty()) { | 569 | if (readyCount == 0 && pending.isEmpty()) { |
572 | lock.unlock(); | ||
573 | return; | 570 | return; |
574 | } | 571 | } |
575 | 572 | ||
@@ -580,7 +577,6 @@ public class Scheduler { | |||
580 | handleSelect(nextTimeout); | 577 | handleSelect(nextTimeout); |
581 | } | 578 | } |
582 | runReady(); | 579 | runReady(); |
583 | lock.unlock(); | ||
584 | } | 580 | } |
585 | 581 | ||
586 | if (readyCount != 0) { | 582 | if (readyCount != 0) { |
@@ -618,9 +614,9 @@ public class Scheduler { | |||
618 | // start executing from the highest priority down to 0 | 614 | // start executing from the highest priority down to 0 |
619 | for (int p = Priority.numberOfPriorities - 1; p >= 0; p--) { | 615 | for (int p = Priority.numberOfPriorities - 1; p >= 0; p--) { |
620 | // execute all tasks with priority p | 616 | // execute all tasks with priority p |
621 | LinkedList<TaskConfiguration> queue = readyLists[p]; | 617 | LinkedList<TaskIdentifier> queue = readyLists[p]; |
622 | while (!queue.isEmpty()) { | 618 | while (!queue.isEmpty()) { |
623 | TaskConfiguration tid = queue.removeFirst(); | 619 | TaskIdentifier tid = queue.removeFirst(); |
624 | readyCount--; | 620 | readyCount--; |
625 | tid.run(); | 621 | tid.run(); |
626 | } | 622 | } |
@@ -637,13 +633,27 @@ public class Scheduler { | |||
637 | */ | 633 | */ |
638 | public static void shutdown() { | 634 | public static void shutdown() { |
639 | // queueReady() while iterating would yield concurrent modification exn otherwise | 635 | // queueReady() while iterating would yield concurrent modification exn otherwise |
640 | for (TaskConfiguration tid : new ArrayList<TaskConfiguration>(pending)) { | 636 | for (TaskIdentifier tid : new ArrayList<TaskIdentifier>(pending)) { |
641 | tid.ctx.reasons.add(Reason.SHUTDOWN); | 637 | tid.ctx.reasons.add(Reason.SHUTDOWN); |
642 | queueReady(tid); | 638 | queueReady(tid); |
643 | } | 639 | } |
644 | pending.clear(); | 640 | pending.clear(); |
645 | } | 641 | } |
646 | 642 | ||
643 | /** | ||
644 | * Reset the scheduler forcefully. | ||
645 | * Intended to be used internally in the Scheduler, as well as in test teardown. | ||
646 | */ | ||
647 | public static void forceReset() { | ||
648 | schedulerRunning = false; | ||
649 | readyCount = 0; | ||
650 | activeTask = null; | ||
651 | for (int i = 0; i < Priority.numberOfPriorities; ++i) { | ||
652 | readyLists[i] = Lists.newLinkedList(); | ||
653 | } | ||
654 | pending.clear(); | ||
655 | } | ||
656 | |||
647 | 657 | ||
648 | /** | 658 | /** |
649 | * A handle to a file system object that can be selected on. | 659 | * A handle to a file system object that can be selected on. |