diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-07-24 10:50:01 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-07-24 10:50:01 +0000 |
commit | e4a01152171e4186e4718c4c9c92842fccf6d8a1 (patch) | |
tree | f17311a274a80efb31a693dfcfaf3a431378bb13 /src | |
parent | cf715527f54ad4e0628088b2cc63fcb8bb35533d (diff) | |
download | gnunet-java-e4a01152171e4186e4718c4c9c92842fccf6d8a1.tar.gz gnunet-java-e4a01152171e4186e4718c4c9c92842fccf6d8a1.zip |
- mq for java
- started with consensus
- started with new mesh
Diffstat (limited to 'src')
26 files changed, 525 insertions, 485 deletions
diff --git a/src/org/gnunet/consensus/ConcludeCallback.java b/src/org/gnunet/consensus/ConcludeCallback.java new file mode 100644 index 0000000..7660c49 --- /dev/null +++ b/src/org/gnunet/consensus/ConcludeCallback.java | |||
@@ -0,0 +1,5 @@ | |||
1 | package org.gnunet.consensus; | ||
2 | |||
3 | public interface ConcludeCallback { | ||
4 | void onConcludeDone(); | ||
5 | } | ||
diff --git a/src/org/gnunet/consensus/ConcludeDoneMessage.java b/src/org/gnunet/consensus/ConcludeDoneMessage.java new file mode 100644 index 0000000..3b016cf --- /dev/null +++ b/src/org/gnunet/consensus/ConcludeDoneMessage.java | |||
@@ -0,0 +1,12 @@ | |||
1 | package org.gnunet.consensus; | ||
2 | |||
3 | |||
4 | import org.gnunet.construct.UnionCase; | ||
5 | |||
6 | /** | ||
7 | * Notify the client that conclude has finished. | ||
8 | * Direction: service -> client | ||
9 | */ | ||
10 | @UnionCase(525) | ||
11 | public class ConcludeDoneMessage { | ||
12 | } | ||
diff --git a/src/org/gnunet/consensus/ConcludeMessage.java b/src/org/gnunet/consensus/ConcludeMessage.java new file mode 100644 index 0000000..7b43928 --- /dev/null +++ b/src/org/gnunet/consensus/ConcludeMessage.java | |||
@@ -0,0 +1,19 @@ | |||
1 | package org.gnunet.consensus; | ||
2 | |||
3 | import org.gnunet.construct.FillWith; | ||
4 | import org.gnunet.construct.UInt16; | ||
5 | import org.gnunet.construct.UInt8; | ||
6 | import org.gnunet.construct.UnionCase; | ||
7 | import org.gnunet.util.GnunetMessage; | ||
8 | |||
9 | /** | ||
10 | * Notify the client of a new element. | ||
11 | * | ||
12 | * Direction: service -> client | ||
13 | * | ||
14 | * @author Florian Dold | ||
15 | */ | ||
16 | @UnionCase(524) | ||
17 | public class ConcludeMessage implements GnunetMessage.Body { | ||
18 | /* empty body */ | ||
19 | } \ No newline at end of file | ||
diff --git a/src/org/gnunet/consensus/Consensus.java b/src/org/gnunet/consensus/Consensus.java new file mode 100644 index 0000000..e6fab01 --- /dev/null +++ b/src/org/gnunet/consensus/Consensus.java | |||
@@ -0,0 +1,120 @@ | |||
1 | package org.gnunet.consensus; | ||
2 | |||
3 | import org.gnunet.mq.ClientMessageQueue; | ||
4 | import org.gnunet.mq.Envelope; | ||
5 | import org.gnunet.mq.MessageQueue; | ||
6 | import org.gnunet.mq.NotifySentHandler; | ||
7 | import org.gnunet.util.*; | ||
8 | import org.slf4j.Logger; | ||
9 | import org.slf4j.LoggerFactory; | ||
10 | |||
11 | /** | ||
12 | * Multi-peer set reconciliation. | ||
13 | */ | ||
14 | public class Consensus { | ||
15 | /** | ||
16 | * Class logger. | ||
17 | */ | ||
18 | private static final Logger logger = LoggerFactory | ||
19 | .getLogger(Consensus.class); | ||
20 | |||
21 | /** | ||
22 | * Callback for new elements arriving from the service. | ||
23 | * Also used to notify of consensus failure. | ||
24 | */ | ||
25 | private final NewElementCallback newElementCallback; | ||
26 | |||
27 | /** | ||
28 | * Client connected to the consensus service. | ||
29 | */ | ||
30 | private Client client; | ||
31 | |||
32 | /** | ||
33 | * Message queue for 'client'. | ||
34 | */ | ||
35 | private MessageQueue client_mq; | ||
36 | |||
37 | /** | ||
38 | * Called when conclude has finished. | ||
39 | */ | ||
40 | private ConcludeCallback concludeCallback; | ||
41 | |||
42 | /** | ||
43 | * Message dispatch for messages from the consensus service. | ||
44 | */ | ||
45 | private class ConsensusMessageReceiver extends RunaboutMessageReceiver { | ||
46 | public void visit(ConcludeDoneMessage m) { | ||
47 | if (null == concludeCallback) | ||
48 | { | ||
49 | logger.error("unexpected conclude done message"); | ||
50 | return; | ||
51 | } | ||
52 | concludeCallback.onConcludeDone(); | ||
53 | } | ||
54 | |||
55 | public void visit(NewElementMessage m) { | ||
56 | ConsensusElement element = new ConsensusElement(); | ||
57 | element.element_type = m.element_type; | ||
58 | element.data = m.element_data; | ||
59 | newElementCallback.onNewElement(element); | ||
60 | } | ||
61 | |||
62 | @Override | ||
63 | public void handleError() { | ||
64 | newElementCallback.onNewElement(null); | ||
65 | } | ||
66 | } | ||
67 | |||
68 | /** | ||
69 | * Create a consensus session. The set being reconciled is initially | ||
70 | * empty. Only reconcile with other peers after | ||
71 | * GNUNET_CONSENSUS_reconcile has been called. | ||
72 | * | ||
73 | * @param num_peers number of peers in the session | ||
74 | * @param peers array of peers participating in this consensus session | ||
75 | * Inclusion of the local peer is optional. | ||
76 | * @param sessionId session identifier | ||
77 | * Allows a group of peers to have more than consensus session. | ||
78 | * @param newElementCallback callback, called when a new element is added to the set by | ||
79 | * another peer | ||
80 | */ | ||
81 | public Consensus(Configuration cfg, int num_peers, PeerIdentity[] peers, HashCode sessionId, | ||
82 | NewElementCallback newElementCallback) { | ||
83 | client = new Client("consensus", cfg); | ||
84 | client_mq = new ClientMessageQueue(client, new ConsensusMessageReceiver()); | ||
85 | this.newElementCallback = newElementCallback; | ||
86 | } | ||
87 | |||
88 | public void insertElement (ConsensusElement element, final InsertDoneCallback idc) { | ||
89 | InsertElementMessage m = new InsertElementMessage(); | ||
90 | m.element_data = element.data; | ||
91 | m.element_type = element.element_type; | ||
92 | Envelope ev = new Envelope(m); | ||
93 | ev.notifySent(new NotifySentHandler() { | ||
94 | @Override | ||
95 | public void onSent() { | ||
96 | idc.onInsertDone(); | ||
97 | } | ||
98 | }); | ||
99 | client_mq.send(ev); | ||
100 | } | ||
101 | |||
102 | public void conclude(ConcludeCallback concludeCallback) { | ||
103 | if (null == concludeCallback) | ||
104 | throw new AssertionError("conclude with empty callback"); | ||
105 | if (null != this.concludeCallback) | ||
106 | throw new AssertionError("called conclude twice"); | ||
107 | this.concludeCallback = concludeCallback; | ||
108 | } | ||
109 | |||
110 | /** | ||
111 | * Destroy a consensus handle (free all state associated with | ||
112 | * it, no longer call any of the callbacks). | ||
113 | */ | ||
114 | public void destroy() { | ||
115 | client_mq.destroy(); | ||
116 | client_mq = null; | ||
117 | client.disconnect(); | ||
118 | client = null; | ||
119 | } | ||
120 | } | ||
diff --git a/src/org/gnunet/consensus/ConsensusElement.java b/src/org/gnunet/consensus/ConsensusElement.java new file mode 100644 index 0000000..846e72a --- /dev/null +++ b/src/org/gnunet/consensus/ConsensusElement.java | |||
@@ -0,0 +1,16 @@ | |||
1 | package org.gnunet.consensus; | ||
2 | |||
3 | |||
4 | public class ConsensusElement { | ||
5 | /** | ||
6 | * Type of the element. | ||
7 | * 0 <= element_type <= 2^16 | ||
8 | */ | ||
9 | int element_type; | ||
10 | |||
11 | /** | ||
12 | * Data for the element. | ||
13 | * 0 <= data.length <= 2^16 | ||
14 | */ | ||
15 | byte[] data; | ||
16 | } | ||
diff --git a/src/org/gnunet/consensus/InsertDoneCallback.java b/src/org/gnunet/consensus/InsertDoneCallback.java new file mode 100644 index 0000000..f0e86ca --- /dev/null +++ b/src/org/gnunet/consensus/InsertDoneCallback.java | |||
@@ -0,0 +1,5 @@ | |||
1 | package org.gnunet.consensus; | ||
2 | |||
3 | public interface InsertDoneCallback { | ||
4 | void onInsertDone(); | ||
5 | } | ||
diff --git a/src/org/gnunet/consensus/InsertElementMessage.java b/src/org/gnunet/consensus/InsertElementMessage.java new file mode 100644 index 0000000..fd0ff67 --- /dev/null +++ b/src/org/gnunet/consensus/InsertElementMessage.java | |||
@@ -0,0 +1,23 @@ | |||
1 | package org.gnunet.consensus; | ||
2 | |||
3 | import org.gnunet.construct.FillWith; | ||
4 | import org.gnunet.construct.UInt16; | ||
5 | import org.gnunet.construct.UInt8; | ||
6 | import org.gnunet.construct.UnionCase; | ||
7 | import org.gnunet.util.GnunetMessage; | ||
8 | |||
9 | /** | ||
10 | * Send an element to the service, insert it into the consensus set. | ||
11 | * | ||
12 | * Direction: client -> service | ||
13 | * | ||
14 | * @author Florian Dold | ||
15 | */ | ||
16 | @UnionCase(521) | ||
17 | public class InsertElementMessage implements GnunetMessage.Body { | ||
18 | @UInt16 | ||
19 | public int element_type; | ||
20 | @FillWith | ||
21 | @UInt8 | ||
22 | public byte[] element_data; | ||
23 | } \ No newline at end of file | ||
diff --git a/src/org/gnunet/consensus/NewElementCallback.java b/src/org/gnunet/consensus/NewElementCallback.java new file mode 100644 index 0000000..4b07a71 --- /dev/null +++ b/src/org/gnunet/consensus/NewElementCallback.java | |||
@@ -0,0 +1,5 @@ | |||
1 | package org.gnunet.consensus; | ||
2 | |||
3 | public interface NewElementCallback { | ||
4 | void onNewElement(ConsensusElement element); | ||
5 | } | ||
diff --git a/src/org/gnunet/consensus/NewElementMessage.java b/src/org/gnunet/consensus/NewElementMessage.java new file mode 100644 index 0000000..deb3634 --- /dev/null +++ b/src/org/gnunet/consensus/NewElementMessage.java | |||
@@ -0,0 +1,20 @@ | |||
1 | package org.gnunet.consensus; | ||
2 | |||
3 | import org.gnunet.construct.*; | ||
4 | import org.gnunet.util.GnunetMessage; | ||
5 | |||
6 | /** | ||
7 | * Notify the client of a new element. | ||
8 | * | ||
9 | * Direction: service -> client | ||
10 | * | ||
11 | * @author Florian Dold | ||
12 | */ | ||
13 | @UnionCase(523) | ||
14 | public class NewElementMessage implements GnunetMessage.Body { | ||
15 | @UInt16 | ||
16 | public int element_type; | ||
17 | @FillWith | ||
18 | @UInt8 | ||
19 | public byte[] element_data; | ||
20 | } \ No newline at end of file | ||
diff --git a/src/org/gnunet/construct/MsgMap.txt b/src/org/gnunet/construct/MsgMap.txt index 719e92a..30e7077 100644 --- a/src/org/gnunet/construct/MsgMap.txt +++ b/src/org/gnunet/construct/MsgMap.txt | |||
@@ -25,7 +25,7 @@ org.gnunet.util.GnunetMessage$Body|153=org.gnunet.dht.MonitorStartStop | |||
25 | org.gnunet.util.GnunetMessage$Body|155=org.gnunet.dht.ClientPutConfirmationMessage | 25 | org.gnunet.util.GnunetMessage$Body|155=org.gnunet.dht.ClientPutConfirmationMessage |
26 | org.gnunet.util.GnunetMessage$Body|262=org.gnunet.mesh.OriginMessage | 26 | org.gnunet.util.GnunetMessage$Body|262=org.gnunet.mesh.OriginMessage |
27 | org.gnunet.util.GnunetMessage$Body|323=org.gnunet.nse.UpdateMessage | 27 | org.gnunet.util.GnunetMessage$Body|323=org.gnunet.nse.UpdateMessage |
28 | org.gnunet.util.GnunetMessage$Body|260=org.gnunet.mesh.UnicastMessage | 28 | org.gnunet.util.GnunetMessage$Body|260=org.gnunet.mesh.DataMessage |
29 | org.gnunet.util.GnunetMessage$Body|261=org.gnunet.mesh.MulticastMessage | 29 | org.gnunet.util.GnunetMessage$Body|261=org.gnunet.mesh.MulticastMessage |
30 | org.gnunet.util.GnunetMessage$Body|321=org.gnunet.nse.StartMessage | 30 | org.gnunet.util.GnunetMessage$Body|321=org.gnunet.nse.StartMessage |
31 | org.gnunet.util.GnunetMessage$Body|144=org.gnunet.dht.ClientGetStopMessage | 31 | org.gnunet.util.GnunetMessage$Body|144=org.gnunet.dht.ClientGetStopMessage |
diff --git a/src/org/gnunet/mesh/ClientConnectMessage.java b/src/org/gnunet/mesh/ClientConnectMessage.java index 57edede..1a56ebb 100644 --- a/src/org/gnunet/mesh/ClientConnectMessage.java +++ b/src/org/gnunet/mesh/ClientConnectMessage.java | |||
@@ -12,18 +12,6 @@ import org.gnunet.util.GnunetMessage; | |||
12 | */ | 12 | */ |
13 | @UnionCase(272) | 13 | @UnionCase(272) |
14 | public class ClientConnectMessage implements GnunetMessage.Body { | 14 | public class ClientConnectMessage implements GnunetMessage.Body { |
15 | @UInt16 | 15 | @IntegerFill(signed = false, bitSize = 32) |
16 | public int applications_length; | ||
17 | @UInt16 | ||
18 | public int types_length; | ||
19 | /** | ||
20 | * List of applications that this client claims to provide. | ||
21 | */ | ||
22 | @VariableSizeIntegerArray(lengthField = "applications_length", signed = false, bitSize = 32) | ||
23 | public int[] apps_list; | 16 | public int[] apps_list; |
24 | /** | ||
25 | * Message types that this client understands. | ||
26 | */ | ||
27 | @VariableSizeIntegerArray(lengthField = "types_length", signed = false, bitSize = 16) | ||
28 | public int[] types_list; | ||
29 | } | 17 | } |
diff --git a/src/org/gnunet/mesh/ConnectPeerByTypeMessage.java b/src/org/gnunet/mesh/ConnectPeerByTypeMessage.java deleted file mode 100644 index fe8bfe0..0000000 --- a/src/org/gnunet/mesh/ConnectPeerByTypeMessage.java +++ /dev/null | |||
@@ -1,18 +0,0 @@ | |||
1 | package org.gnunet.mesh; | ||
2 | |||
3 | import org.gnunet.construct.UInt32; | ||
4 | import org.gnunet.construct.UnionCase; | ||
5 | import org.gnunet.util.GnunetMessage; | ||
6 | /** | ||
7 | * ... | ||
8 | * | ||
9 | * @author Florian Dold | ||
10 | */ | ||
11 | @UnionCase(277) | ||
12 | public class ConnectPeerByTypeMessage implements GnunetMessage.Body { | ||
13 | @UInt32 | ||
14 | public int tunnelId; | ||
15 | |||
16 | @UInt32 | ||
17 | public int applicationType; | ||
18 | } | ||
diff --git a/src/org/gnunet/mesh/UnicastMessage.java b/src/org/gnunet/mesh/DataMessage.java index 141d49a..92546c8 100644 --- a/src/org/gnunet/mesh/UnicastMessage.java +++ b/src/org/gnunet/mesh/DataMessage.java | |||
@@ -10,17 +10,9 @@ import org.gnunet.util.PeerIdentity; | |||
10 | * @author Florian Dold | 10 | * @author Florian Dold |
11 | */ | 11 | */ |
12 | @UnionCase(260) | 12 | @UnionCase(260) |
13 | public class UnicastMessage implements GnunetMessage.Body { | 13 | public class DataMessage implements GnunetMessage.Body { |
14 | @UInt32 | 14 | @UInt32 |
15 | public int tid; | 15 | public int tid; |
16 | @UInt32 | ||
17 | public int ttl; | ||
18 | @UInt32 | ||
19 | public int pid; | ||
20 | @NestedMessage | ||
21 | public PeerIdentity oid; | ||
22 | @NestedMessage | ||
23 | public PeerIdentity destination; | ||
24 | @FillWith | 16 | @FillWith |
25 | @UInt8 | 17 | @UInt8 |
26 | public byte[] payload; | 18 | public byte[] payload; |
diff --git a/src/org/gnunet/mesh/Mesh.java b/src/org/gnunet/mesh/Mesh.java index 6f14451..091cb17 100644 --- a/src/org/gnunet/mesh/Mesh.java +++ b/src/org/gnunet/mesh/Mesh.java | |||
@@ -20,15 +20,17 @@ | |||
20 | 20 | ||
21 | package org.gnunet.mesh; | 21 | package org.gnunet.mesh; |
22 | 22 | ||
23 | import com.google.common.collect.Maps; | ||
24 | import org.gnunet.construct.Construct; | 23 | import org.gnunet.construct.Construct; |
25 | import org.gnunet.requests.FixedMessageRequest; | 24 | import org.gnunet.mq.ClientMessageQueue; |
25 | import org.gnunet.mq.Envelope; | ||
26 | import org.gnunet.mq.MessageQueue; | ||
26 | import org.gnunet.requests.Request; | 27 | import org.gnunet.requests.Request; |
27 | import org.gnunet.requests.RequestQueue; | 28 | import org.gnunet.requests.RequestQueue; |
28 | import org.gnunet.util.*; | 29 | import org.gnunet.util.*; |
29 | import org.slf4j.Logger; | 30 | import org.slf4j.Logger; |
30 | import org.slf4j.LoggerFactory; | 31 | import org.slf4j.LoggerFactory; |
31 | 32 | ||
33 | import java.util.HashMap; | ||
32 | import java.util.Map; | 34 | import java.util.Map; |
33 | 35 | ||
34 | /** | 36 | /** |
@@ -37,345 +39,188 @@ import java.util.Map; | |||
37 | * @author Florian Dold | 39 | * @author Florian Dold |
38 | */ | 40 | */ |
39 | public class Mesh { | 41 | public class Mesh { |
42 | /** | ||
43 | * Class logger. | ||
44 | */ | ||
40 | private static final Logger logger = LoggerFactory | 45 | private static final Logger logger = LoggerFactory |
41 | .getLogger(Mesh.class); | 46 | .getLogger(Mesh.class); |
42 | 47 | ||
43 | /** | 48 | /** |
44 | * How many messages can we send to the service until we have to wait for an ACK from it? | 49 | * For tunnels created by the client, the bit in this |
50 | * mask is always set. | ||
45 | */ | 51 | */ |
46 | private static final int INITIAL_WINDOW_SIZE = 8; | 52 | private static final int TUNNEL_ID_CLI = 0x80000000; |
53 | |||
47 | /** | 54 | /** |
48 | * Requests queued to be sent to the mesh service. | 55 | * For tunnels created by the server, the bit in this |
56 | * mask is always set. | ||
49 | */ | 57 | */ |
50 | private RequestQueue requestQueue; | 58 | private static final int TUNNEL_ID_SERV = 0xB0000000; |
59 | |||
51 | /** | 60 | /** |
52 | * Called whenever a tunnel was destroyed. | 61 | * Disable buffering on intermediate nodes (for minimum latency). |
62 | * Yes/No. | ||
53 | */ | 63 | */ |
54 | private TunnelEndHandler tunnelEndHandler; | 64 | private static final int OPTION_NOBUFFER = 1; |
55 | private MeshRunabout messageReceiver; | ||
56 | private int[] applications; | ||
57 | private InboundTunnelHandler inboundTunnelHandler; | ||
58 | 65 | ||
59 | private final static int LOCAL_TUNNEL_ID_CLI = 0x80000000; | 66 | /** |
60 | private final static int LOCAL_TUNNEL_ID_SERV = 0xB0000000; | 67 | * Enable tunnel reliability, lost messages will be retransmitted. |
68 | * Yes/No. | ||
69 | */ | ||
70 | private static final int OPTION_RELIABLE = 2; | ||
61 | 71 | ||
62 | private int nextTunnelId = LOCAL_TUNNEL_ID_CLI; | 72 | /** |
73 | * Client connected to the mesh service | ||
74 | */ | ||
75 | private final Client client; | ||
63 | 76 | ||
64 | /** | 77 | /** |
65 | * Stores all tunnels created by this client, referenced by their local tunnel id. | 78 | * Message queue for the client. |
66 | */ | 79 | */ |
67 | private Map<Integer, Tunnel> tunnelMap = Maps.newTreeMap(); | 80 | private final ClientMessageQueue client_mq; |
68 | 81 | ||
82 | /** | ||
83 | * Called whenever a tunnel was destroyed. | ||
84 | */ | ||
85 | private TunnelEndHandler tunnelEndHandler; | ||
69 | 86 | ||
70 | public class OriginTunnel extends Tunnel { | 87 | /** |
71 | public DisconnectHandler disconnectHandler; | 88 | * Message handler for messages received through |
72 | public ConnectHandler connectHandler; | 89 | * a tunnel. |
90 | */ | ||
91 | private MeshRunabout messageReceiver; | ||
73 | 92 | ||
74 | public void addPeer(PeerIdentity peerIdentity) { | 93 | /** |
75 | throw new UnsupportedOperationException("not implemented"); | 94 | * Ports that we listen on. |
76 | } | 95 | */ |
96 | private int[] ports; | ||
77 | 97 | ||
78 | /** | 98 | /** |
79 | * Request that the given peer isn't added to this tunnel in calls to | 99 | * Handler for inbound tunnels. |
80 | * connect_by_* calls, (due to misbehaviour, bad performance, ...). | 100 | */ |
81 | * | 101 | private InboundTunnelHandler inboundTunnelHandler; |
82 | * @param peerIdentity peer identity of the peer which should be blacklisted | ||
83 | * for the tunnel. | ||
84 | */ | ||
85 | public void blacklist(PeerIdentity peerIdentity) { | ||
86 | throw new UnsupportedOperationException("not implemented"); | ||
87 | } | ||
88 | 102 | ||
89 | /** | 103 | /** |
90 | * Request that the given peer isn't blacklisted anymore from this tunnel, | 104 | * Mapping from the tunnel's ID to the tunnel object. |
91 | * and therefore can be added in future calls to connect*. | 105 | */ |
92 | * The peer must have been previously blacklisted for this tunnel. | 106 | private Map<Integer,Tunnel> tunnelMap = new HashMap<>(); |
93 | * | ||
94 | * @param peerIdentity peer identity of the peer which shouldn't be blacklisted | ||
95 | * for the tunnel anymore. | ||
96 | */ | ||
97 | public void unblacklist(PeerIdentity peerIdentity) { | ||
98 | throw new UnsupportedOperationException("not implemented"); | ||
99 | } | ||
100 | 107 | ||
101 | /** | 108 | /** |
102 | * Request that the mesh should try to connect to a peer supporting the given | 109 | * Counter for generating fresh tunnel ID's |
103 | * message type. | 110 | * when creating new tunnels. |
104 | * | 111 | */ |
105 | * @param appType application type that must be supported by the peer | 112 | int next_tid = 1; |
106 | * (MESH should discover peer in proximity handling this type) | ||
107 | */ | ||
108 | public void requestConnectByType(int appType) { | ||
109 | ConnectPeerByTypeMessage m = new ConnectPeerByTypeMessage(); | ||
110 | m.applicationType = appType; | ||
111 | m.tunnelId = tunnelId; | ||
112 | requestQueue.add(new FixedMessageRequest(m)); | ||
113 | } | ||
114 | 113 | ||
115 | /** | 114 | /** |
116 | * Request that the mesh should try to connect to a peer matching the | 115 | * A tunnel to a remote peer. |
117 | * description given in the service string. | 116 | * @param <T> type of context data for the tunnel |
118 | * | 117 | */ |
119 | * @param description string describing the destination node requirements | 118 | public class Tunnel<T> extends MessageQueue { |
120 | */ | 119 | private final int opt; |
121 | public void requestConnectByString(String description) { | 120 | public final PeerIdentity peer; |
122 | throw new UnsupportedOperationException("not implemented"); | 121 | public final int port; |
123 | } | 122 | protected int tunnelId; |
123 | private boolean receive_done_expected = false; | ||
124 | int ack_count = 0; | ||
124 | 125 | ||
125 | /** | 126 | /** |
126 | * Request that a peer should be added to the tunnel. The connect handler | 127 | * Create a new tunnel (we're initiator and will be allowed to add/remove peers |
127 | * will be called when the peer connects | 128 | * and to broadcast). |
128 | * | 129 | * |
129 | * @param peer peer to add | 130 | * @param context tunnel context |
131 | * @param peer peer identity the tunnel should go to | ||
132 | * @param port Port number. | ||
133 | * @param nobuffer Flag for disabling buffering on relay nodes. | ||
134 | * @param reliable Flag for end-to-end reliability. | ||
130 | */ | 135 | */ |
131 | public void requestConnectAdd(PeerIdentity peer) { | 136 | public Tunnel(PeerIdentity peer, int port, boolean nobuffer, boolean reliable, T context) |
132 | throw new UnsupportedOperationException("not implemented"); | 137 | { |
138 | this(peer, 0, port, nobuffer, reliable); | ||
139 | TunnelCreateMessage tcm = new TunnelCreateMessage(); | ||
140 | tcm.otherEnd = peer; | ||
141 | tcm.opt = opt; | ||
142 | tcm.port = port; | ||
143 | tcm.tunnel_id = tunnelId; | ||
144 | client_mq.send(tcm); | ||
133 | } | 145 | } |
134 | 146 | ||
135 | /** | 147 | /** |
136 | * Request that a peer should be removed from the tunnel. The existing | 148 | * Private tunnel constructor, for creating tunnel objects for |
137 | * disconnect handler will be called ONCE if we were connected. | 149 | * incoming tunnels. |
138 | * | 150 | * |
139 | * @param peer peer to remove | 151 | * @param peer |
152 | * @param tunnelId | ||
153 | * @param port | ||
154 | * @param nobuffer | ||
155 | * @param reliable | ||
140 | */ | 156 | */ |
141 | public void requestConnectDel(PeerIdentity peer) { | 157 | private Tunnel(PeerIdentity peer, int tunnelId, int port, boolean nobuffer, boolean reliable) { |
142 | throw new UnsupportedOperationException("not implemented"); | 158 | int my_opt = 0; |
143 | } | 159 | if (reliable) |
144 | 160 | my_opt |= OPTION_RELIABLE; | |
145 | private void registerWithService() { | 161 | if (nobuffer) |
146 | requestQueue.add(new TunnelCreateRequest(this)); | 162 | my_opt |= OPTION_NOBUFFER; |
147 | } | 163 | if (0 == tunnelId) |
148 | } | 164 | this.tunnelId = ((next_tid++) | TUNNEL_ID_CLI) & ~TUNNEL_ID_SERV; |
149 | 165 | else | |
150 | public static class TunnelTransmitRequest extends Request { | 166 | this.tunnelId = tunnelId; |
151 | 167 | this.peer = peer; | |
152 | public boolean doNotSend = false; | 168 | this.port = port; |
153 | 169 | this.opt = my_opt; | |
154 | static class Sink implements Connection.MessageSink { | ||
155 | byte[] payload; | ||
156 | |||
157 | @Override | ||
158 | public void send(GnunetMessage.Body m) { | ||
159 | if (payload != null) { | ||
160 | throw new AssertionError("only one payload allowed per transmitter"); | ||
161 | } | ||
162 | payload = Construct.toBinary(GnunetMessage.fromBody(m)); | ||
163 | } | ||
164 | } | ||
165 | |||
166 | public PeerIdentity target; | ||
167 | public MessageTransmitter transmitter; | ||
168 | public Tunnel tunnel; | ||
169 | |||
170 | @Override | ||
171 | public void transmit(Connection.MessageSink sink) { | ||
172 | if (doNotSend) { | ||
173 | return; | ||
174 | } | ||
175 | |||
176 | Sink s = new Sink(); | ||
177 | transmitter.transmit(s); | ||
178 | |||
179 | if (tunnel.tunnelId >= Mesh.LOCAL_TUNNEL_ID_SERV) { | ||
180 | // we are not the origin, thus can only send to origin | ||
181 | OriginMessage m = new OriginMessage(); | ||
182 | m.tid = tunnel.tunnelId; | ||
183 | m.pid = tunnel.nextSentPacketId++; | ||
184 | m.payload = s.payload; | ||
185 | m.oid = new PeerIdentity(); | ||
186 | m.sender = m.oid; | ||
187 | sink.send(m); | ||
188 | } else if (target == null) { | ||
189 | // multicast, we are origin | ||
190 | MulticastMessage m = new MulticastMessage(); | ||
191 | m.tid = tunnel.tunnelId; | ||
192 | m.pid = tunnel.nextSentPacketId++; | ||
193 | m.payload = s.payload; | ||
194 | sink.send(m); | ||
195 | } else { | ||
196 | // unicast | ||
197 | System.out.println("sending unicast"); | ||
198 | UnicastMessage m = new UnicastMessage(); | ||
199 | m.destination = target; | ||
200 | m.oid = new PeerIdentity(); | ||
201 | m.tid = tunnel.tunnelId; | ||
202 | m.pid = tunnel.nextSentPacketId++; | ||
203 | m.payload = s.payload; | ||
204 | sink.send(m); | ||
205 | } | ||
206 | } | 170 | } |
207 | } | ||
208 | |||
209 | public class Tunnel { | ||
210 | protected int tunnelId; | ||
211 | |||
212 | private int nextSentPacketId = 0; | ||
213 | private int maxSentPacketId = INITIAL_WINDOW_SIZE - 1; | ||
214 | |||
215 | private TunnelTransmitRequest waitingTunnelTransmitRequest; | ||
216 | |||
217 | /** | ||
218 | * Ask the mesh to call "notify" once it is ready to transmit the | ||
219 | * given number of bytes to the specified tunnel or target. | ||
220 | * Only one call can be active at any time, to issue another request, | ||
221 | * wait for the callback or cancel the current request. | ||
222 | * | ||
223 | * @param maxdelay how long can the message wait? | ||
224 | * @param target destination for the message | ||
225 | * NULL for multicast to all tunnel targets | ||
226 | * @param notify_size how many bytes of buffer space does notify want? | ||
227 | * @param transmitter handler to call when buffer space is available; | ||
228 | * will be called with NULL on timeout or if the overall queue | ||
229 | * for this peer is larger than queue_size and this is currently | ||
230 | * the message with the lowest priority | ||
231 | * @return non-NULL if the notify callback was queued, | ||
232 | * NULL if we can not even queue the request (insufficient | ||
233 | * memory); if NULL is returned, "notify" will NOT be called. | ||
234 | */ | ||
235 | public Cancelable notifyTransmitReady(RelativeTime maxdelay, PeerIdentity target, int notify_size, MessageTransmitter transmitter) { | ||
236 | if (waitingTunnelTransmitRequest != null) { | ||
237 | throw new AssertionError(); | ||
238 | } | ||
239 | 171 | ||
240 | final TunnelTransmitRequest request = new TunnelTransmitRequest(); | 172 | public void receiveDone() { |
241 | request.target = target; | 173 | if (!receive_done_expected) |
242 | request.transmitter = transmitter; | 174 | throw new AssertionError("unexpected call to receiveDone"); |
243 | request.tunnel = this; | 175 | LocalAckMessage am = new LocalAckMessage(); |
244 | request.setDeadline(maxdelay.toAbsolute()); | 176 | am.tid = tunnelId; |
245 | 177 | client_mq.send(am); | |
246 | if (nextSentPacketId <= maxSentPacketId) { | 178 | receive_done_expected = false; |
247 | |||
248 | final Cancelable cancel = requestQueue.add(request); | ||
249 | |||
250 | return new Cancelable() { | ||
251 | @Override | ||
252 | public void cancel() { | ||
253 | cancel.cancel(); | ||
254 | } | ||
255 | }; | ||
256 | } else { | ||
257 | // we have to wait until we get a local ack from the service | ||
258 | waitingTunnelTransmitRequest = request; | ||
259 | |||
260 | return new Cancelable() { | ||
261 | @Override | ||
262 | public void cancel() { | ||
263 | request.doNotSend = true; | ||
264 | } | ||
265 | }; | ||
266 | } | ||
267 | } | 179 | } |
268 | 180 | ||
269 | public void destroy() { | 181 | public void destroy() { |
270 | // todo | 182 | TunnelDestroyMessage m = new TunnelDestroyMessage(); |
271 | } | 183 | m.tunnel_id = tunnelId; |
272 | 184 | client_mq.send(m); | |
273 | private void onAckUpdated() { | ||
274 | if (waitingTunnelTransmitRequest == null) { | ||
275 | return; | ||
276 | } | ||
277 | requestQueue.add(waitingTunnelTransmitRequest); | ||
278 | waitingTunnelTransmitRequest = null; | ||
279 | } | ||
280 | } | ||
281 | |||
282 | |||
283 | /** | ||
284 | * A request to initialize the connection with the mesh service. | ||
285 | */ | ||
286 | public class ClientConnectRequest extends Request { | ||
287 | @Override | ||
288 | public void transmit(Connection.MessageSink sink) { | ||
289 | System.out.println("transmit called " + this); | ||
290 | |||
291 | ClientConnectMessage ccm = new ClientConnectMessage(); | ||
292 | ccm.applications_length = applications.length; | ||
293 | ccm.apps_list = applications; | ||
294 | int[] types; | ||
295 | if (messageReceiver != null) { | ||
296 | types = RunaboutUtil.getRunaboutMessageTypes(messageReceiver); | ||
297 | } else { | ||
298 | types = new int[0]; | ||
299 | } | ||
300 | ccm.types_list = types; | ||
301 | ccm.types_length = types.length; | ||
302 | |||
303 | sink.send(ccm); | ||
304 | } | ||
305 | } | ||
306 | |||
307 | public static class TunnelCreateRequest extends Request { | ||
308 | public OriginTunnel tunnel; | ||
309 | |||
310 | public TunnelCreateRequest(OriginTunnel rootTunnel) { | ||
311 | tunnel = rootTunnel; | ||
312 | } | 185 | } |
313 | 186 | ||
314 | @Override | 187 | @Override |
315 | public void transmit(Connection.MessageSink sink) { | 188 | protected void sendImmediate(Envelope ev) { |
316 | TunnelCreateMessage tcm = new TunnelCreateMessage(); | 189 | if (ack_count <= 0) |
317 | tcm.tunnel_id = tunnel.tunnelId; | 190 | throw new AssertionError(); |
318 | sink.send(tcm); | 191 | DataMessage m = new DataMessage(); |
192 | m.payload = Construct.toBinary(GnunetMessage.fromBody(ev.message)); | ||
193 | Envelope mesh_ev = new Envelope(m); | ||
194 | client_mq.send(mesh_ev); | ||
195 | ack_count -= 1; | ||
319 | } | 196 | } |
320 | } | 197 | } |
321 | 198 | ||
322 | 199 | ||
323 | private class MeshMessageReceiver extends RunaboutMessageReceiver { | 200 | private class MeshMessageReceiver extends RunaboutMessageReceiver { |
324 | public void visit(PeerAddMessage b) { | ||
325 | Tunnel r = tunnelMap.get(b.tunnelId); | ||
326 | if (r == null || !(r instanceof OriginTunnel)) { | ||
327 | logger.warn("server got confused with tunnel IDs on peer add, ignoring message"); | ||
328 | return; | ||
329 | } | ||
330 | OriginTunnel ot = (OriginTunnel) r; | ||
331 | if (ot.connectHandler != null) { | ||
332 | ot.connectHandler.onConnect(ot, b.peer); | ||
333 | } | ||
334 | } | ||
335 | |||
336 | public void visit(PeerDeleteMessage b) { | ||
337 | Tunnel r = tunnelMap.get(b.tunnelId); | ||
338 | if (r == null || !(r instanceof OriginTunnel)) { | ||
339 | logger.warn("server got confused with tunnel IDs on peer delete, ignoring message"); | ||
340 | return; | ||
341 | } | ||
342 | OriginTunnel ot = (OriginTunnel) r; | ||
343 | if (ot.disconnectHandler != null) { | ||
344 | ot.disconnectHandler.onDisconnect(b.peer); | ||
345 | } | ||
346 | } | ||
347 | |||
348 | public void visit(TunnelCreateMessage m) { | 201 | public void visit(TunnelCreateMessage m) { |
349 | Tunnel t = new Tunnel(); | 202 | Tunnel t = new Tunnel(m.otherEnd, m.tunnel_id, m.port, |
350 | t.tunnelId = m.tunnel_id; | 203 | (m.opt & OPTION_NOBUFFER) != 0, (m.opt & OPTION_NOBUFFER) != 0); |
351 | if (inboundTunnelHandler != null) { | 204 | if (inboundTunnelHandler != null) { |
352 | inboundTunnelHandler.onInboundTunnel(t, m.otherEnd); | 205 | inboundTunnelHandler.onInboundTunnel(t, m.otherEnd); |
353 | } | 206 | } |
354 | } | 207 | } |
355 | 208 | ||
356 | public void visit(UnicastMessage m) { | 209 | public void visit(DataMessage m) { |
357 | messageReceiver.setSender(m.oid); | 210 | Tunnel t = tunnelMap.get(m.tid); |
358 | messageReceiver.visitAppropriate(Construct.parseAs(m.payload, GnunetMessage.class).body); | 211 | if (t != null) |
359 | } | 212 | { |
360 | 213 | if (t.receive_done_expected) | |
361 | public void visit(MulticastMessage m) { | 214 | logger.warn("got unexpected message from service"); |
362 | messageReceiver.setSender(m.oid); | 215 | t.receive_done_expected = true; |
363 | messageReceiver.visitAppropriate(Construct.parseAs(m.payload, GnunetMessage.class).body); | 216 | messageReceiver.visitAppropriate(Construct.parseAs(m.payload, GnunetMessage.class).body); |
364 | } | 217 | } |
365 | |||
366 | public void visit(OriginMessage m) { | ||
367 | messageReceiver.setSender(m.sender); | ||
368 | messageReceiver.visitAppropriate(Construct.parseAs(m.payload, GnunetMessage.class).body); | ||
369 | } | 218 | } |
370 | 219 | ||
371 | public void visit(LocalAckMessage m) { | 220 | public void visit(LocalAckMessage m) { |
372 | Tunnel t = tunnelMap.get(m.tid); | 221 | Tunnel t = tunnelMap.get(m.tid); |
373 | if (t == null) { | 222 | if (t != null) |
374 | logger.warn("server got confused with tunnel IDs on ack, ignoring message"); | 223 | t.ack_count += 1; |
375 | return; | ||
376 | } | ||
377 | t.nextSentPacketId = m.maxPid; | ||
378 | t.onAckUpdated(); | ||
379 | } | 224 | } |
380 | 225 | ||
381 | public void visit(TunnelDestroyMessage m) { | 226 | public void visit(TunnelDestroyMessage m) { |
@@ -405,59 +250,28 @@ public class Mesh { | |||
405 | * is called on the tunnel | 250 | * is called on the tunnel |
406 | */ | 251 | */ |
407 | public Mesh(Configuration cfg, InboundTunnelHandler inboundTunnelHandler, | 252 | public Mesh(Configuration cfg, InboundTunnelHandler inboundTunnelHandler, |
408 | TunnelEndHandler tunnelEndHandler, MeshRunabout messageReceiver, int... applications) { | 253 | TunnelEndHandler tunnelEndHandler, MeshRunabout messageReceiver, int... ports) { |
409 | this.tunnelEndHandler = tunnelEndHandler; | 254 | this.tunnelEndHandler = tunnelEndHandler; |
410 | this.messageReceiver = messageReceiver; | 255 | this.messageReceiver = messageReceiver; |
411 | this.applications = applications; | 256 | this.ports = ports; |
412 | this.inboundTunnelHandler = inboundTunnelHandler; | 257 | this.inboundTunnelHandler = inboundTunnelHandler; |
413 | 258 | ||
414 | Client client = new Client("mesh", cfg); | 259 | client = new Client("mesh", cfg); |
415 | requestQueue = new RequestQueue(client, new MeshMessageReceiver()); | 260 | client_mq = new ClientMessageQueue(client, new MeshMessageReceiver()); |
416 | 261 | ClientConnectMessage ccm = new ClientConnectMessage(); | |
417 | requestQueue.add(new ClientConnectRequest()); | 262 | ccm.apps_list = ports; |
418 | } | 263 | client_mq.send(ccm); |
419 | |||
420 | /** | ||
421 | * Create a new tunnel (we're initiator and will be allowed to add/remove peers | ||
422 | * and to broadcast). | ||
423 | * | ||
424 | * @param connectHandler callback for when a new peer connects to the tunnel, either because the origin added him, | ||
425 | * or the client joined the tunnel | ||
426 | * @param disconnectHandler callback for when when a peer is disconnected | ||
427 | */ | ||
428 | public OriginTunnel createTunnel(ConnectHandler connectHandler, DisconnectHandler disconnectHandler) { | ||
429 | OriginTunnel tunnel = new OriginTunnel(); | ||
430 | tunnel.connectHandler = connectHandler; | ||
431 | tunnel.disconnectHandler = disconnectHandler; | ||
432 | tunnel.tunnelId = nextTunnelId++; | ||
433 | tunnelMap.put(tunnel.tunnelId, tunnel); | ||
434 | tunnel.registerWithService(); | ||
435 | return tunnel; | ||
436 | } | ||
437 | |||
438 | |||
439 | /** | ||
440 | * Announce to ther peer the availability of services described by the regex, | ||
441 | * in order to be reachable to other peers via connect_by_string. | ||
442 | * <p/> | ||
443 | * Note that the first 8 characters are considered to be part of a prefix, | ||
444 | * (for instance 'gnunet://'). If you put a variable part in there (*, +. ()), | ||
445 | * all matching strings will be stored in the DHT. | ||
446 | * | ||
447 | * @param regex string with the regular expression describing local services. | ||
448 | */ | ||
449 | public void announceRegex(String regex) { | ||
450 | throw new UnsupportedOperationException("not implemented"); | ||
451 | } | 264 | } |
452 | 265 | ||
453 | 266 | ||
454 | /** | 267 | /** |
455 | * Disconnect from the mesh service. All tunnels will be destroyed. All tunnel | 268 | * Disconnect from the mesh service. |
456 | * disconnect callbacks will be called on any still connected peers, notifying | 269 | * All tunnels will be destroyed. |
457 | * about their disconnection. The registered inbound tunnel cleaner will be | 270 | * All tunnel disconnect callbacks will be called on any still connected peers, notifying |
458 | * called should any inbound tunnels still exist. | 271 | * about their disconnection. |
459 | */ | 272 | */ |
460 | public void disconnect() { | 273 | public void disconnect() { |
461 | requestQueue.destroy(); | 274 | client_mq.destroy(); |
275 | client.disconnect(); | ||
462 | } | 276 | } |
463 | } | 277 | } |
diff --git a/src/org/gnunet/mesh/MulticastMessage.java b/src/org/gnunet/mesh/MulticastMessage.java deleted file mode 100644 index 2ca344d..0000000 --- a/src/org/gnunet/mesh/MulticastMessage.java +++ /dev/null | |||
@@ -1,27 +0,0 @@ | |||
1 | package org.gnunet.mesh; | ||
2 | |||
3 | import org.gnunet.construct.*; | ||
4 | import org.gnunet.util.GnunetMessage; | ||
5 | import org.gnunet.util.PeerIdentity; | ||
6 | |||
7 | /** | ||
8 | * ... | ||
9 | * | ||
10 | * @author Florian Dold | ||
11 | */ | ||
12 | @UnionCase(261) | ||
13 | public class MulticastMessage implements GnunetMessage.Body { | ||
14 | /** | ||
15 | * Tunnel ID | ||
16 | */ | ||
17 | @UInt32 | ||
18 | public int tid; | ||
19 | @UInt32 | ||
20 | public int ttl; | ||
21 | @UInt32 | ||
22 | public int pid; | ||
23 | @NestedMessage | ||
24 | public PeerIdentity oid; | ||
25 | @FillWith @UInt8 | ||
26 | public byte[] payload; | ||
27 | } | ||
diff --git a/src/org/gnunet/mesh/OriginMessage.java b/src/org/gnunet/mesh/OriginMessage.java deleted file mode 100644 index 143eaeb..0000000 --- a/src/org/gnunet/mesh/OriginMessage.java +++ /dev/null | |||
@@ -1,27 +0,0 @@ | |||
1 | package org.gnunet.mesh; | ||
2 | |||
3 | import org.gnunet.construct.*; | ||
4 | import org.gnunet.util.GnunetMessage; | ||
5 | import org.gnunet.util.PeerIdentity; | ||
6 | |||
7 | /** | ||
8 | * ... | ||
9 | * | ||
10 | * @author Florian Dold | ||
11 | */ | ||
12 | @UnionCase(262) | ||
13 | public class OriginMessage implements GnunetMessage.Body { | ||
14 | @UInt32 | ||
15 | public int tid; | ||
16 | @UInt32 | ||
17 | public int ttl; | ||
18 | @UInt32 | ||
19 | public int pid; | ||
20 | @NestedMessage | ||
21 | public PeerIdentity oid; | ||
22 | @NestedMessage | ||
23 | public PeerIdentity sender; | ||
24 | @FillWith | ||
25 | @UInt8 | ||
26 | public byte[] payload; | ||
27 | } | ||
diff --git a/src/org/gnunet/mesh/PeerAddMessage.java b/src/org/gnunet/mesh/PeerAddMessage.java deleted file mode 100644 index 099165c..0000000 --- a/src/org/gnunet/mesh/PeerAddMessage.java +++ /dev/null | |||
@@ -1,23 +0,0 @@ | |||
1 | package org.gnunet.mesh; | ||
2 | |||
3 | import org.gnunet.construct.NestedMessage; | ||
4 | import org.gnunet.construct.UInt32; | ||
5 | import org.gnunet.construct.UnionCase; | ||
6 | import org.gnunet.util.GnunetMessage; | ||
7 | import org.gnunet.util.PeerIdentity; | ||
8 | |||
9 | /** | ||
10 | * Message used for two things (bad!) | ||
11 | * (1) client->server: request that a certain peer is added to a tunnel | ||
12 | * (2) server->client: notify the client that a new peer has joined the tunnel | ||
13 | * | ||
14 | * @author Florian Dold | ||
15 | */ | ||
16 | @UnionCase(275) | ||
17 | public class PeerAddMessage implements GnunetMessage.Body { | ||
18 | @UInt32 | ||
19 | public int tunnelId; | ||
20 | |||
21 | @NestedMessage | ||
22 | public PeerIdentity peer; | ||
23 | } | ||
diff --git a/src/org/gnunet/mesh/PeerDeleteMessage.java b/src/org/gnunet/mesh/PeerDeleteMessage.java deleted file mode 100644 index 045e565..0000000 --- a/src/org/gnunet/mesh/PeerDeleteMessage.java +++ /dev/null | |||
@@ -1,23 +0,0 @@ | |||
1 | package org.gnunet.mesh; | ||
2 | |||
3 | import org.gnunet.construct.NestedMessage; | ||
4 | import org.gnunet.construct.UInt32; | ||
5 | import org.gnunet.construct.UnionCase; | ||
6 | import org.gnunet.util.GnunetMessage; | ||
7 | import org.gnunet.util.PeerIdentity; | ||
8 | |||
9 | /** | ||
10 | * Message used for two things (bad!) | ||
11 | * (1) client->server: request that a certain peer is added to a tunnel | ||
12 | * (2) server->client: notify the client that a new peer has joined the tunnel | ||
13 | * | ||
14 | * @author Florian Dold | ||
15 | */ | ||
16 | @UnionCase(276) | ||
17 | public class PeerDeleteMessage implements GnunetMessage.Body { | ||
18 | @UInt32 | ||
19 | public int tunnelId; | ||
20 | |||
21 | @NestedMessage | ||
22 | public PeerIdentity peer; | ||
23 | } | ||
diff --git a/src/org/gnunet/mesh/TunnelCreateMessage.java b/src/org/gnunet/mesh/TunnelCreateMessage.java index 7b63bc6..eaa4d6c 100644 --- a/src/org/gnunet/mesh/TunnelCreateMessage.java +++ b/src/org/gnunet/mesh/TunnelCreateMessage.java | |||
@@ -7,11 +7,7 @@ import org.gnunet.util.GnunetMessage; | |||
7 | import org.gnunet.util.PeerIdentity; | 7 | import org.gnunet.util.PeerIdentity; |
8 | 8 | ||
9 | /** | 9 | /** |
10 | * Message used to | 10 | * FIXME |
11 | * a) request the service to create a new tunnel with the given tunnel id | ||
12 | * b) notify the client of a newly created tunnel | ||
13 | * | ||
14 | * todo: this is bad design, split into two messages in the C code! | ||
15 | * | 11 | * |
16 | * @author Florian Dold | 12 | * @author Florian Dold |
17 | */ | 13 | */ |
@@ -20,9 +16,12 @@ public class TunnelCreateMessage implements GnunetMessage.Body { | |||
20 | @UInt32 | 16 | @UInt32 |
21 | public int tunnel_id; | 17 | public int tunnel_id; |
22 | 18 | ||
23 | /** | 19 | @NestedMessage(optional = false) |
24 | * Only present if sent from server to client (purpose b) | ||
25 | */ | ||
26 | @NestedMessage(optional = true) | ||
27 | public PeerIdentity otherEnd; | 20 | public PeerIdentity otherEnd; |
21 | |||
22 | @UInt32 | ||
23 | public int port; | ||
24 | |||
25 | @UInt32 | ||
26 | public int opt; | ||
28 | } | 27 | } |
diff --git a/src/org/gnunet/mq/ClientMessageQueue.java b/src/org/gnunet/mq/ClientMessageQueue.java new file mode 100644 index 0000000..bfb0466 --- /dev/null +++ b/src/org/gnunet/mq/ClientMessageQueue.java | |||
@@ -0,0 +1,48 @@ | |||
1 | package org.gnunet.mq; | ||
2 | |||
3 | |||
4 | import org.gnunet.construct.Construct; | ||
5 | import org.gnunet.util.*; | ||
6 | |||
7 | /** | ||
8 | * Message queue for org.util.Connection | ||
9 | */ | ||
10 | public class ClientMessageQueue extends MessageQueue { | ||
11 | private final Client client; | ||
12 | private final RunaboutMessageReceiver receiver; | ||
13 | |||
14 | |||
15 | public ClientMessageQueue(Client client, RunaboutMessageReceiver receiver) { | ||
16 | this.client = client; | ||
17 | this.receiver = receiver; | ||
18 | } | ||
19 | |||
20 | |||
21 | public ClientMessageQueue(Client client) { | ||
22 | this(client, null); | ||
23 | } | ||
24 | |||
25 | |||
26 | @Override | ||
27 | protected void sendImmediate(final Envelope ev) { | ||
28 | int size = Construct.getSize(ev.message); | ||
29 | client.notifyTransmitReady(RelativeTime.FOREVER, false, size, new MessageTransmitter() { | ||
30 | @Override | ||
31 | public void transmit(Connection.MessageSink sink) { | ||
32 | sink.send(ev.message); | ||
33 | reportMessageSent(); | ||
34 | } | ||
35 | |||
36 | @Override | ||
37 | public void handleError() { | ||
38 | // FIXME | ||
39 | } | ||
40 | }); | ||
41 | } | ||
42 | |||
43 | |||
44 | @Override | ||
45 | public void destroy() { | ||
46 | |||
47 | } | ||
48 | } | ||
diff --git a/src/org/gnunet/mq/Envelope.java b/src/org/gnunet/mq/Envelope.java new file mode 100644 index 0000000..09c0c2c --- /dev/null +++ b/src/org/gnunet/mq/Envelope.java | |||
@@ -0,0 +1,34 @@ | |||
1 | package org.gnunet.mq; | ||
2 | |||
3 | import org.gnunet.util.GnunetMessage; | ||
4 | |||
5 | /** | ||
6 | * Container for a message to be sent by a message queue. | ||
7 | */ | ||
8 | public class Envelope { | ||
9 | public final GnunetMessage.Body message; | ||
10 | private MessageQueue parent_queue; | ||
11 | private NotifySentHandler notify_sent_handler; | ||
12 | |||
13 | public Envelope(GnunetMessage.Body message) { | ||
14 | this.message = message; | ||
15 | } | ||
16 | |||
17 | public void notifySent(NotifySentHandler h) { | ||
18 | this.notify_sent_handler = h; | ||
19 | } | ||
20 | |||
21 | public void injectSent() { | ||
22 | if (notify_sent_handler != null) | ||
23 | notify_sent_handler.onSent(); | ||
24 | } | ||
25 | |||
26 | public void cancel() { | ||
27 | // TODO | ||
28 | } | ||
29 | |||
30 | /* pkg-private */ void invokeSentNotification() { | ||
31 | if (null != notify_sent_handler) | ||
32 | notify_sent_handler.onSent(); | ||
33 | } | ||
34 | } | ||
diff --git a/src/org/gnunet/mq/MessageQueue.java b/src/org/gnunet/mq/MessageQueue.java new file mode 100644 index 0000000..de08edf --- /dev/null +++ b/src/org/gnunet/mq/MessageQueue.java | |||
@@ -0,0 +1,41 @@ | |||
1 | package org.gnunet.mq; | ||
2 | |||
3 | |||
4 | import org.gnunet.util.GnunetMessage; | ||
5 | |||
6 | import java.util.LinkedList; | ||
7 | |||
8 | /** | ||
9 | * General-purpose message queue | ||
10 | */ | ||
11 | public abstract class MessageQueue { | ||
12 | private LinkedList<Envelope> queued_envelopes = new LinkedList<>(); | ||
13 | protected Envelope current_envelope; | ||
14 | |||
15 | protected abstract void sendImmediate(Envelope ev); | ||
16 | |||
17 | public void send(GnunetMessage.Body body) { | ||
18 | send(new Envelope(body)); | ||
19 | } | ||
20 | |||
21 | public void send(Envelope ev) { | ||
22 | if (null == current_envelope) { | ||
23 | current_envelope = ev; | ||
24 | sendImmediate(current_envelope); | ||
25 | } else { | ||
26 | queued_envelopes.addLast(ev); | ||
27 | } | ||
28 | } | ||
29 | |||
30 | protected void reportMessageSent() { | ||
31 | if (null == current_envelope) | ||
32 | throw new AssertionError(); | ||
33 | current_envelope.invokeSentNotification(); | ||
34 | if (queued_envelopes.isEmpty()) | ||
35 | return; | ||
36 | current_envelope = queued_envelopes.pop(); | ||
37 | sendImmediate(current_envelope); | ||
38 | } | ||
39 | |||
40 | public abstract void destroy(); | ||
41 | } | ||
diff --git a/src/org/gnunet/mq/NotifySentHandler.java b/src/org/gnunet/mq/NotifySentHandler.java new file mode 100644 index 0000000..7ec13b2 --- /dev/null +++ b/src/org/gnunet/mq/NotifySentHandler.java | |||
@@ -0,0 +1,6 @@ | |||
1 | package org.gnunet.mq; | ||
2 | |||
3 | |||
4 | public interface NotifySentHandler { | ||
5 | void onSent(); | ||
6 | } | ||
diff --git a/src/org/gnunet/requests/RequestQueue.java b/src/org/gnunet/requests/RequestQueue.java index e514303..6f7102d 100644 --- a/src/org/gnunet/requests/RequestQueue.java +++ b/src/org/gnunet/requests/RequestQueue.java | |||
@@ -28,7 +28,6 @@ import java.util.LinkedList; | |||
28 | * Generic queues for Requests to be sent to the service. | 28 | * Generic queues for Requests to be sent to the service. |
29 | */ | 29 | */ |
30 | public class RequestQueue { | 30 | public class RequestQueue { |
31 | // todo: implement more efficiently (attributes instead of multiple queues) | ||
32 | 31 | ||
33 | /** | 32 | /** |
34 | * Requests to be transmitted to the service. | 33 | * Requests to be transmitted to the service. |
diff --git a/src/org/gnunet/transport/RequestConnectMessage.java b/src/org/gnunet/transport/RequestConnectMessage.java index 9bd7a5b..63bbc39 100644 --- a/src/org/gnunet/transport/RequestConnectMessage.java +++ b/src/org/gnunet/transport/RequestConnectMessage.java | |||
@@ -21,5 +21,4 @@ public class RequestConnectMessage implements GnunetMessage.Body { | |||
21 | */ | 21 | */ |
22 | @NestedMessage | 22 | @NestedMessage |
23 | public PeerIdentity peer; | 23 | public PeerIdentity peer; |
24 | |||
25 | } | 24 | } |
diff --git a/src/org/gnunet/transport/Transport.java b/src/org/gnunet/transport/Transport.java index f79c340..5ccbc77 100644 --- a/src/org/gnunet/transport/Transport.java +++ b/src/org/gnunet/transport/Transport.java | |||
@@ -1,6 +1,7 @@ | |||
1 | package org.gnunet.transport; | 1 | package org.gnunet.transport; |
2 | 2 | ||
3 | import org.gnunet.hello.HelloMessage; | 3 | import org.gnunet.hello.HelloMessage; |
4 | import org.gnunet.mq.Envelope; | ||
4 | import org.gnunet.util.*; | 5 | import org.gnunet.util.*; |
5 | 6 | ||
6 | /** | 7 | /** |
@@ -25,7 +26,19 @@ public class Transport { | |||
25 | * NULL on failure (cb will not be called) | 26 | * NULL on failure (cb will not be called) |
26 | */ | 27 | */ |
27 | Cancelable tryConnect(PeerIdentity target, TryConnectCallback cb) { | 28 | Cancelable tryConnect(PeerIdentity target, TryConnectCallback cb) { |
28 | throw new UnsupportedOperationException(); | 29 | RequestConnectMessage m = new RequestConnectMessage(); |
30 | m.peer = target; | ||
31 | m.reserved = 0; | ||
32 | final Envelope ev = new Envelope(m); | ||
33 | ev.notifySent(null /* FIXME */); | ||
34 | //client_mq.send(ev); | ||
35 | |||
36 | return new Cancelable() { | ||
37 | @Override | ||
38 | public void cancel() { | ||
39 | ev.cancel(); | ||
40 | } | ||
41 | }; | ||
29 | } | 42 | } |
30 | 43 | ||
31 | 44 | ||