aboutsummaryrefslogtreecommitdiff
path: root/src/org/gnunet/util/Server.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/org/gnunet/util/Server.java')
-rw-r--r--src/org/gnunet/util/Server.java365
1 files changed, 294 insertions, 71 deletions
diff --git a/src/org/gnunet/util/Server.java b/src/org/gnunet/util/Server.java
index f2db539..3d65754 100644
--- a/src/org/gnunet/util/Server.java
+++ b/src/org/gnunet/util/Server.java
@@ -20,6 +20,7 @@
20 20
21package org.gnunet.util; 21package org.gnunet.util;
22 22
23import org.gnunet.construct.Construct;
23import org.grothoff.Runabout; 24import org.grothoff.Runabout;
24import org.slf4j.Logger; 25import org.slf4j.Logger;
25import org.slf4j.LoggerFactory; 26import org.slf4j.LoggerFactory;
@@ -29,47 +30,138 @@ import java.net.SocketAddress;
29import java.nio.channels.ServerSocketChannel; 30import java.nio.channels.ServerSocketChannel;
30import java.nio.channels.SocketChannel; 31import java.nio.channels.SocketChannel;
31import java.util.ArrayList; 32import java.util.ArrayList;
33import java.util.Collections;
32import java.util.LinkedList; 34import java.util.LinkedList;
33import java.util.List; 35import java.util.List;
34 36
37/**
38 * A server allows to wait for incoming connections from clients and respectively communicate with those clients.
39 */
35public class Server { 40public class Server {
36 private static final Logger logger = LoggerFactory 41 private static final Logger logger = LoggerFactory
37 .getLogger(Server.class); 42 .getLogger(Server.class);
38 43
44 /**
45 * Default idle timeout for new clients.
46 */
39 private final RelativeTime idleTimeout; 47 private final RelativeTime idleTimeout;
48
49 /**
50 * If true, disconnect a client when it sends a message we do not expect to receive. Otherwise, the unexpected
51 * message will just be discarded.
52 */
40 private final boolean requireFound; 53 private final boolean requireFound;
41 private List<ServerSocketChannel> listenSockets;
42 private List<ClientHandle> clients = new LinkedList<ClientHandle>();
43 54
44 private MessageRunabout receivedMessagehandler; 55 /**
56 * The sockets this server accepts new connections on.
57 */
58 private List<ServerSocketChannel> listenSockets = new ArrayList<ServerSocketChannel>();
59
60 /**
61 * The list of all clients connected to this server.
62 */
63 private List<ClientHandle> clientHandles = new LinkedList<ClientHandle>();
64
65 /**
66 * The runabout that receives received messages, as well as information about the sender of the last
67 * received message.
68 */
69 private MessageRunabout receivedMessageHandler;
70
71 /**
72 * Whenever a client is disconnected all disconnect handlers are informed.
73 */
45 private List<DisconnectHandler> disconnectHandlers = new LinkedList<DisconnectHandler>(); 74 private List<DisconnectHandler> disconnectHandlers = new LinkedList<DisconnectHandler>();
46 private ArrayList<Class> expectedMessages;
47 75
48 private boolean shutdownRequested; 76 /**
77 * Classes of the messages we expect to receive. If a received message is not in this list, the client
78 * will be disconnected, otherwise the message is just ignored.
79 */
80 private List<Class> expectedMessages = Collections.emptyList();
81
82 /**
83 * If true, shut down as soon as all non-monitor clients have finished, and do not allow new connections
84 * to be made to this server.
85 */
86 private boolean inSoftShutdown;
87
88 /**
89 * Task that is executed as soon as a connection is ready to be accepted.
90 */
49 private Cancelable acceptTask; 91 private Cancelable acceptTask;
50 92
93 /**
94 * True if we are destroyed, or in the process of being destroyed with no way back.
95 */
96 private boolean destroyed;
97
98
99 /**
100 * Interface implemented by disconnect handlers, whose onDisconnect method is called whenever a client
101 * is disconnected from the server.
102 */
51 public interface DisconnectHandler { 103 public interface DisconnectHandler {
104 /**
105 * Called whenever a client is disconnected from the server.
106 *
107 * @param clientHandle the handle for the client that was disconnected
108 */
52 void onDisconnect(ClientHandle clientHandle); 109 void onDisconnect(ClientHandle clientHandle);
53 } 110 }
54 111
55 112 /**
113 * A handle to a (remote) client connected to this server.
114 * <p/>
115 * Every client handle keeps a reference count..
116 * Whenever a part of the programs saves a client handle for further interaction with it, keep() should be called.
117 * This prevents the server from disconnecting the client when it is idle.
118 * Once this interaction is over, drop() will decrement the reference count and eventually disconnect the client
119 * after being idle for long enough.
120 */
56 public class ClientHandle { 121 public class ClientHandle {
57 private RelativeTime clientTimeout; 122 /**
123 * The underlying connection to the client-
124 */
58 private Connection connection; 125 private Connection connection;
59 126
127 /**
128 * When do we disconnect the client after it has been idle?
129 */
130 private RelativeTime clientTimeout;
131
132 /**
133 * When referenceCount==0, the server is allowed to drop the client after a timeout.
134 */
60 private int referenceCount = 0; 135 private int referenceCount = 0;
61 private Connection.ReceiveHandle currentReceive;
62 136
137 /**
138 * Handle for canceling the receive process of this client, null if no receive is currently going on.
139 */
140 private Cancelable currentReceive;
141
142 /**
143 * Set to true if the connection to this client should not prevent the server from shutting down.
144 */
63 private boolean isMonitor; 145 private boolean isMonitor;
64 146
65 private ClientHandle(SocketChannel accept) { 147 /**
66 connection = new Connection(accept); 148 * Iff true, disconnect the client as soon as possible.
149 * Disconnecting may sometimes not be possible immediately, for example when the reference count is not zero.
150 */
151 private boolean disconnectRequested;
152
153 /**
154 * Create a client handle.
155 *
156 * @param sock
157 */
158 private ClientHandle(SocketChannel sock) {
159 connection = new Connection(sock);
67 clientTimeout = idleTimeout; 160 clientTimeout = idleTimeout;
68 // start receiving 161 // start receiving
69 receiveDone(true); 162 receiveDone(true);
70 } 163 }
71 164
72
73 /** 165 /**
74 * Notify us when the server has enough space to transmit 166 * Notify us when the server has enough space to transmit
75 * a message of the given size to the given client. 167 * a message of the given size to the given client.
@@ -81,36 +173,63 @@ public class Server {
81 * @return a handle to cancel the notification 173 * @return a handle to cancel the notification
82 */ 174 */
83 public Cancelable notifyTransmitReady(int size, RelativeTime timeout, MessageTransmitter transmitter) { 175 public Cancelable notifyTransmitReady(int size, RelativeTime timeout, MessageTransmitter transmitter) {
84 return connection.notifyTransmitReady(0, timeout, transmitter); 176 return connection.notifyTransmitReady(size, timeout, transmitter);
177 }
178
179 /**
180 * Convenience method for sending messages.
181 *
182 * @param timeout when should we give up sending the message, and call cont.cont(false)
183 * @param message the message to send
184 * @param cont called when the message has been sent successfully or on error
185 * @return a handle to cancel sending the message
186 */
187 public Cancelable transmitWhenReady(final RelativeTime timeout, final GnunetMessage.Body message, final Continuation cont) {
188 return notifyTransmitReady(0, timeout, new MessageTransmitter() {
189 @Override
190 public void transmit(Connection.MessageSink sink) {
191 sink.send(message);
192 if (cont != null) {
193 cont.cont(true);
194 }
195 }
196
197 @Override
198 public void handleError() {
199 if (cont != null) {
200 cont.cont(false);
201 }
202 }
203 });
85 } 204 }
86 205
87 /** 206 /**
88 * Resume receiving from this client, we are done processing the 207 * Resume receiving from this client, we are done processing the
89 * current request. This function must be called from within each 208 * current request. This function must be called from within each
90 * message handler (or its respective continuations). 209 * message handler (or its respective continuations).
91 * <p/> 210 * <p/>
92 * The server does not automatically continue to receive messages to 211 * The server does not automatically continue to receive messages to
93 * support flow control. 212 * support flow control.
94 * 213 *
95 * @param keepClient false if connection to the client should be closed 214 * @param stayConnected false if connection to the client should be closed
96 */ 215 */
97 public void receiveDone(boolean keepClient) { 216 public void receiveDone(boolean stayConnected) {
98 if (keepClient) { 217 if (stayConnected) {
99 currentReceive = connection.receive(RelativeTime.FOREVER, new MessageReceiver() { 218 currentReceive = connection.receive(RelativeTime.FOREVER, new MessageReceiver() {
100 @Override 219 @Override
101 public void process(GnunetMessage.Body msg) { 220 public void process(GnunetMessage.Body msg) {
102 if (msg instanceof UnknownMessageBody) { 221 if ((msg instanceof UnknownMessageBody) || !expectedMessages.contains(msg.getClass())) {
103 if (requireFound) { 222 if (requireFound) {
104 logger.info("disconnecting client sending unknown message"); 223 logger.info("disconnecting client sending unknown message");
105 disconnect(); 224 disconnect();
106 } 225 }
107 // otherwise, just ignore it 226 // otherwise, just ignore it
108 } 227 }
109 if (receivedMessagehandler == null) { 228 if (receivedMessageHandler == null) {
110 throw new AssertionError("received message, but no handler installed"); 229 throw new AssertionError("received message, but no handler installed");
111 } 230 }
112 receivedMessagehandler.setSender(ClientHandle.this); 231 receivedMessageHandler.setSender(ClientHandle.this);
113 receivedMessagehandler.visitAppropriate(msg); 232 receivedMessageHandler.visitAppropriate(msg);
114 } 233 }
115 234
116 @Override 235 @Override
@@ -120,7 +239,12 @@ public class Server {
120 } 239 }
121 }); 240 });
122 } else { 241 } else {
123 disconnect(); 242 if (referenceCount > 0) {
243 this.disconnectRequested = true;
244 } else {
245 System.out.println("disconnecting " + this.isMonitor);
246 disconnect();
247 }
124 } 248 }
125 249
126 } 250 }
@@ -129,48 +253,86 @@ public class Server {
129 * Change the idle timeout of this particular client. 253 * Change the idle timeout of this particular client.
130 */ 254 */
131 public void setTimeout(RelativeTime newTimeout) { 255 public void setTimeout(RelativeTime newTimeout) {
256 this.clientTimeout = newTimeout;
132 } 257 }
133 258
259 /**
260 * Ask the server to disconnect from the given client.
261 * <p/>
262 * The client will be disconnected from the server, no matter what the current reference count is.
263 */
134 public void disconnect() { 264 public void disconnect() {
135 connection.disconnect(); 265 connection.disconnect();
136 Server.this.clients.remove(this); 266 // if we are in the process of destruction, to not remove, the destruction function will do this,
267 // removing the client handle while in destruction would yield a concurrent modification exception
268 if (!destroyed) {
269 Server.this.clientHandles.remove(this);
270 }
137 for (DisconnectHandler dh : disconnectHandlers) { 271 for (DisconnectHandler dh : disconnectHandlers) {
138 dh.onDisconnect(this); 272 dh.onDisconnect(this);
139 } 273 }
274 Server.this.testForSoftShutdown();
140 } 275 }
141 276
142 /** 277 /**
143 * Disable the warning the server issues if a message is not acknowledged 278 * Prevent the client from being disconnected.
144 * in a timely fashion. Use this call if a client is intentionally delayed 279 * For every keep, there should be an additional matching drop.
145 * for a while. Only applies to the current message.
146 */ 280 */
147 public void disableReceiveDoneWarning() {
148 // todo
149 }
150
151 public void keep() { 281 public void keep() {
152 referenceCount++; 282 referenceCount++;
153 } 283 }
154 284
285
286 /**
287 * Allow to disconnect this client, if not prevented by previous calls to keep.
288 * <p/>
289 * A call to drop should be executed for every call to keep.
290 * After drop() has been executed for every matching keep(), the next call to drop()
291 * allows the server to disconnect the client after a timeout.
292 */
155 public void drop() { 293 public void drop() {
294 assert referenceCount > 0;
156 referenceCount--; 295 referenceCount--;
157 if (referenceCount == 0 && shutdownRequested) { 296 if (referenceCount == 0 && disconnectRequested) {
158 disconnect(); 297 disconnect();
159 } 298 }
160 } 299 }
161 300
301
302 /**
303 * Set the 'monitor' flag on this client. Clients which have been
304 * marked as 'monitors' won't prevent the server from shutting down
305 * once 'GNUNET_SERVER_stop_listening' has been invoked. The idea is
306 * that for "normal" clients we likely want to allow them to process
307 * their requests; however, monitor-clients are likely to 'never'
308 * disconnect during shutdown and thus will not be considered when
309 * determining if the server should continue to exist after
310 * 'GNUNET_SERVER_destroy' has been called.
311 */
162 public void markMonitor() { 312 public void markMonitor() {
163 this.isMonitor = true; 313 this.isMonitor = true;
164 } 314 }
315
316 public boolean isMonitor() {
317 return isMonitor;
318 }
165 } 319 }
166 320
167 321
168 abstract static class MessageRunabout extends Runabout { 322 /**
323 * All handlers for receiving messages from clients have to inherit this class.
324 * <p/>
325 * MessageRunabout is a standard runabout with the added possibility of getting the sender of the message.
326 * This is necessary as the runabout's visit methods can have only one parameter.
327 */
328 public abstract static class MessageRunabout extends Runabout {
169 private ClientHandle currentSender; 329 private ClientHandle currentSender;
170 330
171 /** 331 /**
172 * Allows implementors of MessageRunabout to get the Client that sent the message 332 * Allows implementors of MessageRunabout to get the Client that sent the message
173 * currently visited. 333 * currently visited.
334 * <p/>
335 * The return value of getSender() is only valid while executing a visit method.
174 * 336 *
175 * @return handle of the client whose message is currently being visited 337 * @return handle of the client whose message is currently being visited
176 */ 338 */
@@ -178,39 +340,16 @@ public class Server {
178 return currentSender; 340 return currentSender;
179 } 341 }
180 342
343 /**
344 * Private method used to set the sender for the getSender() method.
345 *
346 * @param clientHandle the client handle to set as the sender
347 */
181 private void setSender(ClientHandle clientHandle) { 348 private void setSender(ClientHandle clientHandle) {
182 currentSender = clientHandle; 349 currentSender = clientHandle;
183 } 350 }
184 } 351 }
185 352
186
187 private void doAccept(final ServerSocketChannel srv) {
188 Scheduler.TaskConfiguration b = new Scheduler.TaskConfiguration(RelativeTime.FOREVER,
189 new Scheduler.Task() {
190 @Override
191 public void run(Scheduler.RunContext ctx) {
192 acceptTask = null;
193 try {
194 SocketChannel cli = srv.accept();
195
196 if (cli != null) {
197 logger.debug("client connected");
198 cli.configureBlocking(false);
199 ClientHandle clientHandle = new ClientHandle(cli);
200 clients.add(clientHandle);
201 }
202
203 } catch (IOException e) {
204 throw new RuntimeException("accept failed", e);
205 }
206 doAccept(srv);
207 }
208 });
209 b.selectAccept(srv);
210 acceptTask = b.schedule();
211 }
212
213
214 /** 353 /**
215 * Create a server listening on all specified addresses. 354 * Create a server listening on all specified addresses.
216 * 355 *
@@ -221,7 +360,6 @@ public class Server {
221 public Server(List<SocketAddress> addresses, RelativeTime idleTimeout, boolean requireFound) { 360 public Server(List<SocketAddress> addresses, RelativeTime idleTimeout, boolean requireFound) {
222 this.idleTimeout = idleTimeout; 361 this.idleTimeout = idleTimeout;
223 this.requireFound = requireFound; 362 this.requireFound = requireFound;
224 listenSockets = new ArrayList<ServerSocketChannel>(addresses.size());
225 try { 363 try {
226 for (SocketAddress addr : addresses) { 364 for (SocketAddress addr : addresses) {
227 ServerSocketChannel socket = ServerSocketChannel.open(); 365 ServerSocketChannel socket = ServerSocketChannel.open();
@@ -229,20 +367,53 @@ public class Server {
229 socket.socket().bind(addr); 367 socket.socket().bind(addr);
230 logger.debug("socket listening on {}", addr.toString()); 368 logger.debug("socket listening on {}", addr.toString());
231 listenSockets.add(socket); 369 listenSockets.add(socket);
232 doAccept(socket); 370 addAcceptSocket(socket);
233 } 371 }
234 } catch (IOException e) { 372 } catch (IOException e) {
235 throw new RuntimeException("could not bind"); 373 throw new RuntimeException("could not bind", e);
236 } 374 }
237 } 375 }
238 376
377 /**
378 * Create a server, not listening on any sockets yet for new connections.
379 *
380 * @param idleTimeout time after a client will be disconnected if idle
381 * @param requireFound allow unknown messages to be received without disconnecting the client in response
382 */
239 public Server(RelativeTime idleTimeout, boolean requireFound) { 383 public Server(RelativeTime idleTimeout, boolean requireFound) {
240 this.idleTimeout = idleTimeout; 384 this.idleTimeout = idleTimeout;
241 this.requireFound = requireFound; 385 this.requireFound = requireFound;
242 } 386 }
243 387
244 public void addAcceptSocket(ServerSocketChannel sock) { 388 /**
245 doAccept(sock); 389 * Accept new connections from the given server socket.
390 *
391 * @param sock the new socket to accept connections from
392 */
393 public final void addAcceptSocket(final ServerSocketChannel sock) {
394 Scheduler.TaskConfiguration b = new Scheduler.TaskConfiguration(RelativeTime.FOREVER,
395 new Scheduler.Task() {
396 @Override
397 public void run(Scheduler.RunContext ctx) {
398 acceptTask = null;
399 try {
400 SocketChannel cli = sock.accept();
401
402 if (cli != null) {
403 logger.debug("client connected");
404 cli.configureBlocking(false);
405 ClientHandle clientHandle = new ClientHandle(cli);
406 clientHandles.add(clientHandle);
407 }
408
409 } catch (IOException e) {
410 throw new RuntimeException("accept failed", e);
411 }
412 addAcceptSocket(sock);
413 }
414 });
415 b.selectAccept(sock);
416 acceptTask = b.schedule();
246 } 417 }
247 418
248 /** 419 /**
@@ -253,10 +424,18 @@ public class Server {
253 * @param msgRunabout handler 424 * @param msgRunabout handler
254 */ 425 */
255 public void setHandler(MessageRunabout msgRunabout) { 426 public void setHandler(MessageRunabout msgRunabout) {
256 receivedMessagehandler = msgRunabout; 427 receivedMessageHandler = msgRunabout;
257 expectedMessages = RunaboutUtil.getRunaboutVisitees(msgRunabout); 428 expectedMessages = RunaboutUtil.getRunaboutVisitees(msgRunabout);
258 } 429 }
259 430
431 /**
432 * Ask the server to notify us whenever a client disconnects.
433 * This handler is called whenever the actual network connection
434 * is closed; the reference count may be zero or larger than zero
435 * at this point. Note that the disconnect handler is also called when
436 *
437 * @param disconnectHandler handler to call on disconnect
438 */
260 public Cancelable notifyDisconnect(final DisconnectHandler disconnectHandler) { 439 public Cancelable notifyDisconnect(final DisconnectHandler disconnectHandler) {
261 this.disconnectHandlers.add(disconnectHandler); 440 this.disconnectHandlers.add(disconnectHandler);
262 return new Cancelable() { 441 return new Cancelable() {
@@ -268,19 +447,63 @@ public class Server {
268 } 447 }
269 448
270 /** 449 /**
271 * Stop the listen socket and get ready to shutdown the server 450 * Stop the listen socket destroy the server as soon as only monitor clients are left.
272 * once only 'monitor' clients are left.
273 */ 451 */
274 public void stopListening() { 452 public void stopListening() {
275 shutdownRequested = true; 453 inSoftShutdown = true;
276 // todo: shut down if only monitor clients left 454 if (acceptTask != null) {
455 acceptTask.cancel();
456 acceptTask = null;
457 }
458 testForSoftShutdown();
277 } 459 }
278 460
461 /**
462 * Disconnect all clients forcefully from the server and stop listening.
463 * <p/>
464 * No methods should be called on a server and its client handles after destroy() has been called.
465 */
279 public void destroy() { 466 public void destroy() {
280 for (ClientHandle h : new ArrayList<ClientHandle>(clients)) { 467 if (destroyed) {
468 return;
469 }
470 destroyed = true;
471 for (ClientHandle h : clientHandles) {
281 h.disconnect(); 472 h.disconnect();
282 } 473 }
283 acceptTask.cancel(); 474 clientHandles.clear();
475 if (acceptTask != null) {
476 acceptTask.cancel();
477 acceptTask = null;
478 }
479 for (ServerSocketChannel ssc : listenSockets) {
480 try {
481 ssc.close();
482 } catch (IOException e) {
483 logger.error("closing listen socket failed", e);
484 }
485 }
284 } 486 }
285 487
488 /**
489 * Test if we should destroy outselves.
490 */
491 private void testForSoftShutdown() {
492 // do this so we don't have many recursive calls to testForSoftShutdown when shutting down
493 if (destroyed) {
494 return;
495 }
496 if (inSoftShutdown) {
497 System.out.println(""+clientHandles.size());
498 boolean done = true;
499 for (ClientHandle clientHandle : this.clientHandles) {
500 if (!clientHandle.isMonitor) {
501 done = false;
502 }
503 }
504 if (done) {
505 destroy();
506 }
507 }
508 }
286} 509}