aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/gnunet/util/Client.java
blob: 6da685916b815eb781a66b9ce0bec3c39eafd2bc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
/*
     This file is part of GNUnet.
     Copyright (C) 2009 Christian Grothoff (and other contributing authors)

     GNUnet is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License as published
     by the Free Software Foundation; either version 2, or (at your
     option) any later version.

     GNUnet is distributed in the hope that it will be useful, but
     WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     General Public License for more details.

     You should have received a copy of the GNU General Public License
     along with GNUnet; see the file COPYING.  If not, write to the
     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
     Boston, MA 02111-1307, USA.
 */
package org.gnunet.util;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.gnunet.mq.Envelope;
import org.gnunet.mq.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * A connection to a GNUnet service.
 *
 * Wraps a Connection, and is responsible for waiting until the underlying connection has been made
 * and allows reconnects.
 */
public class Client extends MessageQueue {
    private static final Logger logger = LoggerFactory
            .getLogger(Client.class);

    /**
     * Underlying connection to the service.
     * May be NULL if the client is currently not connected.
     */
    private Connection connection;

    /**
     * Host this client should be connected to.
     */
    private final String hostname;

    /**
     * Port of the host the client should connect to.
     */
    private final int port;

    /**
     * Initial value for connectBackoff.
     */
    private static final RelativeTime INITAL_BACKOFF = RelativeTime.MILLISECOND.multiply(5);

    /**
     * Maximum value for connectBackoff.
     */
    private static final RelativeTime MAX_BACKOFF = RelativeTime.SECOND.multiply(5);

    /**
     * The time to wait after an error occured while connecting.
     * Every time an error occurs while connecting, this value is doubled until its maximum
     * value (MAX_BACKOFF) has been reached. This strategy is called exponential backoff.
     */
    private RelativeTime connectBackoff = INITAL_BACKOFF;

    /**
     * True if we are waiting for the client to connect before we can ask it to do
     * notifyTransmitReady.
     */
    private boolean notifyTransmitReadyDelayed;

    /**
     * When notifyTransmitReadyDelayed is true, This can be used to onCancel the task
     * waiting for the connection to be established.
     */
    private Cancelable delayedNotifyTransmitHandle;

    /**
     * Currently installed persistent receiver.
     * Will receive all messages sent to the client.
     */
    private RunaboutMessageReceiver receiver;

    private boolean receiverActive;

    /**
     * Handle to onCancel the message currently submitted in the queue,
     */
    private Cancelable currentSubmit;
    private Scheduler.TaskIdentifier receiverLifenessTask;

    /**
     * Create a connection to a service.
     *
     * @param serviceName name of the service
     * @param cfg         configuration to use
     */
    public Client(String serviceName, Configuration cfg) {
        if (cfg == null) {
            throw new AssertionError("Configuration may not be null");
        }
        if (!cfg.haveValue(serviceName, "PORT")) {
            throw new Configuration.ConfigurationException(String.format("PORT of service '%s' not specified", serviceName));
        }
        if (!cfg.haveValue(serviceName, "HOSTNAME")) {
            throw new Configuration.ConfigurationException(String.format("HOSTNAME of service '%s' not specified", serviceName));
        }

        // get port of this service from the configuration
        Optional<Long> portOption = cfg.getValueNumber(serviceName, "PORT");
        port = portOption.get().intValue();
        // get the hostname from the configuration
        hostname = cfg.getValueString(serviceName, "HOSTNAME").get();
        if (hostname == null || hostname.isEmpty()) {
            throw new Configuration.ConfigurationException(String.format("hostname of service '%s' empty", serviceName));
        }
        reconnect();
        // we don't have to wait for any acks, but can send right away!
        reportReadyForSubmit();
    }

    /**
     * Create a connection to a service with the specified hostname and port.
     *
     * @param hostname hostname of the service
     * @param port port of the service
     */
    public Client(String hostname, int port) {
        this.hostname = hostname;
        this.port = port;
        reconnect();
        // we don't have to wait for any acks, but can send right away!
        reportReadyForSubmit();
    }


    /**
     * Receive one message from the service. Can only be called after sending a message to the server.
     *
     * @param timeout  deadline after which MessageReceiver.deadline will be called
     * @param receiver MessageReceiver that is responsible for the received message
     */
    public void receiveOne(RelativeTime timeout, MessageReceiver receiver) {
        connection.receive(timeout, receiver);
    }

