/* This file is part of GNUnet. (C) 2011, 2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ package org.gnunet.core; import com.google.common.collect.Maps; import org.gnunet.construct.Construct; import org.gnunet.construct.MessageLoader; import org.gnunet.core.messages.*; import org.gnunet.mq.Envelope; import org.gnunet.requests.MatchingRequestContainer; import org.gnunet.requests.Request; import org.gnunet.requests.RequestIdentifier; import org.gnunet.util.*; import org.grothoff.Runabout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; /** * API for the GNUnet core service. *

* Sends messages to connected peers. */ public class Core { /** * Logger for org.gnunet.Core. */ private static final Logger logger = LoggerFactory .getLogger(Core.class); /** * Client for connecting to the core service */ private final Client client; /* * set to null once connected for the first time */ private InitCallback initCallback; /* * Callback for traffic notifications. null if not interested. */ private HeaderNotify notifyOutboundHeaders; /* * Callback for traffic notifications. null if not interested. */ private HeaderNotify notifyInboundHeaders; /* * Callback for traffic notifications. null if not interested. */ private MessageNotify notifyOutboundMessages; /* * Callback for traffic notifications. null if not interested. */ private MessageNotify notifyInboundMessages; /* * Callbacks for connect events */ private ConnectHandler connectHandler; /** * Callback for disconnect events. */ private DisconnectHandler disconnectHandler; /** * Messages we are interested in. * Per default we are interested in all messages => specific interest set is empty. */ private int[] interested = new int[0]; /** * Handler for the messages we are interested in. */ private Runabout messageHandler; /** * Peers that we were notified about being connected to them. * Every connected peer is mapped to a generator for unique getRequestIdentifier IDs. */ private HashMap connectedPeers = Maps.newHashMap(); /** * Request container for notify transmit requests. */ private MatchingRequestContainer ntrRequests; public static class NotifyTransmitReadyRequest extends Request { private final int size; final public PeerIdentity target; final public long priority; public int smrId; final public MessageTransmitter transmitter; final public AbsoluteTime deadline; public NotifyTransmitReadyRequest(int priority, int size, PeerIdentity target, RelativeTime timeout, MessageTransmitter transmitter) { this.deadline = timeout.toAbsolute(); this.priority = priority; this.size = size; this.target = target; this.transmitter = transmitter; } @Override public Envelope assembleRequest() { SendMessageRequest m = new SendMessageRequest(); m.peer = target; m.smrId = smrId; m.priority = priority; m.size = size; m.deadline = deadline.asMessage(); return new Envelope(m); } public void onCancel() { // do nothing } } public final class CoreReceiver extends RunaboutMessageReceiver { public void visit(InitReplyMessage m) { PeerIdentity myIdentity = m.myIdentity; connectedPeers.put(myIdentity, 1); if (initCallback != null) { initCallback.onInit(m.myIdentity); initCallback = null; } } public void visit(ConnectNotifyMessage m) { if (connectHandler != null) { connectHandler.onConnect(m.peer); } } public void visit(DisconnectNotifyMessage m) { if (disconnectHandler != null) { disconnectHandler.onDisconnect(m.peer); } } public void visit(NotifyInboundTrafficMessage m) { boolean found = false; if (notifyInboundHeaders != null) { notifyInboundHeaders.notify(m.payloadHeader); } if (notifyInboundMessages != null) { // todo: call corresponding notify on notifyInboundMessages } for (int i : interested) { if (i == m.payloadHeader.messageType) { found = true; break; } } if (found) { Class bodyClass = MessageLoader.getUnionClass(GnunetMessage.Body.class, m.payloadHeader.messageType); @SuppressWarnings("unchecked") GnunetMessage.Body b = (GnunetMessage.Body) Construct.parseAs(m.payloadBody, bodyClass); messageHandler.visitAppropriate(b); } } public void visit(NotifyOutboundTrafficMessage m) { if (notifyOutboundHeaders != null) { notifyOutboundHeaders.notify(m.payloadHeader); } if (notifyOutboundMessages != null) { // todo } } public void visit(SendMessageReady m) { logger.debug("got SendMessageReady"); RequestIdentification rid = new RequestIdentification(m.smrId, m.peer); RequestIdentifier reqId = ntrRequests.getRequestIdentifier(rid); NotifyTransmitReadyRequest req = reqId.getRequest(); final SendMessage sm = new SendMessage(); sm.cork = 0; sm.peer = req.target; sm.priority = req.priority; sm.deadline = req.deadline.asMessage(); req.transmitter.transmit(new Connection.MessageSink() { boolean sent; @Override public void send(GnunetMessage.Body m) { if (sent) { throw new AssertionError("sending multiple messages not supported"); } sm.payloadMessage = GnunetMessage.fromBody(m); sent = true; } }); if (sm.payloadMessage == null) throw new AssertionError(); client.send(sm); } @Override public void visitDefault(Object o) { logger.warn("received unexpected message from core: {}", o.getClass()); } @Override public void handleError() { logger.warn("Error receiving from the transport service."); if (disconnectHandler != null) { for (PeerIdentity e : connectedPeers.keySet()) { disconnectHandler.onDisconnect(e); } } connectedPeers.clear(); } } /** * Establish a connection to the core service. * * @param cfg configuration to use */ public Core(Configuration cfg) { client = new Client("core", cfg); client.installReceiver(new CoreReceiver()); ntrRequests = new MatchingRequestContainer(client); } /** * Send to the service which messages are we interested in. * * @param initCallback called after the init message has been sent */ public void init(InitCallback initCallback) { this.initCallback = initCallback; InitMessage initMessage = new InitMessage(); initMessage.interested = interested; initMessage.options = 0; for (int i : interested) { logger.debug("we are interested in " + i); } client.sendPreferred(initMessage); } /** * Ask the core to call "notify" once it is ready to transmit the * given number of bytes to the specified "target". Must only be * called after a connection to the respective peer has been * established (and the client has been informed about this). * * @param priority how important is the message? * @param maxdelay how long can the message wait? * @param target the identity of the receiver * @param size the size of the message we want to transmit * @param transmitter called once the core service is ready to send message * @return a handle to onCancel the notification */ public Cancelable notifyTransmitReady(int priority, RelativeTime maxdelay, PeerIdentity target, int size, final MessageTransmitter transmitter) { if (!connectedPeers.containsKey(target)) { throw new AssertionError("notifyTransmitReady called for unconnected peer"); } int id = connectedPeers.get(target); connectedPeers.put(target, id+1); NotifyTransmitReadyRequest notifyRequest = new NotifyTransmitReadyRequest(priority, size, target, maxdelay, transmitter); notifyRequest.smrId = id; RequestIdentification rid = new RequestIdentification(notifyRequest.smrId, target); return ntrRequests.addRequest(rid, notifyRequest); } /** * Helper function to retrieve the peer identity with the given configuration via CORE. * Should not be used unless there is no other means to obtain the peer identity. * * @param cfg configuration to use * @param cont continuation, called with the peer identity once available */ public static void withPeerIdentity(Configuration cfg, final PeerIdentityContinuation cont) { final Core core = new Core(cfg); core.init(new InitCallback() { @Override public void onInit(PeerIdentity myIdentity) { core.disconnect(); cont.cont(myIdentity); } }); } /** * Observe outgoing message headers from core. * * @param h callback */ public void observeOutboundHeaders(HeaderNotify h) { this.notifyOutboundHeaders = h; } /** * Observe inbound headers from core. * * @param h callback */ public void observeInboundHeaders(HeaderNotify h) { this.notifyInboundHeaders = h; } /** * Observe inbound messages from core. * * @param h callback */ public void observeInboundMessages(MessageNotify h) { this.notifyInboundMessages = h; } /** * Observe outbound messages from core. * * @param h callback */ public void observeOutboundMessages(MessageNotify h) { this.notifyOutboundMessages = h; } /** * Observe core connections * * @param connectHandler callback */ public void observeConnect(ConnectHandler connectHandler) { this.connectHandler = connectHandler; } /** * Observe core disconnections. * * @param disconnectHandler callback */ public void observeDisconnect(DisconnectHandler disconnectHandler) { this.disconnectHandler = disconnectHandler; } /** * Handle all incoming messages with the specified runabout. * Has to be called before init, as the service has to know which messages we * are interested in. * * @param runabout the runabout that will handle received messages */ public void setMessageHandler(Runabout runabout) { if (messageHandler != null) { throw new AssertionError("Core can have only on message handler"); } if (client.isConnected()) { // todo: shouldn't we just reconnect? throw new AssertionError("can set message handler only if not yet connected"); } messageHandler = runabout; interested = RunaboutUtil.getRunaboutMessageTypes(runabout); } /** * Disconnect from the core service. This function can only * be called *after* all pending notifyTransmitReady * requests have been explicitly cancelled. */ public void disconnect() { client.disconnect(); } }