diff options
Diffstat (limited to 'src/main/java/org/gnunet/util/Server.java')
-rw-r--r-- | src/main/java/org/gnunet/util/Server.java | 509 |
1 files changed, 509 insertions, 0 deletions
diff --git a/src/main/java/org/gnunet/util/Server.java b/src/main/java/org/gnunet/util/Server.java new file mode 100644 index 0000000..8a86b45 --- /dev/null +++ b/src/main/java/org/gnunet/util/Server.java | |||
@@ -0,0 +1,509 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) 2011, 2012 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | package org.gnunet.util; | ||
22 | |||
23 | import org.gnunet.construct.Construct; | ||
24 | import org.grothoff.Runabout; | ||
25 | import org.slf4j.Logger; | ||
26 | import org.slf4j.LoggerFactory; | ||
27 | |||
28 | import java.io.IOException; | ||
29 | import java.net.SocketAddress; | ||
30 | import java.nio.channels.ServerSocketChannel; | ||
31 | import java.nio.channels.SocketChannel; | ||
32 | import java.util.ArrayList; | ||
33 | import java.util.Collections; | ||
34 | import java.util.LinkedList; | ||
35 | import java.util.List; | ||
36 | |||
37 | /** | ||
38 | * A server allows to wait for incoming connections from clients and respectively communicate with those clients. | ||
39 | */ | ||
40 | public class Server { | ||
41 | private static final Logger logger = LoggerFactory | ||
42 | .getLogger(Server.class); | ||
43 | |||
44 | /** | ||
45 | * Default idle timeout for new clients. | ||
46 | */ | ||
47 | private final RelativeTime idleTimeout; | ||
48 | |||
49 | /** | ||
50 | * If true, disconnect a client when it sends a message we do not expect to receive. Otherwise, the unexpected | ||
51 | * message will just be discarded. | ||
52 | */ | ||
53 | private final boolean requireFound; | ||
54 | |||
55 | /** | ||
56 | * The sockets this server accepts new connections on. | ||
57 | */ | ||
58 | private List<ServerSocketChannel> listenSockets = new ArrayList<ServerSocketChannel>(); | ||
59 | |||
60 | /** | ||
61 | * The list of all clients connected to this server. | ||
62 | */ | ||
63 | private List<ClientHandle> clientHandles = new LinkedList<ClientHandle>(); | ||
64 | |||
65 | /** | ||
66 | * The runabout that receives received messages, as well as information about the sender of the last | ||
67 | * received message. | ||
68 | */ | ||
69 | private MessageRunabout receivedMessageHandler; | ||
70 | |||
71 | /** | ||
72 | * Whenever a client is disconnected all disconnect handlers are informed. | ||
73 | */ | ||
74 | private List<DisconnectHandler> disconnectHandlers = new LinkedList<DisconnectHandler>(); | ||
75 | |||
76 | /** | ||
77 | * Classes of the messages we expect to receive. If a received message is not in this list, the client | ||
78 | * will be disconnected, otherwise the message is just ignored. | ||
79 | */ | ||
80 | private List<Class> expectedMessages = Collections.emptyList(); | ||
81 | |||
82 | /** | ||
83 | * If true, shut down as soon as all non-monitor clients have finished, and do not allow new connections | ||
84 | * to be made to this server. | ||
85 | */ | ||
86 | private boolean inSoftShutdown; | ||
87 | |||
88 | /** | ||
89 | * Task that is executed as soon as a connection is ready to be accepted. | ||
90 | */ | ||
91 | private Cancelable acceptTask; | ||
92 | |||
93 | /** | ||
94 | * True if we are destroyed, or in the process of being destroyed with no way back. | ||
95 | */ | ||
96 | private boolean destroyed; | ||
97 | |||
98 | |||
99 | /** | ||
100 | * Interface implemented by disconnect handlers, whose onDisconnect method is called whenever a client | ||
101 | * is disconnected from the server. | ||
102 | */ | ||
103 | public interface DisconnectHandler { | ||
104 | /** | ||
105 | * Called whenever a client is disconnected from the server. | ||
106 | * | ||
107 | * @param clientHandle the handle for the client that was disconnected | ||
108 | */ | ||
109 | void onDisconnect(ClientHandle clientHandle); | ||
110 | } | ||
111 | |||
112 | /** | ||
113 | * A handle to a (remote) client connected to this server. | ||
114 | * <p/> | ||
115 | * Every client handle keeps a reference count.. | ||
116 | * Whenever a part of the programs saves a client handle for further interaction with it, keep() should be called. | ||
117 | * This prevents the server from disconnecting the client when it is idle. | ||
118 | * Once this interaction is over, drop() will decrement the reference count and eventually disconnect the client | ||
119 | * after being idle for long enough. | ||
120 | */ | ||
121 | public class ClientHandle { | ||
122 | /** | ||
123 | * The underlying connection to the client- | ||
124 | */ | ||
125 | private Connection connection; | ||
126 | |||
127 | /** | ||
128 | * When referenceCount==0, the server is allowed to drop the client after a timeout. | ||
129 | */ | ||
130 | private int referenceCount = 0; | ||
131 | |||
132 | /** | ||
133 | * Handle for canceling the receive process of this client, null if no receive is currently going on. | ||
134 | */ | ||
135 | private Cancelable currentReceive; | ||
136 | |||
137 | /** | ||
138 | * Set to true if the connection to this client should not prevent the server from shutting down. | ||
139 | */ | ||
140 | private boolean isMonitor; | ||
141 | |||
142 | /** | ||
143 | * Iff true, disconnect the client as soon as possible. | ||
144 | * Disconnecting may sometimes not be possible immediately, for example when the reference count is not zero. | ||
145 | */ | ||
146 | private boolean disconnectRequested; | ||
147 | |||
148 | /** | ||
149 | * Create a client handle. | ||
150 | * | ||
151 | * @param sock | ||
152 | */ | ||
153 | private ClientHandle(SocketChannel sock) { | ||
154 | connection = new Connection(sock); | ||
155 | // start receiving | ||
156 | receiveDone(true); | ||
157 | } | ||
158 | |||
159 | /** | ||
160 | * Notify us when the server has enough space to transmit | ||
161 | * a message of the given size to the given client. | ||
162 | * | ||
163 | * @param size requested amount of buffer space | ||
164 | * @param timeout after how long should we give up (and call | ||
165 | * notify with buf NULL and size 0)? | ||
166 | * @param transmitter callback | ||
167 | * @return a handle to cancel the notification | ||
168 | */ | ||
169 | public Cancelable notifyTransmitReady(int size, RelativeTime timeout, MessageTransmitter transmitter) { | ||
170 | return connection.notifyTransmitReady(size, timeout, transmitter); | ||
171 | } | ||
172 | |||
173 | /** | ||
174 | * Convenience method for sending messages. | ||
175 | * | ||
176 | * @param timeout when should we give up sending the message, and call cont.cont(false) | ||
177 | * @param message the message to send | ||
178 | * @param cont called when the message has been sent successfully or on error | ||
179 | * @return a handle to cancel sending the message | ||
180 | */ | ||
181 | public Cancelable transmitWhenReady(final RelativeTime timeout, final GnunetMessage.Body message, final Continuation cont) { | ||
182 | return notifyTransmitReady(0, timeout, new MessageTransmitter() { | ||
183 | @Override | ||
184 | public void transmit(Connection.MessageSink sink) { | ||
185 | sink.send(message); | ||
186 | if (cont != null) { | ||
187 | cont.cont(true); | ||
188 | } | ||
189 | } | ||
190 | |||
191 | @Override | ||
192 | public void handleError() { | ||
193 | if (cont != null) { | ||
194 | cont.cont(false); | ||
195 | } | ||
196 | } | ||
197 | }); | ||
198 | } | ||
199 | |||
200 | /** | ||
201 | * Resume receiving from this client, we are done processing the | ||
202 | * current request. This function must be called from within each | ||
203 | * message handler (or its respective continuations). | ||
204 | * <p/> | ||
205 | * The server does not automatically continue to receive messages to | ||
206 | * support flow control. | ||
207 | * | ||
208 | * @param stayConnected false if connection to the client should be closed | ||
209 | */ | ||
210 | public void receiveDone(boolean stayConnected) { | ||
211 | if (currentReceive != null) { | ||
212 | throw new AssertionError("receiveDone() called, but still waiting for message"); | ||
213 | } | ||
214 | if (stayConnected) { | ||
215 | currentReceive = connection.receive(RelativeTime.FOREVER, new MessageReceiver() { | ||
216 | @Override | ||
217 | public void process(GnunetMessage.Body msg) { | ||
218 | currentReceive = null; | ||
219 | if ((msg instanceof UnknownMessageBody) || !expectedMessages.contains(msg.getClass())) { | ||
220 | if (requireFound) { | ||
221 | logger.info("disconnecting client sending unknown message"); | ||
222 | disconnect(); | ||
223 | } | ||
224 | // otherwise, just ignore it | ||
225 | } | ||
226 | if (receivedMessageHandler == null) { | ||
227 | throw new AssertionError("received message, but no handler installed"); | ||
228 | } | ||
229 | receivedMessageHandler.setSender(ClientHandle.this); | ||
230 | receivedMessageHandler.visitAppropriate(msg); | ||
231 | } | ||
232 | |||
233 | @Override | ||
234 | public void handleError() { | ||
235 | logger.warn("error receiving from client"); | ||
236 | disconnect(); | ||
237 | } | ||
238 | }); | ||
239 | } else { | ||
240 | if (referenceCount > 0) { | ||
241 | this.disconnectRequested = true; | ||
242 | } else { | ||
243 | System.out.println("disconnecting " + this.isMonitor); | ||
244 | disconnect(); | ||
245 | } | ||
246 | } | ||
247 | |||
248 | } | ||
249 | |||
250 | /** | ||
251 | * Ask the server to disconnect from the given client. | ||
252 | * <p/> | ||
253 | * The client will be disconnected from the server, no matter what the current reference count is. | ||
254 | */ | ||
255 | public void disconnect() { | ||
256 | connection.disconnect(); | ||
257 | // if we are in the process of destruction, to not remove, the destruction function will do this, | ||
258 | // removing the client handle while in destruction would yield a concurrent modification exception | ||
259 | if (!destroyed) { | ||
260 | Server.this.clientHandles.remove(this); | ||
261 | } | ||
262 | for (DisconnectHandler dh : disconnectHandlers) { | ||
263 | dh.onDisconnect(this); | ||
264 | } | ||
265 | Server.this.testForSoftShutdown(); | ||
266 | } | ||
267 | |||
268 | /** | ||
269 | * Prevent the client from being disconnected. | ||
270 | * For every keep, there should be an additional matching drop. | ||
271 | */ | ||
272 | public void keep() { | ||
273 | referenceCount++; | ||
274 | } | ||
275 | |||
276 | |||
277 | /** | ||
278 | * Allow to disconnect this client, if not prevented by previous calls to keep. | ||
279 | * <p/> | ||
280 | * A call to drop should be executed for every call to keep. | ||
281 | * After drop() has been executed for every matching keep(), the next call to drop() | ||
282 | * allows the server to disconnect the client after a timeout. | ||
283 | */ | ||
284 | public void drop() { | ||
285 | assert referenceCount > 0; | ||
286 | referenceCount--; | ||
287 | if (referenceCount == 0 && disconnectRequested) { | ||
288 | disconnect(); | ||
289 | } | ||
290 | } | ||
291 | |||
292 | |||
293 | /** | ||
294 | * Set the 'monitor' flag on this client. Clients which have been | ||
295 | * marked as 'monitors' won't prevent the server from shutting down | ||
296 | * once 'GNUNET_SERVER_stop_listening' has been invoked. The idea is | ||
297 | * that for "normal" clients we likely want to allow them to process | ||
298 | * their requests; however, monitor-clients are likely to 'never' | ||
299 | * disconnect during shutdown and thus will not be considered when | ||
300 | * determining if the server should continue to exist after | ||
301 | * 'GNUNET_SERVER_destroy' has been called. | ||
302 | */ | ||
303 | public void markMonitor() { | ||
304 | this.isMonitor = true; | ||
305 | } | ||
306 | |||
307 | public boolean isMonitor() { | ||
308 | return isMonitor; | ||
309 | } | ||
310 | } | ||
311 | |||
312 | |||
313 | /** | ||
314 | * All handlers for receiving messages from clients have to inherit this class. | ||
315 | * <p/> | ||
316 | * MessageRunabout is a standard runabout with the added possibility of getting the sender of the message. | ||
317 | * This is necessary as the runabout's visit methods can have only one parameter. | ||
318 | */ | ||
319 | public abstract static class MessageRunabout extends Runabout { | ||
320 | private ClientHandle currentSender; | ||
321 | |||
322 | /** | ||
323 | * Allows implementors of MessageRunabout to get the Client that sent the message | ||
324 | * currently visited. | ||
325 | * <p/> | ||
326 | * The return value of getSender() is only valid while executing a visit method. | ||
327 | * | ||
328 | * @return handle of the client whose message is currently being visited | ||
329 | */ | ||
330 | public final ClientHandle getSender() { | ||
331 | return currentSender; | ||
332 | } | ||
333 | |||
334 | /** | ||
335 | * Private method used to set the sender for the getSender() method. | ||
336 | * | ||
337 | * @param clientHandle the client handle to set as the sender | ||
338 | */ | ||
339 | private void setSender(ClientHandle clientHandle) { | ||
340 | currentSender = clientHandle; | ||
341 | } | ||
342 | } | ||
343 | |||
344 | /** | ||
345 | * Create a server listening on all specified addresses. | ||
346 | * | ||
347 | * @param addresses addresses to bind on | ||
348 | * @param idleTimeout time after a client will be disconnected if idle | ||
349 | * @param requireFound allow unknown messages to be received without disconnecting the client in response | ||
350 | */ | ||
351 | public Server(List<SocketAddress> addresses, RelativeTime idleTimeout, boolean requireFound) { | ||
352 | this.idleTimeout = idleTimeout; | ||
353 | this.requireFound = requireFound; | ||
354 | try { | ||
355 | for (SocketAddress addr : addresses) { | ||
356 | ServerSocketChannel socket = ServerSocketChannel.open(); | ||
357 | socket.configureBlocking(false); | ||
358 | socket.socket().bind(addr); | ||
359 | logger.debug("socket listening on {}", addr.toString()); | ||
360 | listenSockets.add(socket); | ||
361 | addAcceptSocket(socket); | ||
362 | } | ||
363 | } catch (IOException e) { | ||
364 | throw new RuntimeException("could not bind", e); | ||
365 | } | ||
366 | } | ||
367 | |||
368 | /** | ||
369 | * Create a server, not listening on any sockets yet for new connections. | ||
370 | * | ||
371 | * @param idleTimeout time after a client will be disconnected if idle | ||
372 | * @param requireFound allow unknown messages to be received without disconnecting the client in response | ||
373 | */ | ||
374 | public Server(RelativeTime idleTimeout, boolean requireFound) { | ||
375 | this.idleTimeout = idleTimeout; | ||
376 | this.requireFound = requireFound; | ||
377 | } | ||
378 | |||
379 | /** | ||
380 | * Accept new connections from the given server socket. | ||
381 | * | ||
382 | * @param sock the new socket to accept connections from | ||
383 | */ | ||
384 | public final void addAcceptSocket(final ServerSocketChannel sock) { | ||
385 | Scheduler.TaskConfiguration b = new Scheduler.TaskConfiguration(RelativeTime.FOREVER, | ||
386 | new Scheduler.Task() { | ||
387 | @Override | ||
388 | public void run(Scheduler.RunContext ctx) { | ||
389 | acceptTask = null; | ||
390 | try { | ||
391 | SocketChannel cli = sock.accept(); | ||
392 | |||
393 | if (cli != null) { | ||
394 | logger.debug("client connected"); | ||
395 | cli.configureBlocking(false); | ||
396 | ClientHandle clientHandle = new ClientHandle(cli); | ||
397 | clientHandles.add(clientHandle); | ||
398 | } | ||
399 | |||
400 | } catch (IOException e) { | ||
401 | throw new RuntimeException("accept failed", e); | ||
402 | } | ||
403 | addAcceptSocket(sock); | ||
404 | } | ||
405 | }); | ||
406 | b.selectAccept(sock); | ||
407 | acceptTask = b.schedule(); | ||
408 | } | ||
409 | |||
410 | /** | ||
411 | * Pass messages that the runabout can handle to it. | ||
412 | * There can only be one runabout per message type. | ||
413 | * (Discrepancy with the C-API, could be changed in the future) | ||
414 | * | ||
415 | * @param msgRunabout handler | ||
416 | */ | ||
417 | public void setHandler(MessageRunabout msgRunabout) { | ||
418 | receivedMessageHandler = msgRunabout; | ||
419 | expectedMessages = RunaboutUtil.getRunaboutVisitees(msgRunabout); | ||
420 | } | ||
421 | |||
422 | /** | ||
423 | * Ask the server to notify us whenever a client disconnects. | ||
424 | * This handler is called whenever the actual network connection | ||
425 | * is closed; the reference count may be zero or larger than zero | ||
426 | * at this point. Note that the disconnect handler is also called when | ||
427 | * | ||
428 | * @param disconnectHandler handler to call on disconnect | ||
429 | */ | ||
430 | public Cancelable notifyDisconnect(final DisconnectHandler disconnectHandler) { | ||
431 | this.disconnectHandlers.add(disconnectHandler); | ||
432 | return new Cancelable() { | ||
433 | @Override | ||
434 | public void cancel() { | ||
435 | Server.this.disconnectHandlers.remove(disconnectHandler); | ||
436 | } | ||
437 | }; | ||
438 | } | ||
439 | |||
440 | /** | ||
441 | * Stop the listen socket destroy the server as soon as only monitor clients are left. | ||
442 | */ | ||
443 | public void stopListening() { | ||
444 | inSoftShutdown = true; | ||
445 | if (acceptTask != null) { | ||
446 | acceptTask.cancel(); | ||
447 | acceptTask = null; | ||
448 | } | ||
449 | testForSoftShutdown(); | ||
450 | } | ||
451 | |||
452 | /** | ||
453 | * Disconnect all clients forcefully from the server and stop listening. | ||
454 | * <p/> | ||
455 | * No methods should be called on a server and its client handles after destroy() has been called. | ||
456 | */ | ||
457 | public void destroy() { | ||
458 | if (destroyed) { | ||
459 | return; | ||
460 | } | ||
461 | destroyed = true; | ||
462 | for (ClientHandle h : clientHandles) { | ||
463 | h.disconnect(); | ||
464 | } | ||
465 | clientHandles.clear(); | ||
466 | if (acceptTask != null) { | ||
467 | acceptTask.cancel(); | ||
468 | acceptTask = null; | ||
469 | } | ||
470 | for (ServerSocketChannel ssc : listenSockets) { | ||
471 | try { | ||
472 | ssc.close(); | ||
473 | } catch (IOException e) { | ||
474 | logger.error("closing listen socket failed", e); | ||
475 | } | ||
476 | } | ||
477 | } | ||
478 | |||
479 | /** | ||
480 | * Test if we should destroy outselves. | ||
481 | */ | ||
482 | private void testForSoftShutdown() { | ||
483 | // do this so we don't have many recursive calls to testForSoftShutdown when shutting down | ||
484 | if (destroyed) { | ||
485 | return; | ||
486 | } | ||
487 | if (inSoftShutdown) { | ||
488 | System.out.println(""+clientHandles.size()); | ||
489 | boolean done = true; | ||
490 | for (ClientHandle clientHandle : this.clientHandles) { | ||
491 | if (!clientHandle.isMonitor) { | ||
492 | done = false; | ||
493 | } | ||
494 | } | ||
495 | if (done) { | ||
496 | destroy(); | ||
497 | } | ||
498 | } | ||
499 | } | ||
500 | |||
501 | @Override | ||
502 | protected void finalize() throws Throwable { | ||
503 | super.finalize(); | ||
504 | if (!destroyed) { | ||
505 | logger.warn("Server instance not destroyed, but finalizer called"); | ||
506 | } | ||
507 | destroy(); | ||
508 | } | ||
509 | } | ||