aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/gnunet/consensus/Consensus.java
blob: 322599ccb0210d537eadf64d2e6bbcd2cd6802f2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package org.gnunet.consensus;

import org.gnunet.mq.Envelope;
import org.gnunet.mq.MessageQueue;
import org.gnunet.mq.NotifySentHandler;
import org.gnunet.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Multi-peer set reconciliation.
 */
public class Consensus {
    /**
     * Class logger.
     */
    private static final Logger logger = LoggerFactory
            .getLogger(Consensus.class);

    /**
     * Callback for new elements arriving from the service.
     * Also used to notify of consensus failure.
     */
    private final NewElementCallback newElementCallback;

    /**
     * Client connected to the consensus service.
     */
    private Client client;

    /**
     * Called when conclude has finished.
     */
    private ConcludeCallback concludeCallback;

    /**
     * Message dispatch for messages from the consensus service.
     */
    private class ConsensusMessageReceiver extends RunaboutMessageReceiver {
        public void visit(ConcludeDoneMessage m) {
            if (null == concludeCallback)
            {
                logger.error("unexpected conclude done message");
                return;
            }
            concludeCallback.onConcludeDone();
        }

        public void visit(NewElementMessage m) {
            ConsensusElement element = new ConsensusElement();
            element.element_type = m.element_type;
            element.data = m.element_data;
            newElementCallback.onNewElement(element);
        }

        @Override
        public void handleError() {
            newElementCallback.onNewElement(null);
        }
    }

    /**
     * Create a consensus session.  The set being reconciled is initially
     * empty.  Only reconcile with other peers after
     * GNUNET_CONSENSUS_reconcile has been called.
     *
     * @param num_peers number of peers in the session
     * @param peers array of peers participating in this consensus session
     *              Inclusion of the local peer is optional.
     * @param sessionId session identifier
     *                   Allows a group of peers to have more than consensus session.
     * @param newElementCallback callback, called when a new element is added to the set by
     *                    another peer
     */
    public Consensus(Configuration cfg, int num_peers, PeerIdentity[] peers, HashCode sessionId,
                     NewElementCallback newElementCallback) {
        client = new Client("consensus", cfg);
        client.installReceiver(new ConsensusMessageReceiver());
        this.newElementCallback = newElementCallback;
    }

    /**
     * Insert an element into the consensus set.
     *
     * @param element element to insert in the consnesus
     * @param idc called when the element has been sent to the service
     */
    public void insertElement (ConsensusElement element, final InsertDoneCallback idc) {
        InsertElementMessage m = new InsertElementMessage();
        m.element_data = element.data;
        m.element_type = element.element_type;
        Envelope ev = new Envelope(m);
        ev.notifySent(new NotifySentHandler() {
            @Override
            public void onSent() {
                idc.onInsertDone();
            }
        });
        client.send(ev);
    }

    /**
     * We are done with inserting new elements into the consensus;
     * try to conclude the consensus within a given time window.
     * After conclude has been called, no further elements may be
     * inserted by the client.
     * @param concludeCallback called when the consensus has concluded
     */
    public void conclude(ConcludeCallback concludeCallback) {
        if (null == concludeCallback)
            throw new AssertionError("conclude with empty callback");
        if (null != this.concludeCallback)
            throw new AssertionError("called conclude twice");
        this.concludeCallback = concludeCallback;
        ConcludeMessage m = new ConcludeMessage();
        client.send(m);
    }

    /**
     * Destroy a consensus handle.
     * Free all state associated with
     * it, no longer call any of the callbacks.
     */
    public void destroy() {
        client.disconnect();
        client = null;
    }
}