aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/gnunet/peerstore/Peerstore.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/gnunet/peerstore/Peerstore.java')
-rw-r--r--src/main/java/org/gnunet/peerstore/Peerstore.java220
1 files changed, 220 insertions, 0 deletions
diff --git a/src/main/java/org/gnunet/peerstore/Peerstore.java b/src/main/java/org/gnunet/peerstore/Peerstore.java
new file mode 100644
index 0000000..6af904d
--- /dev/null
+++ b/src/main/java/org/gnunet/peerstore/Peerstore.java
@@ -0,0 +1,220 @@
1/*
2 This file is part of GNUnet.
3 (C) 2011, 2012 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19 */
20
21package org.gnunet.peerstore;
22
23import org.gnunet.peerstore.messages.IterateEndMessage;
24import org.gnunet.peerstore.messages.IterateRecordMessage;
25import org.gnunet.peerstore.messages.StoreMessage;
26import org.gnunet.peerstore.messages.WatchRecordMessage;
27import org.gnunet.requests.MatchingRequestContainer;
28import org.gnunet.requests.RequestIdentifier;
29import org.gnunet.requests.SequentialRequestContainer;
30import org.gnunet.requests.TimeoutHandler;
31import org.gnunet.util.*;
32
33/**
34 * API for the GNUnet peerstore service.
35 */
36public class Peerstore {
37
38 /**
39 * Client connecting us to the statistics service.
40 */
41 private Client client;
42
43 /**
44 * All request to the service for iterating records.
45 */
46 private final SequentialRequestContainer<IterateRequest> iterateRequests;
47
48 /**
49 * All requests to the service for watching a value.
50 */
51 private final MatchingRequestContainer<HashCode, WatchRequest> watchRequests;
52
53 /**
54 * Are we currently destroying the client?
55 */
56 private boolean destroyRequested;
57
58 /**
59 * Messages from the statistics service are dispatched to an instance of
60 * this class.
61 */
62 public class PeerstoreMessageReceiver extends RunaboutMessageReceiver {
63
64 public void visit(IterateRecordMessage m) {
65 RequestIdentifier<IterateRequest> req = iterateRequests
66 .getRequestIdentifier();
67 if (null != req)
68 req.getRequest().receiver.onReceive(m.sub_system, m.peer,
69 m.key, m.value, new AbsoluteTime(m.expiry.value));
70 }
71
72 public void visit(IterateEndMessage m) {
73 RequestIdentifier<IterateRequest> req = iterateRequests
74 .getRequestIdentifier();
75 if (null != req) {
76 req.retire();
77 req.getRequest().receiver.onDone();
78 }
79 }
80
81 public void visit(WatchRecordMessage m) {
82 RequestIdentifier<WatchRequest> ri = watchRequests
83 .getRequestIdentifier(WatchRequest.getHash(m.sub_system,
84 m.peer, m.key));
85 WatchRequest r = ri.getRequest();
86 if (null != r) {
87 r.watcher.onReceive(m.sub_system, m.peer, m.key, m.value,
88 new AbsoluteTime(m.expiry.value));
89 }
90 }
91
92 @Override
93 public void handleError() {
94 if (null == client)
95 throw new AssertionError();
96 if (!destroyRequested) {
97 client.reconnect();
98 iterateRequests.restart();
99 watchRequests.restart();
100 }
101 }
102
103 }
104
105 /**
106 * Create a connection to the peerstore service.
107 *
108 * @param cfg
109 * configuration to use
110 */
111 public Peerstore(Configuration cfg) {
112 client = new Client("peerstore", cfg);
113 client.installReceiver(new PeerstoreMessageReceiver());
114 iterateRequests = new SequentialRequestContainer<IterateRequest>(client);
115 watchRequests = new MatchingRequestContainer<HashCode, WatchRequest>(
116 client);
117 }
118
119 /**
120 * Store a new entry in the PEERSTORE. Note that stored entries can be lost
121 * in some cases such as power failure.
122 *
123 * @param sub_system
124 * name of the sub system
125 * @param peer
126 * Peer Identity
127 * @param key
128 * entry key
129 * @param value
130 * entry value BLOB
131 * @param expiry
132 * absolute time after which the entry is (possibly) deleted
133 * @param options
134 * options specific to the storage operation
135 */
136 public void store(final String sub_system, final PeerIdentity peer,
137 final String key, final byte[] value, AbsoluteTime expiry,
138 StoreOption options) {
139 if (destroyRequested || client == null)
140 throw new AssertionError("already destroyed");
141 StoreMessage sm = new StoreMessage();
142 sm.peer_set = 1;
143 sm.peer = peer;
144 sm.sub_system_size = sub_system.length() + 1;
145 sm.key_size = key.length() + 1;
146 sm.value_size = value.length;
147 sm.expiry = new AbsoluteTimeMessage(expiry);
148 sm.options = options.val;
149 sm.sub_system = sub_system;
150 sm.key = key;
151 sm.value = value;
152 client.send(sm);
153 }
154
155 /**
156 * Iterate over records matching supplied key information
157 *
158 * @param sub_system
159 * name of sub system
160 * @param peer
161 * Peer identity (can be NULL)
162 * @param key
163 * entry key string (can be NULL)
164 * @param timeout
165 * time after which the iterate request is canceled
166 * @param receiver
167 * Object that will receive request results
168 * @return Handle to iteration request
169 */
170 public Cancelable iterate(final String sub_system, final PeerIdentity peer,
171 final String key, final RelativeTime timeout,
172 final PeerstoreReceiver receiver) {
173 if (destroyRequested || client == null)
174 throw new AssertionError("already destroyed");
175 RequestIdentifier<IterateRequest> identifier = iterateRequests
176 .addRequest(new IterateRequest(sub_system, peer, key, receiver));
177 identifier.setTimeout(timeout, new TimeoutHandler() {
178
179 @Override
180 public void onTimeout() {
181 receiver.onTimeout();
182
183 }
184 });
185 return identifier;
186 }
187
188 /**
189 * Request watching a given key User will be notified with any new values
190 * added to key
191 *
192 * @param sub_system
193 * name of sub system
194 * @param peer
195 * Peer identity
196 * @param key
197 * entry key string
198 * @param watcher
199 * Receiver for watch records
200 * @return Handle to watch request
201 */
202 public Cancelable watch(final String sub_system, final PeerIdentity peer,
203 final String key, PeerstoreWatcher watcher) {
204 if (destroyRequested || null == client)
205 throw new AssertionError("already destroyed");
206 WatchRequest r = new WatchRequest(sub_system, peer, key, watcher);
207 return watchRequests.addRequest(r.hash, r);
208 }
209
210 /**
211 * Destroy handle to the peerstore service.
212 */
213 public void destroy() { // TODO: imeplement syncfirst
214 if (destroyRequested)
215 throw new AssertionError("already destroyed");
216 destroyRequested = true;
217 client.disconnect();
218 client = null;
219 }
220}