/* This file is part of GNUnet. Copyright (C) 2009 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ package org.gnunet.util; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.channels.spi.SelectorProvider; import java.util.*; /** * Schedule computations using continuation passing style. * * All operations, per default, use the thread-local scheduler. * * @author Florian Dold */ public class Scheduler { private static final Logger logger = LoggerFactory .getLogger(Scheduler.class); static ThreadLocal threadScheduler = new ThreadLocal() { @Override protected SchedulerInstance initialValue() { return new SchedulerInstance(); } }; private static class SchedulerInstance { /** * Task that we are currently executing, or null if no task is currently running. */ TaskIdentifier activeTask = null; /** * Number of tasks in the ready lists, that is, number of tasks that is ready to run * (all prerequisites are fulfilled). */ int readyCount = 0; /** * Selector, used to check file descriptors for readiness. */ Selector selector = null; /** * For every priority, there is a list of tasks that is definitely ready to run. */ @SuppressWarnings("unchecked") final LinkedList[] readyLists = new LinkedList[Priority.numberOfPriorities]; /** * true iff the scheduler is currently running. */ boolean schedulerRunning = false; /** * Pending tasks are waiting for an event. Each pending task has a (possibly infinitely long) * deadline after which the task is executed regardless of the prerequisites. */ final Queue pending = new PriorityQueue(5, new Comparator () { @Override public int compare(TaskIdentifier a, TaskIdentifier b) { return a.deadline.compareTo(b.deadline); } }); /** * Check if the system is still life. Trigger disconnect if we have tasks, but * none of them give us lifeness. * * @return true to continue the main loop, false to exit */ private boolean checkLiveness() { if (readyCount > 0) { return true; } for (TaskIdentifier t : pending) { if (t.lifeness) { return true; } } // trigger shutdown if we still have pending tasks, but none of them has lifeness if (!pending.isEmpty()) { logger.debug("tasks pending but not alive -- disconnect"); shutdown(); return true; } return false; } /** * Queue a Task for execution. * * @param tid TaskIdentifier of the ready task */ private void queueReady(TaskIdentifier tid) { int idx = tid.priority.ordinal(); readyLists[idx].add(tid); readyCount++; pending.remove(tid); } void addContinuation(Task task, EnumSet reasons) { readyLists[Priority.DEFAULT.ordinal()].add(new TaskIdentifier(this, task, reasons)); readyCount += 1; } /** * Queue all tasks with expired timeout. * * @return the minimum time to wait until the next timeout expiry */ private RelativeTime handleTimeouts() { RelativeTime timeout = RelativeTime.FOREVER; // check if any timeouts occurred while (true) { TaskIdentifier t = pending.peek(); if (t == null) { break; } RelativeTime remaining = t.deadline.getRemaining(); if (remaining.getMicroseconds() <= 0) { t.deregister(); t.ctx.reasons = EnumSet.of(Reason.TIMEOUT); queueReady(t); } else { timeout = remaining; break; } } return timeout; } public SchedulerInstance() { for (int i = 0; i < Priority.numberOfPriorities; ++i) { readyLists[i] = new LinkedList(); } try { selector = SelectorProvider.provider().openSelector(); } catch (final IOException e) { // what to do here? logger.error("fatal: cannot create selector"); System.exit(-1); } } /** * Select on channels and queue tasks that become executable. * * @param timeout timeout for select */ private void handleSelect(RelativeTime timeout) { // gnunet-java uses microseconds, but the select api uses milliseconds long timeout_ms = timeout.getMicroseconds() / 1000; try { // selector.select(0) would block indefinitely (counter-intuitive, java's fault) if (timeout_ms == 0) { selector.selectNow(); } else if (timeout.isForever()) { // fixme: we should only do this if we are sure there are tasks that select on something logger.debug("selecting, timeout=forever"); selector.select(0); } else { selector.select(timeout_ms); } } catch (IOException e) { throw new IOError(e); } logger.debug("select over"); // we use a set so that we don't execute any task twice Collection executableTasks = new HashSet(); for (SelectionKey sk : selector.selectedKeys()) { @SuppressWarnings("unchecked") LinkedList subscribers = (LinkedList) sk.attachment(); for (TaskInterestOps ops : subscribers) { if ((sk.readyOps() & ops.interestOps) != 0) { executableTasks.add(ops.tid); addReasonsFromInterestOp(ops.tid.ctx.reasons, sk.readyOps() & ops.interestOps); } } } for (TaskIdentifier tt : executableTasks) { // onCancel subscriptions to other events, we can execute now! tt.deregister(); queueReady(tt); } } /** * Initialize and run scheduler. This function will return when all tasks * have completed. */ public void run() { run(null); } /** * Initialize and run scheduler. This function will return when all tasks * have completed. * * @param initialTask the initial task to run immediately */ public void run(Task initialTask) { logger.debug("running scheduler"); if (schedulerRunning) { throw new AssertionError("Scheduler already running"); } schedulerRunning = true; try { runUnchecked(initialTask); } finally { logger.debug("cleaning up after scheduler ran"); // ensure that after run returns, the scheduler is in its initial state, // even though there was an exception (e.g. after a test case that expects an exception) forceReset(); } } /** * Request the shutdown of the scheduler. Marks all currently pending tasks as * ready because of disconnect. This will cause all tasks to run (as soon as * possible, respecting priorities and prerequisite tasks). Note that tasks * scheduled AFTER this call may still be delayed arbitrarily. */ public void shutdown() { // queueReady() while iterating would yield concurrent modification exn otherwise for (TaskIdentifier tid : new ArrayList(pending)) { tid.ctx.reasons.add(Reason.SHUTDOWN); queueReady(tid); } pending.clear(); } /** * Reset the scheduler forcefully. * Intended to be used internally in the Scheduler, as well as in test teardown. */ public void forceReset() { schedulerRunning = false; readyCount = 0; activeTask = null; for (int i = 0; i < Priority.numberOfPriorities; ++i) { readyLists[i] = Lists.newLinkedList(); } pending.clear(); } public boolean hasTasks() { return readyCount != 0 || !pending.isEmpty(); } /** * Execute tasks until either *
    *
  • there are no ready tasks
  • *
  • there is a pending task (which may be of higher priority)
  • *
*/ private void runReady() { do { if (readyCount == 0) { return; } // start executing from the highest priority down to 0 for (int p = Priority.numberOfPriorities - 1; p >= 0; p--) { // execute all tasks with priority p LinkedList queue = readyLists[p]; while (!queue.isEmpty()) { TaskIdentifier tid = queue.removeFirst(); readyCount--; tid.run(); } } } while (pending.size() == 0); } /** * Initialize and run scheduler. This function will return when all tasks * have completed. Don't check if the scheduler is already running or not. * * @param initialTask the initial task to run immediately */ private void runUnchecked(Task initialTask) { if (initialTask != null) { addContinuation(initialTask, EnumSet.of(Reason.STARTUP)); } // the gnunet main loop while (checkLiveness()) { RelativeTime nextTimeout = handleTimeouts(); if (nextTimeout.getMicroseconds() < 0) { logger.warn("negative timeout for select"); } // don't select if there are no tasks; we are done! if (readyCount == 0 && pending.isEmpty()) { return; } // don't block in select if we have tasks ready to run! if (readyCount > 0) { handleSelect(RelativeTime.ZERO); } else { handleSelect(nextTimeout); } runReady(); } if (readyCount != 0) { throw new AssertionError("tasks ready after scheduler ran (count)"); } for (List readyList : readyLists) { if (!readyList.isEmpty()) { throw new AssertionError("tasks ready after scheduler ran (list)"); } } if (pending.size() != 0) { throw new AssertionError("pending tasks after scheduler ran"); } if (activeTask != null) { throw new AssertionError("active task after scheduler ran"); } } } /** * Priority for Tasks, in order if ascending priority. * When two tasks are ready, the one with the higher priority is executed first. */ public enum Priority { IDLE, BACKGROUND, DEFAULT, HIGH, UI, URGENT, SHUTDOWN; /** * how many different priorities do we have? */ private static final int numberOfPriorities = Priority.values().length; } /** * Reasons for executing a task. */ public enum Reason { STARTUP, SHUTDOWN, TIMEOUT, READ_READY, WRITE_READY, ACCEPT_READY, CONNECT_READY } /** * The context of a task that is ready to run. */ public static class RunContext { /** * The reason this task has been called by the scheduler. */ public EnumSet reasons = EnumSet.noneOf(Reason.class); } /** * Which operations is a task identifier interested in? */ private static class TaskInterestOps { TaskIdentifier tid; int interestOps; } /** * Manage subscriptions for selection events on channels. */ private static class Subscriptions { /** * Selector to use for subscription operations. */ private final Selector selector; private static class ChannelInterest { SelectableChannel channel; int interestOps; } List channelInterests = Lists.newLinkedList(); void add(SelectableChannel channel, int interestOps) { boolean found = false; for (ChannelInterest ci : channelInterests) { if (ci.channel == channel) { ci.interestOps |= interestOps; if ((ci.interestOps | SelectionKey.OP_CONNECT | SelectionKey.OP_READ) != 0) { throw new AssertionError("OP_CONNECT and OP_READ are incompatible in java"); } found = true; break; } } if (!found) { ChannelInterest ci = new ChannelInterest(); ci.channel = channel; ci.interestOps = interestOps; channelInterests.add(ci); } } void apply(TaskIdentifier tid) { for (ChannelInterest ci : channelInterests) { SelectionKey key = ci.channel.keyFor(selector); if (key == null || !key.isValid()) { try { key = ci.channel.register(selector, ci.interestOps); key.attach(new LinkedList()); } catch (ClosedChannelException e) { throw new IOError(e); } } else { key.interestOps(key.interestOps() | ci.interestOps); } @SuppressWarnings("unchecked") LinkedList opl = (LinkedList) key.attachment(); TaskInterestOps tio = new TaskInterestOps(); tio.tid = tid; tio.interestOps = ci.interestOps; opl.add(tio); } } void stop(TaskIdentifier tid) { for (ChannelInterest ci : channelInterests) { SelectionKey key = ci.channel.keyFor(selector); if (key == null || !key.isValid()) { logger.warn("missing selection key"); return; } @SuppressWarnings("unchecked") LinkedList interestList = (LinkedList) key.attachment(); Iterator it = interestList.iterator(); int remainingInterestOps = 0; while (it.hasNext()) { TaskInterestOps ops = it.next(); if (ops.tid == tid) { it.remove(); } else { remainingInterestOps |= ops.interestOps; } } key.interestOps(remainingInterestOps); } } public Subscriptions(Selector selector) { this.selector = selector; } } /** * A task is the basic unit of work that is managed by the scheduler. */ public static interface Task { public void run(RunContext ctx); } /** * Representation of a task that has been scheduled, and can be canceled * until the task has run. */ public static class TaskIdentifier implements Cancelable { private boolean hasRun = false; private boolean isCanceled = false; private final Task task; private final RunContext ctx = new RunContext(); private final boolean lifeness; private final Priority priority; private final AbsoluteTime deadline; private final Subscriptions subscriptions; private SchedulerInstance scheduler; public TaskIdentifier(SchedulerInstance scheduler, Task task, EnumSet reasons) { this.scheduler = scheduler; this.ctx.reasons = reasons; this.task = task; lifeness = true; priority = Priority.DEFAULT; deadline = null; subscriptions = null; } public TaskIdentifier(SchedulerInstance scheduler, TaskConfiguration tc) { this.scheduler = scheduler; this.task = tc.task; this.subscriptions = tc.subscriptions; this.deadline = tc.deadline; this.priority = tc.priority; this.lifeness = tc.lifeness; } private void run() { if (hasRun) { throw new AssertionError("same task ran twice"); } if (isCanceled) { return; } TaskIdentifier old = scheduler.activeTask; scheduler.activeTask = this; task.run(ctx); hasRun = true; scheduler.activeTask = old; } @Override public void cancel() { if (hasRun) { throw new AssertionError("can't onCancel task that already ran"); } if (isCanceled) { throw new AssertionError("task canceled twice"); } isCanceled = true; scheduler.pending.remove(this); } private void deregister() { if (subscriptions != null) { subscriptions.stop(this); } } } /** * A TaskConfiguration contains all information to schedule a task. */ public static class TaskConfiguration { private final Task task; private boolean lifeness = true; private Priority priority; private final AbsoluteTime deadline; private Subscriptions subscriptions; private SchedulerInstance scheduler; /** * Create a TaskConfiguration. * * @param delay when will the task be run? * may be null to indicate that this task may not be run * (but only queued directly) * @param task task to run with this TaskIdentifier */ TaskConfiguration(SchedulerInstance scheduler, RelativeTime delay, Task task) { if (delay == null) throw new AssertionError("task delay may not be 'null'"); this.scheduler = scheduler; this.task = task; this.deadline = delay.toAbsolute(); } public TaskConfiguration(RelativeTime delay, Task task) { this(threadScheduler.get(), delay, task); } public TaskIdentifier schedule() { return schedule(Scheduler.threadScheduler.get()); } public TaskIdentifier schedule(SchedulerInstance scheduler) { if (priority == null) { if (scheduler.activeTask != null) { priority = scheduler.activeTask.priority; } else { priority = Priority.DEFAULT; } } TaskIdentifier tid = new TaskIdentifier(scheduler, this); if (subscriptions != null) subscriptions.apply(tid); scheduler.pending.add(tid); return tid; } public void addSelectEvent(SelectableChannel channel, int event) { if (channel == null) { throw new AssertionError("channel may not be null"); } if (subscriptions == null) subscriptions = new Subscriptions(scheduler.selector); subscriptions.add(channel, event); } public void setLifeness(boolean b) { this.lifeness = b; } } /** * Run the task regardless of any prerequisites, before any other task of * the same priority. */ public static void addContinuation(Task task, EnumSet reasons) { threadScheduler.get().addContinuation(task, reasons); } /** * Schedule a new task to be run as soon as possible. The task will be run * with the priority of the calling task. * * @param task main function of the task * @return unique task identifier for the job only valid until "task" is * started! */ public static Cancelable add(Task task) { return addDelayed(RelativeTime.ZERO, task); } /** * Add a task to run after the specified delay. * * @param delay time to wait until running the task * @param task the task to run after delay * @return the TaskIdentifier, can be used to onCancel the task until it has been executed. */ public static TaskIdentifier addDelayed(RelativeTime delay, Task task) { TaskConfiguration tid = new TaskConfiguration(delay, task); return tid.schedule(threadScheduler.get()); } /** * Add a task to run after the specified delay, or after the given channel * is ready to read, whichever occurs first. * * @param timeout time to wait until running the task * @param chan chennel of interest * @param task task to run * @return task identifier */ public static TaskIdentifier addRead(RelativeTime timeout, SelectableChannel chan, Task task) { TaskConfiguration tid = new TaskConfiguration(timeout, task); tid.addSelectEvent(chan, SelectionKey.OP_READ); return tid.schedule(threadScheduler.get()); } /** * Initialize and run scheduler. This function will return when all tasks * have completed. */ public static void run() { threadScheduler.get().run(); } /** * Initialize and run scheduler. This function will return when all tasks * have completed. * * @param initialTask the initial task to run immediately */ public static void run(Task initialTask) { threadScheduler.get().run(initialTask); } public static boolean hasTasks() { return threadScheduler.get().hasTasks(); } /** * Add a task to run after the specified delay, or after the given channel * is ready to write, whichever occurs first. * * @param timeout to wait until running the task * @param chan channel of interest * @param task task to run * @return task identifier */ public static TaskIdentifier addWrite(RelativeTime timeout, SelectableChannel chan, Task task) { TaskConfiguration tid = new TaskConfiguration(timeout, task); tid.addSelectEvent(chan, SelectionKey.OP_WRITE); return tid.schedule(threadScheduler.get()); } private static void addReasonsFromInterestOp(EnumSet reasons, int interestOps) { if ((interestOps & SelectionKey.OP_READ) != 0) reasons.add(Reason.READ_READY); if ((interestOps & SelectionKey.OP_WRITE) != 0) reasons.add(Reason.WRITE_READY); if ((interestOps & SelectionKey.OP_CONNECT) != 0) reasons.add(Reason.CONNECT_READY); if ((interestOps & SelectionKey.OP_ACCEPT) != 0) reasons.add(Reason.ACCEPT_READY); } /** * A handle to a file system object that can be selected on. */ public static class FilePipe { private FilePipeThread filePipeThread; private FilePipe(FilePipeThread filePipeThread) { this.filePipeThread = filePipeThread; } public Pipe.SourceChannel getSource() { return filePipeThread.pipe.source(); } } public static void forceReset() { // FIXME: this should reset all schedulers? threadScheduler.get().forceReset(); } /** * A thread that reads from a file pipe. */ private static class FilePipeThread extends Thread { public File file; public Pipe pipe; FilePipeThread(File file) { this.file = file; try { pipe = SelectorProvider.provider().openPipe(); pipe.source().configureBlocking(false); pipe.sink().configureBlocking(false); } catch (IOException e) { throw new RuntimeException("selector provider has no pipes"); } } @Override public void run() { // has to be done in thread, blocks if file is a fifo FileChannel fileChannel; try { FileInputStream stream; stream = new FileInputStream(file); fileChannel = stream.getChannel(); } catch (FileNotFoundException e) { throw new IOError(e); } // we have such a small buffer so that the pipe will not buffer ByteBuffer buffer = ByteBuffer.allocate(1); boolean quit = false; while (!quit) { try { buffer.clear(); fileChannel.read(buffer); buffer.flip(); pipe.sink().write(buffer); } catch (IOException e) { quit = true; try { fileChannel.close(); } catch (IOException ex) { // nothing we can do here } try { pipe.sink().close(); } catch (IOException ex) { // nothing we can do here } try { pipe.source().close(); } catch (IOException ex) { // nothing we can do here } } } } } public static FilePipe openFilePipe(File file) { FilePipeThread fpt = new FilePipeThread(file); fpt.setDaemon(true); fpt.start(); return new FilePipe(fpt); } public static void debugPrintPendingTasks() { System.err.println("pending tasks:"); for (TaskIdentifier i : threadScheduler.get().pending) { System.err.println(i.task.getClass()); } } }