aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/gnunet/requests/SequentialRequestContainer.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/gnunet/requests/SequentialRequestContainer.java')
-rw-r--r--src/main/java/org/gnunet/requests/SequentialRequestContainer.java86
1 files changed, 86 insertions, 0 deletions
diff --git a/src/main/java/org/gnunet/requests/SequentialRequestContainer.java b/src/main/java/org/gnunet/requests/SequentialRequestContainer.java
new file mode 100644
index 0000000..037055f
--- /dev/null
+++ b/src/main/java/org/gnunet/requests/SequentialRequestContainer.java
@@ -0,0 +1,86 @@
1package org.gnunet.requests;
2
3import org.gnunet.mq.Envelope;
4import org.gnunet.mq.MessageQueue;
5import org.gnunet.mq.NotifySentHandler;
6import org.gnunet.util.Cancelable;
7
8import java.util.LinkedList;
9
10/**
11 * Container for requests that are responded to in sequential order.
12 */
13public class SequentialRequestContainer<T extends RequestContainer.Request> extends RequestContainer {
14 private LinkedList<T> requests = new LinkedList<T>();
15 private MessageQueue mq;
16
17 int requestsActive = 0;
18
19 public SequentialRequestContainer(MessageQueue mq) {
20 this.mq = mq;
21 }
22
23 public T getRequest() {
24 return requests.getFirst();
25 }
26
27 public void next() {
28 if (requestsActive == 0 || requests.isEmpty())
29 throw new AssertionError();
30 requestsActive--;
31 requests.removeFirst();
32 if (requestsActive == 0 && !requests.isEmpty()) {
33 Request r = requests.getFirst();
34 setRequestTransmitting(r, true);
35 Envelope ev = r.assembleRequest();
36 setRequestTransmissionCancel(r, ev);
37 mq.send(r.assembleRequest());
38 requestsActive++;
39 }
40 }
41
42 public Cancelable addRequest(final T request) {
43 requests.addLast(request);
44 if (overlap || requestsActive == 0) {
45 requestsActive++;
46 setRequestTransmitting(request, true);
47 Envelope ev = request.assembleRequest();
48 ev.notifySent(new NotifySentHandler() {
49 @Override
50 public void onSent() {
51 setRequestTransmitting(request, false);
52 }
53 });
54 setRequestTransmissionCancel(request, ev);
55 mq.send(request.assembleRequest());
56 }
57 return new Cancelable() {
58 @Override
59 public void cancel() {
60 setRequestCancelled(request);
61 if (isRequestTransmitting(request)) {
62 cancelRequestTransmission(request);
63 } else {
64 request.cancel();
65 }
66 }
67 };
68 }
69
70 @Override
71 public void restart() {
72 LinkedList<T> requestsOld = requests;
73 requests = new LinkedList<T>();
74 for (T r : requestsOld) {
75 if (!isRequestCancelled(r)) {
76 setRequestTransmitting(r, false);
77 addRequest(r);
78 }
79 }
80 }
81
82
83 public Iterable<T> iter() {
84 return requests;
85 }
86}