/* This file is part of GNUnet. (C) 2011, 2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ package org.gnunet.nse; import org.gnunet.construct.Double; import org.gnunet.construct.*; import org.gnunet.construct.ProtocolViolationException; import org.gnunet.util.*; import org.gnunet.util.getopt.Argument; import org.gnunet.util.getopt.ArgumentAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashSet; /** * An API for the network size estimation service. * * @author Florian Dold */ public class NetworkSizeEstimation { private static final Logger logger = LoggerFactory .getLogger(NetworkSizeEstimation.class); private Collection subscribers = new HashSet(1); private boolean disconnected = false; private Client client; @UnionCase(321) public static class StartMessage implements GnunetMessage.Body {} @SuppressWarnings("InstanceVariableMayNotBeInitialized") @UnionCase(323) public static class UpdateMessage implements GnunetMessage.Body { @UInt32 public int reserved; @NestedMessage public AbsoluteTimeMessage timestamp; @Double public double sizeEstimate; @Double public double stdDeviation; } private class NSE_Receiver implements MessageReceiver { @Override public void process(GnunetMessage.Body msg) { if (!(msg instanceof UpdateMessage)) { throw new ProtocolViolationException("got unexcpected message"); } UpdateMessage uMsg = (UpdateMessage) msg; for (Subscriber s : subscribers) { s.update(AbsoluteTime.fromNetwork(uMsg.timestamp), uMsg.sizeEstimate, uMsg.stdDeviation); } if (!disconnected) { client.receive(RelativeTime.FOREVER, this); } } @Override public void handleError() { logger.warn("NSE connection lost - trying to reconnect"); client.reconnect(); requestUpdate(); } } private class NSE_Transmitter implements MessageTransmitter { @Override public void transmit(Connection.MessageSink sink) { StartMessage m = new StartMessage(); sink.send(m); client.receive(RelativeTime.FOREVER, new NSE_Receiver()); } @Override public void handleError() { client.reconnect(); } } /** * A handle for a subscription to the network size estimation service, may be used to cancel the * subscription. */ public class Subscription implements Cancelable { private Subscriber sub; private Subscription(Subscriber sub) { this.sub = sub; } /** * Cancel the subscription. */ public void cancel() { subscribers.remove(sub); } } /** * A NSE_Subscriber receives updates from the service. */ public interface Subscriber { public void update(AbsoluteTime timestamp, double estimate, double deviation); } /** * Subscribe for updates from the service. * * @param s callback for updates * @return a subscription handle that may be used to cancel the subscription */ public Cancelable subscribe(Subscriber s) { subscribers.add(s); requestUpdate(); return new Subscription(s); } /** * Create a connection to the network size estimation service. * * @param cfg the configuration to use for connecting with the service */ public NetworkSizeEstimation(Configuration cfg) { client = new Client("nse", cfg); } private void requestUpdate() { client.notifyTransmitReady(RelativeTime.FOREVER, true, 0, new NSE_Transmitter()); } /** * Cancel all subscriptions and disconnect from the service. */ public void disconnect() { disconnected = true; client.disconnect(); } public static void main(String[] args) { new Program(args) { @Argument(action = ArgumentAction.SET, shortname = "w", longname = "continuous", description = "don't exit after the first estimation response") boolean cont = false; public void run() { final NetworkSizeEstimation svc = new NetworkSizeEstimation(cfg); Subscriber subscriber = new Subscriber() { @Override public void update(AbsoluteTime timestamp, double estimate, double deviation) { System.out.println("est:" + estimate + " dev: " + deviation + " t: " + timestamp); if (!cont) { svc.disconnect(); } } }; svc.subscribe(subscriber); } }.start(); } }