/* This file is part of GNUnet. (C) 2011, 2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ package org.gnunet.util; import org.gnunet.construct.Construct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOError; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.LinkedList; import java.util.List; /** * Integrates sockets with the gnunet-java message loop / the scheduler. */ public class Connection { private static final Logger logger = LoggerFactory .getLogger(Connection.class); /** * The underlying socket the client is using to talk with the service. */ private SocketChannel connectionChannel = null; /** * The list of all address probes. * Each address probe tries to connect via a different address. */ private List addressProbes = null; /** * The task that is currently used by the resolve mechanism. */ private Cancelable resolveHandle = null; /** * The task that is responsible for establishing the connection to the server. */ private Cancelable connectHandle = null; /** * The ReceiveTask responsible for receiving a whole message from the service * and calling the respective MessageReceiver. */ private Scheduler.TaskIdentifier receiveTaskId; private MessageReceiver currentReceiver; private AbsoluteTime receiveDeadline; /** * The handle for the current transmission. Writes data to the socket. */ private TransmitHelper currentTransmitHelper = null; /** * The handle for the next transmission. The next transmission will become the current * transmission once the current transmission has completed. * While nextTransmitHelper is not null, no new transmit requests may be scheduled. */ private TransmitHelper nextTransmitHelper = null; /** * Has the last call to the mst produced a message? */ private boolean processedMessage; /** * The transmitters passed to transmitReadyNotify(...) write to this buffer by calling * methods on the MessageSink passed to the Transmitter.transmit(MessageSink s) method. * Initially, this buffer has the size of the smallest possible messages, but grows when * transmitting larger messages. */ private ByteBuffer transmitBuffer = ByteBuffer.allocate(GnunetMessage.Header.SIZE); /** * Has the connection been disconnected? */ private boolean disconnected = false; /** * Timeout task for the connect notify. */ private Scheduler.TaskIdentifier notifyConnectedTimeout; /** * Continuation to call when connected. */ private Continuation notifyConnectedContinuation; /** * Message stream tokenizer for messages received from the connection. */ private MessageStreamTokenizer mst; /** * An address probe is a connection to a socket that may succeed or not. * The first address probe that succeeded is used for this connection. */ private static class AddressProbe { Cancelable connectTask; SocketChannel channel; public void cancel() { if (connectTask != null) { connectTask.cancel(); } if (channel != null) { try { channel.close(); } catch (IOException e) { // nothing we can do here } } } } /** * Represents a getRequestIdentifier for transmission. */ public interface TransmitHandle extends Cancelable { /** * Cancel a getRequestIdentifier for the transmit ready notification. * This does *not* onCancel a transmission that already has been started. */ public void cancel(); } /** * An interface that allows the Transmitter.transmit method to deliver their messages * to the client, which sends them to the service. */ public interface MessageSink { public void send(GnunetMessage.Body m); } /** * The ReceiveTask is responsible for receiving a whole * GnunetMessage and call the respective MessageReceiver with the message on success, * and null on failure or timeout. */ private class ReceiveTask implements Scheduler.Task { /** * The task object's work is over, either because it succeeded at its job, * or it has been canceled. */ public boolean done; private void error() { currentReceiver.handleError(); done = true; } @Override public void run(Scheduler.RunContext ctx) { if (currentReceiver == null) { throw new AssertionError(); } receiveTaskId = null; if (ctx.reasons.contains(Scheduler.Reason.TIMEOUT)) { error(); } else if (ctx.reasons.contains(Scheduler.Reason.READ_READY)) { logger.debug("ready to receive"); try { processedMessage = false; logger.debug("reading into mst ..."); int n = mst.readFrom(connectionChannel, true); logger.debug("read {} bytes into mst", n); if (processedMessage) { done = true; return; } if (-1 == n) { error(); return; } } catch (IOException e) { logger.debug("got IOException ({}, {})", e.getClass().getName(), e.getMessage()); error(); return; } if (receiveDeadline.isDue()) { error(); return; } receiveTaskId = Scheduler.addRead(receiveDeadline.getRemaining(), connectionChannel, this); } else if (ctx.reasons.contains(Scheduler.Reason.SHUTDOWN)) { done = true; } else { // XXX: what to do here? throw new RuntimeException("receive failed"); } } } private class TransmitHelper implements Scheduler.Task, MessageSink { private final MessageTransmitter transmitter; private Cancelable notifyTimeoutTask; private Cancelable transmitTask = null; public TransmitHelper(final MessageTransmitter transmitter, RelativeTime notifyTimeout) { this.transmitter = transmitter; Scheduler.TaskConfiguration tc = new Scheduler.TaskConfiguration(notifyTimeout, new Scheduler.Task() { @Override public void run(Scheduler.RunContext ctx) { transmitter.handleError(); } }); notifyTimeoutTask = tc.schedule(); } public void cancel() { if (transmitTask != null) { transmitTask.cancel(); transmitTask = null; } if (notifyTimeoutTask != null) { notifyTimeoutTask.cancel(); notifyTimeoutTask = null; } } @Override public void run(Scheduler.RunContext ctx) { this.transmitTask = null; if (connectionChannel == null) { logger.error("could not write to channel (null)"); return; } try { int n = connectionChannel.write(transmitBuffer); // logger.debug("connectionChannel has written " + n + " bytes to " + connectionChannel.socket().toString()); } catch (IOException e) { throw new IOError(e); } if (transmitBuffer.remaining() == 0) { logger.debug("transmit buffer fully sent"); if (nextTransmitHelper == null) { currentTransmitHelper = null; } else { currentTransmitHelper = nextTransmitHelper; // we need to to this so the transmit callback can do notifyTransmitReady TransmitHelper tmpTransmitHelper = nextTransmitHelper; nextTransmitHelper = null; tmpTransmitHelper.start(); } } else { schedule(); } } /** * called to notify when we are ready to put new messages in the transmit buffer */ public void start() { notifyTimeoutTask.cancel(); notifyTimeoutTask = null; transmitBuffer.clear(); transmitter.transmit(TransmitHelper.this); transmitBuffer.flip(); schedule(); } private void schedule() { if (disconnected) { return; } // timeout is forever, because there is no way to directly limit the transmission time // of a message, only the max. wait time before transmission. // onCancel must be called on the transmitTask if we disconnect Scheduler.TaskConfiguration tc = new Scheduler.TaskConfiguration(RelativeTime.FOREVER, this); tc.addSelectEvent(connectionChannel, SelectionKey.OP_WRITE); this.transmitTask = tc.schedule(); } @Override public void send(final GnunetMessage.Body m) { final GnunetMessage gm = new GnunetMessage(); gm.header = new GnunetMessage.Header(); gm.body = m; Construct.patch(gm); gm.header.messageSize = Construct.getSize(gm); byte[] b = Construct.toBinary(gm); if (b.length != gm.header.messageSize) { throw new AssertionError( String.format("tried to send message with binary size %s but size in header %s", b.length, gm.header.messageSize)); } logger.debug("sending message (size={},type={}) over {}", new String[] {String.valueOf(b.length), String.valueOf(gm.header.messageType), connectionChannel.socket().toString()}); if (transmitBuffer.remaining() < b.length) { ByteBuffer buf = ByteBuffer.allocate(b.length + transmitBuffer.capacity()); transmitBuffer.flip(); buf.put(transmitBuffer); transmitBuffer = buf; } transmitBuffer.put(b); } } private class ConnectionMstCallback implements MstCalllback { private void dispatch(GnunetMessage.Body mb) { if (processedMessage) { throw new AssertionError(); } if (null == currentReceiver) { throw new AssertionError(); } currentReceiver.process(mb); processedMessage = true; } @Override public void onUnknownMessage(UnknownMessageBody b) { dispatch(b); } @Override public void onKnownMessage(GnunetMessage msg) { dispatch(msg.body); } } /** * Create a connection to the given hostname/port. * * @param hostname name of the host to connect to * @param port port of the host to connect to */ public Connection(String hostname, int port) { mst = new MessageStreamTokenizer(new ConnectionMstCallback()); addressProbes = new LinkedList(); ConnectionResolveHandler addressHandler = new ConnectionResolveHandler(port); resolveHandle = Resolver.getInstance().resolveHostname(hostname, RelativeTime.FOREVER, addressHandler); } public Connection(SocketChannel sock) { mst = new MessageStreamTokenizer(new ConnectionMstCallback()); assert sock != null; this.connectionChannel = sock; } class ConnectionResolveHandler implements Resolver.AddressCallback { private final int port; public ConnectionResolveHandler(int port) { this.port = port; } @Override public void onAddress(InetAddress addr) { final SocketChannel channel = createChannel(); try { channel.connect(new InetSocketAddress(addr, port)); } catch (IOException e) { logger.error("could not connect to host"); return; } final AddressProbe addressProbe = new AddressProbe(); addressProbe.channel = channel; Scheduler.TaskConfiguration tc = new Scheduler.TaskConfiguration(RelativeTime.FOREVER, new Scheduler.Task() { @Override public void run(Scheduler.RunContext ctx) { addressProbe.connectTask = null; if (ctx.reasons.contains(Scheduler.Reason.SHUTDOWN)) { return; } Connection.this.finishConnect(addressProbe); } }); // our channel has already disconnected if (!channel.isOpen()) { return; } tc.addSelectEvent(channel, SelectionKey.OP_CONNECT); addressProbe.connectTask = tc.schedule(); } @Override public void onFinished() { resolveHandle = null; } @Override public void onTimeout() { // do nothing // todo: is this correct? } } /** * Actually connect the socket that select reported as ready to connect. * Discards all remaining address probes. */ private void finishConnect(AddressProbe probe) { // can happen if the addres probe task was already scheduled if (connectionChannel != null) { try { probe.channel.close(); } catch (IOException e) { logger.error("could not close channel", e); } return; } SocketChannel channel = probe.channel; boolean connected; try { connected = channel.finishConnect(); } catch (IOException e) { logger.debug("finishConnect() was not successful: {}", (Object) e); return; } if (!connected) { logger.error("socket reported OP_CONNECT but is not connected"); return; } for (AddressProbe addressProbe : addressProbes) { if (addressProbe != probe && addressProbe.connectTask != null) { addressProbe.connectTask.cancel(); try { addressProbe.channel.close(); } catch (IOException e) { logger.error("could not close channel", e); } } } addressProbes.clear(); connectionChannel = channel; if (currentTransmitHelper != null) { currentTransmitHelper.start(); } Continuation c = notifyConnectedContinuation; notifyConnectedContinuation = null; if (notifyConnectedTimeout != null) { notifyConnectedTimeout.cancel(); notifyConnectedTimeout = null; } if (c != null) { c.cont(true); } } /** * Open a channel for this connection in non-blocking mode */ private SocketChannel createChannel() { try { SocketChannel channel = SelectorProvider.provider().openSocketChannel(); channel.configureBlocking(false); return channel; } catch (IOException e) { // this is fatal, no retry necessary throw new IOError(e); } } public boolean isConnected() { return connectionChannel != null && connectionChannel.isConnected(); } /** * Receive one message from the network. * * @param timeout deadline after which receiver.onError() will be called * @param receiver MessageReceiver that is responsible for the received message */ public void receive(final RelativeTime timeout, final MessageReceiver receiver) { if (receiveTaskId != null) throw new AssertionError("already receiving"); if (!isConnected()) { throw new AssertionError("cannot receive if not connected"); } currentReceiver = receiver; receiveDeadline = timeout.toAbsolute(); // make sure that the receiver is never called directly Scheduler.add(new Scheduler.Task() { @Override public void run(Scheduler.RunContext ctx) { // full message still in buffer? processedMessage = false; if (mst.extractOne()) { logger.debug("full message was in buffer, not reading from socket"); if (!processedMessage) { throw new AssertionError(); } return; } // did we get disconnected in the mean time? if (connectionChannel == null) { return; } final ReceiveTask task = new ReceiveTask(); receiveTaskId = Scheduler.addRead(timeout, connectionChannel, task); } }); } /** * Call the transmitter once the we are ready to transmit data. * * @param size number of bytes to send * @param timeout after how long should we give up (and call transmitter.transmit(null)) * @param transmitter the MessageTransmitter object to call once the client is ready to transmit or * when the timeout is over. Guaranteed to be called *after* notifyTransmitReady has returned. * @return a handle that can be used to onCancel the transmit getRequestIdentifier, null if getRequestIdentifier could be satisfied immediately */ public TransmitHandle notifyTransmitReady(int size, RelativeTime timeout, final MessageTransmitter transmitter) { if (disconnected) { throw new AssertionError("notifyTransmitReady called on a closed connection"); } if (nextTransmitHelper != null) { throw new AssertionError( "previous transmit getRequestIdentifier must have completed before calling notifyTransmitReady again"); } if (timeout.getMicroseconds() <= 0) { throw new AssertionError("notifyTransmitReady timeout must be positive"); } if (!isConnected()) { throw new AssertionError("notifyTransmitHandle can only be called once connected"); } final TransmitHelper transmit = new TransmitHelper(transmitter, timeout); if (currentTransmitHelper == null) { currentTransmitHelper = transmit; currentTransmitHelper.start(); return null; } nextTransmitHelper = transmit; return new TransmitHandle() { @Override public void cancel() { transmit.cancel(); } }; } /** * Call cont after establishing the connection or when the timeout has occured. * * @param timeout timeout * @param cont continuation to call * @return a handle to cancel the notification */ Cancelable notifyConnected(RelativeTime timeout, final Continuation cont) { if (notifyConnectedTimeout != null) { throw new AssertionError(); } this.notifyConnectedContinuation = cont; this.notifyConnectedTimeout = Scheduler.addDelayed(timeout, new Scheduler.Task() { @Override public void run(Scheduler.RunContext ctx) { Continuation c = notifyConnectedContinuation; notifyConnectedContinuation = null; Connection.this.notifyConnectedTimeout = null; if (c != null) { c.cont(false); } } }); return this.notifyConnectedTimeout; } /** * Disconnect. There must not be any pending transmit/receive requests. * Any buffered data scheduled for writing is discarded. */ public void disconnect() { if (disconnected) { logger.error("disconnect called twice"); } disconnected = true; if (notifyConnectedTimeout != null) { notifyConnectedTimeout.cancel(); notifyConnectedTimeout = null; } if (receiveTaskId != null) { receiveTaskId.cancel(); receiveTaskId = null; } if (currentTransmitHelper != null) { currentTransmitHelper.cancel(); currentTransmitHelper = null; } if (nextTransmitHelper != null) { nextTransmitHelper.cancel(); nextTransmitHelper = null; } if (resolveHandle != null) { resolveHandle.cancel(); resolveHandle = null; } if (connectHandle != null) { connectHandle.cancel(); connectHandle = null; } if (connectionChannel != null) { try { connectionChannel.close(); } catch (IOException e) { throw new IOError(e); } connectionChannel = null; } if (addressProbes != null) { for (AddressProbe ap : addressProbes) { ap.cancel(); } } } }