aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-07-24 10:50:01 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-07-24 10:50:01 +0000
commite4a01152171e4186e4718c4c9c92842fccf6d8a1 (patch)
treef17311a274a80efb31a693dfcfaf3a431378bb13 /src
parentcf715527f54ad4e0628088b2cc63fcb8bb35533d (diff)
downloadgnunet-java-e4a01152171e4186e4718c4c9c92842fccf6d8a1.tar.gz
gnunet-java-e4a01152171e4186e4718c4c9c92842fccf6d8a1.zip
- mq for java
- started with consensus - started with new mesh
Diffstat (limited to 'src')
-rw-r--r--src/org/gnunet/consensus/ConcludeCallback.java5
-rw-r--r--src/org/gnunet/consensus/ConcludeDoneMessage.java12
-rw-r--r--src/org/gnunet/consensus/ConcludeMessage.java19
-rw-r--r--src/org/gnunet/consensus/Consensus.java120
-rw-r--r--src/org/gnunet/consensus/ConsensusElement.java16
-rw-r--r--src/org/gnunet/consensus/InsertDoneCallback.java5
-rw-r--r--src/org/gnunet/consensus/InsertElementMessage.java23
-rw-r--r--src/org/gnunet/consensus/NewElementCallback.java5
-rw-r--r--src/org/gnunet/consensus/NewElementMessage.java20
-rw-r--r--src/org/gnunet/construct/MsgMap.txt2
-rw-r--r--src/org/gnunet/mesh/ClientConnectMessage.java14
-rw-r--r--src/org/gnunet/mesh/ConnectPeerByTypeMessage.java18
-rw-r--r--src/org/gnunet/mesh/DataMessage.java (renamed from src/org/gnunet/mesh/UnicastMessage.java)10
-rw-r--r--src/org/gnunet/mesh/Mesh.java478
-rw-r--r--src/org/gnunet/mesh/MulticastMessage.java27
-rw-r--r--src/org/gnunet/mesh/OriginMessage.java27
-rw-r--r--src/org/gnunet/mesh/PeerAddMessage.java23
-rw-r--r--src/org/gnunet/mesh/PeerDeleteMessage.java23
-rw-r--r--src/org/gnunet/mesh/TunnelCreateMessage.java17
-rw-r--r--src/org/gnunet/mq/ClientMessageQueue.java48
-rw-r--r--src/org/gnunet/mq/Envelope.java34
-rw-r--r--src/org/gnunet/mq/MessageQueue.java41
-rw-r--r--src/org/gnunet/mq/NotifySentHandler.java6
-rw-r--r--src/org/gnunet/requests/RequestQueue.java1
-rw-r--r--src/org/gnunet/transport/RequestConnectMessage.java1
-rw-r--r--src/org/gnunet/transport/Transport.java15
26 files changed, 525 insertions, 485 deletions
diff --git a/src/org/gnunet/consensus/ConcludeCallback.java b/src/org/gnunet/consensus/ConcludeCallback.java
new file mode 100644
index 0000000..7660c49
--- /dev/null
+++ b/src/org/gnunet/consensus/ConcludeCallback.java
@@ -0,0 +1,5 @@
1package org.gnunet.consensus;
2
3public interface ConcludeCallback {
4 void onConcludeDone();
5}
diff --git a/src/org/gnunet/consensus/ConcludeDoneMessage.java b/src/org/gnunet/consensus/ConcludeDoneMessage.java
new file mode 100644
index 0000000..3b016cf
--- /dev/null
+++ b/src/org/gnunet/consensus/ConcludeDoneMessage.java
@@ -0,0 +1,12 @@
1package org.gnunet.consensus;
2
3
4import org.gnunet.construct.UnionCase;
5
6/**
7 * Notify the client that conclude has finished.
8 * Direction: service -> client
9 */
10@UnionCase(525)
11public class ConcludeDoneMessage {
12}
diff --git a/src/org/gnunet/consensus/ConcludeMessage.java b/src/org/gnunet/consensus/ConcludeMessage.java
new file mode 100644
index 0000000..7b43928
--- /dev/null
+++ b/src/org/gnunet/consensus/ConcludeMessage.java
@@ -0,0 +1,19 @@
1package org.gnunet.consensus;
2
3import org.gnunet.construct.FillWith;
4import org.gnunet.construct.UInt16;
5import org.gnunet.construct.UInt8;
6import org.gnunet.construct.UnionCase;
7import org.gnunet.util.GnunetMessage;
8
9/**
10 * Notify the client of a new element.
11 *
12 * Direction: service -> client
13 *
14 * @author Florian Dold
15 */
16@UnionCase(524)
17public class ConcludeMessage implements GnunetMessage.Body {
18 /* empty body */
19} \ No newline at end of file
diff --git a/src/org/gnunet/consensus/Consensus.java b/src/org/gnunet/consensus/Consensus.java
new file mode 100644
index 0000000..e6fab01
--- /dev/null
+++ b/src/org/gnunet/consensus/Consensus.java
@@ -0,0 +1,120 @@
1package org.gnunet.consensus;
2
3import org.gnunet.mq.ClientMessageQueue;
4import org.gnunet.mq.Envelope;
5import org.gnunet.mq.MessageQueue;
6import org.gnunet.mq.NotifySentHandler;
7import org.gnunet.util.*;
8import org.slf4j.Logger;
9import org.slf4j.LoggerFactory;
10
11/**
12 * Multi-peer set reconciliation.
13 */
14public class Consensus {
15 /**
16 * Class logger.
17 */
18 private static final Logger logger = LoggerFactory
19 .getLogger(Consensus.class);
20
21 /**
22 * Callback for new elements arriving from the service.
23 * Also used to notify of consensus failure.
24 */
25 private final NewElementCallback newElementCallback;
26
27 /**
28 * Client connected to the consensus service.
29 */
30 private Client client;
31
32 /**
33 * Message queue for 'client'.
34 */
35 private MessageQueue client_mq;
36
37 /**
38 * Called when conclude has finished.
39 */
40 private ConcludeCallback concludeCallback;
41
42 /**
43 * Message dispatch for messages from the consensus service.
44 */
45 private class ConsensusMessageReceiver extends RunaboutMessageReceiver {
46 public void visit(ConcludeDoneMessage m) {
47 if (null == concludeCallback)
48 {
49 logger.error("unexpected conclude done message");
50 return;
51 }
52 concludeCallback.onConcludeDone();
53 }
54
55 public void visit(NewElementMessage m) {
56 ConsensusElement element = new ConsensusElement();
57 element.element_type = m.element_type;
58 element.data = m.element_data;
59 newElementCallback.onNewElement(element);
60 }
61
62 @Override
63 public void handleError() {
64 newElementCallback.onNewElement(null);
65 }
66 }
67
68 /**
69 * Create a consensus session. The set being reconciled is initially
70 * empty. Only reconcile with other peers after
71 * GNUNET_CONSENSUS_reconcile has been called.
72 *
73 * @param num_peers number of peers in the session
74 * @param peers array of peers participating in this consensus session
75 * Inclusion of the local peer is optional.
76 * @param sessionId session identifier
77 * Allows a group of peers to have more than consensus session.
78 * @param newElementCallback callback, called when a new element is added to the set by
79 * another peer
80 */
81 public Consensus(Configuration cfg, int num_peers, PeerIdentity[] peers, HashCode sessionId,
82 NewElementCallback newElementCallback) {
83 client = new Client("consensus", cfg);
84 client_mq = new ClientMessageQueue(client, new ConsensusMessageReceiver());
85 this.newElementCallback = newElementCallback;
86 }
87
88 public void insertElement (ConsensusElement element, final InsertDoneCallback idc) {
89 InsertElementMessage m = new InsertElementMessage();
90 m.element_data = element.data;
91 m.element_type = element.element_type;
92 Envelope ev = new Envelope(m);
93 ev.notifySent(new NotifySentHandler() {
94 @Override
95 public void onSent() {
96 idc.onInsertDone();
97 }
98 });
99 client_mq.send(ev);
100 }
101
102 public void conclude(ConcludeCallback concludeCallback) {
103 if (null == concludeCallback)
104 throw new AssertionError("conclude with empty callback");
105 if (null != this.concludeCallback)
106 throw new AssertionError("called conclude twice");
107 this.concludeCallback = concludeCallback;
108 }
109
110 /**
111 * Destroy a consensus handle (free all state associated with
112 * it, no longer call any of the callbacks).
113 */
114 public void destroy() {
115 client_mq.destroy();
116 client_mq = null;
117 client.disconnect();
118 client = null;
119 }
120}
diff --git a/src/org/gnunet/consensus/ConsensusElement.java b/src/org/gnunet/consensus/ConsensusElement.java
new file mode 100644
index 0000000..846e72a
--- /dev/null
+++ b/src/org/gnunet/consensus/ConsensusElement.java
@@ -0,0 +1,16 @@
1package org.gnunet.consensus;
2
3
4public class ConsensusElement {
5 /**
6 * Type of the element.
7 * 0 <= element_type <= 2^16
8 */
9 int element_type;
10
11 /**
12 * Data for the element.
13 * 0 <= data.length <= 2^16
14 */
15 byte[] data;
16}
diff --git a/src/org/gnunet/consensus/InsertDoneCallback.java b/src/org/gnunet/consensus/InsertDoneCallback.java
new file mode 100644
index 0000000..f0e86ca
--- /dev/null
+++ b/src/org/gnunet/consensus/InsertDoneCallback.java
@@ -0,0 +1,5 @@
1package org.gnunet.consensus;
2
3public interface InsertDoneCallback {
4 void onInsertDone();
5}
diff --git a/src/org/gnunet/consensus/InsertElementMessage.java b/src/org/gnunet/consensus/InsertElementMessage.java
new file mode 100644
index 0000000..fd0ff67
--- /dev/null
+++ b/src/org/gnunet/consensus/InsertElementMessage.java
@@ -0,0 +1,23 @@
1package org.gnunet.consensus;
2
3import org.gnunet.construct.FillWith;
4import org.gnunet.construct.UInt16;
5import org.gnunet.construct.UInt8;
6import org.gnunet.construct.UnionCase;
7import org.gnunet.util.GnunetMessage;
8
9/**
10 * Send an element to the service, insert it into the consensus set.
11 *
12 * Direction: client -> service
13 *
14 * @author Florian Dold
15 */
16@UnionCase(521)
17public class InsertElementMessage implements GnunetMessage.Body {
18 @UInt16
19 public int element_type;
20 @FillWith
21 @UInt8
22 public byte[] element_data;
23} \ No newline at end of file
diff --git a/src/org/gnunet/consensus/NewElementCallback.java b/src/org/gnunet/consensus/NewElementCallback.java
new file mode 100644
index 0000000..4b07a71
--- /dev/null
+++ b/src/org/gnunet/consensus/NewElementCallback.java
@@ -0,0 +1,5 @@
1package org.gnunet.consensus;
2
3public interface NewElementCallback {
4 void onNewElement(ConsensusElement element);
5}
diff --git a/src/org/gnunet/consensus/NewElementMessage.java b/src/org/gnunet/consensus/NewElementMessage.java
new file mode 100644
index 0000000..deb3634
--- /dev/null
+++ b/src/org/gnunet/consensus/NewElementMessage.java
@@ -0,0 +1,20 @@
1package org.gnunet.consensus;
2
3import org.gnunet.construct.*;
4import org.gnunet.util.GnunetMessage;
5
6/**
7 * Notify the client of a new element.
8 *
9 * Direction: service -> client
10 *
11 * @author Florian Dold
12 */
13@UnionCase(523)
14public class NewElementMessage implements GnunetMessage.Body {
15 @UInt16
16 public int element_type;
17 @FillWith
18 @UInt8
19 public byte[] element_data;
20} \ No newline at end of file
diff --git a/src/org/gnunet/construct/MsgMap.txt b/src/org/gnunet/construct/MsgMap.txt
index 719e92a..30e7077 100644
--- a/src/org/gnunet/construct/MsgMap.txt
+++ b/src/org/gnunet/construct/MsgMap.txt
@@ -25,7 +25,7 @@ org.gnunet.util.GnunetMessage$Body|153=org.gnunet.dht.MonitorStartStop
25org.gnunet.util.GnunetMessage$Body|155=org.gnunet.dht.ClientPutConfirmationMessage 25org.gnunet.util.GnunetMessage$Body|155=org.gnunet.dht.ClientPutConfirmationMessage
26org.gnunet.util.GnunetMessage$Body|262=org.gnunet.mesh.OriginMessage 26org.gnunet.util.GnunetMessage$Body|262=org.gnunet.mesh.OriginMessage
27org.gnunet.util.GnunetMessage$Body|323=org.gnunet.nse.UpdateMessage 27org.gnunet.util.GnunetMessage$Body|323=org.gnunet.nse.UpdateMessage
28org.gnunet.util.GnunetMessage$Body|260=org.gnunet.mesh.UnicastMessage 28org.gnunet.util.GnunetMessage$Body|260=org.gnunet.mesh.DataMessage
29org.gnunet.util.GnunetMessage$Body|261=org.gnunet.mesh.MulticastMessage 29org.gnunet.util.GnunetMessage$Body|261=org.gnunet.mesh.MulticastMessage
30org.gnunet.util.GnunetMessage$Body|321=org.gnunet.nse.StartMessage 30org.gnunet.util.GnunetMessage$Body|321=org.gnunet.nse.StartMessage
31org.gnunet.util.GnunetMessage$Body|144=org.gnunet.dht.ClientGetStopMessage 31org.gnunet.util.GnunetMessage$Body|144=org.gnunet.dht.ClientGetStopMessage
diff --git a/src/org/gnunet/mesh/ClientConnectMessage.java b/src/org/gnunet/mesh/ClientConnectMessage.java
index 57edede..1a56ebb 100644
--- a/src/org/gnunet/mesh/ClientConnectMessage.java
+++ b/src/org/gnunet/mesh/ClientConnectMessage.java
@@ -12,18 +12,6 @@ import org.gnunet.util.GnunetMessage;
12 */ 12 */
13@UnionCase(272) 13@UnionCase(272)
14public class ClientConnectMessage implements GnunetMessage.Body { 14public class ClientConnectMessage implements GnunetMessage.Body {
15 @UInt16 15 @IntegerFill(signed = false, bitSize = 32)
16 public int applications_length;
17 @UInt16
18 public int types_length;
19 /**
20 * List of applications that this client claims to provide.
21 */
22 @VariableSizeIntegerArray(lengthField = "applications_length", signed = false, bitSize = 32)
23 public int[] apps_list; 16 public int[] apps_list;
24 /**
25 * Message types that this client understands.
26 */
27 @VariableSizeIntegerArray(lengthField = "types_length", signed = false, bitSize = 16)
28 public int[] types_list;
29} 17}
diff --git a/src/org/gnunet/mesh/ConnectPeerByTypeMessage.java b/src/org/gnunet/mesh/ConnectPeerByTypeMessage.java
deleted file mode 100644
index fe8bfe0..0000000
--- a/src/org/gnunet/mesh/ConnectPeerByTypeMessage.java
+++ /dev/null
@@ -1,18 +0,0 @@
1package org.gnunet.mesh;
2
3import org.gnunet.construct.UInt32;
4import org.gnunet.construct.UnionCase;
5import org.gnunet.util.GnunetMessage;
6/**
7 * ...
8 *
9 * @author Florian Dold
10 */
11@UnionCase(277)
12public class ConnectPeerByTypeMessage implements GnunetMessage.Body {
13 @UInt32
14 public int tunnelId;
15
16 @UInt32
17 public int applicationType;
18}
diff --git a/src/org/gnunet/mesh/UnicastMessage.java b/src/org/gnunet/mesh/DataMessage.java
index 141d49a..92546c8 100644
--- a/src/org/gnunet/mesh/UnicastMessage.java
+++ b/src/org/gnunet/mesh/DataMessage.java
@@ -10,17 +10,9 @@ import org.gnunet.util.PeerIdentity;
10 * @author Florian Dold 10 * @author Florian Dold
11 */ 11 */
12@UnionCase(260) 12@UnionCase(260)
13public class UnicastMessage implements GnunetMessage.Body { 13public class DataMessage implements GnunetMessage.Body {
14 @UInt32 14 @UInt32
15 public int tid; 15 public int tid;
16 @UInt32
17 public int ttl;
18 @UInt32
19 public int pid;
20 @NestedMessage
21 public PeerIdentity oid;
22 @NestedMessage
23 public PeerIdentity destination;
24 @FillWith 16 @FillWith
25 @UInt8 17 @UInt8
26 public byte[] payload; 18 public byte[] payload;
diff --git a/src/org/gnunet/mesh/Mesh.java b/src/org/gnunet/mesh/Mesh.java
index 6f14451..091cb17 100644
--- a/src/org/gnunet/mesh/Mesh.java
+++ b/src/org/gnunet/mesh/Mesh.java
@@ -20,15 +20,17 @@
20 20
21package org.gnunet.mesh; 21package org.gnunet.mesh;
22 22
23import com.google.common.collect.Maps;
24import org.gnunet.construct.Construct; 23import org.gnunet.construct.Construct;
25import org.gnunet.requests.FixedMessageRequest; 24import org.gnunet.mq.ClientMessageQueue;
25import org.gnunet.mq.Envelope;
26import org.gnunet.mq.MessageQueue;
26import org.gnunet.requests.Request; 27import org.gnunet.requests.Request;
27import org.gnunet.requests.RequestQueue; 28import org.gnunet.requests.RequestQueue;
28import org.gnunet.util.*; 29import org.gnunet.util.*;
29import org.slf4j.Logger; 30import org.slf4j.Logger;
30import org.slf4j.LoggerFactory; 31import org.slf4j.LoggerFactory;
31 32
33import java.util.HashMap;
32import java.util.Map; 34import java.util.Map;
33 35
34/** 36/**
@@ -37,345 +39,188 @@ import java.util.Map;
37 * @author Florian Dold 39 * @author Florian Dold
38 */ 40 */
39public class Mesh { 41public class Mesh {
42 /**
43 * Class logger.
44 */
40 private static final Logger logger = LoggerFactory 45 private static final Logger logger = LoggerFactory
41 .getLogger(Mesh.class); 46 .getLogger(Mesh.class);
42 47
43 /** 48 /**
44 * How many messages can we send to the service until we have to wait for an ACK from it? 49 * For tunnels created by the client, the bit in this
50 * mask is always set.
45 */ 51 */
46 private static final int INITIAL_WINDOW_SIZE = 8; 52 private static final int TUNNEL_ID_CLI = 0x80000000;
53
47 /** 54 /**
48 * Requests queued to be sent to the mesh service. 55 * For tunnels created by the server, the bit in this
56 * mask is always set.
49 */ 57 */
50 private RequestQueue requestQueue; 58 private static final int TUNNEL_ID_SERV = 0xB0000000;
59
51 /** 60 /**
52 * Called whenever a tunnel was destroyed. 61 * Disable buffering on intermediate nodes (for minimum latency).
62 * Yes/No.
53 */ 63 */
54 private TunnelEndHandler tunnelEndHandler; 64 private static final int OPTION_NOBUFFER = 1;
55 private MeshRunabout messageReceiver;
56 private int[] applications;
57 private InboundTunnelHandler inboundTunnelHandler;
58 65
59 private final static int LOCAL_TUNNEL_ID_CLI = 0x80000000; 66 /**
60 private final static int LOCAL_TUNNEL_ID_SERV = 0xB0000000; 67 * Enable tunnel reliability, lost messages will be retransmitted.
68 * Yes/No.
69 */
70 private static final int OPTION_RELIABLE = 2;
61 71
62 private int nextTunnelId = LOCAL_TUNNEL_ID_CLI; 72 /**
73 * Client connected to the mesh service
74 */
75 private final Client client;
63 76
64 /** 77 /**
65 * Stores all tunnels created by this client, referenced by their local tunnel id. 78 * Message queue for the client.
66 */ 79 */
67 private Map<Integer, Tunnel> tunnelMap = Maps.newTreeMap(); 80 private final ClientMessageQueue client_mq;
68 81
82 /**
83 * Called whenever a tunnel was destroyed.
84 */
85 private TunnelEndHandler tunnelEndHandler;
69 86
70 public class OriginTunnel extends Tunnel { 87 /**
71 public DisconnectHandler disconnectHandler; 88 * Message handler for messages received through
72 public ConnectHandler connectHandler; 89 * a tunnel.
90 */
91 private MeshRunabout messageReceiver;
73 92
74 public void addPeer(PeerIdentity peerIdentity) { 93 /**
75 throw new UnsupportedOperationException("not implemented"); 94 * Ports that we listen on.
76 } 95 */
96 private int[] ports;
77 97
78 /** 98 /**
79 * Request that the given peer isn't added to this tunnel in calls to 99 * Handler for inbound tunnels.
80 * connect_by_* calls, (due to misbehaviour, bad performance, ...). 100 */
81 * 101 private InboundTunnelHandler inboundTunnelHandler;
82 * @param peerIdentity peer identity of the peer which should be blacklisted
83 * for the tunnel.
84 */
85 public void blacklist(PeerIdentity peerIdentity) {
86 throw new UnsupportedOperationException("not implemented");
87 }
88 102
89 /** 103 /**
90 * Request that the given peer isn't blacklisted anymore from this tunnel, 104 * Mapping from the tunnel's ID to the tunnel object.
91 * and therefore can be added in future calls to connect*. 105 */
92 * The peer must have been previously blacklisted for this tunnel. 106 private Map<Integer,Tunnel> tunnelMap = new HashMap<>();
93 *
94 * @param peerIdentity peer identity of the peer which shouldn't be blacklisted
95 * for the tunnel anymore.
96 */
97 public void unblacklist(PeerIdentity peerIdentity) {
98 throw new UnsupportedOperationException("not implemented");
99 }
100 107
101 /** 108 /**
102 * Request that the mesh should try to connect to a peer supporting the given 109 * Counter for generating fresh tunnel ID's
103 * message type. 110 * when creating new tunnels.
104 * 111 */
105 * @param appType application type that must be supported by the peer 112 int next_tid = 1;
106 * (MESH should discover peer in proximity handling this type)
107 */
108 public void requestConnectByType(int appType) {
109 ConnectPeerByTypeMessage m = new ConnectPeerByTypeMessage();
110 m.applicationType = appType;
111 m.tunnelId = tunnelId;
112 requestQueue.add(new FixedMessageRequest(m));
113 }
114 113
115 /** 114 /**
116 * Request that the mesh should try to connect to a peer matching the 115 * A tunnel to a remote peer.
117 * description given in the service string. 116 * @param <T> type of context data for the tunnel
118 * 117 */
119 * @param description string describing the destination node requirements 118 public class Tunnel<T> extends MessageQueue {
120 */ 119 private final int opt;
121 public void requestConnectByString(String description) { 120 public final PeerIdentity peer;
122 throw new UnsupportedOperationException("not implemented"); 121 public final int port;
123 } 122 protected int tunnelId;
123 private boolean receive_done_expected = false;
124 int ack_count = 0;
124 125
125 /** 126 /**
126 * Request that a peer should be added to the tunnel. The connect handler 127 * Create a new tunnel (we're initiator and will be allowed to add/remove peers
127 * will be called when the peer connects 128 * and to broadcast).
128 * 129 *
129 * @param peer peer to add 130 * @param context tunnel context
131 * @param peer peer identity the tunnel should go to
132 * @param port Port number.
133 * @param nobuffer Flag for disabling buffering on relay nodes.
134 * @param reliable Flag for end-to-end reliability.
130 */ 135 */
131 public void requestConnectAdd(PeerIdentity peer) { 136 public Tunnel(PeerIdentity peer, int port, boolean nobuffer, boolean reliable, T context)
132 throw new UnsupportedOperationException("not implemented"); 137 {
138 this(peer, 0, port, nobuffer, reliable);
139 TunnelCreateMessage tcm = new TunnelCreateMessage();
140 tcm.otherEnd = peer;
141 tcm.opt = opt;
142 tcm.port = port;
143 tcm.tunnel_id = tunnelId;
144 client_mq.send(tcm);
133 } 145 }
134 146
135 /** 147 /**
136 * Request that a peer should be removed from the tunnel. The existing 148 * Private tunnel constructor, for creating tunnel objects for
137 * disconnect handler will be called ONCE if we were connected. 149 * incoming tunnels.
138 * 150 *
139 * @param peer peer to remove 151 * @param peer
152 * @param tunnelId
153 * @param port
154 * @param nobuffer
155 * @param reliable
140 */ 156 */
141 public void requestConnectDel(PeerIdentity peer) { 157 private Tunnel(PeerIdentity peer, int tunnelId, int port, boolean nobuffer, boolean reliable) {
142 throw new UnsupportedOperationException("not implemented"); 158 int my_opt = 0;
143 } 159 if (reliable)
144 160 my_opt |= OPTION_RELIABLE;
145 private void registerWithService() { 161 if (nobuffer)
146 requestQueue.add(new TunnelCreateRequest(this)); 162 my_opt |= OPTION_NOBUFFER;
147 } 163 if (0 == tunnelId)
148 } 164 this.tunnelId = ((next_tid++) | TUNNEL_ID_CLI) & ~TUNNEL_ID_SERV;
149 165 else
150 public static class TunnelTransmitRequest extends Request { 166 this.tunnelId = tunnelId;
151 167 this.peer = peer;
152 public boolean doNotSend = false; 168 this.port = port;
153 169 this.opt = my_opt;
154 static class Sink implements Connection.MessageSink {
155 byte[] payload;
156
157 @Override
158 public void send(GnunetMessage.Body m) {
159 if (payload != null) {
160 throw new AssertionError("only one payload allowed per transmitter");
161 }
162 payload = Construct.toBinary(GnunetMessage.fromBody(m));
163 }
164 }
165
166 public PeerIdentity target;
167 public MessageTransmitter transmitter;
168 public Tunnel tunnel;
169
170 @Override
171 public void transmit(Connection.MessageSink sink) {
172 if (doNotSend) {
173 return;
174 }
175
176 Sink s = new Sink();
177 transmitter.transmit(s);
178
179 if (tunnel.tunnelId >= Mesh.LOCAL_TUNNEL_ID_SERV) {
180 // we are not the origin, thus can only send to origin
181 OriginMessage m = new OriginMessage();
182 m.tid = tunnel.tunnelId;
183 m.pid = tunnel.nextSentPacketId++;
184 m.payload = s.payload;
185 m.oid = new PeerIdentity();
186 m.sender = m.oid;
187 sink.send(m);
188 } else if (target == null) {
189 // multicast, we are origin
190 MulticastMessage m = new MulticastMessage();
191 m.tid = tunnel.tunnelId;
192 m.pid = tunnel.nextSentPacketId++;
193 m.payload = s.payload;
194 sink.send(m);
195 } else {
196 // unicast
197 System.out.println("sending unicast");
198 UnicastMessage m = new UnicastMessage();
199 m.destination = target;
200 m.oid = new PeerIdentity();
201 m.tid = tunnel.tunnelId;
202 m.pid = tunnel.nextSentPacketId++;
203 m.payload = s.payload;
204 sink.send(m);
205 }
206 } 170 }
207 }
208
209 public class Tunnel {
210 protected int tunnelId;
211
212 private int nextSentPacketId = 0;
213 private int maxSentPacketId = INITIAL_WINDOW_SIZE - 1;
214
215 private TunnelTransmitRequest waitingTunnelTransmitRequest;
216
217 /**
218 * Ask the mesh to call "notify" once it is ready to transmit the
219 * given number of bytes to the specified tunnel or target.
220 * Only one call can be active at any time, to issue another request,
221 * wait for the callback or cancel the current request.
222 *
223 * @param maxdelay how long can the message wait?
224 * @param target destination for the message
225 * NULL for multicast to all tunnel targets
226 * @param notify_size how many bytes of buffer space does notify want?
227 * @param transmitter handler to call when buffer space is available;
228 * will be called with NULL on timeout or if the overall queue
229 * for this peer is larger than queue_size and this is currently
230 * the message with the lowest priority
231 * @return non-NULL if the notify callback was queued,
232 * NULL if we can not even queue the request (insufficient
233 * memory); if NULL is returned, "notify" will NOT be called.
234 */
235 public Cancelable notifyTransmitReady(RelativeTime maxdelay, PeerIdentity target, int notify_size, MessageTransmitter transmitter) {
236 if (waitingTunnelTransmitRequest != null) {
237 throw new AssertionError();
238 }
239 171
240 final TunnelTransmitRequest request = new TunnelTransmitRequest(); 172 public void receiveDone() {
241 request.target = target; 173 if (!receive_done_expected)
242 request.transmitter = transmitter; 174 throw new AssertionError("unexpected call to receiveDone");
243 request.tunnel = this; 175 LocalAckMessage am = new LocalAckMessage();
244 request.setDeadline(maxdelay.toAbsolute()); 176 am.tid = tunnelId;
245 177 client_mq.send(am);
246 if (nextSentPacketId <= maxSentPacketId) { 178 receive_done_expected = false;
247
248 final Cancelable cancel = requestQueue.add(request);
249
250 return new Cancelable() {
251 @Override
252 public void cancel() {
253 cancel.cancel();
254 }
255 };
256 } else {
257 // we have to wait until we get a local ack from the service
258 waitingTunnelTransmitRequest = request;
259
260 return new Cancelable() {
261 @Override
262 public void cancel() {
263 request.doNotSend = true;
264 }
265 };
266 }
267 } 179 }
268 180
269 public void destroy() { 181 public void destroy() {
270 // todo 182 TunnelDestroyMessage m = new TunnelDestroyMessage();
271 } 183 m.tunnel_id = tunnelId;
272 184 client_mq.send(m);
273 private void onAckUpdated() {
274 if (waitingTunnelTransmitRequest == null) {
275 return;
276 }
277 requestQueue.add(waitingTunnelTransmitRequest);
278 waitingTunnelTransmitRequest = null;
279 }
280 }
281
282
283 /**
284 * A request to initialize the connection with the mesh service.
285 */
286 public class ClientConnectRequest extends Request {
287 @Override
288 public void transmit(Connection.MessageSink sink) {
289 System.out.println("transmit called " + this);
290
291 ClientConnectMessage ccm = new ClientConnectMessage();
292 ccm.applications_length = applications.length;
293 ccm.apps_list = applications;
294 int[] types;
295 if (messageReceiver != null) {
296 types = RunaboutUtil.getRunaboutMessageTypes(messageReceiver);
297 } else {
298 types = new int[0];
299 }
300 ccm.types_list = types;
301 ccm.types_length = types.length;
302
303 sink.send(ccm);
304 }
305 }
306
307 public static class TunnelCreateRequest extends Request {
308 public OriginTunnel tunnel;
309
310 public TunnelCreateRequest(OriginTunnel rootTunnel) {
311 tunnel = rootTunnel;
312 } 185 }
313 186
314 @Override 187 @Override
315 public void transmit(Connection.MessageSink sink) { 188 protected void sendImmediate(Envelope ev) {
316 TunnelCreateMessage tcm = new TunnelCreateMessage(); 189 if (ack_count <= 0)
317 tcm.tunnel_id = tunnel.tunnelId; 190 throw new AssertionError();
318 sink.send(tcm); 191 DataMessage m = new DataMessage();
192 m.payload = Construct.toBinary(GnunetMessage.fromBody(ev.message));
193 Envelope mesh_ev = new Envelope(m);
194 client_mq.send(mesh_ev);
195 ack_count -= 1;
319 } 196 }
320 } 197 }
321 198
322 199
323 private class MeshMessageReceiver extends RunaboutMessageReceiver { 200 private class MeshMessageReceiver extends RunaboutMessageReceiver {
324 public void visit(PeerAddMessage b) {
325 Tunnel r = tunnelMap.get(b.tunnelId);
326 if (r == null || !(r instanceof OriginTunnel)) {
327 logger.warn("server got confused with tunnel IDs on peer add, ignoring message");
328 return;
329 }
330 OriginTunnel ot = (OriginTunnel) r;
331 if (ot.connectHandler != null) {
332 ot.connectHandler.onConnect(ot, b.peer);
333 }
334 }
335
336 public void visit(PeerDeleteMessage b) {
337 Tunnel r = tunnelMap.get(b.tunnelId);
338 if (r == null || !(r instanceof OriginTunnel)) {
339 logger.warn("server got confused with tunnel IDs on peer delete, ignoring message");
340 return;
341 }
342 OriginTunnel ot = (OriginTunnel) r;
343 if (ot.disconnectHandler != null) {
344 ot.disconnectHandler.onDisconnect(b.peer);
345 }
346 }
347
348 public void visit(TunnelCreateMessage m) { 201 public void visit(TunnelCreateMessage m) {
349 Tunnel t = new Tunnel(); 202 Tunnel t = new Tunnel(m.otherEnd, m.tunnel_id, m.port,
350 t.tunnelId = m.tunnel_id; 203 (m.opt & OPTION_NOBUFFER) != 0, (m.opt & OPTION_NOBUFFER) != 0);
351 if (inboundTunnelHandler != null) { 204 if (inboundTunnelHandler != null) {
352 inboundTunnelHandler.onInboundTunnel(t, m.otherEnd); 205 inboundTunnelHandler.onInboundTunnel(t, m.otherEnd);
353 } 206 }
354 } 207 }
355 208
356 public void visit(UnicastMessage m) { 209 public void visit(DataMessage m) {
357 messageReceiver.setSender(m.oid); 210 Tunnel t = tunnelMap.get(m.tid);
358 messageReceiver.visitAppropriate(Construct.parseAs(m.payload, GnunetMessage.class).body); 211 if (t != null)
359 } 212 {
360 213 if (t.receive_done_expected)
361 public void visit(MulticastMessage m) { 214 logger.warn("got unexpected message from service");
362 messageReceiver.setSender(m.oid); 215 t.receive_done_expected = true;
363 messageReceiver.visitAppropriate(Construct.parseAs(m.payload, GnunetMessage.class).body); 216 messageReceiver.visitAppropriate(Construct.parseAs(m.payload, GnunetMessage.class).body);
364 } 217 }
365
366 public void visit(OriginMessage m) {
367 messageReceiver.setSender(m.sender);
368 messageReceiver.visitAppropriate(Construct.parseAs(m.payload, GnunetMessage.class).body);
369 } 218 }
370 219
371 public void visit(LocalAckMessage m) { 220 public void visit(LocalAckMessage m) {
372 Tunnel t = tunnelMap.get(m.tid); 221 Tunnel t = tunnelMap.get(m.tid);
373 if (t == null) { 222 if (t != null)
374 logger.warn("server got confused with tunnel IDs on ack, ignoring message"); 223 t.ack_count += 1;
375 return;
376 }
377 t.nextSentPacketId = m.maxPid;
378 t.onAckUpdated();
379 } 224 }
380 225
381 public void visit(TunnelDestroyMessage m) { 226 public void visit(TunnelDestroyMessage m) {
@@ -405,59 +250,28 @@ public class Mesh {
405 * is called on the tunnel 250 * is called on the tunnel
406 */ 251 */
407 public Mesh(Configuration cfg, InboundTunnelHandler inboundTunnelHandler, 252 public Mesh(Configuration cfg, InboundTunnelHandler inboundTunnelHandler,
408 TunnelEndHandler tunnelEndHandler, MeshRunabout messageReceiver, int... applications) { 253 TunnelEndHandler tunnelEndHandler, MeshRunabout messageReceiver, int... ports) {
409 this.tunnelEndHandler = tunnelEndHandler; 254 this.tunnelEndHandler = tunnelEndHandler;
410 this.messageReceiver = messageReceiver; 255 this.messageReceiver = messageReceiver;
411 this.applications = applications; 256 this.ports = ports;
412 this.inboundTunnelHandler = inboundTunnelHandler; 257 this.inboundTunnelHandler = inboundTunnelHandler;
413 258
414 Client client = new Client("mesh", cfg); 259 client = new Client("mesh", cfg);
415 requestQueue = new RequestQueue(client, new MeshMessageReceiver()); 260 client_mq = new ClientMessageQueue(client, new MeshMessageReceiver());
416 261 ClientConnectMessage ccm = new ClientConnectMessage();
417 requestQueue.add(new ClientConnectRequest()); 262 ccm.apps_list = ports;
418 } 263 client_mq.send(ccm);
419
420 /**
421 * Create a new tunnel (we're initiator and will be allowed to add/remove peers
422 * and to broadcast).
423 *
424 * @param connectHandler callback for when a new peer connects to the tunnel, either because the origin added him,
425 * or the client joined the tunnel
426 * @param disconnectHandler callback for when when a peer is disconnected
427 */
428 public OriginTunnel createTunnel(ConnectHandler connectHandler, DisconnectHandler disconnectHandler) {
429 OriginTunnel tunnel = new OriginTunnel();
430 tunnel.connectHandler = connectHandler;
431 tunnel.disconnectHandler = disconnectHandler;
432 tunnel.tunnelId = nextTunnelId++;
433 tunnelMap.put(tunnel.tunnelId, tunnel);
434 tunnel.registerWithService();
435 return tunnel;
436 }
437
438
439 /**
440 * Announce to ther peer the availability of services described by the regex,
441 * in order to be reachable to other peers via connect_by_string.
442 * <p/>
443 * Note that the first 8 characters are considered to be part of a prefix,
444 * (for instance 'gnunet://'). If you put a variable part in there (*, +. ()),
445 * all matching strings will be stored in the DHT.
446 *
447 * @param regex string with the regular expression describing local services.
448 */
449 public void announceRegex(String regex) {
450 throw new UnsupportedOperationException("not implemented");
451 } 264 }
452 265
453 266
454 /** 267 /**
455 * Disconnect from the mesh service. All tunnels will be destroyed. All tunnel 268 * Disconnect from the mesh service.
456 * disconnect callbacks will be called on any still connected peers, notifying 269 * All tunnels will be destroyed.
457 * about their disconnection. The registered inbound tunnel cleaner will be 270 * All tunnel disconnect callbacks will be called on any still connected peers, notifying
458 * called should any inbound tunnels still exist. 271 * about their disconnection.
459 */ 272 */
460 public void disconnect() { 273 public void disconnect() {
461 requestQueue.destroy(); 274 client_mq.destroy();
275 client.disconnect();
462 } 276 }
463} 277}
diff --git a/src/org/gnunet/mesh/MulticastMessage.java b/src/org/gnunet/mesh/MulticastMessage.java
deleted file mode 100644
index 2ca344d..0000000
--- a/src/org/gnunet/mesh/MulticastMessage.java
+++ /dev/null
@@ -1,27 +0,0 @@
1package org.gnunet.mesh;
2
3import org.gnunet.construct.*;
4import org.gnunet.util.GnunetMessage;
5import org.gnunet.util.PeerIdentity;
6
7/**
8 * ...
9 *
10 * @author Florian Dold
11 */
12@UnionCase(261)
13public class MulticastMessage implements GnunetMessage.Body {
14 /**
15 * Tunnel ID
16 */
17 @UInt32
18 public int tid;
19 @UInt32
20 public int ttl;
21 @UInt32
22 public int pid;
23 @NestedMessage
24 public PeerIdentity oid;
25 @FillWith @UInt8
26 public byte[] payload;
27}
diff --git a/src/org/gnunet/mesh/OriginMessage.java b/src/org/gnunet/mesh/OriginMessage.java
deleted file mode 100644
index 143eaeb..0000000
--- a/src/org/gnunet/mesh/OriginMessage.java
+++ /dev/null
@@ -1,27 +0,0 @@
1package org.gnunet.mesh;
2
3import org.gnunet.construct.*;
4import org.gnunet.util.GnunetMessage;
5import org.gnunet.util.PeerIdentity;
6
7/**
8 * ...
9 *
10 * @author Florian Dold
11 */
12@UnionCase(262)
13public class OriginMessage implements GnunetMessage.Body {
14 @UInt32
15 public int tid;
16 @UInt32
17 public int ttl;
18 @UInt32
19 public int pid;
20 @NestedMessage
21 public PeerIdentity oid;
22 @NestedMessage
23 public PeerIdentity sender;
24 @FillWith
25 @UInt8
26 public byte[] payload;
27}
diff --git a/src/org/gnunet/mesh/PeerAddMessage.java b/src/org/gnunet/mesh/PeerAddMessage.java
deleted file mode 100644
index 099165c..0000000
--- a/src/org/gnunet/mesh/PeerAddMessage.java
+++ /dev/null
@@ -1,23 +0,0 @@
1package org.gnunet.mesh;
2
3import org.gnunet.construct.NestedMessage;
4import org.gnunet.construct.UInt32;
5import org.gnunet.construct.UnionCase;
6import org.gnunet.util.GnunetMessage;
7import org.gnunet.util.PeerIdentity;
8
9/**
10 * Message used for two things (bad!)
11 * (1) client->server: request that a certain peer is added to a tunnel
12 * (2) server->client: notify the client that a new peer has joined the tunnel
13 *
14 * @author Florian Dold
15 */
16@UnionCase(275)
17public class PeerAddMessage implements GnunetMessage.Body {
18 @UInt32
19 public int tunnelId;
20
21 @NestedMessage
22 public PeerIdentity peer;
23}
diff --git a/src/org/gnunet/mesh/PeerDeleteMessage.java b/src/org/gnunet/mesh/PeerDeleteMessage.java
deleted file mode 100644
index 045e565..0000000
--- a/src/org/gnunet/mesh/PeerDeleteMessage.java
+++ /dev/null
@@ -1,23 +0,0 @@
1package org.gnunet.mesh;
2
3import org.gnunet.construct.NestedMessage;
4import org.gnunet.construct.UInt32;
5import org.gnunet.construct.UnionCase;
6import org.gnunet.util.GnunetMessage;
7import org.gnunet.util.PeerIdentity;
8
9/**
10 * Message used for two things (bad!)
11 * (1) client->server: request that a certain peer is added to a tunnel
12 * (2) server->client: notify the client that a new peer has joined the tunnel
13 *
14 * @author Florian Dold
15 */
16@UnionCase(276)
17public class PeerDeleteMessage implements GnunetMessage.Body {
18 @UInt32
19 public int tunnelId;
20
21 @NestedMessage
22 public PeerIdentity peer;
23}
diff --git a/src/org/gnunet/mesh/TunnelCreateMessage.java b/src/org/gnunet/mesh/TunnelCreateMessage.java
index 7b63bc6..eaa4d6c 100644
--- a/src/org/gnunet/mesh/TunnelCreateMessage.java
+++ b/src/org/gnunet/mesh/TunnelCreateMessage.java
@@ -7,11 +7,7 @@ import org.gnunet.util.GnunetMessage;
7import org.gnunet.util.PeerIdentity; 7import org.gnunet.util.PeerIdentity;
8 8
9/** 9/**
10 * Message used to 10 * FIXME
11 * a) request the service to create a new tunnel with the given tunnel id
12 * b) notify the client of a newly created tunnel
13 *
14 * todo: this is bad design, split into two messages in the C code!
15 * 11 *
16 * @author Florian Dold 12 * @author Florian Dold
17 */ 13 */
@@ -20,9 +16,12 @@ public class TunnelCreateMessage implements GnunetMessage.Body {
20 @UInt32 16 @UInt32
21 public int tunnel_id; 17 public int tunnel_id;
22 18
23 /** 19 @NestedMessage(optional = false)
24 * Only present if sent from server to client (purpose b)
25 */
26 @NestedMessage(optional = true)
27 public PeerIdentity otherEnd; 20 public PeerIdentity otherEnd;
21
22 @UInt32
23 public int port;
24
25 @UInt32
26 public int opt;
28} 27}
diff --git a/src/org/gnunet/mq/ClientMessageQueue.java b/src/org/gnunet/mq/ClientMessageQueue.java
new file mode 100644
index 0000000..bfb0466
--- /dev/null
+++ b/src/org/gnunet/mq/ClientMessageQueue.java
@@ -0,0 +1,48 @@
1package org.gnunet.mq;
2
3
4import org.gnunet.construct.Construct;
5import org.gnunet.util.*;
6
7/**
8 * Message queue for org.util.Connection
9 */
10public class ClientMessageQueue extends MessageQueue {
11 private final Client client;
12 private final RunaboutMessageReceiver receiver;
13
14
15 public ClientMessageQueue(Client client, RunaboutMessageReceiver receiver) {
16 this.client = client;
17 this.receiver = receiver;
18 }
19
20
21 public ClientMessageQueue(Client client) {
22 this(client, null);
23 }
24
25
26 @Override
27 protected void sendImmediate(final Envelope ev) {
28 int size = Construct.getSize(ev.message);
29 client.notifyTransmitReady(RelativeTime.FOREVER, false, size, new MessageTransmitter() {
30 @Override
31 public void transmit(Connection.MessageSink sink) {
32 sink.send(ev.message);
33 reportMessageSent();
34 }
35
36 @Override
37 public void handleError() {
38 // FIXME
39 }
40 });
41 }
42
43
44 @Override
45 public void destroy() {
46
47 }
48}
diff --git a/src/org/gnunet/mq/Envelope.java b/src/org/gnunet/mq/Envelope.java
new file mode 100644
index 0000000..09c0c2c
--- /dev/null
+++ b/src/org/gnunet/mq/Envelope.java
@@ -0,0 +1,34 @@
1package org.gnunet.mq;
2
3import org.gnunet.util.GnunetMessage;
4
5/**
6 * Container for a message to be sent by a message queue.
7 */
8public class Envelope {
9 public final GnunetMessage.Body message;
10 private MessageQueue parent_queue;
11 private NotifySentHandler notify_sent_handler;
12
13 public Envelope(GnunetMessage.Body message) {
14 this.message = message;
15 }
16
17 public void notifySent(NotifySentHandler h) {
18 this.notify_sent_handler = h;
19 }
20
21 public void injectSent() {
22 if (notify_sent_handler != null)
23 notify_sent_handler.onSent();
24 }
25
26 public void cancel() {
27 // TODO
28 }
29
30 /* pkg-private */ void invokeSentNotification() {
31 if (null != notify_sent_handler)
32 notify_sent_handler.onSent();
33 }
34}
diff --git a/src/org/gnunet/mq/MessageQueue.java b/src/org/gnunet/mq/MessageQueue.java
new file mode 100644
index 0000000..de08edf
--- /dev/null
+++ b/src/org/gnunet/mq/MessageQueue.java
@@ -0,0 +1,41 @@
1package org.gnunet.mq;
2
3
4import org.gnunet.util.GnunetMessage;
5
6import java.util.LinkedList;
7
8/**
9 * General-purpose message queue
10 */
11public abstract class MessageQueue {
12 private LinkedList<Envelope> queued_envelopes = new LinkedList<>();
13 protected Envelope current_envelope;
14
15 protected abstract void sendImmediate(Envelope ev);
16
17 public void send(GnunetMessage.Body body) {
18 send(new Envelope(body));
19 }
20
21 public void send(Envelope ev) {
22 if (null == current_envelope) {
23 current_envelope = ev;
24 sendImmediate(current_envelope);
25 } else {
26 queued_envelopes.addLast(ev);
27 }
28 }
29
30 protected void reportMessageSent() {
31 if (null == current_envelope)
32 throw new AssertionError();
33 current_envelope.invokeSentNotification();
34 if (queued_envelopes.isEmpty())
35 return;
36 current_envelope = queued_envelopes.pop();
37 sendImmediate(current_envelope);
38 }
39
40 public abstract void destroy();
41}
diff --git a/src/org/gnunet/mq/NotifySentHandler.java b/src/org/gnunet/mq/NotifySentHandler.java
new file mode 100644
index 0000000..7ec13b2
--- /dev/null
+++ b/src/org/gnunet/mq/NotifySentHandler.java
@@ -0,0 +1,6 @@
1package org.gnunet.mq;
2
3
4public interface NotifySentHandler {
5 void onSent();
6}
diff --git a/src/org/gnunet/requests/RequestQueue.java b/src/org/gnunet/requests/RequestQueue.java
index e514303..6f7102d 100644
--- a/src/org/gnunet/requests/RequestQueue.java
+++ b/src/org/gnunet/requests/RequestQueue.java
@@ -28,7 +28,6 @@ import java.util.LinkedList;
28 * Generic queues for Requests to be sent to the service. 28 * Generic queues for Requests to be sent to the service.
29 */ 29 */
30public class RequestQueue { 30public class RequestQueue {
31 // todo: implement more efficiently (attributes instead of multiple queues)
32 31
33 /** 32 /**
34 * Requests to be transmitted to the service. 33 * Requests to be transmitted to the service.
diff --git a/src/org/gnunet/transport/RequestConnectMessage.java b/src/org/gnunet/transport/RequestConnectMessage.java
index 9bd7a5b..63bbc39 100644
--- a/src/org/gnunet/transport/RequestConnectMessage.java
+++ b/src/org/gnunet/transport/RequestConnectMessage.java
@@ -21,5 +21,4 @@ public class RequestConnectMessage implements GnunetMessage.Body {
21 */ 21 */
22 @NestedMessage 22 @NestedMessage
23 public PeerIdentity peer; 23 public PeerIdentity peer;
24
25} 24}
diff --git a/src/org/gnunet/transport/Transport.java b/src/org/gnunet/transport/Transport.java
index f79c340..5ccbc77 100644
--- a/src/org/gnunet/transport/Transport.java
+++ b/src/org/gnunet/transport/Transport.java
@@ -1,6 +1,7 @@
1package org.gnunet.transport; 1package org.gnunet.transport;
2 2
3import org.gnunet.hello.HelloMessage; 3import org.gnunet.hello.HelloMessage;
4import org.gnunet.mq.Envelope;
4import org.gnunet.util.*; 5import org.gnunet.util.*;
5 6
6/** 7/**
@@ -25,7 +26,19 @@ public class Transport {
25 * NULL on failure (cb will not be called) 26 * NULL on failure (cb will not be called)
26 */ 27 */
27 Cancelable tryConnect(PeerIdentity target, TryConnectCallback cb) { 28 Cancelable tryConnect(PeerIdentity target, TryConnectCallback cb) {
28 throw new UnsupportedOperationException(); 29 RequestConnectMessage m = new RequestConnectMessage();
30 m.peer = target;
31 m.reserved = 0;
32 final Envelope ev = new Envelope(m);
33 ev.notifySent(null /* FIXME */);
34 //client_mq.send(ev);
35
36 return new Cancelable() {
37 @Override
38 public void cancel() {
39 ev.cancel();
40 }
41 };
29 } 42 }
30 43
31 44