aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/gnunet/testbed/Controller.java
blob: e855033849626d276064ae0e2232d0b8223683bd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
/*
     This file is part of GNUnet.
     Copyright (C) 2009,2013 Christian Grothoff (and other contributing authors)

     GNUnet is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License as published
     by the Free Software Foundation; either version 2, or (at your
     option) any later version.

     GNUnet is distributed in the hope that it will be useful, but
     WITHOUT ANY WARRANTY; without even the implied warranty of
     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     General Public License for more details.

     You should have received a copy of the GNU General Public License
     along with GNUnet; see the file COPYING.  If not, write to the
     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
     Boston, MA 02111-1307, USA.
 */
package org.gnunet.testbed;

import org.gnunet.mq.Envelope;
import org.gnunet.requests.MatchingRequestContainer;
import org.gnunet.requests.Request;
import org.gnunet.requests.RequestIdentifier;
import org.gnunet.testbed.messages.*;
import org.gnunet.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Handle to interact with a GNUnet testbed controller.  Each
 * controller has at least one master handle which is created when the
 * controller is created; this master handle interacts with the
 * controller process, destroying it destroys the controller (by
 * closing stdin of the controller process).  Additionally,
 * controllers can interact with each other (in a P2P fashion); those
 * links are established via TCP/IP on the controller's service port.
 */
public class Controller {
    private static final Logger logger = LoggerFactory
        .getLogger(Controller.class);

    public static class EventTypes {
       /**
        * A peer has been started.
        */
        public static final int PEER_START = 0;
        /**
         * A peer has been stopped.
         */
        public static final int PEER_STOP = 1;
        /**
         * A connection between two peers has been established.
         */
        public static final int PEERS_CONNECT = 2;
        /**
         * A connection between two peers has been torn down.
         */
        public static final int PEERS_DISCONNECT = 3;
        /**
         * An operation has finished.
         */
        public static final int OPERATION_FINISHED = 4;
    }

    /**
     * Client connecting to the testbed service.
     */
    private Client client;

    /**
     * Host that this controller runs on.
     */
    private Host host;

    private int operationCounter = 1;
    private int peerCounter;

    /**
     * Request queue (akin to operation queue(s) in the GNUnet C implementation).
     */
    private MatchingRequestContainer<Long,OperationRequest> requests;

    abstract class OperationRequest extends Request {
        protected final long operationId;
        public OperationRequest() {
            operationId = (((long) host.id) << 32) | (long) operationCounter++;
        }
    }

    abstract class GenericOperationRequest extends OperationRequest {
        final protected OperationCompletionCallback cb;
        public GenericOperationRequest(OperationCompletionCallback cb) {
            this.cb = cb;
        }
        void onSuccess() {
            cb.onCompletion();
        }
    }

    class PeerCreateRequest extends OperationRequest {
        final Host host;
        final Configuration cfg;
        final PeerCreateCallback cb;
        final int peerId;

        public PeerCreateRequest(Host host, Configuration cfg, PeerCreateCallback cb) {
            this.host = host;
            this.cfg = cfg;
            this.cb = cb;
            this.peerId = peerCounter++;
        }

        @Override
        public Envelope assembleRequest() {
            CompressedConfig ccfg = new CompressedConfig(cfg);
            PeerCreateMessage m = new PeerCreateMessage();
            m.hostId = host.id;
            m.operationId = operationId;
            m.peerId = peerId;
            m.compressedConfig = ccfg.compressedData;
            m.configSize = ccfg.getUncompressedSize();
            System.out.println("create getRequestIdentifier with opid " + m.operationId);
            return new Envelope(m);
        }
    }

    class PeerDestroyRequest extends GenericOperationRequest {
        final int peerId;
        public PeerDestroyRequest(Peer p, OperationCompletionCallback cb) {
            super(cb);
            peerId = p.peerId;
        }

