diff options
Diffstat (limited to 'src/org/gnunet/util/Client.java')
-rw-r--r-- | src/org/gnunet/util/Client.java | 111 |
1 files changed, 86 insertions, 25 deletions
diff --git a/src/org/gnunet/util/Client.java b/src/org/gnunet/util/Client.java index d2e1308..fbd94a6 100644 --- a/src/org/gnunet/util/Client.java +++ b/src/org/gnunet/util/Client.java | |||
@@ -22,6 +22,7 @@ package org.gnunet.util; | |||
22 | import org.slf4j.Logger; | 22 | import org.slf4j.Logger; |
23 | import org.slf4j.LoggerFactory; | 23 | import org.slf4j.LoggerFactory; |
24 | 24 | ||
25 | |||
25 | /** | 26 | /** |
26 | * A connection to a gnunet service. | 27 | * A connection to a gnunet service. |
27 | * | 28 | * |
@@ -69,6 +70,12 @@ public class Client { | |||
69 | private boolean notifyTransmitReadyDelayed; | 70 | private boolean notifyTransmitReadyDelayed; |
70 | 71 | ||
71 | /** | 72 | /** |
73 | * When notifyTransmitReadyDelayed is true, this can be used to cancel the task | ||
74 | * waiting for the connection to be established. | ||
75 | */ | ||
76 | private Cancelable delayedNotifyTransmitHandle; | ||
77 | |||
78 | /** | ||
72 | * Create a connection to a service. | 79 | * Create a connection to a service. |
73 | * | 80 | * |
74 | * @param serviceName name of the service | 81 | * @param serviceName name of the service |
@@ -111,16 +118,6 @@ public class Client { | |||
111 | return connection.receive(timeout, receiver); | 118 | return connection.receive(timeout, receiver); |
112 | } | 119 | } |
113 | 120 | ||
114 | |||
115 | private static class DelayedTransmitHandle implements Cancelable { | ||
116 | Cancelable realTransmitHandle; | ||
117 | Cancelable timeoutHandle; | ||
118 | @Override | ||
119 | public void cancel() { | ||
120 | throw new UnsupportedOperationException(); | ||
121 | } | ||
122 | } | ||
123 | |||
124 | /** | 121 | /** |
125 | * Ask the client to call us once it is able to send a message. | 122 | * Ask the client to call us once it is able to send a message. |
126 | * | 123 | * |
@@ -132,39 +129,103 @@ public class Client { | |||
132 | * if the caller does not care about temporary connection errors, | 129 | * if the caller does not care about temporary connection errors, |
133 | * for example because the protocol is stateless | 130 | * for example because the protocol is stateless |
134 | * @param size size of the message we want to transmit, can be an upper bound | 131 | * @param size size of the message we want to transmit, can be an upper bound |
135 | *@param transmitter the MessageTransmitter object to call once the client is ready to transmit or | 132 | * @param transmitter the MessageTransmitter object to call once the client is ready to transmit or |
136 | * when the timeout is over. Guaranteed to be called *after* notifyTransmitReady has returned. @return a handle that can be used to cancel the transmit request | 133 | * when the timeout is over. Guaranteed to be called *after* notifyTransmitReady has returned. @return a handle that can be used to cancel the transmit request |
134 | * | ||
135 | * @return a handle to cancel the notification | ||
137 | */ | 136 | */ |
138 | public Cancelable notifyTransmitReady(final RelativeTime timeout, | 137 | public Cancelable notifyTransmitReady(final RelativeTime timeout, |
139 | final boolean autoRetry, int size, final MessageTransmitter transmitter) { | 138 | final boolean autoRetry, int size, final MessageTransmitter transmitter) { |
139 | if (notifyTransmitReadyDelayed) { | ||
140 | throw new AssertionError("notifyTransmitReady called twice!"); | ||
141 | } | ||
140 | if (connection.isConnected()) { | 142 | if (connection.isConnected()) { |
141 | return connection.notifyTransmitReady(0, timeout, transmitter); | 143 | return connection.notifyTransmitReady(0, timeout, transmitter); |
142 | } else { | 144 | } else { |
143 | notifyTransmitReadyDelayed = true; | 145 | notifyTransmitReadyDelayed = true; |
144 | final DelayedTransmitHandle delayedTransmitHandle = new DelayedTransmitHandle(); | 146 | final AbsoluteTime deadline = timeout.toAbsolute(); |
145 | delayedTransmitHandle.timeoutHandle = Scheduler.addDelayed(connectBackoff, new Scheduler.Task() { | 147 | delayedNotifyTransmitHandle = connection.notifyConnected(connectBackoff, new Continuation() { |
146 | @Override | 148 | @Override |
147 | public void run(Scheduler.RunContext ctx) { | 149 | public void cont(boolean success) { |
148 | if (connection == null) { | 150 | delayedNotifyTransmitHandle = null; |
149 | return; | 151 | if (success) { |
150 | } | ||
151 | if (connection.isConnected()) { | ||
152 | notifyTransmitReadyDelayed = false; | 152 | notifyTransmitReadyDelayed = false; |
153 | connection.notifyTransmitReady(0, timeout, transmitter); | 153 | delayedNotifyTransmitHandle = connection.notifyTransmitReady(0, timeout, new MessageTransmitter() { |
154 | @Override | ||
155 | public void transmit(Connection.MessageSink sink) { | ||
156 | delayedNotifyTransmitHandle = null; | ||
157 | transmitter.transmit(sink); | ||
158 | } | ||
159 | |||
160 | @Override | ||
161 | public void handleError() { | ||
162 | delayedNotifyTransmitHandle = null; | ||
163 | transmitter.handleError(); | ||
164 | } | ||
165 | }); | ||
154 | } else { | 166 | } else { |
155 | logger.debug("still not connected, retrying in {}ms", connectBackoff.getMilliseconds()); | 167 | if (deadline.isDue()) { |
156 | reconnect(); | 168 | transmitter.handleError(); |
157 | connectBackoff = RelativeTime.min(connectBackoff.multiply(2), MAX_BACKOFF); | 169 | } else { |
158 | Scheduler.addDelayed(connectBackoff, this); | 170 | RelativeTime timeout = deadline.getRemaining(); |
171 | connectBackoff = RelativeTime.min(timeout, RelativeTime.min(connectBackoff.multiply(2), MAX_BACKOFF)); | ||
172 | reconnect(); | ||
173 | delayedNotifyTransmitHandle = connection.notifyConnected(connectBackoff, this); | ||
174 | } | ||
159 | } | 175 | } |
160 | } | 176 | } |
161 | }); | 177 | }); |
162 | return delayedTransmitHandle; | 178 | return new Cancelable() { |
179 | @Override | ||
180 | public void cancel() { | ||
181 | if (delayedNotifyTransmitHandle != null) { | ||
182 | delayedNotifyTransmitHandle.cancel(); | ||
183 | } | ||
184 | } | ||
185 | }; | ||
163 | } | 186 | } |
164 | } | 187 | } |
165 | 188 | ||
189 | /** | ||
190 | * Convenience method for sending messages. | ||
191 | * | ||
192 | * @param timeout when should we give up sending the message, and call cont.cont(false) | ||
193 | * @param message the message to send | ||
194 | * @param cont called when the message has been sent successfully or on error | ||
195 | * @return a handle to cancel sending the message | ||
196 | */ | ||
197 | public Cancelable transmitWhenReady(final RelativeTime timeout, final GnunetMessage.Body message, final Continuation cont) { | ||
198 | return notifyTransmitReady(timeout, false, 0, new MessageTransmitter() { | ||
199 | @Override | ||
200 | public void transmit(Connection.MessageSink sink) { | ||
201 | sink.send(message); | ||
202 | if (cont != null) { | ||
203 | cont.cont(true); | ||
204 | } | ||
205 | } | ||
206 | |||
207 | @Override | ||
208 | public void handleError() { | ||
209 | if (cont != null) { | ||
210 | cont.cont(false); | ||
211 | } | ||
212 | } | ||
213 | }); | ||
214 | } | ||
215 | |||
216 | /** | ||
217 | * Convenience method for sending messages. Timeout defaults to FOREVER. | ||
218 | * | ||
219 | * @param message the message to send | ||
220 | * @param cont called when the message has been sent successfully or on error | ||
221 | * @return a handle to cancel sending the message | ||
222 | */ | ||
223 | public Cancelable transmitWhenReady(final GnunetMessage.Body message, final Continuation cont) { | ||
224 | return transmitWhenReady(RelativeTime.FOREVER, message, cont); | ||
225 | } | ||
226 | |||
166 | 227 | ||
167 | public void reconnect() { | 228 | public final void reconnect() { |
168 | if (connection != null) { | 229 | if (connection != null) { |
169 | connection.disconnect(); | 230 | connection.disconnect(); |
170 | } | 231 | } |