diff options
Diffstat (limited to 'src/main/java/org/gnunet/mq/MessageQueue.java')
-rw-r--r-- | src/main/java/org/gnunet/mq/MessageQueue.java | 68 |
1 files changed, 42 insertions, 26 deletions
diff --git a/src/main/java/org/gnunet/mq/MessageQueue.java b/src/main/java/org/gnunet/mq/MessageQueue.java index 4df3ae4..cab1003 100644 --- a/src/main/java/org/gnunet/mq/MessageQueue.java +++ b/src/main/java/org/gnunet/mq/MessageQueue.java | |||
@@ -11,10 +11,19 @@ import java.util.LinkedList; | |||
11 | public abstract class MessageQueue { | 11 | public abstract class MessageQueue { |
12 | private LinkedList<Envelope> queued_envelopes = new LinkedList<Envelope>(); | 12 | private LinkedList<Envelope> queued_envelopes = new LinkedList<Envelope>(); |
13 | private LinkedList<Envelope> prefered_queued_envelopes = new LinkedList<Envelope>(); | 13 | private LinkedList<Envelope> prefered_queued_envelopes = new LinkedList<Envelope>(); |
14 | protected Envelope current_envelope; | 14 | protected Envelope currentEnvelope; |
15 | private boolean readyForSubmit; | ||
15 | 16 | ||
17 | /** | ||
18 | * Submit a message. Once the message can't be canceled, | ||
19 | * reportMessageSent must be called. | ||
20 | * @param ev envelope to submit | ||
21 | */ | ||
16 | protected abstract void submit(Envelope ev); | 22 | protected abstract void submit(Envelope ev); |
17 | 23 | ||
24 | /** | ||
25 | * Cancel a message that already has been submitted. | ||
26 | */ | ||
18 | protected abstract void retract(); | 27 | protected abstract void retract(); |
19 | 28 | ||
20 | public void send(GnunetMessage.Body body) { | 29 | public void send(GnunetMessage.Body body) { |
@@ -25,6 +34,16 @@ public abstract class MessageQueue { | |||
25 | sendPrefered(new Envelope(body)); | 34 | sendPrefered(new Envelope(body)); |
26 | } | 35 | } |
27 | 36 | ||
37 | public void send(Envelope ev) { | ||
38 | queued_envelopes.addLast(ev); | ||
39 | trySubmitNext(); | ||
40 | } | ||
41 | |||
42 | public void sendPrefered(Envelope ev) { | ||
43 | prefered_queued_envelopes.addLast(ev); | ||
44 | trySubmitNext(); | ||
45 | } | ||
46 | |||
28 | private Envelope pollNextEnvelope() { | 47 | private Envelope pollNextEnvelope() { |
29 | if (!prefered_queued_envelopes.isEmpty()) | 48 | if (!prefered_queued_envelopes.isEmpty()) |
30 | return prefered_queued_envelopes.removeFirst(); | 49 | return prefered_queued_envelopes.removeFirst(); |
@@ -33,36 +52,33 @@ public abstract class MessageQueue { | |||
33 | return null; | 52 | return null; |
34 | } | 53 | } |
35 | 54 | ||
36 | public void send(Envelope ev) { | 55 | protected void trySubmitNext() { |
37 | if (null == current_envelope) { | 56 | if (currentEnvelope != null || !readyForSubmit) { |
38 | current_envelope = ev; | 57 | return; |
39 | submit(current_envelope); | 58 | } |
40 | } else { | 59 | Envelope ev = pollNextEnvelope(); |
41 | queued_envelopes.addLast(ev); | 60 | if (ev == null) { |
61 | return; | ||
42 | } | 62 | } |
63 | currentEnvelope = ev; | ||
64 | readyForSubmit = false; | ||
65 | submit(currentEnvelope); | ||
43 | } | 66 | } |
44 | 67 | ||
45 | public void sendPrefered(Envelope ev) { | 68 | protected void reportReadyForSubmit() { |
46 | if (null == current_envelope) { | 69 | if (readyForSubmit) { |
47 | current_envelope = ev; | 70 | throw new AssertionError("message queue reported 'ready for submit' twice"); |
48 | submit(current_envelope); | ||
49 | } else { | ||
50 | prefered_queued_envelopes.addLast(ev); | ||
51 | } | 71 | } |
72 | readyForSubmit = true; | ||
73 | trySubmitNext(); | ||
52 | } | 74 | } |
53 | 75 | ||
54 | protected void reportMessageSent() { | 76 | protected void reportMessageSent() { |
55 | if (null == current_envelope) | 77 | if (null == currentEnvelope) |
56 | throw new AssertionError(); | 78 | throw new AssertionError(); |
57 | current_envelope.invokeSentNotification(); | 79 | currentEnvelope.invokeSentNotification(); |
58 | next(); | 80 | currentEnvelope = null; |
59 | } | 81 | trySubmitNext(); |
60 | |||
61 | private void next() { | ||
62 | current_envelope = pollNextEnvelope(); | ||
63 | if (current_envelope == null) | ||
64 | return; | ||
65 | submit(current_envelope); | ||
66 | } | 82 | } |
67 | 83 | ||
68 | /** | 84 | /** |
@@ -71,11 +87,11 @@ public abstract class MessageQueue { | |||
71 | * @param ev the envelope to cancel | 87 | * @param ev the envelope to cancel |
72 | */ | 88 | */ |
73 | /* pkg-private */ void cancelEnvelope(Envelope ev) { | 89 | /* pkg-private */ void cancelEnvelope(Envelope ev) { |
74 | if (null == current_envelope) | 90 | if (null == currentEnvelope) |
75 | throw new AssertionError(); | 91 | throw new AssertionError(); |
76 | if (ev == current_envelope) { | 92 | if (ev == currentEnvelope) { |
77 | retract(); | 93 | retract(); |
78 | next(); | 94 | trySubmitNext(); |
79 | } else { | 95 | } else { |
80 | queued_envelopes.remove(ev); | 96 | queued_envelopes.remove(ev); |
81 | prefered_queued_envelopes.remove(ev); | 97 | prefered_queued_envelopes.remove(ev); |