aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/gnunet/mq/MessageQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/gnunet/mq/MessageQueue.java')
-rw-r--r--src/main/java/org/gnunet/mq/MessageQueue.java68
1 files changed, 42 insertions, 26 deletions
diff --git a/src/main/java/org/gnunet/mq/MessageQueue.java b/src/main/java/org/gnunet/mq/MessageQueue.java
index 4df3ae4..cab1003 100644
--- a/src/main/java/org/gnunet/mq/MessageQueue.java
+++ b/src/main/java/org/gnunet/mq/MessageQueue.java
@@ -11,10 +11,19 @@ import java.util.LinkedList;
11public abstract class MessageQueue { 11public abstract class MessageQueue {
12 private LinkedList<Envelope> queued_envelopes = new LinkedList<Envelope>(); 12 private LinkedList<Envelope> queued_envelopes = new LinkedList<Envelope>();
13 private LinkedList<Envelope> prefered_queued_envelopes = new LinkedList<Envelope>(); 13 private LinkedList<Envelope> prefered_queued_envelopes = new LinkedList<Envelope>();
14 protected Envelope current_envelope; 14 protected Envelope currentEnvelope;
15 private boolean readyForSubmit;
15 16
17 /**
18 * Submit a message. Once the message can't be canceled,
19 * reportMessageSent must be called.
20 * @param ev envelope to submit
21 */
16 protected abstract void submit(Envelope ev); 22 protected abstract void submit(Envelope ev);
17 23
24 /**
25 * Cancel a message that already has been submitted.
26 */
18 protected abstract void retract(); 27 protected abstract void retract();
19 28
20 public void send(GnunetMessage.Body body) { 29 public void send(GnunetMessage.Body body) {
@@ -25,6 +34,16 @@ public abstract class MessageQueue {
25 sendPrefered(new Envelope(body)); 34 sendPrefered(new Envelope(body));
26 } 35 }
27 36
37 public void send(Envelope ev) {
38 queued_envelopes.addLast(ev);
39 trySubmitNext();
40 }
41
42 public void sendPrefered(Envelope ev) {
43 prefered_queued_envelopes.addLast(ev);
44 trySubmitNext();
45 }
46
28 private Envelope pollNextEnvelope() { 47 private Envelope pollNextEnvelope() {
29 if (!prefered_queued_envelopes.isEmpty()) 48 if (!prefered_queued_envelopes.isEmpty())
30 return prefered_queued_envelopes.removeFirst(); 49 return prefered_queued_envelopes.removeFirst();
@@ -33,36 +52,33 @@ public abstract class MessageQueue {
33 return null; 52 return null;
34 } 53 }
35 54
36 public void send(Envelope ev) { 55 protected void trySubmitNext() {
37 if (null == current_envelope) { 56 if (currentEnvelope != null || !readyForSubmit) {
38 current_envelope = ev; 57 return;
39 submit(current_envelope); 58 }
40 } else { 59 Envelope ev = pollNextEnvelope();
41 queued_envelopes.addLast(ev); 60 if (ev == null) {
61 return;
42 } 62 }
63 currentEnvelope = ev;
64 readyForSubmit = false;
65 submit(currentEnvelope);
43 } 66 }
44 67
45 public void sendPrefered(Envelope ev) { 68 protected void reportReadyForSubmit() {
46 if (null == current_envelope) { 69 if (readyForSubmit) {
47 current_envelope = ev; 70 throw new AssertionError("message queue reported 'ready for submit' twice");
48 submit(current_envelope);
49 } else {
50 prefered_queued_envelopes.addLast(ev);
51 } 71 }
72 readyForSubmit = true;
73 trySubmitNext();
52 } 74 }
53 75
54 protected void reportMessageSent() { 76 protected void reportMessageSent() {
55 if (null == current_envelope) 77 if (null == currentEnvelope)
56 throw new AssertionError(); 78 throw new AssertionError();
57 current_envelope.invokeSentNotification(); 79 currentEnvelope.invokeSentNotification();
58 next(); 80 currentEnvelope = null;
59 } 81 trySubmitNext();
60
61 private void next() {
62 current_envelope = pollNextEnvelope();
63 if (current_envelope == null)
64 return;
65 submit(current_envelope);
66 } 82 }
67 83
68 /** 84 /**
@@ -71,11 +87,11 @@ public abstract class MessageQueue {
71 * @param ev the envelope to cancel 87 * @param ev the envelope to cancel
72 */ 88 */
73 /* pkg-private */ void cancelEnvelope(Envelope ev) { 89 /* pkg-private */ void cancelEnvelope(Envelope ev) {
74 if (null == current_envelope) 90 if (null == currentEnvelope)
75 throw new AssertionError(); 91 throw new AssertionError();
76 if (ev == current_envelope) { 92 if (ev == currentEnvelope) {
77 retract(); 93 retract();
78 next(); 94 trySubmitNext();
79 } else { 95 } else {
80 queued_envelopes.remove(ev); 96 queued_envelopes.remove(ev);
81 prefered_queued_envelopes.remove(ev); 97 prefered_queued_envelopes.remove(ev);