        @Override
        public Envelope assembleRequest() {
            PeerDestroyMessage m = new PeerDestroyMessage();
            m.operationId = operationId;
            m.peerId = peerId;

            System.out.println("destroy getRequestIdentifier with opid " + m.operationId);
            return new Envelope(m);
        }
    }

    class PeerInformationRequest extends OperationRequest {
        final int peerId;
        final PeerInformationCallback cb;
        public PeerInformationRequest(Peer peer, PeerInformationCallback cb) {
            peerId = peer.peerId;
            this.cb = cb;
        }

        @Override
        public Envelope assembleRequest() {
            PeerGetInformationMessage m = new PeerGetInformationMessage();
            m.operationId = operationId;
            m.peerId = peerId;
            return new Envelope(m);
        }
    }

    class PeerUpdateConfigurationRequest extends GenericOperationRequest {
        final int peerId;
        final Configuration cfg;
        public PeerUpdateConfigurationRequest(Peer peer, OperationCompletionCallback cb, Configuration cfg) {
            super(cb);
            peerId = peer.peerId;
            this.cfg = cfg;
        }

        @Override
        public Envelope assembleRequest() {
            PeerReconfigureMessage m = new PeerReconfigureMessage();
            m.operationId = operationId;
            m.peerId = peerId;
            CompressedConfig ccfg = new CompressedConfig(cfg);
            m.uncompressedConfigSize = ccfg.getUncompressedSize();
            m.compressedConfig = ccfg.compressedData;

            System.out.println("compressed config size " +m.compressedConfig.length);
            return new Envelope(m);
        }
    }

    class PeerManageServiceRequest extends OperationRequest {
        final Peer peer;
        final boolean start;
        final String serviceName;
        public PeerManageServiceRequest(Peer peer, String serviceName, boolean start) {
            this.peer = peer;
            this.start = start;
            this.serviceName = serviceName;
        }
        @Override
        public Envelope assembleRequest() {
            ManagePeerServiceMessage m = new ManagePeerServiceMessage();
            m.operationId = operationId;
            m.peerId = peer.peerId;
            m.serviceName = serviceName;
            m.start = start;
            return new Envelope(m);
        }
    }

    class PeerStartRequest extends OperationRequest {
        final Peer peer;
        final PeerChurnCallback peerChurnCallback;

        public PeerStartRequest(Peer peer, PeerChurnCallback peerChurnCallback) {
            this.peer = peer;
            this.peerChurnCallback = peerChurnCallback;
        }

        @Override
        public Envelope assembleRequest() {
            PeerStartMessage m = new PeerStartMessage();
            m.operationId = operationId;
            m.peerId = peer.peerId;
            return new Envelope(m);
        }
    }

    class PeerStopRequest extends OperationRequest {
        final Peer peer;
        final PeerChurnCallback peerChurnCallback;

        public PeerStopRequest(Peer peer, PeerChurnCallback peerChurnCallback) {
            this.peer = peer;
            this.peerChurnCallback = peerChurnCallback;
        }

        @Override
        public Envelope assembleRequest() {
            PeerStopMessage m = new PeerStopMessage();
            m.operationId = operationId;
            m.peerId = peer.peerId;
            return new Envelope(m);
        }
    }

    class PeerConnectOverlayRequest extends OperationRequest {
        final Peer peer1;
        final Peer peer2;
        final OperationCompletionCallback cb;
        public PeerConnectOverlayRequest(Peer peer1, Peer peer2, OperationCompletionCallback cb) {
            this.peer1 = peer1;
            this.peer2 = peer2;
            this.cb = cb;
        }
        @Override
        public Envelope assembleRequest() {
            OverlayConnectMessage m = new OverlayConnectMessage();
            m.operationId = operationId;
            m.peer1 = peer1.peerId;
            m.peer2 = peer2.peerId;
            m.hostOfPeer2 = peer2.getHost().id;
            return new Envelope(m);
        }
    }


