aboutsummaryrefslogtreecommitdiff
path: root/src/org/gnunet/util/Client.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/org/gnunet/util/Client.java')
-rw-r--r--src/org/gnunet/util/Client.java111
1 files changed, 86 insertions, 25 deletions
diff --git a/src/org/gnunet/util/Client.java b/src/org/gnunet/util/Client.java
index d2e1308..fbd94a6 100644
--- a/src/org/gnunet/util/Client.java
+++ b/src/org/gnunet/util/Client.java
@@ -22,6 +22,7 @@ package org.gnunet.util;
22import org.slf4j.Logger; 22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory; 23import org.slf4j.LoggerFactory;
24 24
25
25/** 26/**
26 * A connection to a gnunet service. 27 * A connection to a gnunet service.
27 * 28 *
@@ -69,6 +70,12 @@ public class Client {
69 private boolean notifyTransmitReadyDelayed; 70 private boolean notifyTransmitReadyDelayed;
70 71
71 /** 72 /**
73 * When notifyTransmitReadyDelayed is true, this can be used to cancel the task
74 * waiting for the connection to be established.
75 */
76 private Cancelable delayedNotifyTransmitHandle;
77
78 /**
72 * Create a connection to a service. 79 * Create a connection to a service.
73 * 80 *
74 * @param serviceName name of the service 81 * @param serviceName name of the service
@@ -111,16 +118,6 @@ public class Client {
111 return connection.receive(timeout, receiver); 118 return connection.receive(timeout, receiver);
112 } 119 }
113 120
114
115 private static class DelayedTransmitHandle implements Cancelable {
116 Cancelable realTransmitHandle;
117 Cancelable timeoutHandle;
118 @Override
119 public void cancel() {
120 throw new UnsupportedOperationException();
121 }
122 }
123
124 /** 121 /**
125 * Ask the client to call us once it is able to send a message. 122 * Ask the client to call us once it is able to send a message.
126 * 123 *
@@ -132,39 +129,103 @@ public class Client {
132 * if the caller does not care about temporary connection errors, 129 * if the caller does not care about temporary connection errors,
133 * for example because the protocol is stateless 130 * for example because the protocol is stateless
134 * @param size size of the message we want to transmit, can be an upper bound 131 * @param size size of the message we want to transmit, can be an upper bound
135 *@param transmitter the MessageTransmitter object to call once the client is ready to transmit or 132 * @param transmitter the MessageTransmitter object to call once the client is ready to transmit or
136 * when the timeout is over. Guaranteed to be called *after* notifyTransmitReady has returned. @return a handle that can be used to cancel the transmit request 133 * when the timeout is over. Guaranteed to be called *after* notifyTransmitReady has returned. @return a handle that can be used to cancel the transmit request
134 *
135 * @return a handle to cancel the notification
137 */ 136 */
138 public Cancelable notifyTransmitReady(final RelativeTime timeout, 137 public Cancelable notifyTransmitReady(final RelativeTime timeout,
139 final boolean autoRetry, int size, final MessageTransmitter transmitter) { 138 final boolean autoRetry, int size, final MessageTransmitter transmitter) {
139 if (notifyTransmitReadyDelayed) {
140 throw new AssertionError("notifyTransmitReady called twice!");
141 }
140 if (connection.isConnected()) { 142 if (connection.isConnected()) {
141 return connection.notifyTransmitReady(0, timeout, transmitter); 143 return connection.notifyTransmitReady(0, timeout, transmitter);
142 } else { 144 } else {
143 notifyTransmitReadyDelayed = true; 145 notifyTransmitReadyDelayed = true;
144 final DelayedTransmitHandle delayedTransmitHandle = new DelayedTransmitHandle(); 146 final AbsoluteTime deadline = timeout.toAbsolute();
145 delayedTransmitHandle.timeoutHandle = Scheduler.addDelayed(connectBackoff, new Scheduler.Task() { 147 delayedNotifyTransmitHandle = connection.notifyConnected(connectBackoff, new Continuation() {
146 @Override 148 @Override
147 public void run(Scheduler.RunContext ctx) { 149 public void cont(boolean success) {
148 if (connection == null) { 150 delayedNotifyTransmitHandle = null;
149 return; 151 if (success) {
150 }
151 if (connection.isConnected()) {
152 notifyTransmitReadyDelayed = false; 152 notifyTransmitReadyDelayed = false;
153 connection.notifyTransmitReady(0, timeout, transmitter); 153 delayedNotifyTransmitHandle = connection.notifyTransmitReady(0, timeout, new MessageTransmitter() {
154 @Override
155 public void transmit(Connection.MessageSink sink) {
156 delayedNotifyTransmitHandle = null;
157 transmitter.transmit(sink);
158 }
159
160 @Override
161 public void handleError() {
162 delayedNotifyTransmitHandle = null;
163 transmitter.handleError();
164 }
165 });
154 } else { 166 } else {
155 logger.debug("still not connected, retrying in {}ms", connectBackoff.getMilliseconds()); 167 if (deadline.isDue()) {
156 reconnect(); 168 transmitter.handleError();
157 connectBackoff = RelativeTime.min(connectBackoff.multiply(2), MAX_BACKOFF); 169 } else {
158 Scheduler.addDelayed(connectBackoff, this); 170 RelativeTime timeout = deadline.getRemaining();
171 connectBackoff = RelativeTime.min(timeout, RelativeTime.min(connectBackoff.multiply(2), MAX_BACKOFF));
172 reconnect();
173 delayedNotifyTransmitHandle = connection.notifyConnected(connectBackoff, this);
174 }
159 } 175 }
160 } 176 }
161 }); 177 });
162 return delayedTransmitHandle; 178 return new Cancelable() {
179 @Override
180 public void cancel() {
181 if (delayedNotifyTransmitHandle != null) {
182 delayedNotifyTransmitHandle.cancel();
183 }
184 }
185 };
163 } 186 }
164 } 187 }
165 188
189 /**
190 * Convenience method for sending messages.
191 *
192 * @param timeout when should we give up sending the message, and call cont.cont(false)
193 * @param message the message to send
194 * @param cont called when the message has been sent successfully or on error
195 * @return a handle to cancel sending the message
196 */
197 public Cancelable transmitWhenReady(final RelativeTime timeout, final GnunetMessage.Body message, final Continuation cont) {
198 return notifyTransmitReady(timeout, false, 0, new MessageTransmitter() {
199 @Override
200 public void transmit(Connection.MessageSink sink) {
201 sink.send(message);
202 if (cont != null) {
203 cont.cont(true);
204 }
205 }
206
207 @Override
208 public void handleError() {
209 if (cont != null) {
210 cont.cont(false);
211 }
212 }
213 });
214 }
215
216 /**
217 * Convenience method for sending messages. Timeout defaults to FOREVER.
218 *
219 * @param message the message to send
220 * @param cont called when the message has been sent successfully or on error
221 * @return a handle to cancel sending the message
222 */
223 public Cancelable transmitWhenReady(final GnunetMessage.Body message, final Continuation cont) {
224 return transmitWhenReady(RelativeTime.FOREVER, message, cont);
225 }
226
166 227
167 public void reconnect() { 228 public final void reconnect() {
168 if (connection != null) { 229 if (connection != null) {
169 connection.disconnect(); 230 connection.disconnect();
170 } 231 }