    /**
     * Ask the client to call us once it is able to send a message.
     *
     *
     * @param timeout     after how long should we give up (and call transmitter.transmit(null))
     * @param autoRetry   if the connection to the service dies, should we
     *                    automatically reconnect and retry (within the deadline period)
     *                    or should we immediately fail in this case?  Pass true
     *                    if the caller does not care about temporary connection errors,
     *                    for example because the protocol is stateless
     * @param size        size of the message we want to transmit, can be an upper bound
     * @param transmitter the MessageTransmitter object to call once the client is ready to transmit or
     *                    when the timeout is over. Guaranteed to be called *after* notifyTransmitReady has returned.  @return a handle that can be used to onCancel the transmit getRequestIdentifier
     *
     * @return a handle to onCancel the notification
     */
    public Cancelable notifyTransmitReady(final RelativeTime timeout,
                                          final boolean autoRetry, int size, final MessageTransmitter transmitter) {
        if (notifyTransmitReadyDelayed) {
            throw new AssertionError("notifyTransmitReady called twice!");
        }
        if (connection == null) {
            throw new AssertionError("notifyTransmitReady called on disconnected client");
        }
        if (connection.isConnected()) {
            return connection.notifyTransmitReady(0, timeout, transmitter);
        } else {
            notifyTransmitReadyDelayed = true;
            final AbsoluteTime deadline = timeout.toAbsolute();
            delayedNotifyTransmitHandle = connection.notifyConnected(connectBackoff, new Continuation() {
                @Override
                public void cont(boolean success) {
                    delayedNotifyTransmitHandle = null;
                    if (success) {
                        activateReceiver();
                        notifyTransmitReadyDelayed = false;
                        delayedNotifyTransmitHandle = connection.notifyTransmitReady(0, timeout, new MessageTransmitter() {
                            @Override
                            public void transmit(Connection.MessageSink sink) {
                                delayedNotifyTransmitHandle = null;
                                transmitter.transmit(sink);
                            }

                            @Override
                            public void handleError() {
                                delayedNotifyTransmitHandle = null;
                                transmitter.handleError();
                            }
                        });
                    } else {
                        logger.debug("connect timed out, trying again");
                        if (deadline.isDue()) {
                            transmitter.handleError();
                        } else {
                            RelativeTime timeout = deadline.getRemaining();
                            connectBackoff = RelativeTime.min(timeout, RelativeTime.min(connectBackoff.multiply(2), MAX_BACKOFF));
                            reconnect();
                            delayedNotifyTransmitHandle = connection.notifyConnected(connectBackoff, this);
                        }
                    }
                }
            });
            return new Cancelable() {
                @Override
                public void cancel() {
                    if (delayedNotifyTransmitHandle != null) {
                        delayedNotifyTransmitHandle.cancel();
                    }
                }
            };
        }
    }

    /**
     * Convenience method for sending messages.
     *
     * @param timeout when should we give up sending the message, and call cont.cont(false)
     * @param message the message to send
     * @param cont called when the message has been sent successfully or on error
     * @return a handle to onCancel sending the message
     */
    public Cancelable transmitWhenReady(final RelativeTime timeout, final GnunetMessage.Body message, final Continuation cont) {
        return notifyTransmitReady(timeout, false, 0, new MessageTransmitter() {
            @Override
            public void transmit(Connection.MessageSink sink) {
                sink.send(message);
                if (cont != null) {
                    cont.cont(true);
                }
            }

            @Override
            public void handleError() {
                if (cont != null) {
                    cont.cont(false);
                }
            }
        });
    }

    /**
     * Convenience method for sending messages. Timeout defaults to FOREVER.
     *
     * @param message the message to send
     * @param cont called when the message has been sent successfully or on error
     * @return a handle to onCancel sending the message
     */
    public Cancelable transmitWhenReady(final GnunetMessage.Body message, final Continuation cont) {
        return transmitWhenReady(RelativeTime.FOREVER, message, cont);
    }

    public final void reconnect() {
        if (connection != null) {
            connection.disconnect();
        }
        connection = new Connection(hostname, port);
    }

    /**
     * Disconnect from the service. Cancels all pending receive/transmit requests.
     */
    public void disconnect() {
        connection.disconnect();
        connection = null;
        if (receiverLifenessTask != null) {
            receiverLifenessTask.cancel();
            receiverLifenessTask = null;
        }
        receiverActive = false;
    }

    public boolean isConnected() {
        return (connection != null) && connection.isConnected();
    }

    @Override
    protected void submit(Envelope ev) {
        currentSubmit = transmitWhenReady(RelativeTime.FOREVER, ev.message, new Continuation() {
            @Override
            public void cont(boolean success) {
                currentSubmit = null;
                reportMessageSent();
                reportReadyForSubmit();
            }
        });
    }

    @Override
    protected void retract() {
        if (currentSubmit == null)
            throw new AssertionError();
        currentSubmit.cancel();
        currentSubmit = null;
    }

    private void activateReceiver() {
        if (receiverActive || receiver == null)
            return;
        final MessageReceiver proxyReceiver = new MessageReceiver() {
            @Override
            public void process(GnunetMessage.Body msg) {
                Client.this.receiver.process(msg);
                if (connection != null && connection.isConnected())
                    connection.receive(RelativeTime.FOREVER, this);
                else
                    receiverActive = false;
            }

            @Override
            public void handleError() {
                Client.this.receiver.handleError();
                receiverActive = false;
            }
        };
        connection.receive(RelativeTime.FOREVER, proxyReceiver);
        receiverActive = true;
    }

    public void installReceiver(RunaboutMessageReceiver receiver) {
        Preconditions.checkState(this.receiver == null);
        this.receiverLifenessTask = Scheduler.addDelayed(RelativeTime.FOREVER, new Scheduler.Task() {
            @Override
            public void run(Scheduler.RunContext ctx) {
                // nothing to be done
            }
        });
        this.receiver = receiver;
        if (connection != null && connection.isConnected())
            activateReceiver();
    }
}