    public class ControllerMessageReceiver extends RunaboutMessageReceiver {
        public void visit(PeerEventMessage m) {
            RequestIdentifier<OperationRequest> rId = requests.getRequestIdentifier(m.operationId);
            OperationRequest r = rId.getRequest();
            if (null == r) {
                logger.error("no matching peer event getRequestIdentifier for op id %s", m.operationId);
                return;
            }
            if (r instanceof PeerStartRequest && m.eventType == EventTypes.PEER_START) {
                PeerStartRequest psr = (PeerStartRequest) r;
                psr.peerChurnCallback.onChurnSuccess();
            } else if (r instanceof PeerStopRequest && m.eventType == EventTypes.PEER_STOP) {
                PeerStopRequest psr = (PeerStopRequest) r;
                psr.peerChurnCallback.onChurnSuccess();
            } else {
                logger.error("unexpected peer event message, event type %s and getRequestIdentifier %s",
                             m.eventType, r);
            }
        }
        public void visit(PeerCreateSuccessMessage m) {
            RequestIdentifier<OperationRequest> rId = requests.getRequestIdentifier(m.operationId);
            OperationRequest r = rId.getRequest();
            if (!(r instanceof PeerCreateRequest)) {
                logger.warn("response to peer create getRequestIdentifier does not match");
                return;
            }
            PeerCreateRequest pcr = (PeerCreateRequest) r;
            Peer p = new Peer(pcr.peerId);
            pcr.cb.onPeerCreated(p);
        }
        public void visit(PeerInformationMessage m) {
            RequestIdentifier<OperationRequest> rId = requests.getRequestIdentifier(m.operationId);
            OperationRequest r = rId.getRequest();
            if (null == r) {
                logger.error("unexpected peer information message (opid={})", m.operationId);
                return;
            }
            if (!(r instanceof PeerInformationRequest)) {
                logger.warn("response to peer create getRequestIdentifier does not match");
                return;
            }
            PeerInformationRequest pir = (PeerInformationRequest) r;
            CompressedConfig ccfg = new CompressedConfig(m.compressedConfig);
            pir.cb.onSuccess(m.peerIdentity, ccfg.decompress());
        }
        public void visit(GenericOperationSuccessMessage m) {
            RequestIdentifier<OperationRequest> rId = requests.getRequestIdentifier(m.operationId);
            OperationRequest r = rId.getRequest();
            if (null == r) {
                logger.error("unexpected generic success message (opid={})", m.operationId);
                return;
            }
            if (!(r instanceof GenericOperationRequest)) {
                logger.error(String.format(
                               "got GenericOperationSuccessMessage as response to getRequestIdentifier '%s', opid %s; event type %s",
                               r.getClass(), m.operationId, m.eventType));
                return;
            }
            GenericOperationRequest gr = (GenericOperationRequest) r;
            gr.onSuccess();
        }
        public void visit(ConnectionEventMessage m) {
            RequestIdentifier<OperationRequest> rId = requests.getRequestIdentifier(m.operationId);
            OperationRequest r = rId.getRequest();
            if (null == r) {
                logger.error("unexpected connection event message (opid={})", m.operationId);
                return;
            }
            if (!(r instanceof PeerConnectOverlayRequest)) {
                logger.error("unexpected connection event message for operation {}", r.getClass());
                return;
            }
            PeerConnectOverlayRequest cr = (PeerConnectOverlayRequest) r;
            cr.cb.onCompletion();

        }
        public void visit(OperationFailEventMessage m) {
            logger.error("operation failed: " + m.errorMessage);
        }
        @Override
        public void handleError() {
            throw new AssertionError();
        }
    }

    /**
     * Connect to a controller process.  The configuration to use for the connection
     * is retreived from the given host where a controller is started using
     * GNUNET_TESTBED_controller_start().
     *
     * @param host host to run the controller on; This should be the same host if
     *          the controller was previously started with
     *          GNUNET_TESTBED_controller_start()
     */
    public Controller(Host host) {
        this.host = host;
        client = new Client("testbed", host.cfg);
        client.installReceiver(new ControllerMessageReceiver());
        requests = new MatchingRequestContainer<Long, OperationRequest>(client);

        ControllerInitMessage m = new ControllerInitMessage();
        // we are interested in all events
        m.eventMask = 1 | 2 | 4 | 8 | 16;
        m.controlerHostname = (host.hostname == null) ? "127.0.0.1" : host.hostname;
        m.hostId = host.id;
        client.send(m);
    }

