/* This file is part of GNUnet. Copyright (C) 2009,2013 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ package org.gnunet.testbed; import org.gnunet.mq.Envelope; import org.gnunet.requests.MatchingRequestContainer; import org.gnunet.requests.Request; import org.gnunet.requests.RequestIdentifier; import org.gnunet.testbed.messages.*; import org.gnunet.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Handle to interact with a GNUnet testbed controller. Each * controller has at least one master handle which is created when the * controller is created; this master handle interacts with the * controller process, destroying it destroys the controller (by * closing stdin of the controller process). Additionally, * controllers can interact with each other (in a P2P fashion); those * links are established via TCP/IP on the controller's service port. */ public class Controller { private static final Logger logger = LoggerFactory .getLogger(Controller.class); public static class EventTypes { /** * A peer has been started. */ public static final int PEER_START = 0; /** * A peer has been stopped. */ public static final int PEER_STOP = 1; /** * A connection between two peers has been established. */ public static final int PEERS_CONNECT = 2; /** * A connection between two peers has been torn down. */ public static final int PEERS_DISCONNECT = 3; /** * An operation has finished. */ public static final int OPERATION_FINISHED = 4; } /** * Client connecting to the testbed service. */ private Client client; /** * Host that this controller runs on. */ private Host host; private int operationCounter = 1; private int peerCounter; /** * Request queue (akin to operation queue(s) in the GNUnet C implementation). */ private MatchingRequestContainer requests; abstract class OperationRequest extends Request { protected final long operationId; public OperationRequest() { operationId = (((long) host.id) << 32) | (long) operationCounter++; } } abstract class GenericOperationRequest extends OperationRequest { final protected OperationCompletionCallback cb; public GenericOperationRequest(OperationCompletionCallback cb) { this.cb = cb; } void onSuccess() { cb.onCompletion(); } } class PeerCreateRequest extends OperationRequest { final Host host; final Configuration cfg; final PeerCreateCallback cb; final int peerId; public PeerCreateRequest(Host host, Configuration cfg, PeerCreateCallback cb) { this.host = host; this.cfg = cfg; this.cb = cb; this.peerId = peerCounter++; } @Override public Envelope assembleRequest() { CompressedConfig ccfg = new CompressedConfig(cfg); PeerCreateMessage m = new PeerCreateMessage(); m.hostId = host.id; m.operationId = operationId; m.peerId = peerId; m.compressedConfig = ccfg.compressedData; m.configSize = ccfg.getUncompressedSize(); System.out.println("create getRequestIdentifier with opid " + m.operationId); return new Envelope(m); } } class PeerDestroyRequest extends GenericOperationRequest { final int peerId; public PeerDestroyRequest(Peer p, OperationCompletionCallback cb) { super(cb); peerId = p.peerId; } @Override public Envelope assembleRequest() { PeerDestroyMessage m = new PeerDestroyMessage(); m.operationId = operationId; m.peerId = peerId; System.out.println("destroy getRequestIdentifier with opid " + m.operationId); return new Envelope(m); } } class PeerInformationRequest extends OperationRequest { final int peerId; final PeerInformationCallback cb; public PeerInformationRequest(Peer peer, PeerInformationCallback cb) { peerId = peer.peerId; this.cb = cb; } @Override public Envelope assembleRequest() { PeerGetInformationMessage m = new PeerGetInformationMessage(); m.operationId = operationId; m.peerId = peerId; return new Envelope(m); } } class PeerUpdateConfigurationRequest extends GenericOperationRequest { final int peerId; final Configuration cfg; public PeerUpdateConfigurationRequest(Peer peer, OperationCompletionCallback cb, Configuration cfg) { super(cb); peerId = peer.peerId; this.cfg = cfg; } @Override public Envelope assembleRequest() { PeerReconfigureMessage m = new PeerReconfigureMessage(); m.operationId = operationId; m.peerId = peerId; CompressedConfig ccfg = new CompressedConfig(cfg); m.uncompressedConfigSize = ccfg.getUncompressedSize(); m.compressedConfig = ccfg.compressedData; System.out.println("compressed config size " +m.compressedConfig.length); return new Envelope(m); } } class PeerManageServiceRequest extends OperationRequest { final Peer peer; final boolean start; final String serviceName; public PeerManageServiceRequest(Peer peer, String serviceName, boolean start) { this.peer = peer; this.start = start; this.serviceName = serviceName; } @Override public Envelope assembleRequest() { ManagePeerServiceMessage m = new ManagePeerServiceMessage(); m.operationId = operationId; m.peerId = peer.peerId; m.serviceName = serviceName; m.start = start; return new Envelope(m); } } class PeerStartRequest extends OperationRequest { final Peer peer; final PeerChurnCallback peerChurnCallback; public PeerStartRequest(Peer peer, PeerChurnCallback peerChurnCallback) { this.peer = peer; this.peerChurnCallback = peerChurnCallback; } @Override public Envelope assembleRequest() { PeerStartMessage m = new PeerStartMessage(); m.operationId = operationId; m.peerId = peer.peerId; return new Envelope(m); } } class PeerStopRequest extends OperationRequest { final Peer peer; final PeerChurnCallback peerChurnCallback; public PeerStopRequest(Peer peer, PeerChurnCallback peerChurnCallback) { this.peer = peer; this.peerChurnCallback = peerChurnCallback; } @Override public Envelope assembleRequest() { PeerStopMessage m = new PeerStopMessage(); m.operationId = operationId; m.peerId = peer.peerId; return new Envelope(m); } } class PeerConnectOverlayRequest extends OperationRequest { final Peer peer1; final Peer peer2; final OperationCompletionCallback cb; public PeerConnectOverlayRequest(Peer peer1, Peer peer2, OperationCompletionCallback cb) { this.peer1 = peer1; this.peer2 = peer2; this.cb = cb; } @Override public Envelope assembleRequest() { OverlayConnectMessage m = new OverlayConnectMessage(); m.operationId = operationId; m.peer1 = peer1.peerId; m.peer2 = peer2.peerId; m.hostOfPeer2 = peer2.getHost().id; return new Envelope(m); } } public class ControllerMessageReceiver extends RunaboutMessageReceiver { public void visit(PeerEventMessage m) { RequestIdentifier rId = requests.getRequestIdentifier(m.operationId); OperationRequest r = rId.getRequest(); if (null == r) { logger.error("no matching peer event getRequestIdentifier for op id %s", m.operationId); return; } if (r instanceof PeerStartRequest && m.eventType == EventTypes.PEER_START) { PeerStartRequest psr = (PeerStartRequest) r; psr.peerChurnCallback.onChurnSuccess(); } else if (r instanceof PeerStopRequest && m.eventType == EventTypes.PEER_STOP) { PeerStopRequest psr = (PeerStopRequest) r; psr.peerChurnCallback.onChurnSuccess(); } else { logger.error("unexpected peer event message, event type %s and getRequestIdentifier %s", m.eventType, r); } } public void visit(PeerCreateSuccessMessage m) { RequestIdentifier rId = requests.getRequestIdentifier(m.operationId); OperationRequest r = rId.getRequest(); if (!(r instanceof PeerCreateRequest)) { logger.warn("response to peer create getRequestIdentifier does not match"); return; } PeerCreateRequest pcr = (PeerCreateRequest) r; Peer p = new Peer(pcr.peerId); pcr.cb.onPeerCreated(p); } public void visit(PeerInformationMessage m) { RequestIdentifier rId = requests.getRequestIdentifier(m.operationId); OperationRequest r = rId.getRequest(); if (null == r) { logger.error("unexpected peer information message (opid={})", m.operationId); return; } if (!(r instanceof PeerInformationRequest)) { logger.warn("response to peer create getRequestIdentifier does not match"); return; } PeerInformationRequest pir = (PeerInformationRequest) r; CompressedConfig ccfg = new CompressedConfig(m.compressedConfig); pir.cb.onSuccess(m.peerIdentity, ccfg.decompress()); } public void visit(GenericOperationSuccessMessage m) { RequestIdentifier rId = requests.getRequestIdentifier(m.operationId); OperationRequest r = rId.getRequest(); if (null == r) { logger.error("unexpected generic success message (opid={})", m.operationId); return; } if (!(r instanceof GenericOperationRequest)) { logger.error(String.format( "got GenericOperationSuccessMessage as response to getRequestIdentifier '%s', opid %s; event type %s", r.getClass(), m.operationId, m.eventType)); return; } GenericOperationRequest gr = (GenericOperationRequest) r; gr.onSuccess(); } public void visit(ConnectionEventMessage m) { RequestIdentifier rId = requests.getRequestIdentifier(m.operationId); OperationRequest r = rId.getRequest(); if (null == r) { logger.error("unexpected connection event message (opid={})", m.operationId); return; } if (!(r instanceof PeerConnectOverlayRequest)) { logger.error("unexpected connection event message for operation {}", r.getClass()); return; } PeerConnectOverlayRequest cr = (PeerConnectOverlayRequest) r; cr.cb.onCompletion(); } public void visit(OperationFailEventMessage m) { logger.error("operation failed: " + m.errorMessage); } @Override public void handleError() { throw new AssertionError(); } } /** * Connect to a controller process. The configuration to use for the connection * is retreived from the given host where a controller is started using * GNUNET_TESTBED_controller_start(). * * @param host host to run the controller on; This should be the same host if * the controller was previously started with * GNUNET_TESTBED_controller_start() */ public Controller(Host host) { this.host = host; client = new Client("testbed", host.cfg); client.installReceiver(new ControllerMessageReceiver()); requests = new MatchingRequestContainer(client); ControllerInitMessage m = new ControllerInitMessage(); // we are interested in all events m.eventMask = 1 | 2 | 4 | 8 | 16; m.controlerHostname = (host.hostname == null) ? "127.0.0.1" : host.hostname; m.hostId = host.id; client.send(m); } /** * Create the given peer at the specified host using the given * controller. If the given controller is not running on the target * host, it should find or create a controller at the target host and * delegate creating the peer. Explicit delegation paths can be setup * using 'Controller.link'. If no explicit delegation * path exists, a direct link with a subordinate controller is setup * for the first delegated peer to a particular host; the subordinate * controller is then destroyed once the last peer that was delegated * to the remote host is stopped. * * Creating the peer only creates the handle to manipulate and further * configure the peer; use "Peer.start" and * "Peer.stop" to actually start/stop the peer's * processes. * * Note that the given configuration will be adjusted by the * controller to avoid port/path conflicts with other peers. * The "final" configuration can be obtained using * 'Peer.getInformation'. * * @param host host to run the peer on; cannot be NULL * @param cfg Template configuration to use for the peer. * @param cb the callback to call when the peer has been created * @return the operation handle */ public Cancelable createPeer(Host host, Configuration cfg, PeerCreateCallback cb) { PeerCreateRequest r = new PeerCreateRequest(host, cfg, cb); return requests.addRequest(r.operationId, r); } /** * Stop the given controller (also will terminate all peers and * controllers dependent on this controller). This function * blocks until the testbed has been fully terminated (!). */ public void disconnect () { client.disconnect(); } /** * Create a link from slave controller to delegated controller. Whenever the * master controller is asked to start a peer at the delegated controller the * getRequestIdentifier will be routed towards slave controller (if a route exists). The * slave controller will then route it to the delegated controller. The * configuration of the delegated controller is given and is used to either * create the delegated controller or to connect to an existing controller. Note * that while starting the delegated controller the configuration will be * modified to accommodate available free ports. the 'is_subordinate' specifies * if the given delegated controller should be started and managed by the slave * controller, or if the delegated controller already has a master and the slave * controller connects to it as a non master controller. The success or failure * of this operation will be signalled through the * GNUNET_TESTBED_ControllerCallback() with an event of type * GNUNET_TESTBED_ET_OPERATION_FINISHED * * @param delegated_host requests to which host should be delegated; cannot be NULL * @param slave_host which host is used to run the slave controller; use NULL to * make the master controller connect to the delegated host * @param is_subordinate GNUNET_YES if the controller at delegated_host should * be started by the slave controller; GNUNET_NO if the slave * controller has to connect to the already started delegated * controller via TCP/IP * @return the operation handle */ public Cancelable link(Host delegated_host, Host slave_host, boolean is_subordinate) { // low priority throw new UnsupportedOperationException("not yet implemented"); } /** * Register a host with the controller. This makes the controller aware of the * host. A host should be registered at the controller before starting a * sub-controller on that host using GNUNET_TESTBED_controller_link(). * * @param host the host to register * @param cc the completion callback to call to inform the status of * registration. After calling this callback the registration handle * will be invalid. Cannot be NULL * @return handle to the host registration which can be used to onCancel the * registration; NULL if another registration handle is present and * is not cancelled */ Cancelable registerHost(Host host, HostRegistrationCompletion cc) { throw new UnsupportedOperationException("not implemented"); } /** * Opaque handle to a peer controlled by the testbed framework. A peer runs * at a particular host. */ public class Peer { final private int peerId; /** * Private constructor for the peer, creates the peer with the given id, * and, implicitly the containing controller. * * @param peerId id for the peer */ private Peer(int peerId) { this.peerId = peerId; } /** * Start this peer * * @param peerChurnCallback completion callback * @return handle to onCancel the operation */ public Cancelable start(PeerChurnCallback peerChurnCallback) { PeerStartRequest r = new PeerStartRequest(this, peerChurnCallback); return requests.addRequest(r.operationId, r); } /** * Stop this peer * * @param peerChurnCallback completion callback * @return handle to onCancel the operation */ public Cancelable stop(PeerChurnCallback peerChurnCallback) { PeerStopRequest r = new PeerStopRequest(this, peerChurnCallback); return requests.addRequest(r.operationId, r); } public Cancelable requestInformation(PeerInformationCallback cb) { PeerInformationRequest r = new PeerInformationRequest(this, cb); return requests.addRequest(r.operationId, r); } /* * Change peer configuration. Must only be called while the * peer is stopped. Ports and paths cannot be changed this * way. */ public Cancelable updateConfiguration(Configuration cfg, OperationCompletionCallback cb) { PeerUpdateConfigurationRequest r = new PeerUpdateConfigurationRequest(this, cb, cfg); return requests.addRequest(r.operationId, r); } /* * Change peer configuration. Must only be called while the * peer is stopped. Ports and paths cannot be changed this * way. */ public Cancelable destroy(OperationCompletionCallback cb) { PeerDestroyRequest r = new PeerDestroyRequest(this, cb); return requests.addRequest(r.operationId, r); } public Cancelable manageService(String serviceName, boolean start, OperationCompletionCallback cb) { PeerManageServiceRequest r = new PeerManageServiceRequest(this, serviceName, start); return requests.addRequest(r.operationId, r); } /** * Both peers must have been started before calling this function. * This function then obtains a HELLO from this peer, gives it to 'otherPeer' * and asks 'otherPeer' to connect to this peer. * * @param otherPeer peer to connect this peer to * @param cb callback object to signal completion or failure * @return token to onCancel the getRequestIdentifier * */ public Cancelable connectOverlay(Peer otherPeer, OperationCompletionCallback cb) { PeerConnectOverlayRequest r = new PeerConnectOverlayRequest(this, otherPeer, cb); return requests.addRequest(r.operationId, r); } /** * Connect to a service offered by the given peer. Will ensure that * the getRequestIdentifier is queued to not overwhelm our ability to create and * maintain connections with other systems. The actual service * handle is then returned via the 'op_result' member in the event * callback. The 'ca' callback is used to create the connection * when the time is right; the 'da' callback will be used to * destroy the connection (upon 'GNUNET_TESTBED_operation_done'). * 'GNUNET_TESTBED_operation_done' can be used to abort this * operation until the event callback has been called. * * @param serviceName name of the service to connect to * @param serviceAdapter callback object for connection establishment and tear-down. * @return handle for the operation */ public Cancelable getServiceConnection(String serviceName, final ServiceAdapter serviceAdapter) { return requestInformation(new PeerInformationCallback() { @Override public void onSuccess(PeerIdentity peerIdentity, Configuration configuration) { serviceAdapter.onConnect(configuration); } }); } /** * Get the host this peer is running on. * * @return the host this peer is running on */ public Host getHost() { return Controller.this.host; } } }