aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/org/gnunet/peerstore/Peerstore.java
blob: 6af904d49f161c4fe69b1ffdfcb40cd36fbbe819 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
/*
 This file is part of GNUnet.
 (C) 2011, 2012 Christian Grothoff (and other contributing authors)

 GNUnet is free software; you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published
 by the Free Software Foundation; either version 3, or (at your
 option) any later version.

 GNUnet is distributed in the hope that it will be useful, but
 WITHOUT ANY WARRANTY; without even the implied warranty of
 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 General Public License for more details.

 You should have received a copy of the GNU General Public License
 along with GNUnet; see the file COPYING.  If not, write to the
 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 Boston, MA 02111-1307, USA.
 */

package org.gnunet.peerstore;

import org.gnunet.peerstore.messages.IterateEndMessage;
import org.gnunet.peerstore.messages.IterateRecordMessage;
import org.gnunet.peerstore.messages.StoreMessage;
import org.gnunet.peerstore.messages.WatchRecordMessage;
import org.gnunet.requests.MatchingRequestContainer;
import org.gnunet.requests.RequestIdentifier;
import org.gnunet.requests.SequentialRequestContainer;
import org.gnunet.requests.TimeoutHandler;
import org.gnunet.util.*;

/**
 * API for the GNUnet peerstore service.
 */
public class Peerstore {

	/**
	 * Client connecting us to the statistics service.
	 */
	private Client client;

	/**
	 * All request to the service for iterating records.
	 */
	private final SequentialRequestContainer<IterateRequest> iterateRequests;

	/**
	 * All requests to the service for watching a value.
	 */
	private final MatchingRequestContainer<HashCode, WatchRequest> watchRequests;

	/**
	 * Are we currently destroying the client?
	 */
	private boolean destroyRequested;

	/**
	 * Messages from the statistics service are dispatched to an instance of
	 * this class.
	 */
	public class PeerstoreMessageReceiver extends RunaboutMessageReceiver {

		public void visit(IterateRecordMessage m) {
			RequestIdentifier<IterateRequest> req = iterateRequests
					.getRequestIdentifier();
			if (null != req)
				req.getRequest().receiver.onReceive(m.sub_system, m.peer,
						m.key, m.value, new AbsoluteTime(m.expiry.value));
		}

		public void visit(IterateEndMessage m) {
			RequestIdentifier<IterateRequest> req = iterateRequests
					.getRequestIdentifier();
			if (null != req) {
				req.retire();
				req.getRequest().receiver.onDone();
			}
		}

		public void visit(WatchRecordMessage m) {
			RequestIdentifier<WatchRequest> ri = watchRequests
					.getRequestIdentifier(WatchRequest.getHash(m.sub_system,
							m.peer, m.key));
			WatchRequest r = ri.getRequest();
			if (null != r) {
				r.watcher.onReceive(m.sub_system, m.peer, m.key, m.value,
						new AbsoluteTime(m.expiry.value));
			}
		}

		@Override
		public void handleError() {
			if (null == client)
				throw new AssertionError();
			if (!destroyRequested) {
				client.reconnect();
				iterateRequests.restart();
				watchRequests.restart();
			}
		}

	}

	/**
	 * Create a connection to the peerstore service.
	 * 
	 * @param cfg
	 *            configuration to use
	 */
	public Peerstore(Configuration cfg) {
		client = new Client("peerstore", cfg);
		client.installReceiver(new PeerstoreMessageReceiver());
		iterateRequests = new SequentialRequestContainer<IterateRequest>(client);
		watchRequests = new MatchingRequestContainer<HashCode, WatchRequest>(
				client);
	}

	/**
	 * Store a new entry in the PEERSTORE. Note that stored entries can be lost
	 * in some cases such as power failure.
	 * 
	 * @param sub_system
	 *            name of the sub system
	 * @param peer
	 *            Peer Identity
	 * @param key
	 *            entry key
	 * @param value
	 *            entry value BLOB
	 * @param expiry
	 *            absolute time after which the entry is (possibly) deleted
	 * @param options
	 *            options specific to the storage operation
	 */
	public void store(final String sub_system, final PeerIdentity peer,
			final String key, final byte[] value, AbsoluteTime expiry,
			StoreOption options) {
		if (destroyRequested || client == null)
			throw new AssertionError("already destroyed");
		StoreMessage sm = new StoreMessage();
		sm.peer_set = 1;
		sm.peer = peer;
		sm.sub_system_size = sub_system.length() + 1;
		sm.key_size = key.length() + 1;
		sm.value_size = value.length;
		sm.expiry = new AbsoluteTimeMessage(expiry);
		sm.options = options.val;
		sm.sub_system = sub_system;
		sm.key = key;
		sm.value = value;
		client.send(sm);
	}

	/**
	 * Iterate over records matching supplied key information
	 * 
	 * @param sub_system
	 *            name of sub system
	 * @param peer
	 *            Peer identity (can be NULL)
	 * @param key
	 *            entry key string (can be NULL)
	 * @param timeout
	 *            time after which the iterate request is canceled
	 * @param receiver
	 *            Object that will receive request results
	 * @return Handle to iteration request
	 */
	public Cancelable iterate(final String sub_system, final PeerIdentity peer,
			final String key, final RelativeTime timeout,
			final PeerstoreReceiver receiver) {
		if (destroyRequested || client == null)
			throw new AssertionError("already destroyed");
		RequestIdentifier<IterateRequest> identifier = iterateRequests
				.addRequest(new IterateRequest(sub_system, peer, key, receiver));
		identifier.setTimeout(timeout, new TimeoutHandler() {

			@Override
			public void onTimeout() {
				receiver.onTimeout();

			}
		});
		return identifier;
	}

	/**
	 * Request watching a given key User will be notified with any new values
	 * added to key
	 * 
	 * @param sub_system
	 *            name of sub system
	 * @param peer
	 *            Peer identity
	 * @param key
	 *            entry key string
	 * @param watcher
	 *            Receiver for watch records
	 * @return Handle to watch request
	 */
	public Cancelable watch(final String sub_system, final PeerIdentity peer,
			final String key, PeerstoreWatcher watcher) {
		if (destroyRequested || null == client)
			throw new AssertionError("already destroyed");
		WatchRequest r = new WatchRequest(sub_system, peer, key, watcher);
		return watchRequests.addRequest(r.hash, r);
	}

	/**
	 * Destroy handle to the peerstore service.
	 */
	public void destroy() { // TODO: imeplement syncfirst
		if (destroyRequested)
			throw new AssertionError("already destroyed");
		destroyRequested = true;
		client.disconnect();
		client = null;
	}
}