    /**
     * Create the given peer at the specified host using the given
     * controller.  If the given controller is not running on the target
     * host, it should find or create a controller at the target host and
     * delegate creating the peer.  Explicit delegation paths can be setup
     * using 'Controller.link'.  If no explicit delegation
     * path exists, a direct link with a subordinate controller is setup
     * for the first delegated peer to a particular host; the subordinate
     * controller is then destroyed once the last peer that was delegated
     * to the remote host is stopped.
     *
     * Creating the peer only creates the handle to manipulate and further
     * configure the peer; use "Peer.start" and
     * "Peer.stop" to actually start/stop the peer's
     * processes.
     *
     * Note that the given configuration will be adjusted by the
     * controller to avoid port/path conflicts with other peers.
     * The "final" configuration can be obtained using
     * 'Peer.getInformation'.
     *
     * @param host host to run the peer on; cannot be NULL
     * @param cfg Template configuration to use for the peer.
     * @param cb the callback to call when the peer has been created
     * @return the operation handle
     */
    public Cancelable createPeer(Host host, Configuration cfg, PeerCreateCallback cb) {
        PeerCreateRequest r = new PeerCreateRequest(host, cfg, cb);
        return requests.addRequest(r.operationId, r);
    }

    /**
     * Stop the given controller (also will terminate all peers and
     * controllers dependent on this controller).  This function
     * blocks until the testbed has been fully terminated (!).
     */
    public void disconnect () {
        client.disconnect();
    }


    /**
     * Create a link from slave controller to delegated controller. Whenever the
     * master controller is asked to start a peer at the delegated controller the
     * getRequestIdentifier will be routed towards slave controller (if a route exists). The
     * slave controller will then route it to the delegated controller. The
     * configuration of the delegated controller is given and is used to either
     * create the delegated controller or to connect to an existing controller. Note
     * that while starting the delegated controller the configuration will be
     * modified to accommodate available free ports.  the 'is_subordinate' specifies
     * if the given delegated controller should be started and managed by the slave
     * controller, or if the delegated controller already has a master and the slave
     * controller connects to it as a non master controller. The success or failure
     * of this operation will be signalled through the
     * GNUNET_TESTBED_ControllerCallback() with an event of type
     * GNUNET_TESTBED_ET_OPERATION_FINISHED
     *
     * @param delegated_host requests to which host should be delegated; cannot be NULL
     * @param slave_host which host is used to run the slave controller; use NULL to
     *          make the master controller connect to the delegated host
     * @param is_subordinate GNUNET_YES if the controller at delegated_host should
     *          be started by the slave controller; GNUNET_NO if the slave
     *          controller has to connect to the already started delegated
     *          controller via TCP/IP
     * @return the operation handle
     */
    public Cancelable link(Host delegated_host, Host slave_host, boolean is_subordinate) {
        // low priority
        throw new UnsupportedOperationException("not yet implemented");
    }


    /**
     * Register a host with the controller. This makes the controller aware of the
     * host. A host should be registered at the controller before starting a
     * sub-controller on that host using GNUNET_TESTBED_controller_link().
     *
     * @param host the host to register
     * @param cc the completion callback to call to inform the status of
     *          registration. After calling this callback the registration handle
     *          will be invalid. Cannot be NULL
     * @return handle to the host registration which can be used to onCancel the
     *           registration; NULL if another registration handle is present and
     *           is not cancelled
     */
    Cancelable registerHost(Host host, HostRegistrationCompletion cc) {
        throw new UnsupportedOperationException("not implemented");
    }


