aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/gnunet/mq/MessageQueue.java
blob: 4df3ae461c278ef4838ccb284cf7bd386436636d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package org.gnunet.mq;


import org.gnunet.util.GnunetMessage;

import java.util.LinkedList;

/**
 * General-purpose message queue
 */
public abstract class MessageQueue {
    private LinkedList<Envelope> queued_envelopes = new LinkedList<Envelope>();
    private LinkedList<Envelope> prefered_queued_envelopes = new LinkedList<Envelope>();
    protected Envelope current_envelope;

    protected abstract void submit(Envelope ev);

    protected abstract void retract();

    public void send(GnunetMessage.Body body) {
        send(new Envelope(body));
    }

    public void sendPrefered(GnunetMessage.Body body) {
        sendPrefered(new Envelope(body));
    }

    private Envelope pollNextEnvelope() {
        if (!prefered_queued_envelopes.isEmpty())
            return prefered_queued_envelopes.removeFirst();
        if (!queued_envelopes.isEmpty())
            return queued_envelopes.removeFirst();
        return null;
    }

    public void send(Envelope ev) {
        if (null == current_envelope) {
            current_envelope = ev;
            submit(current_envelope);
        } else {
            queued_envelopes.addLast(ev);
        }
    }

    public void sendPrefered(Envelope ev) {
        if (null == current_envelope) {
            current_envelope = ev;
            submit(current_envelope);
        } else {
            prefered_queued_envelopes.addLast(ev);
        }
    }

    protected void reportMessageSent() {
        if (null == current_envelope)
            throw new AssertionError();
        current_envelope.invokeSentNotification();
        next();
    }

    private void next() {
        current_envelope = pollNextEnvelope();
        if (current_envelope == null)
            return;
        submit(current_envelope);
    }

    /**
     * Cancel sending an envelope. The envelope must be queued in this message queue.
     *
     * @param ev the envelope to cancel
     */
    /* pkg-private */ void cancelEnvelope(Envelope ev) {
        if (null == current_envelope)
            throw new AssertionError();
        if (ev == current_envelope) {
            retract();
            next();
        } else {
            queued_envelopes.remove(ev);
            prefered_queued_envelopes.remove(ev);
        }
    }
}