/*
This file is part of GNUnet.
(C) 2011, 2012 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
by the Free Software Foundation; either version 3, or (at your
option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
*/
package org.gnunet.core;
import com.google.common.collect.Maps;
import org.gnunet.construct.Construct;
import org.gnunet.construct.MessageLoader;
import org.gnunet.core.messages.*;
import org.gnunet.mq.Envelope;
import org.gnunet.requests.MatchingRequestContainer;
import org.gnunet.requests.Request;
import org.gnunet.requests.RequestIdentifier;
import org.gnunet.util.*;
import org.grothoff.Runabout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
/**
* API for the GNUnet core service.
*
* Sends messages to connected peers.
*/
public class Core {
/**
* Logger for org.gnunet.Core.
*/
private static final Logger logger = LoggerFactory
.getLogger(Core.class);
/**
* Client for connecting to the core service
*/
private final Client client;
/*
* set to null once connected for the first time
*/
private InitCallback initCallback;
/*
* Callback for traffic notifications. null if not interested.
*/
private HeaderNotify notifyOutboundHeaders;
/*
* Callback for traffic notifications. null if not interested.
*/
private HeaderNotify notifyInboundHeaders;
/*
* Callback for traffic notifications. null if not interested.
*/
private MessageNotify notifyOutboundMessages;
/*
* Callback for traffic notifications. null if not interested.
*/
private MessageNotify notifyInboundMessages;
/*
* Callbacks for connect events
*/
private ConnectHandler connectHandler;
/**
* Callback for disconnect events.
*/
private DisconnectHandler disconnectHandler;
/**
* Messages we are interested in.
* Per default we are interested in all messages => specific interest set is empty.
*/
private int[] interested = new int[0];
/**
* Handler for the messages we are interested in.
*/
private Runabout messageHandler;
/**
* Peers that we were notified about being connected to them.
* Every connected peer is mapped to a generator for unique getRequestIdentifier IDs.
*/
private HashMap connectedPeers = Maps.newHashMap();
/**
* Request container for notify transmit requests.
*/
private MatchingRequestContainer ntrRequests;
public static class NotifyTransmitReadyRequest extends Request {
private final int size;
final public PeerIdentity target;
final public long priority;
public int smrId;
final public MessageTransmitter transmitter;
final public AbsoluteTime deadline;
public NotifyTransmitReadyRequest(int priority, int size, PeerIdentity target,
RelativeTime timeout, MessageTransmitter transmitter) {
this.deadline = timeout.toAbsolute();
this.priority = priority;
this.size = size;
this.target = target;
this.transmitter = transmitter;
}
@Override
public Envelope assembleRequest() {
SendMessageRequest m = new SendMessageRequest();
m.peer = target;
m.smrId = smrId;
m.priority = priority;
m.size = size;
m.deadline = deadline.asMessage();
return new Envelope(m);
}
public void onCancel() {
// do nothing
}
}
public final class CoreReceiver extends RunaboutMessageReceiver {
public void visit(InitReplyMessage m) {
PeerIdentity myIdentity = m.myIdentity;
connectedPeers.put(myIdentity, 1);
if (initCallback != null) {
initCallback.onInit(m.myIdentity);
initCallback = null;
}
}
public void visit(ConnectNotifyMessage m) {
if (connectHandler != null) {
connectHandler.onConnect(m.peer);
}
}
public void visit(DisconnectNotifyMessage m) {
if (disconnectHandler != null) {
disconnectHandler.onDisconnect(m.peer);
}
}
public void visit(NotifyInboundTrafficMessage m) {
boolean found = false;
if (notifyInboundHeaders != null) {
notifyInboundHeaders.notify(m.payloadHeader);
}
if (notifyInboundMessages != null) {
// todo: call corresponding notify on notifyInboundMessages
}
for (int i : interested) {
if (i == m.payloadHeader.messageType) {
found = true;
break;
}
}
if (found) {
Class bodyClass = MessageLoader.getUnionClass(GnunetMessage.Body.class, m.payloadHeader.messageType);
@SuppressWarnings("unchecked")
GnunetMessage.Body b = (GnunetMessage.Body) Construct.parseAs(m.payloadBody, bodyClass);
messageHandler.visitAppropriate(b);
}
}
public void visit(NotifyOutboundTrafficMessage m) {
if (notifyOutboundHeaders != null) {
notifyOutboundHeaders.notify(m.payloadHeader);
}
if (notifyOutboundMessages != null) {
// todo
}
}
public void visit(SendMessageReady m) {
logger.debug("got SendMessageReady");
RequestIdentification rid = new RequestIdentification(m.smrId, m.peer);
RequestIdentifier reqId = ntrRequests.getRequestIdentifier(rid);
NotifyTransmitReadyRequest req = reqId.getRequest();
final SendMessage sm = new SendMessage();
sm.cork = 0;
sm.peer = req.target;
sm.priority = req.priority;
sm.deadline = req.deadline.asMessage();
req.transmitter.transmit(new Connection.MessageSink() {
boolean sent;
@Override
public void send(GnunetMessage.Body m) {
if (sent) {
throw new AssertionError("sending multiple messages not supported");
}
sm.payloadMessage = GnunetMessage.fromBody(m);
sent = true;
}
});
if (sm.payloadMessage == null)
throw new AssertionError();
client.send(sm);
}
@Override
public void visitDefault(Object o) {
logger.warn("received unexpected message from core: {}", o.getClass());
}
@Override
public void handleError() {
logger.warn("Error receiving from the transport service.");
if (disconnectHandler != null) {
for (PeerIdentity e : connectedPeers.keySet()) {
disconnectHandler.onDisconnect(e);
}
}
connectedPeers.clear();
}
}
/**
* Establish a connection to the core service.
*
* @param cfg configuration to use
*/
public Core(Configuration cfg) {
client = new Client("core", cfg);
client.installReceiver(new CoreReceiver());
ntrRequests = new MatchingRequestContainer(client);
}
/**
* Send to the service which messages are we interested in.
*
* @param initCallback called after the init message has been sent
*/
public void init(InitCallback initCallback) {
this.initCallback = initCallback;
InitMessage initMessage = new InitMessage();
initMessage.interested = interested;
initMessage.options = 0;
for (int i : interested) {
logger.debug("we are interested in " + i);
}
client.sendPreferred(initMessage);
}
/**
* Ask the core to call "notify" once it is ready to transmit the
* given number of bytes to the specified "target". Must only be
* called after a connection to the respective peer has been
* established (and the client has been informed about this).
*
* @param priority how important is the message?
* @param maxdelay how long can the message wait?
* @param target the identity of the receiver
* @param size the size of the message we want to transmit
* @param transmitter called once the core service is ready to send message
* @return a handle to onCancel the notification
*/
public Cancelable notifyTransmitReady(int priority, RelativeTime maxdelay,
PeerIdentity target, int size, final MessageTransmitter transmitter) {
if (!connectedPeers.containsKey(target)) {
throw new AssertionError("notifyTransmitReady called for unconnected peer");
}
int id = connectedPeers.get(target);
connectedPeers.put(target, id+1);
NotifyTransmitReadyRequest notifyRequest = new NotifyTransmitReadyRequest(priority, size, target,
maxdelay, transmitter);
notifyRequest.smrId = id;
RequestIdentification rid = new RequestIdentification(notifyRequest.smrId, target);
return ntrRequests.addRequest(rid, notifyRequest);
}
/**
* Helper function to retrieve the peer identity with the given configuration via CORE.
* Should not be used unless there is no other means to obtain the peer identity.
*
* @param cfg configuration to use
* @param cont continuation, called with the peer identity once available
*/
public static void withPeerIdentity(Configuration cfg, final PeerIdentityContinuation cont) {
final Core core = new Core(cfg);
core.init(new InitCallback() {
@Override
public void onInit(PeerIdentity myIdentity) {
core.disconnect();
cont.cont(myIdentity);
}
});
}
/**
* Observe outgoing message headers from core.
*
* @param h callback
*/
public void observeOutboundHeaders(HeaderNotify h) {
this.notifyOutboundHeaders = h;
}
/**
* Observe inbound headers from core.
*
* @param h callback
*/
public void observeInboundHeaders(HeaderNotify h) {
this.notifyInboundHeaders = h;
}
/**
* Observe inbound messages from core.
*
* @param h callback
*/
public void observeInboundMessages(MessageNotify h) {
this.notifyInboundMessages = h;
}
/**
* Observe outbound messages from core.
*
* @param h callback
*/
public void observeOutboundMessages(MessageNotify h) {
this.notifyOutboundMessages = h;
}
/**
* Observe core connections
*
* @param connectHandler callback
*/
public void observeConnect(ConnectHandler connectHandler) {
this.connectHandler = connectHandler;
}
/**
* Observe core disconnections.
*
* @param disconnectHandler callback
*/
public void observeDisconnect(DisconnectHandler disconnectHandler) {
this.disconnectHandler = disconnectHandler;
}
/**
* Handle all incoming messages with the specified runabout.
* Has to be called before init, as the service has to know which messages we
* are interested in.
*
* @param runabout the runabout that will handle received messages
*/
public void setMessageHandler(Runabout runabout) {
if (messageHandler != null) {
throw new AssertionError("Core can have only on message handler");
}
if (client.isConnected()) {
// todo: shouldn't we just reconnect?
throw new AssertionError("can set message handler only if not yet connected");
}
messageHandler = runabout;
interested = RunaboutUtil.getRunaboutMessageTypes(runabout);
}
/**
* Disconnect from the core service. This function can only
* be called *after* all pending notifyTransmitReady
* requests have been explicitly cancelled.
*/
public void disconnect() {
client.disconnect();
}
}