    /**
     * Opaque handle to a peer controlled by the testbed framework.  A peer runs
     * at a particular host.
     */
    public class Peer {
        final private int peerId;

        /**
         * Private constructor for the peer, creates the peer with the given id,
         * and, implicitly the containing controller.
         *
         * @param peerId id for the peer
         */
        private Peer(int peerId) {
            this.peerId = peerId;
        }

        /**
         * Start this peer
         *
         * @param peerChurnCallback completion callback
         * @return handle to onCancel the operation
         */
        public Cancelable start(PeerChurnCallback peerChurnCallback) {
            PeerStartRequest r = new PeerStartRequest(this, peerChurnCallback);
            return requests.addRequest(r.operationId, r);
        }

        /**
         * Stop this peer
         *
         * @param peerChurnCallback completion callback
         * @return handle to onCancel the operation
         */
        public Cancelable stop(PeerChurnCallback peerChurnCallback) {
            PeerStopRequest r = new PeerStopRequest(this, peerChurnCallback);
            return requests.addRequest(r.operationId, r);
        }

        public Cancelable requestInformation(PeerInformationCallback cb) {
            PeerInformationRequest r = new PeerInformationRequest(this, cb);
            return requests.addRequest(r.operationId, r);
        }

        /*
         * Change peer configuration.  Must only be called while the
         * peer is stopped.  Ports and paths cannot be changed this
         * way.
         */
        public Cancelable updateConfiguration(Configuration cfg, OperationCompletionCallback cb) {
            PeerUpdateConfigurationRequest r = new PeerUpdateConfigurationRequest(this, cb, cfg);
            return requests.addRequest(r.operationId, r);
        }

        /*
         * Change peer configuration.  Must only be called while the
         * peer is stopped.  Ports and paths cannot be changed this
         * way.
         */
        public Cancelable destroy(OperationCompletionCallback cb) {
            PeerDestroyRequest r = new PeerDestroyRequest(this, cb);
            return requests.addRequest(r.operationId, r);
        }

        public Cancelable manageService(String serviceName, boolean start, OperationCompletionCallback cb) {
            PeerManageServiceRequest r = new PeerManageServiceRequest(this, serviceName, start);
            return requests.addRequest(r.operationId, r);
        }

        /**
         * Both peers must have been started before calling this function.
         * This function then obtains a HELLO from this peer, gives it to 'otherPeer'
         * and asks 'otherPeer' to connect to this peer.
         *
         * @param otherPeer peer to connect this peer to
         * @param cb callback object to signal completion or failure
         * @return token to onCancel the getRequestIdentifier
         *
         */
        public Cancelable connectOverlay(Peer otherPeer, OperationCompletionCallback cb) {
            PeerConnectOverlayRequest r = new PeerConnectOverlayRequest(this, otherPeer, cb);
            return requests.addRequest(r.operationId, r);
        }


        /**
         * Connect to a service offered by the given peer.  Will ensure that
         * the getRequestIdentifier is queued to not overwhelm our ability to create and
         * maintain connections with other systems.  The actual service
         * handle is then returned via the 'op_result' member in the event
         * callback.  The 'ca' callback is used to create the connection
         * when the time is right; the 'da' callback will be used to
         * destroy the connection (upon 'GNUNET_TESTBED_operation_done').
         * 'GNUNET_TESTBED_operation_done' can be used to abort this
         * operation until the event callback has been called.
         *
         * @param serviceName name of the service to connect to
         * @param serviceAdapter callback object for connection establishment and tear-down.
         * @return handle for the operation
         */
        public Cancelable getServiceConnection(String serviceName, final ServiceAdapter serviceAdapter) {
            return requestInformation(new PeerInformationCallback() {
                @Override
                public void onSuccess(PeerIdentity peerIdentity, Configuration configuration) {
                    serviceAdapter.onConnect(configuration);
                }
            });
        }

        /**
         * Get the host this peer is running on.
         *
         * @return the host this peer is running on
         */
        public Host getHost() {
            return Controller.this.host;
        }
    }
}