aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/gnunet/util/Server.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/gnunet/util/Server.java')
-rw-r--r--src/main/java/org/gnunet/util/Server.java509
1 files changed, 509 insertions, 0 deletions
diff --git a/src/main/java/org/gnunet/util/Server.java b/src/main/java/org/gnunet/util/Server.java
new file mode 100644
index 0000000..8a86b45
--- /dev/null
+++ b/src/main/java/org/gnunet/util/Server.java
@@ -0,0 +1,509 @@
1/*
2 This file is part of GNUnet.
3 (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19 */
20
21package org.gnunet.util;
22
23import org.gnunet.construct.Construct;
24import org.grothoff.Runabout;
25import org.slf4j.Logger;
26import org.slf4j.LoggerFactory;
27
28import java.io.IOException;
29import java.net.SocketAddress;
30import java.nio.channels.ServerSocketChannel;
31import java.nio.channels.SocketChannel;
32import java.util.ArrayList;
33import java.util.Collections;
34import java.util.LinkedList;
35import java.util.List;
36
37/**
38 * A server allows to wait for incoming connections from clients and respectively communicate with those clients.
39 */
40public class Server {
41 private static final Logger logger = LoggerFactory
42 .getLogger(Server.class);
43
44 /**
45 * Default idle timeout for new clients.
46 */
47 private final RelativeTime idleTimeout;
48
49 /**
50 * If true, disconnect a client when it sends a message we do not expect to receive. Otherwise, the unexpected
51 * message will just be discarded.
52 */
53 private final boolean requireFound;
54
55 /**
56 * The sockets this server accepts new connections on.
57 */
58 private List<ServerSocketChannel> listenSockets = new ArrayList<ServerSocketChannel>();
59
60 /**
61 * The list of all clients connected to this server.
62 */
63 private List<ClientHandle> clientHandles = new LinkedList<ClientHandle>();
64
65 /**
66 * The runabout that receives received messages, as well as information about the sender of the last
67 * received message.
68 */
69 private MessageRunabout receivedMessageHandler;
70
71 /**
72 * Whenever a client is disconnected all disconnect handlers are informed.
73 */
74 private List<DisconnectHandler> disconnectHandlers = new LinkedList<DisconnectHandler>();
75
76 /**
77 * Classes of the messages we expect to receive. If a received message is not in this list, the client
78 * will be disconnected, otherwise the message is just ignored.
79 */
80 private List<Class> expectedMessages = Collections.emptyList();
81
82 /**
83 * If true, shut down as soon as all non-monitor clients have finished, and do not allow new connections
84 * to be made to this server.
85 */
86 private boolean inSoftShutdown;
87
88 /**
89 * Task that is executed as soon as a connection is ready to be accepted.
90 */
91 private Cancelable acceptTask;
92
93 /**
94 * True if we are destroyed, or in the process of being destroyed with no way back.
95 */
96 private boolean destroyed;
97
98
99 /**
100 * Interface implemented by disconnect handlers, whose onDisconnect method is called whenever a client
101 * is disconnected from the server.
102 */
103 public interface DisconnectHandler {
104 /**
105 * Called whenever a client is disconnected from the server.
106 *
107 * @param clientHandle the handle for the client that was disconnected
108 */
109 void onDisconnect(ClientHandle clientHandle);
110 }
111
112 /**
113 * A handle to a (remote) client connected to this server.
114 * <p/>
115 * Every client handle keeps a reference count..
116 * Whenever a part of the programs saves a client handle for further interaction with it, keep() should be called.
117 * This prevents the server from disconnecting the client when it is idle.
118 * Once this interaction is over, drop() will decrement the reference count and eventually disconnect the client
119 * after being idle for long enough.
120 */
121 public class ClientHandle {
122 /**
123 * The underlying connection to the client-
124 */
125 private Connection connection;
126
127 /**
128 * When referenceCount==0, the server is allowed to drop the client after a timeout.
129 */
130 private int referenceCount = 0;
131
132 /**
133 * Handle for canceling the receive process of this client, null if no receive is currently going on.
134 */
135 private Cancelable currentReceive;
136
137 /**
138 * Set to true if the connection to this client should not prevent the server from shutting down.
139 */
140 private boolean isMonitor;
141
142 /**
143 * Iff true, disconnect the client as soon as possible.
144 * Disconnecting may sometimes not be possible immediately, for example when the reference count is not zero.
145 */
146 private boolean disconnectRequested;
147
148 /**
149 * Create a client handle.
150 *
151 * @param sock
152 */
153 private ClientHandle(SocketChannel sock) {
154 connection = new Connection(sock);
155 // start receiving
156 receiveDone(true);
157 }
158
159 /**
160 * Notify us when the server has enough space to transmit
161 * a message of the given size to the given client.
162 *
163 * @param size requested amount of buffer space
164 * @param timeout after how long should we give up (and call
165 * notify with buf NULL and size 0)?
166 * @param transmitter callback
167 * @return a handle to cancel the notification
168 */
169 public Cancelable notifyTransmitReady(int size, RelativeTime timeout, MessageTransmitter transmitter) {
170 return connection.notifyTransmitReady(size, timeout, transmitter);
171 }
172
173 /**
174 * Convenience method for sending messages.
175 *
176 * @param timeout when should we give up sending the message, and call cont.cont(false)
177 * @param message the message to send
178 * @param cont called when the message has been sent successfully or on error
179 * @return a handle to cancel sending the message
180 */
181 public Cancelable transmitWhenReady(final RelativeTime timeout, final GnunetMessage.Body message, final Continuation cont) {
182 return notifyTransmitReady(0, timeout, new MessageTransmitter() {
183 @Override
184 public void transmit(Connection.MessageSink sink) {
185 sink.send(message);
186 if (cont != null) {
187 cont.cont(true);
188 }
189 }
190
191 @Override
192 public void handleError() {
193 if (cont != null) {
194 cont.cont(false);
195 }
196 }
197 });
198 }
199
200 /**
201 * Resume receiving from this client, we are done processing the
202 * current request. This function must be called from within each
203 * message handler (or its respective continuations).
204 * <p/>
205 * The server does not automatically continue to receive messages to
206 * support flow control.
207 *
208 * @param stayConnected false if connection to the client should be closed
209 */
210 public void receiveDone(boolean stayConnected) {
211 if (currentReceive != null) {
212 throw new AssertionError("receiveDone() called, but still waiting for message");
213 }
214 if (stayConnected) {
215 currentReceive = connection.receive(RelativeTime.FOREVER, new MessageReceiver() {
216 @Override
217 public void process(GnunetMessage.Body msg) {
218 currentReceive = null;
219 if ((msg instanceof UnknownMessageBody) || !expectedMessages.contains(msg.getClass())) {
220 if (requireFound) {
221 logger.info("disconnecting client sending unknown message");
222 disconnect();
223 }
224 // otherwise, just ignore it
225 }
226 if (receivedMessageHandler == null) {
227 throw new AssertionError("received message, but no handler installed");
228 }
229 receivedMessageHandler.setSender(ClientHandle.this);
230 receivedMessageHandler.visitAppropriate(msg);
231 }
232
233 @Override
234 public void handleError() {
235 logger.warn("error receiving from client");
236 disconnect();
237 }
238 });
239 } else {
240 if (referenceCount > 0) {
241 this.disconnectRequested = true;
242 } else {
243 System.out.println("disconnecting " + this.isMonitor);
244 disconnect();
245 }
246 }
247
248 }
249
250 /**
251 * Ask the server to disconnect from the given client.
252 * <p/>
253 * The client will be disconnected from the server, no matter what the current reference count is.
254 */
255 public void disconnect() {
256 connection.disconnect();
257 // if we are in the process of destruction, to not remove, the destruction function will do this,
258 // removing the client handle while in destruction would yield a concurrent modification exception
259 if (!destroyed) {
260 Server.this.clientHandles.remove(this);
261 }
262 for (DisconnectHandler dh : disconnectHandlers) {
263 dh.onDisconnect(this);
264 }
265 Server.this.testForSoftShutdown();
266 }
267
268 /**
269 * Prevent the client from being disconnected.
270 * For every keep, there should be an additional matching drop.
271 */
272 public void keep() {
273 referenceCount++;
274 }
275
276
277 /**
278 * Allow to disconnect this client, if not prevented by previous calls to keep.
279 * <p/>
280 * A call to drop should be executed for every call to keep.
281 * After drop() has been executed for every matching keep(), the next call to drop()
282 * allows the server to disconnect the client after a timeout.
283 */
284 public void drop() {
285 assert referenceCount > 0;
286 referenceCount--;
287 if (referenceCount == 0 && disconnectRequested) {
288 disconnect();
289 }
290 }
291
292
293 /**
294 * Set the 'monitor' flag on this client. Clients which have been
295 * marked as 'monitors' won't prevent the server from shutting down
296 * once 'GNUNET_SERVER_stop_listening' has been invoked. The idea is
297 * that for "normal" clients we likely want to allow them to process
298 * their requests; however, monitor-clients are likely to 'never'
299 * disconnect during shutdown and thus will not be considered when
300 * determining if the server should continue to exist after
301 * 'GNUNET_SERVER_destroy' has been called.
302 */
303 public void markMonitor() {
304 this.isMonitor = true;
305 }
306
307 public boolean isMonitor() {
308 return isMonitor;
309 }
310 }
311
312
313 /**
314 * All handlers for receiving messages from clients have to inherit this class.
315 * <p/>
316 * MessageRunabout is a standard runabout with the added possibility of getting the sender of the message.
317 * This is necessary as the runabout's visit methods can have only one parameter.
318 */
319 public abstract static class MessageRunabout extends Runabout {
320 private ClientHandle currentSender;
321
322 /**
323 * Allows implementors of MessageRunabout to get the Client that sent the message
324 * currently visited.
325 * <p/>
326 * The return value of getSender() is only valid while executing a visit method.
327 *
328 * @return handle of the client whose message is currently being visited
329 */
330 public final ClientHandle getSender() {
331 return currentSender;
332 }
333
334 /**
335 * Private method used to set the sender for the getSender() method.
336 *
337 * @param clientHandle the client handle to set as the sender
338 */
339 private void setSender(ClientHandle clientHandle) {
340 currentSender = clientHandle;
341 }
342 }
343
344 /**
345 * Create a server listening on all specified addresses.
346 *
347 * @param addresses addresses to bind on
348 * @param idleTimeout time after a client will be disconnected if idle
349 * @param requireFound allow unknown messages to be received without disconnecting the client in response
350 */
351 public Server(List<SocketAddress> addresses, RelativeTime idleTimeout, boolean requireFound) {
352 this.idleTimeout = idleTimeout;
353 this.requireFound = requireFound;
354 try {
355 for (SocketAddress addr : addresses) {
356 ServerSocketChannel socket = ServerSocketChannel.open();
357 socket.configureBlocking(false);
358 socket.socket().bind(addr);
359 logger.debug("socket listening on {}", addr.toString());
360 listenSockets.add(socket);
361 addAcceptSocket(socket);
362 }
363 } catch (IOException e) {
364 throw new RuntimeException("could not bind", e);
365 }
366 }
367
368 /**
369 * Create a server, not listening on any sockets yet for new connections.
370 *
371 * @param idleTimeout time after a client will be disconnected if idle
372 * @param requireFound allow unknown messages to be received without disconnecting the client in response
373 */
374 public Server(RelativeTime idleTimeout, boolean requireFound) {
375 this.idleTimeout = idleTimeout;
376 this.requireFound = requireFound;
377 }
378
379 /**
380 * Accept new connections from the given server socket.
381 *
382 * @param sock the new socket to accept connections from
383 */
384 public final void addAcceptSocket(final ServerSocketChannel sock) {
385 Scheduler.TaskConfiguration b = new Scheduler.TaskConfiguration(RelativeTime.FOREVER,
386 new Scheduler.Task() {
387 @Override
388 public void run(Scheduler.RunContext ctx) {
389 acceptTask = null;
390 try {
391 SocketChannel cli = sock.accept();
392
393 if (cli != null) {
394 logger.debug("client connected");
395 cli.configureBlocking(false);
396 ClientHandle clientHandle = new ClientHandle(cli);
397 clientHandles.add(clientHandle);
398 }
399
400 } catch (IOException e) {
401 throw new RuntimeException("accept failed", e);
402 }
403 addAcceptSocket(sock);
404 }
405 });
406 b.selectAccept(sock);
407 acceptTask = b.schedule();
408 }
409
410 /**
411 * Pass messages that the runabout can handle to it.
412 * There can only be one runabout per message type.
413 * (Discrepancy with the C-API, could be changed in the future)
414 *
415 * @param msgRunabout handler
416 */
417 public void setHandler(MessageRunabout msgRunabout) {
418 receivedMessageHandler = msgRunabout;
419 expectedMessages = RunaboutUtil.getRunaboutVisitees(msgRunabout);
420 }
421
422 /**
423 * Ask the server to notify us whenever a client disconnects.
424 * This handler is called whenever the actual network connection
425 * is closed; the reference count may be zero or larger than zero
426 * at this point. Note that the disconnect handler is also called when
427 *
428 * @param disconnectHandler handler to call on disconnect
429 */
430 public Cancelable notifyDisconnect(final DisconnectHandler disconnectHandler) {
431 this.disconnectHandlers.add(disconnectHandler);
432 return new Cancelable() {
433 @Override
434 public void cancel() {
435 Server.this.disconnectHandlers.remove(disconnectHandler);
436 }
437 };
438 }
439
440 /**
441 * Stop the listen socket destroy the server as soon as only monitor clients are left.
442 */
443 public void stopListening() {
444 inSoftShutdown = true;
445 if (acceptTask != null) {
446 acceptTask.cancel();
447 acceptTask = null;
448 }
449 testForSoftShutdown();
450 }
451
452 /**
453 * Disconnect all clients forcefully from the server and stop listening.
454 * <p/>
455 * No methods should be called on a server and its client handles after destroy() has been called.
456 */
457 public void destroy() {
458 if (destroyed) {
459 return;
460 }
461 destroyed = true;
462 for (ClientHandle h : clientHandles) {
463 h.disconnect();
464 }
465 clientHandles.clear();
466 if (acceptTask != null) {
467 acceptTask.cancel();
468 acceptTask = null;
469 }
470 for (ServerSocketChannel ssc : listenSockets) {
471 try {
472 ssc.close();
473 } catch (IOException e) {
474 logger.error("closing listen socket failed", e);
475 }
476 }
477 }
478
479 /**
480 * Test if we should destroy outselves.
481 */
482 private void testForSoftShutdown() {
483 // do this so we don't have many recursive calls to testForSoftShutdown when shutting down
484 if (destroyed) {
485 return;
486 }
487 if (inSoftShutdown) {
488 System.out.println(""+clientHandles.size());
489 boolean done = true;
490 for (ClientHandle clientHandle : this.clientHandles) {
491 if (!clientHandle.isMonitor) {
492 done = false;
493 }
494 }
495 if (done) {
496 destroy();
497 }
498 }
499 }
500
501 @Override
502 protected void finalize() throws Throwable {
503 super.finalize();
504 if (!destroyed) {
505 logger.warn("Server instance not destroyed, but finalizer called");
506 }
507 destroy();
508 }
509}