aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2010-01-29 19:21:41 +0000
committerChristian Grothoff <christian@grothoff.org>2010-01-29 19:21:41 +0000
commit258bd33b0a8e26200d8bf36d8e65524a1069790d (patch)
treeb45169bffadac84468c33d1fd696ff81ee84a3b3 /src/fs
parent65f29cbee10775c4ec627f01c115d1bebea88642 (diff)
downloadgnunet-258bd33b0a8e26200d8bf36d8e65524a1069790d.tar.gz
gnunet-258bd33b0a8e26200d8bf36d8e65524a1069790d.zip
finally compiles again
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/Makefile.am3
-rw-r--r--src/fs/fs.h49
-rw-r--r--src/fs/gnunet-service-fs.c3322
-rw-r--r--src/fs/gnunet-service-fs_drq.c416
-rw-r--r--src/fs/gnunet-service-fs_drq.h137
-rw-r--r--src/fs/gnunet-service-fs_indexing.c37
-rw-r--r--src/fs/gnunet-service-fs_indexing.h12
7 files changed, 2038 insertions, 1938 deletions
diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am
index 70897c5ef..a43c8340d 100644
--- a/src/fs/Makefile.am
+++ b/src/fs/Makefile.am
@@ -85,7 +85,8 @@ gnunet_search_LDADD = \
85 85
86gnunet_service_fs_SOURCES = \ 86gnunet_service_fs_SOURCES = \
87 gnunet-service-fs.c \ 87 gnunet-service-fs.c \
88 gnunet-service-fs_indexing.c gnunet-service-fs_indexing.h 88 gnunet-service-fs_drq.c gnunet-service-fs_drq.h \
89 gnunet-service-fs_indexing.c gnunet-service-fs_indexing.h
89gnunet_service_fs_LDADD = \ 90gnunet_service_fs_LDADD = \
90 $(top_builddir)/src/fs/libgnunetfs.la \ 91 $(top_builddir)/src/fs/libgnunetfs.la \
91 $(top_builddir)/src/datastore/libgnunetdatastore.la \ 92 $(top_builddir)/src/datastore/libgnunetdatastore.la \
diff --git a/src/fs/fs.h b/src/fs/fs.h
index ae2e67654..d19f27b6f 100644
--- a/src/fs/fs.h
+++ b/src/fs/fs.h
@@ -1132,28 +1132,23 @@ struct SBlock
1132 1132
1133 1133
1134/** 1134/**
1135 * Message sent from a GNUnet (fs) publishing 1135 * Message sent from a GNUnet (fs) publishing activity to the
1136 * activity to the gnunet-fs-service to 1136 * gnunet-fs-service to initiate indexing of a file. The service is
1137 * initiate indexing of a file. The service 1137 * supposed to check if the specified file is available and has the
1138 * is supposed to check if the specified file 1138 * same cryptographic hash. It should then respond with either a
1139 * is available and has the same cryptographic 1139 * confirmation or a denial.
1140 * hash. It should then respond with either
1141 * a confirmation or a denial.
1142 * 1140 *
1143 * On OSes where this works, it is considered 1141 * On OSes where this works, it is considered acceptable if the
1144 * acceptable if the service only checks that 1142 * service only checks that the path, device and inode match (it can
1145 * the path, device and inode match (it can 1143 * then be assumed that the hash will also match without actually
1146 * then be assumed that the hash will also match 1144 * computing it; this is an optimization that should be safe given
1147 * without actually computing it; this is an 1145 * that the client is not our adversary).
1148 * optimization that should be safe given that
1149 * the client is not our adversary).
1150 */ 1146 */
1151struct IndexStartMessage 1147struct IndexStartMessage
1152{ 1148{
1153 1149
1154 /** 1150 /**
1155 * Message type will be 1151 * Message type will be GNUNET_MESSAGE_TYPE_FS_INDEX_START.
1156 * GNUNET_MESSAGE_TYPE_FS_INDEX_START.
1157 */ 1152 */
1158 struct GNUNET_MessageHeader header; 1153 struct GNUNET_MessageHeader header;
1159 1154
@@ -1216,12 +1211,10 @@ struct IndexInfoMessage
1216 1211
1217 1212
1218/** 1213/**
1219 * Message sent from a GNUnet (fs) unindexing 1214 * Message sent from a GNUnet (fs) unindexing activity to the
1220 * activity to the gnunet-fs-service to 1215 * gnunet-service-fs to indicate that a file will be unindexed. The
1221 * indicate that a file will be unindexed. The service 1216 * service is supposed to remove the file from the list of indexed
1222 * is supposed to remove the file from the 1217 * files and response with a confirmation message (even if the file
1223 * list of indexed files and response with
1224 * a confirmation message (even if the file
1225 * was already not on the list). 1218 * was already not on the list).
1226 */ 1219 */
1227struct UnindexMessage 1220struct UnindexMessage
@@ -1247,9 +1240,8 @@ struct UnindexMessage
1247 1240
1248 1241
1249/** 1242/**
1250 * Message sent from a GNUnet (fs) search 1243 * Message sent from a GNUnet (fs) search activity to the
1251 * activity to the gnunet-fs-service to 1244 * gnunet-service-fs to start a search.
1252 * start a search.
1253 */ 1245 */
1254struct SearchMessage 1246struct SearchMessage
1255{ 1247{
@@ -1308,10 +1300,9 @@ struct SearchMessage
1308 1300
1309 1301
1310/** 1302/**
1311 * Response from FS service with a result for 1303 * Response from FS service with a result for a previous FS search.
1312 * a previous FS search. Note that queries 1304 * Note that queries for DBLOCKS and IBLOCKS that have received a
1313 * for DBLOCKS and IBLOCKS that have received 1305 * single response are considered done.
1314 * a single response are considered done.
1315 */ 1306 */
1316struct ContentMessage 1307struct ContentMessage
1317{ 1308{
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 57b6dd421..740f63624 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 (C) 2009 Christian Grothoff (and other contributing authors) 3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -20,341 +20,363 @@
20 20
21/** 21/**
22 * @file fs/gnunet-service-fs.c 22 * @file fs/gnunet-service-fs.c
23 * @brief program that provides the file-sharing service 23 * @brief gnunet anonymity protocol implementation
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * 25 *
26 * TODO: 26 * TODO:
27 * - fix gazillion of minor FIXME's 27 * - forward_request_task (P2P forwarding!)
28 * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the 28 * - track stats for hot-path routing
29 * core to transmit to another peer; need to make sure this is bounded overall... 29 * - implement hot-path routing decision procedure
30 * - randomly delay processing for improved anonymity (can wait) 30 * - detect duplicate requests (P2P and CS)
31 * - content migration (put in local DS) (can wait) 31 * - implement: bound_priority, test_load_too_high, validate_skblock
32 * - handle some special cases when forwarding replies based on tracked requests (can wait) 32 * - add content migration support (store locally)
33 * - tracking of success correlations for hot-path routing (can wait) 33 * - add random delay
34 * - various load-based actions (can wait) 34 * - statistics
35 * - validation of KSBLOCKS (can wait) 35 *
36 * - remove on-demand blocks if they keep failing (can wait)
37 * - check that we decrement PIDs always where necessary (can wait)
38 * - find out how to use core-pulling instead of pushing (at least for some cases)
39 */ 36 */
40#include "platform.h" 37#include "platform.h"
41#include <float.h> 38#include <float.h>
39#include "gnunet_constants.h"
42#include "gnunet_core_service.h" 40#include "gnunet_core_service.h"
43#include "gnunet_datastore_service.h" 41#include "gnunet_datastore_service.h"
44#include "gnunet_peer_lib.h" 42#include "gnunet_peer_lib.h"
45#include "gnunet_protocols.h" 43#include "gnunet_protocols.h"
46#include "gnunet_signatures.h" 44#include "gnunet_signatures.h"
47#include "gnunet_util_lib.h" 45#include "gnunet_util_lib.h"
46#include "gnunet-service-fs_drq.h"
48#include "gnunet-service-fs_indexing.h" 47#include "gnunet-service-fs_indexing.h"
49#include "fs.h" 48#include "fs.h"
50 49
51#define DEBUG_FS GNUNET_NO 50/**
51 * Maximum number of outgoing messages we queue per peer.
52 * FIXME: set to a tiny value for testing; make configurable.
53 */
54#define MAX_QUEUE_PER_PEER 2
55
56
57
58/**
59 * Maximum number of requests (from other peers) that we're
60 * willing to have pending at any given point in time.
61 * FIXME: set from configuration (and 32 is a tiny value for testing only).
62 */
63static uint64_t max_pending_requests = 32;
64
65
66/**
67 * Information we keep for each pending reply. The
68 * actual message follows at the end of this struct.
69 */
70struct PendingMessage;
71
52 72
53/** 73/**
54 * Signature of a function that is called whenever a datastore 74 * Function called upon completion of a transmission.
55 * request can be processed (or an entry put on the queue times out).
56 * 75 *
57 * @param cls closure 76 * @param cls closure
58 * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout 77 * @param pid ID of receiving peer, 0 on transmission error
59 */ 78 */
60typedef void (*RequestFunction)(void *cls, 79typedef void (*TransmissionContinuation)(void * cls,
61 int ok); 80 GNUNET_PEER_Id tpid);
62 81
63 82
64/** 83/**
65 * Doubly-linked list of our requests for the datastore. 84 * Information we keep for each pending reply. The
85 * actual message follows at the end of this struct.
66 */ 86 */
67struct DatastoreRequestQueue 87struct PendingMessage
68{ 88{
69
70 /** 89 /**
71 * This is a doubly-linked list. 90 * This is a doubly-linked list of messages to the same peer.
72 */ 91 */
73 struct DatastoreRequestQueue *next; 92 struct PendingMessage *next;
74 93
75 /** 94 /**
76 * This is a doubly-linked list. 95 * This is a doubly-linked list of messages to the same peer.
77 */ 96 */
78 struct DatastoreRequestQueue *prev; 97 struct PendingMessage *prev;
79 98
80 /** 99 /**
81 * Function to call (will issue the request). 100 * Entry in pending message list for this pending message.
101 */
102 struct PendingMessageList *pml;
103
104 /**
105 * Function to call immediately once we have transmitted this
106 * message.
82 */ 107 */
83 RequestFunction req; 108 TransmissionContinuation cont;
84 109
85 /** 110 /**
86 * Closure for req. 111 * Closure for cont.
87 */ 112 */
88 void *req_cls; 113 void *cont_cls;
89 114
90 /** 115 /**
91 * When should this request time-out because we don't care anymore? 116 * Size of the reply; actual reply message follows
117 * at the end of this struct.
92 */ 118 */
93 struct GNUNET_TIME_Absolute timeout; 119 size_t msize;
94 120
95 /** 121 /**
96 * ID of task used for signaling timeout. 122 * How important is this message for us?
97 */ 123 */
98 GNUNET_SCHEDULER_TaskIdentifier task; 124 uint32_t priority;
99 125
100}; 126};
101 127
102 128
103/** 129/**
104 * Closure for processing START_SEARCH messages from a client. 130 * Information about a peer that we are connected to.
131 * We track data that is useful for determining which
132 * peers should receive our requests. We also keep
133 * a list of messages to transmit to this peer.
105 */ 134 */
106struct LocalGetContext 135struct ConnectedPeer
107{ 136{
108 137
109 /** 138 /**
110 * This is a doubly-linked list. 139 * List of the last clients for which this peer successfully
140 * answered a query.
111 */ 141 */
112 struct LocalGetContext *next; 142 struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
113 143
114 /** 144 /**
115 * This is a doubly-linked list. 145 * List of the last PIDs for which
146 * this peer successfully answered a query;
147 * We use 0 to indicate no successful reply.
116 */ 148 */
117 struct LocalGetContext *prev; 149 GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
118 150
119 /** 151 /**
120 * Client that initiated the search. 152 * Average delay between sending the peer a request and
121 */ 153 * getting a reply (only calculated over the requests for
122 struct GNUNET_SERVER_Client *client; 154 * which we actually got a reply). Calculated
155 * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
156 */
157 struct GNUNET_TIME_Relative avg_delay;
123 158
124 /** 159 /**
125 * Array of results that we've already received 160 * Handle for an active request for transmission to this
126 * (can be NULL). 161 * peer, or NULL.
127 */ 162 */
128 GNUNET_HashCode *results; 163 struct GNUNET_CORE_TransmitHandle *cth;
129 164
130 /** 165 /**
131 * Bloomfilter over all results (for fast query construction); 166 * Messages (replies, queries, content migration) we would like to
132 * NULL if we don't have any results. 167 * send to this peer in the near future. Sorted by priority, head.
133 *
134 * FIXME: this member is not used, is that OK? If so, it should
135 * be removed!
136 */ 168 */
137 struct GNUNET_CONTAINER_BloomFilter *results_bf; 169 struct PendingMessage *pending_messages_head;
138 170
139 /** 171 /**
140 * DS request associated with this operation. 172 * Messages (replies, queries, content migration) we would like to
173 * send to this peer in the near future. Sorted by priority, tail.
141 */ 174 */
142 struct DatastoreRequestQueue *req; 175 struct PendingMessage *pending_messages_tail;
143 176
144 /** 177 /**
145 * Current result message to transmit to client (or NULL). 178 * Average priority of successful replies. Calculated
146 */ 179 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
147 struct ContentMessage *result;
148
149 /**
150 * Type of the content that we're looking for.
151 * 0 for any.
152 */ 180 */
153 uint32_t type; 181 double avg_priority;
154 182
155 /** 183 /**
156 * Desired anonymity level. 184 * Increase in traffic preference still to be submitted
185 * to the core service for this peer. FIXME: double or 'uint64_t'?
157 */ 186 */
158 uint32_t anonymity_level; 187 double inc_preference;
159 188
160 /** 189 /**
161 * Number of results actually stored in the results array. 190 * The peer's identity.
162 */
163 unsigned int results_used;
164
165 /**
166 * Size of the results array in memory.
167 */
168 unsigned int results_size;
169
170 /**
171 * Size (in bytes) of the 'results_bf' bloomfilter.
172 *
173 * FIXME: this member is not used, is that OK? If so, it should
174 * be removed!
175 */ 191 */
176 size_t results_bf_size; 192 GNUNET_PEER_Id pid;
177 193
178 /** 194 /**
179 * If the request is for a DBLOCK or IBLOCK, this is the identity of 195 * Size of the linked list of 'pending_messages'.
180 * the peer that is known to have a response. Set to all-zeros if
181 * such a target is not known (note that even if OUR anonymity
182 * level is >0 we may happen to know the responder's identity;
183 * nevertheless, we should probably not use it for a DHT-lookup
184 * or similar blunt actions in order to avoid exposing ourselves).
185 */ 196 */
186 struct GNUNET_PeerIdentity target; 197 unsigned int pending_requests;
187 198
188 /** 199 /**
189 * If the request is for an SBLOCK, this is the identity of the 200 * Which offset in "last_p2p_replies" will be updated next?
190 * pseudonym to which the SBLOCK belongs. 201 * (we go round-robin).
191 */ 202 */
192 GNUNET_HashCode namespace; 203 unsigned int last_p2p_replies_woff;
193 204
194 /** 205 /**
195 * Hash of the keyword (aka query) for KBLOCKs; Hash of 206 * Which offset in "last_client_replies" will be updated next?
196 * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query) 207 * (we go round-robin).
197 * and hash of the identifier XORed with the target for
198 * SBLOCKS (aka query).
199 */ 208 */
200 GNUNET_HashCode query; 209 unsigned int last_client_replies_woff;
201 210
202}; 211};
203 212
204 213
205/** 214/**
206 * Possible routing policies for an FS-GET request. 215 * Information we keep for each pending request. We should try to
216 * keep this struct as small as possible since its memory consumption
217 * is key to how many requests we can have pending at once.
207 */ 218 */
208enum RoutingPolicy 219struct PendingRequest;
209 {
210 /**
211 * Simply drop the request.
212 */
213 ROUTING_POLICY_NONE = 0,
214
215 /**
216 * Answer it if we can from local datastore.
217 */
218 ROUTING_POLICY_ANSWER = 1,
219
220 /**
221 * Forward the request to other peers (if possible).
222 */
223 ROUTING_POLICY_FORWARD = 2,
224
225 /**
226 * Forward to other peers, and ask them to route
227 * the response via ourselves.
228 */
229 ROUTING_POLICY_INDIRECT = 6,
230
231 /**
232 * Do everything we could possibly do (that would
233 * make sense).
234 */
235 ROUTING_POLICY_ALL = 7
236 };
237 220
238 221
239/** 222/**
240 * Internal context we use for our initial processing 223 * Doubly-linked list of requests we are performing
241 * of a GET request. 224 * on behalf of the same client.
242 */ 225 */
243struct ProcessGetContext 226struct ClientRequestList
244{ 227{
228
245 /** 229 /**
246 * The search query (used for datastore lookup). 230 * This is a doubly-linked list.
247 */ 231 */
248 GNUNET_HashCode query; 232 struct ClientRequestList *next;
249 233
250 /** 234 /**
251 * Which peer we should forward the response to. 235 * This is a doubly-linked list.
252 */ 236 */
253 struct GNUNET_PeerIdentity reply_to; 237 struct ClientRequestList *prev;
254 238
255 /** 239 /**
256 * Namespace for the result (only set for SKS requests) 240 * Request this entry represents.
257 */ 241 */
258 GNUNET_HashCode namespace; 242 struct PendingRequest *req;
259 243
260 /** 244 /**
261 * Peer that we should forward the query to if possible 245 * Client list this request belongs to.
262 * (since that peer likely has the content).
263 */ 246 */
264 struct GNUNET_PeerIdentity prime_target; 247 struct ClientList *client_list;
265 248
249};
250
251
252/**
253 * Replies to be transmitted to the client. The actual
254 * response message is allocated after this struct.
255 */
256struct ClientResponseMessage
257{
266 /** 258 /**
267 * When did we receive this request? 259 * This is a doubly-linked list.
268 */ 260 */
269 struct GNUNET_TIME_Absolute start_time; 261 struct ClientResponseMessage *next;
270 262
271 /** 263 /**
272 * Our entry in the DRQ (non-NULL while we wait for our 264 * This is a doubly-linked list.
273 * turn to interact with the local database).
274 */ 265 */
275 struct DatastoreRequestQueue *drq; 266 struct ClientResponseMessage *prev;
276 267
277 /** 268 /**
278 * Filter used to eliminate duplicate results. Can be NULL if we 269 * Client list entry this response belongs to.
279 * are not yet filtering any results.
280 */ 270 */
281 struct GNUNET_CONTAINER_BloomFilter *bf; 271 struct ClientList *client_list;
282 272
283 /** 273 /**
284 * Bitmap describing which of the optional 274 * Number of bytes in the response.
285 * hash codes / peer identities were given to us.
286 */ 275 */
287 uint32_t bm; 276 size_t msize;
277};
278
288 279
280/**
281 * Linked list of clients we are performing requests
282 * for right now.
283 */
284struct ClientList
285{
289 /** 286 /**
290 * Desired block type. 287 * This is a linked list.
291 */ 288 */
292 uint32_t type; 289 struct ClientList *next;
293 290
294 /** 291 /**
295 * Priority of the request. 292 * ID of a client making a request, NULL if this entry is for a
293 * peer.
296 */ 294 */
297 uint32_t priority; 295 struct GNUNET_SERVER_Client *client;
298 296
299 /** 297 /**
300 * Size of the 'bf' (in bytes). 298 * Head of list of requests performed on behalf
299 * of this client right now.
301 */ 300 */
302 size_t bf_size; 301 struct ClientRequestList *rl_head;
303 302
304 /** 303 /**
305 * In what ways are we going to process 304 * Tail of list of requests performed on behalf
306 * the request? 305 * of this client right now.
307 */ 306 */
308 enum RoutingPolicy policy; 307 struct ClientRequestList *rl_tail;
309 308
310 /** 309 /**
311 * Time-to-live for the request (value 310 * Head of linked list of responses.
312 * we use).
313 */ 311 */
314 int32_t ttl; 312 struct ClientResponseMessage *res_head;
315 313
316 /** 314 /**
317 * Number to mingle hashes for bloom-filter 315 * Tail of linked list of responses.
318 * tests with.
319 */ 316 */
320 int32_t mingle; 317 struct ClientResponseMessage *res_tail;
321 318
322 /** 319 /**
323 * Number of results that were found so far. 320 * Context for sending replies.
324 */ 321 */
325 unsigned int results_found; 322 struct GNUNET_CONNECTION_TransmitHandle *th;
323
326}; 324};
327 325
328 326
329/** 327/**
330 * Information we keep for each pending reply. The 328 * Hash map entry of requests we are performing
331 * actual message follows at the end of this struct. 329 * on behalf of the same peer.
332 */ 330 */
333struct PendingMessage 331struct PeerRequestEntry
334{ 332{
335 /**
336 * This is a linked list.
337 */
338 struct PendingMessage *next;
339 333
340 /** 334 /**
341 * Size of the reply; actual reply message follows 335 * Request this entry represents.
342 * at the end of this struct.
343 */ 336 */
344 size_t msize; 337 struct PendingRequest *req;
345 338
346 /** 339 /**
347 * How important is this message for us? 340 * Entry of peer responsible for this entry.
348 */ 341 */
349 uint32_t priority; 342 struct ConnectedPeer *cp;
350 343
351}; 344};
352 345
353 346
354/** 347/**
355 * All requests from a client are kept in a doubly-linked list. 348 * Doubly-linked list of messages we are performing
349 * due to a pending request.
356 */ 350 */
357struct ClientRequestList; 351struct PendingMessageList
352{
353
354 /**
355 * This is a doubly-linked list of messages on behalf of the same request.
356 */
357 struct PendingMessageList *next;
358
359 /**
360 * This is a doubly-linked list of messages on behalf of the same request.
361 */
362 struct PendingMessageList *prev;
363
364 /**
365 * Message this entry represents.
366 */
367 struct PendingMessage *pm;
368
369 /**
370 * Request this entry belongs to.
371 */
372 struct PendingRequest *req;
373
374 /**
375 * Peer this message is targeted for.
376 */
377 struct ConnectedPeer *target;
378
379};
358 380
359 381
360/** 382/**
@@ -366,23 +388,23 @@ struct PendingRequest
366{ 388{
367 389
368 /** 390 /**
369 * ID of a client making a request, NULL if this entry is for a 391 * If this request was made by a client, this is our entry in the
370 * peer. 392 * client request list; otherwise NULL.
371 */ 393 */
372 struct GNUNET_SERVER_Client *client; 394 struct ClientRequestList *client_request_list;
373 395
374 /** 396 /**
375 * If this request was made by a client, 397 * If this request was made by a peer, this is our entry in the
376 * this is our entry in the client request 398 * per-peer multi-hash map; otherwise NULL.
377 * list; otherwise NULL.
378 */ 399 */
379 struct ClientRequestList *crl_entry; 400 struct PeerRequestEntry *pht_entry;
380 401
381 /** 402 /**
382 * If this is a namespace query, pointer to the hash of the public 403 * If this is a namespace query, pointer to the hash of the public
383 * key of the namespace; otherwise NULL. 404 * key of the namespace; otherwise NULL. Pointer will be to the
405 * end of this struct (so no need to free it).
384 */ 406 */
385 GNUNET_HashCode *namespace; 407 const GNUNET_HashCode *namespace;
386 408
387 /** 409 /**
388 * Bloomfilter we use to filter out replies that we don't care about 410 * Bloomfilter we use to filter out replies that we don't care about
@@ -396,36 +418,29 @@ struct PendingRequest
396 struct GNUNET_CORE_InformationRequestContext *irc; 418 struct GNUNET_CORE_InformationRequestContext *irc;
397 419
398 /** 420 /**
399 * Handle for an active request for transmission to this peer, or 421 * Hash code of all replies that we have seen so far (only valid
400 * NULL. Only used for replies that we are trying to send to a peer 422 * if client is not NULL since we only track replies like this for
401 * that we are not yet connected to. 423 * our own clients).
402 */
403 struct GNUNET_CORE_TransmitHandle *cth;
404
405 /**
406 * Replies that we have received but were unable to forward yet
407 * (typically non-null only if we have a pending transmission
408 * request with the client or the respective peer).
409 */ 424 */
410 struct PendingMessage *replies_pending; 425 GNUNET_HashCode *replies_seen;
411 426
412 /** 427 /**
413 * Pending transmission request for the target client (for processing of 428 * Node in the heap representing this entry; NULL
414 * 'replies_pending'). 429 * if we have no heap node.
415 */ 430 */
416 struct GNUNET_CONNECTION_TransmitHandle *th; 431 struct GNUNET_CONTAINER_HeapNode *hnode;
417 432
418 /** 433 /**
419 * Hash code of all replies that we have seen so far (only valid 434 * Head of list of messages being performed on behalf of this
420 * if client is not NULL since we only track replies like this for 435 * request.
421 * our own clients).
422 */ 436 */
423 GNUNET_HashCode *replies_seen; 437 struct PendingMessageList *pending_head;
424 438
425 /** 439 /**
426 * Node in the heap representing this entry. 440 * Tail of list of messages being performed on behalf of this
441 * request.
427 */ 442 */
428 struct GNUNET_CONTAINER_HeapNode *hnode; 443 struct PendingMessageList *pending_tail;
429 444
430 /** 445 /**
431 * When did we first see this request (form this peer), or, if our 446 * When did we first see this request (form this peer), or, if our
@@ -445,12 +460,6 @@ struct PendingRequest
445 GNUNET_SCHEDULER_TaskIdentifier task; 460 GNUNET_SCHEDULER_TaskIdentifier task;
446 461
447 /** 462 /**
448 * (Interned) Peer identifier (only valid if "client" is NULL)
449 * that identifies a peer that gave us this request.
450 */
451 GNUNET_PEER_Id source_pid;
452
453 /**
454 * (Interned) Peer identifier that identifies a preferred target 463 * (Interned) Peer identifier that identifies a preferred target
455 * for requests. 464 * for requests.
456 */ 465 */
@@ -461,6 +470,12 @@ struct PendingRequest
461 * received our query for this content. 470 * received our query for this content.
462 */ 471 */
463 GNUNET_PEER_Id *used_pids; 472 GNUNET_PEER_Id *used_pids;
473
474 /**
475 * Our entry in the DRQ (non-NULL while we wait for our
476 * turn to interact with the local database).
477 */
478 struct DatastoreRequestQueue *drq;
464 479
465 /** 480 /**
466 * Size of the 'bf' (in bytes). 481 * Size of the 'bf' (in bytes).
@@ -483,6 +498,11 @@ struct PendingRequest
483 unsigned int used_pids_size; 498 unsigned int used_pids_size;
484 499
485 /** 500 /**
501 * Number of results found for this request.
502 */
503 unsigned int results_found;
504
505 /**
486 * How many entries in "replies_seen" are actually valid? 506 * How many entries in "replies_seen" are actually valid?
487 */ 507 */
488 unsigned int replies_seen_off; 508 unsigned int replies_seen_off;
@@ -527,422 +547,529 @@ struct PendingRequest
527 547
528 548
529/** 549/**
530 * All requests from a client are kept in a doubly-linked list. 550 * Our scheduler.
531 */ 551 */
532struct ClientRequestList 552static struct GNUNET_SCHEDULER_Handle *sched;
533{
534 /**
535 * This is a doubly-linked list.
536 */
537 struct ClientRequestList *next;
538
539 /**
540 * This is a doubly-linked list.
541 */
542 struct ClientRequestList *prev;
543
544 /**
545 * A request from this client.
546 */
547 struct PendingRequest *req;
548
549 /**
550 * Client list with the head and tail of this DLL.
551 */
552 struct ClientList *cl;
553};
554
555 553
556/** 554/**
557 * Linked list of all clients that we are currently processing 555 * Our configuration.
558 * requests for.
559 */ 556 */
560struct ClientList 557const struct GNUNET_CONFIGURATION_Handle *cfg;
561{
562
563 /**
564 * This is a linked list.
565 */
566 struct ClientList *next;
567
568 /**
569 * What client is this entry for?
570 */
571 struct GNUNET_SERVER_Client* client;
572
573 /**
574 * Head of the DLL of requests from this client.
575 */
576 struct ClientRequestList *head;
577
578 /**
579 * Tail of the DLL of requests from this client.
580 */
581 struct ClientRequestList *tail;
582
583};
584
585 558
586/** 559/**
587 * Closure for "process_reply" function. 560 * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
588 */ 561 */
589struct ProcessReplyClosure 562static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
590{
591 /**
592 * The data for the reply.
593 */
594 const void *data;
595
596 /**
597 * When the reply expires.
598 */
599 struct GNUNET_TIME_Absolute expiration;
600
601 /**
602 * Size of data.
603 */
604 size_t size;
605
606 /**
607 * Namespace that this reply belongs to
608 * (if it is of type SBLOCK).
609 */
610 GNUNET_HashCode namespace;
611
612 /**
613 * Type of the block.
614 */
615 uint32_t type;
616
617 /**
618 * How much was this reply worth to us?
619 */
620 uint32_t priority;
621};
622
623 563
624/** 564/**
625 * Information about a peer that we are connected to. 565 * Map of peer identifiers to "struct PendingRequest" (for that peer).
626 * We track data that is useful for determining which
627 * peers should receive our requests.
628 */ 566 */
629struct ConnectedPeer 567static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
630{
631
632 /**
633 * List of the last clients for which this peer
634 * successfully answered a query.
635 */
636 struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
637
638 /**
639 * List of the last PIDs for which
640 * this peer successfully answered a query;
641 * We use 0 to indicate no successful reply.
642 */
643 GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
644
645 /**
646 * Average delay between sending the peer a request and
647 * getting a reply (only calculated over the requests for
648 * which we actually got a reply). Calculated
649 * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
650 */
651 struct GNUNET_TIME_Relative avg_delay;
652
653 /**
654 * Handle for an active request for transmission to this
655 * peer, or NULL.
656 */
657 struct GNUNET_CORE_TransmitHandle *cth;
658
659 /**
660 * Messages (replies, queries, content migration) we would like to
661 * send to this peer in the near future. Sorted by priority.
662 */
663 struct PendingMessage *pending_messages;
664
665 /**
666 * Average priority of successful replies. Calculated
667 * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
668 */
669 double avg_priority;
670
671 /**
672 * The peer's identity.
673 */
674 GNUNET_PEER_Id pid;
675
676 /**
677 * Number of requests we have currently pending with this peer (that
678 * is, requests that were transmitted so recently that we would not
679 * retransmit them right now).
680 */
681 unsigned int pending_requests;
682
683 /**
684 * Which offset in "last_p2p_replies" will be updated next?
685 * (we go round-robin).
686 */
687 unsigned int last_p2p_replies_woff;
688
689 /**
690 * Which offset in "last_client_replies" will be updated next?
691 * (we go round-robin).
692 */
693 unsigned int last_client_replies_woff;
694
695};
696
697 568
698/** 569/**
699 * Our connection to the datastore. 570 * Map of query identifiers to "struct PendingRequest" (for that query).
700 */ 571 */
701static struct GNUNET_DATASTORE_Handle *dsh; 572static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
702 573
703/** 574/**
704 * Our scheduler. 575 * Heap with the request that will expire next at the top. Contains
576 * pointers of type "struct PendingRequest*"; these will *also* be
577 * aliased from the "requests_by_peer" data structures and the
578 * "requests_by_query" table. Note that requests from our clients
579 * don't expire and are thus NOT in the "requests_by_expiration"
580 * (or the "requests_by_peer" tables).
705 */ 581 */
706static struct GNUNET_SCHEDULER_Handle *sched; 582static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
707 583
708/** 584/**
709 * Our configuration. 585 * Linked list of clients we are currently processing requests for.
710 */ 586 */
711const struct GNUNET_CONFIGURATION_Handle *cfg; 587struct ClientList *client_list;
712 588
713/** 589/**
714 * Handle to the core service (NULL until we've connected to it). 590 * Pointer to handle to the core service (points to NULL until we've
591 * connected to it).
715 */ 592 */
716struct GNUNET_CORE_Handle *core; 593struct GNUNET_CORE_Handle *core;
717 594
718/**
719 * Head of doubly-linked LGC list.
720 */
721static struct LocalGetContext *lgc_head;
722 595
723/** 596/* ******************* clean up functions ************************ */
724 * Tail of doubly-linked LGC list.
725 */
726static struct LocalGetContext *lgc_tail;
727 597
728/**
729 * Head of request queue for the datastore, sorted by timeout.
730 */
731static struct DatastoreRequestQueue *drq_head;
732 598
733/** 599/**
734 * Tail of request queue for the datastore. 600 * We're done with a particular message list entry.
601 * Free all associated resources.
602 *
603 * @param pml entry to destroy
735 */ 604 */
736static struct DatastoreRequestQueue *drq_tail; 605static void
606destroy_pending_message_list_entry (struct PendingMessageList *pml)
607{
608 GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
609 pml->req->pending_tail,
610 pml);
611 GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
612 pml->target->pending_messages_tail,
613 pml->pm);
614 pml->target->pending_requests--;
615 GNUNET_free (pml->pm);
616 GNUNET_free (pml);
617}
737 618
738/**
739 * Map of query hash codes to requests.
740 */
741static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
742 619
743/** 620/**
744 * Map of peer IDs to requests (for those requests coming 621 * Destroy the given pending message (and call the respective
745 * from other peers). 622 * continuation).
623 *
624 * @param pm message to destroy
625 * @param tpid id of peer that the message was delivered to, or 0 for none
746 */ 626 */
747static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer; 627static void
628destroy_pending_message (struct PendingMessage *pm,
629 GNUNET_PEER_Id tpid)
630{
631 struct PendingMessageList *pml = pm->pml;
632
633 GNUNET_assert (pml->pm == pm);
634 GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
635 pm->cont (pm->cont_cls, 0);
636 destroy_pending_message_list_entry (pml);
637}
638
748 639
749/**
750 * Linked list of all of our clients and their requests.
751 */
752static struct ClientList *clients;
753 640
754/** 641/**
755 * Heap with the request that will expire next at the top. Contains 642 * We're done processing a particular request.
756 * pointers of type "struct PendingRequest*"; these will *also* be 643 * Free all associated resources.
757 * aliased from the "requests_by_peer" data structures and the 644 *
758 * "requests_by_query" table. Note that requests from our clients 645 * @param pr request to destroy
759 * don't expire and are thus NOT in the "requests_by_expiration"
760 * (or the "requests_by_peer" tables).
761 */ 646 */
762static struct GNUNET_CONTAINER_Heap *requests_by_expiration; 647static void
648destroy_pending_request (struct PendingRequest *pr)
649{
650 struct GNUNET_PeerIdentity pid;
651
652 if (pr->hnode != NULL)
653 {
654 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
655 pr->hnode);
656 pr->hnode = NULL;
657 }
658 /* might have already been removed from map
659 in 'process_reply' if there was a unique
660 reply; hence ignore the return value here */
661 (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
662 &pr->query,
663 pr);
664 if (pr->drq != NULL)
665 {
666 GNUNET_FS_drq_get_cancel (pr->drq);
667 pr->drq = NULL;
668 }
669 if (pr->client_request_list != NULL)
670 {
671 GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
672 pr->client_request_list->client_list->rl_tail,
673 pr->client_request_list);
674 GNUNET_free (pr->client_request_list);
675 pr->client_request_list = NULL;
676 }
677 if (pr->pht_entry != NULL)
678 {
679 GNUNET_PEER_resolve (pr->pht_entry->cp->pid,
680 &pid);
681 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
682 &pid.hashPubKey,
683 pr->pht_entry);
684 GNUNET_free (pr->pht_entry);
685 pr->pht_entry = NULL;
686 }
687 if (pr->bf != NULL)
688 {
689 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
690 pr->bf = NULL;
691 }
692 if (pr->irc != NULL)
693 {
694 GNUNET_CORE_peer_change_preference_cancel (pr->irc);
695 pr->irc = NULL;
696 }
697 if (pr->replies_seen != NULL)
698 {
699 GNUNET_free (pr->replies_seen);
700 pr->replies_seen = NULL;
701 }
702 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
703 {
704 GNUNET_SCHEDULER_cancel (sched,
705 pr->task);
706 pr->task = GNUNET_SCHEDULER_NO_TASK;
707 }
708 while (NULL != pr->pending_head)
709 destroy_pending_message_list_entry (pr->pending_head);
710 GNUNET_PEER_change_rc (pr->target_pid, -1);
711 if (pr->used_pids != NULL)
712 {
713 GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
714 GNUNET_free (pr->used_pids);
715 pr->used_pids_off = 0;
716 pr->used_pids_size = 0;
717 pr->used_pids = NULL;
718 }
719 GNUNET_free (pr);
720}
721
763 722
764/** 723/**
765 * Map of peer identifiers to "struct ConnectedPeer" (for that peer). 724 * Method called whenever a given peer connects.
725 *
726 * @param cls closure, not used
727 * @param peer peer identity this notification is about
728 * @param latency reported latency of the connection with 'other'
729 * @param distance reported distance (DV) to 'other'
766 */ 730 */
767static struct GNUNET_CONTAINER_MultiHashMap *connected_peers; 731static void
732peer_connect_handler (void *cls,
733 const struct
734 GNUNET_PeerIdentity * peer,
735 struct GNUNET_TIME_Relative latency,
736 uint32_t distance)
737{
738 struct ConnectedPeer *cp;
739
740 cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
741 cp->pid = GNUNET_PEER_intern (peer);
742 GNUNET_CONTAINER_multihashmap_put (connected_peers,
743 &peer->hashPubKey,
744 cp,
745 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
746}
747
768 748
769/** 749/**
770 * Maximum number of requests (from other peers) that we're 750 * Free (each) request made by the peer.
771 * willing to have pending at any given point in time. 751 *
772 * FIXME: set from configuration (and 32 is a tiny value for testing only). 752 * @param cls closure, points to peer that the request belongs to
753 * @param key current key code
754 * @param value value in the hash map
755 * @return GNUNET_YES (we should continue to iterate)
773 */ 756 */
774static uint64_t max_pending_requests = 32; 757static int
775 758destroy_request (void *cls,
759 const GNUNET_HashCode * key,
760 void *value)
761{
762 const struct GNUNET_PeerIdentity * peer = cls;
763 struct PendingRequest *pr = value;
764
765 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
766 &peer->hashPubKey,
767 pr);
768 destroy_pending_request (pr);
769 return GNUNET_YES;
770}
776 771
777 772
778/** 773/**
779 * Run the next DS request in our 774 * Method called whenever a peer disconnects.
780 * queue, we're done with the current one. 775 *
776 * @param cls closure, not used
777 * @param peer peer identity this notification is about
781 */ 778 */
782static void 779static void
783next_ds_request () 780peer_disconnect_handler (void *cls,
781 const struct
782 GNUNET_PeerIdentity * peer)
784{ 783{
785 struct DatastoreRequestQueue *e; 784 struct ConnectedPeer *cp;
786 785 struct PendingMessage *pm;
787 while (NULL != (e = drq_head)) 786 unsigned int i;
787
788 GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
789 &peer->hashPubKey,
790 &destroy_request,
791 (void*) peer);
792 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
793 &peer->hashPubKey);
794 if (cp == NULL)
795 return;
796 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
788 { 797 {
789 if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value) 798 if (NULL != cp->last_client_replies[i])
790 break; 799 {
791 if (e->task != GNUNET_SCHEDULER_NO_TASK) 800 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
792 GNUNET_SCHEDULER_cancel (sched, e->task); 801 cp->last_client_replies[i] = NULL;
793 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); 802 }
794 e->req (e->req_cls, GNUNET_NO);
795 GNUNET_free (e);
796 } 803 }
797 if (e == NULL) 804 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
798 return; 805 &peer->hashPubKey,
799 if (e->task != GNUNET_SCHEDULER_NO_TASK) 806 cp);
800 GNUNET_SCHEDULER_cancel (sched, e->task); 807 GNUNET_PEER_change_rc (cp->pid, -1);
801 e->task = GNUNET_SCHEDULER_NO_TASK; 808 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
802 e->req (e->req_cls, GNUNET_YES); 809 if (NULL != cp->cth)
803 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); 810 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
804 GNUNET_free (e); 811 while (NULL != (pm = cp->pending_messages_head))
812 destroy_pending_message (pm, 0 /* delivery failed */);
813 GNUNET_break (0 == cp->pending_requests);
814 GNUNET_free (cp);
805} 815}
806 816
807 817
808/** 818/**
809 * A datastore request had to be timed out. 819 * Iterator over hash map entries that removes all occurences
820 * of the given 'client' from the 'last_client_replies' of the
821 * given connected peer.
810 * 822 *
811 * @param cls closure (of type "struct DatastoreRequestQueue*") 823 * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
812 * @param tc task context, unused 824 * @param key current key code (unused)
825 * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
826 * @return GNUNET_YES (we should continue to iterate)
813 */ 827 */
814static void 828static int
815timeout_ds_request (void *cls, 829remove_client_from_last_client_replies (void *cls,
816 const struct GNUNET_SCHEDULER_TaskContext *tc) 830 const GNUNET_HashCode * key,
831 void *value)
817{ 832{
818 struct DatastoreRequestQueue *e = cls; 833 struct GNUNET_SERVER_Client *client = cls;
834 struct ConnectedPeer *cp = value;
835 unsigned int i;
819 836
820 e->task = GNUNET_SCHEDULER_NO_TASK; 837 for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
821 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e); 838 {
822 e->req (e->req_cls, GNUNET_NO); 839 if (cp->last_client_replies[i] == client)
823 GNUNET_free (e); 840 {
841 GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
842 cp->last_client_replies[i] = NULL;
843 }
844 }
845 return GNUNET_YES;
824} 846}
825 847
826 848
827/** 849/**
828 * Queue a request for the datastore. 850 * A client disconnected. Remove all of its pending queries.
829 * 851 *
830 * @param deadline by when the request should run 852 * @param cls closure, NULL
831 * @param fun function to call once the request can be run 853 * @param client identification of the client
832 * @param fun_cls closure for fun
833 */ 854 */
834static struct DatastoreRequestQueue * 855static void
835queue_ds_request (struct GNUNET_TIME_Relative deadline, 856handle_client_disconnect (void *cls,
836 RequestFunction fun, 857 struct GNUNET_SERVER_Client
837 void *fun_cls) 858 * client)
838{ 859{
839 struct DatastoreRequestQueue *e; 860 struct ClientList *pos;
840 struct DatastoreRequestQueue *bef; 861 struct ClientList *prev;
862 struct ClientRequestList *rcl;
863 struct ClientResponseMessage *creply;
841 864
842 if (drq_head == NULL) 865 if (client == NULL)
843 { 866 return; /* huh? is this allowed? */
844 /* no other requests pending, run immediately */ 867 prev = NULL;
845 fun (fun_cls, GNUNET_OK); 868 pos = client_list;
846 return NULL; 869 while ( (NULL != pos) &&
847 } 870 (pos->client != client) )
848 e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
849 e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
850 e->req = fun;
851 e->req_cls = fun_cls;
852 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
853 { 871 {
854 /* local request, highest prio, put at head of queue 872 prev = pos;
855 regardless of deadline */ 873 pos = pos->next;
856 bef = NULL;
857 } 874 }
875 if (pos == NULL)
876 return; /* no requests pending for this client */
877 while (NULL != (rcl = pos->rl_head))
878 destroy_pending_request (rcl->req);
879 if (prev == NULL)
880 client_list = pos->next;
858 else 881 else
882 prev->next = pos->next;
883 if (pos->th != NULL)
859 { 884 {
860 bef = drq_tail; 885 GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
861 while ( (NULL != bef) && 886 pos->th = NULL;
862 (e->timeout.value < bef->timeout.value) )
863 bef = bef->prev;
864 } 887 }
865 GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e); 888 while (NULL != (creply = pos->res_head))
866 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value) 889 {
867 return e; 890 GNUNET_CONTAINER_DLL_remove (pos->res_head,
868 e->task = GNUNET_SCHEDULER_add_delayed (sched, 891 pos->res_tail,
869 deadline, 892 creply);
870 &timeout_ds_request, 893 GNUNET_free (creply);
871 e); 894 }
872 return e; 895 GNUNET_SERVER_client_drop (pos->client);
896 GNUNET_free (pos);
897 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
898 &remove_client_from_last_client_replies,
899 client);
873} 900}
874 901
875 902
876/** 903/**
877 * Free the state associated with a local get context. 904 * Iterator to free peer entries.
878 * 905 *
879 * @param lgc the lgc to free 906 * @param cls closure, unused
907 * @param key current key code
908 * @param value value in the hash map (peer entry)
909 * @return GNUNET_YES (we should continue to iterate)
910 */
911static int
912clean_peer (void *cls,
913 const GNUNET_HashCode * key,
914 void *value)
915{
916 peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
917 return GNUNET_YES;
918}
919
920
921/**
922 * Task run during shutdown.
923 *
924 * @param cls unused
925 * @param tc unused
880 */ 926 */
881static void 927static void
882local_get_context_free (struct LocalGetContext *lgc) 928shutdown_task (void *cls,
929 const struct GNUNET_SCHEDULER_TaskContext *tc)
883{ 930{
884 GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc); 931 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
885 GNUNET_SERVER_client_drop (lgc->client); 932 &clean_peer,
886 GNUNET_free_non_null (lgc->results); 933 NULL);
887 if (lgc->results_bf != NULL) 934 GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
888 GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf); 935 connected_peers = NULL;
889 if (lgc->req != NULL) 936 while (client_list != NULL)
890 { 937 handle_client_disconnect (NULL,
891 if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK) 938 client_list->client);
892 GNUNET_SCHEDULER_cancel (sched, lgc->req->task); 939 GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
893 GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc); 940 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
894 GNUNET_free (lgc->req); 941 requests_by_expiration_heap = 0;
895 } 942 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
896 GNUNET_free (lgc); 943 GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
944 query_request_map = NULL;
945 GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
946 GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
947 peer_request_map = NULL;
948 GNUNET_assert (NULL != core);
949 GNUNET_CORE_disconnect (core);
950 core = NULL;
951 sched = NULL;
952 cfg = NULL;
897} 953}
898 954
899 955
956/* ******************* Utility functions ******************** */
957
958
900/** 959/**
901 * We're able to transmit the next (local) result to the client. 960 * Transmit the given message by copying it to the target buffer
902 * Do it and ask the datastore for more. Or, on error, tell 961 * "buf". "buf" will be NULL and "size" zero if the socket was closed
903 * the datastore to stop giving us more. 962 * for writing in the meantime. In that case, do nothing
963 * (the disconnect or shutdown handler will take care of the rest).
964 * If we were able to transmit messages and there are still more
965 * pending, ask core again for further calls to this function.
904 * 966 *
905 * @param cls our closure (struct LocalGetContext) 967 * @param cls closure, pointer to the 'struct ConnectedPeer*'
906 * @param max maximum number of bytes we can transmit 968 * @param size number of bytes available in buf
907 * @param buf where to copy our message 969 * @param buf where the callee should write the message
908 * @return number of bytes copied to buf 970 * @return number of bytes written to buf
909 */ 971 */
910static size_t 972static size_t
911transmit_local_result (void *cls, 973transmit_to_peer (void *cls,
912 size_t max, 974 size_t size, void *buf)
913 void *buf)
914{ 975{
915 struct LocalGetContext *lgc = cls; 976 struct ConnectedPeer *cp = cls;
916 uint16_t msize; 977 char *cbuf = buf;
917 978 struct GNUNET_PeerIdentity pid;
979 struct PendingMessage *pm;
980 size_t msize;
981
982 cp->cth = NULL;
918 if (NULL == buf) 983 if (NULL == buf)
919 { 984 {
920#if DEBUG_FS 985#if DEBUG_FS
921 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 986 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
922 "Failed to transmit result to local client, aborting datastore iteration.\n"); 987 "Dropping reply, core too busy.\n");
923#endif 988#endif
924 /* error, abort! */
925 GNUNET_free (lgc->result);
926 lgc->result = NULL;
927 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
928 return 0; 989 return 0;
929 } 990 }
930 msize = ntohs (lgc->result->header.size); 991 msize = 0;
931#if DEBUG_FS 992 while ( (NULL != (pm = cp->pending_messages_head) ) &&
932 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 993 (pm->msize <= size) )
933 "Transmitting %u bytes of result to local client.\n", 994 {
934 msize); 995 memcpy (&cbuf[msize], &pm[1], pm->msize);
935#endif 996 msize += pm->msize;
936 GNUNET_assert (max >= msize); 997 size -= pm->msize;
937 memcpy (buf, lgc->result, msize); 998 destroy_pending_message (pm, cp->pid);
938 GNUNET_free (lgc->result); 999 }
939 lgc->result = NULL; 1000 if (NULL != pm)
940 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 1001 {
1002 GNUNET_PEER_resolve (cp->pid,
1003 &pid);
1004 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1005 pm->priority,
1006 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1007 &pid,
1008 pm->msize,
1009 &transmit_to_peer,
1010 pm);
1011 }
941 return msize; 1012 return msize;
942} 1013}
943 1014
944 1015
945/** 1016/**
1017 * Add a message to the set of pending messages for the given peer.
1018 *
1019 * @param cp peer to send message to
1020 * @param pm message to queue
1021 * @param pr request on which behalf this message is being queued
1022 */
1023static void
1024add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1025 struct PendingMessage *pm,
1026 struct PendingRequest *pr)
1027{
1028 struct PendingMessage *pos;
1029 struct PendingMessageList *pml;
1030 struct GNUNET_PeerIdentity pid;
1031
1032 GNUNET_assert (pm->next == NULL);
1033 GNUNET_assert (pm->pml == NULL);
1034 pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1035 pml->req = pr;
1036 pml->target = cp;
1037 pml->pm = pm;
1038 pm->pml = pml;
1039 GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1040 pr->pending_tail,
1041 pml);
1042 pos = cp->pending_messages_head;
1043 while ( (pos != NULL) &&
1044 (pm->priority < pos->priority) )
1045 pos = pos->next;
1046 GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1047 cp->pending_messages_tail,
1048 pos,
1049 pm);
1050 cp->pending_requests++;
1051 if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1052 destroy_pending_message (cp->pending_messages_tail, 0);
1053 if (cp->cth == NULL)
1054 {
1055 /* need to schedule transmission */
1056 GNUNET_PEER_resolve (cp->pid, &pid);
1057 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1058 cp->pending_messages_head->priority,
1059 GNUNET_TIME_UNIT_FOREVER_REL,
1060 &pid,
1061 cp->pending_messages_head->msize,
1062 &transmit_to_peer,
1063 cp);
1064 }
1065 if (cp->cth == NULL)
1066 {
1067 /* FIXME: call stats (rare, bad case) */
1068 }
1069}
1070
1071
1072/**
946 * Mingle hash with the mingle_number to produce different bits. 1073 * Mingle hash with the mingle_number to produce different bits.
947 */ 1074 */
948static void 1075static void
@@ -960,6 +1087,48 @@ mingle_hash (const GNUNET_HashCode * in,
960 1087
961 1088
962/** 1089/**
1090 * Test if the load on this peer is too high
1091 * to even consider processing the query at
1092 * all.
1093 *
1094 * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1095 */
1096static int
1097test_load_too_high ()
1098{
1099 return GNUNET_NO; // FIXME
1100}
1101
1102
1103/* ******************* Pending Request Refresh Task ******************** */
1104
1105
1106/**
1107 * Function called after we either failed or succeeded
1108 * at transmitting a query to a peer.
1109 *
1110 * @param cls the requests "struct PendingRequest*"
1111 * @param pid ID of receiving peer, 0 on transmission error
1112 */
1113static void
1114transmit_query_continuation (void *cls,
1115 GNUNET_PEER_Id tpid)
1116{
1117 struct PendingRequest *pr = cls;
1118
1119 if (tpid == 0)
1120 return;
1121 GNUNET_PEER_change_rc (tpid, 1);
1122 if (pr->used_pids_off == pr->used_pids_size)
1123 GNUNET_array_grow (pr->used_pids,
1124 pr->used_pids_size,
1125 pr->used_pids_size * 2 + 2);
1126 pr->used_pids[pr->used_pids_off++] = tpid;
1127}
1128
1129
1130#if 0
1131/**
963 * How many bytes should a bloomfilter be if we have already seen 1132 * How many bytes should a bloomfilter be if we have already seen
964 * entry_count responses? Note that BLOOMFILTER_K gives us the number 1133 * entry_count responses? Note that BLOOMFILTER_K gives us the number
965 * of bits set per entry. Furthermore, we should not re-size the 1134 * of bits set per entry. Furthermore, we should not re-size the
@@ -1024,75 +1193,15 @@ refresh_bloomfilter (unsigned int count,
1024 } 1193 }
1025 return bf; 1194 return bf;
1026} 1195}
1196#endif
1027 1197
1028 1198
1029/** 1199/**
1030 * Closure used for "target_peer_select_cb". 1200 * We use a random delay to make the timing of requests less
1031 */ 1201 * predictable. This function returns such a random delay.
1032struct PeerSelectionContext
1033{
1034 /**
1035 * The request for which we are selecting
1036 * peers.
1037 */
1038 struct PendingRequest *pr;
1039
1040 /**
1041 * Current "prime" target.
1042 */
1043 struct GNUNET_PeerIdentity target;
1044
1045 /**
1046 * How much do we like this target?
1047 */
1048 double target_score;
1049
1050};
1051
1052
1053/**
1054 * Function called for each connected peer to determine
1055 * which one(s) would make good targets for forwarding.
1056 * 1202 *
1057 * @param cls closure (struct PeerSelectionContext) 1203 * FIXME: make schedule dependent on the specifics of the request?
1058 * @param key current key code (peer identity) 1204 * Or bandwidth and number of connected peers and load?
1059 * @param value value in the hash map (struct ConnectedPeer)
1060 * @return GNUNET_YES if we should continue to
1061 * iterate,
1062 * GNUNET_NO if not.
1063 */
1064static int
1065target_peer_select_cb (void *cls,
1066 const GNUNET_HashCode * key,
1067 void *value)
1068{
1069 struct PeerSelectionContext *psc = cls;
1070 struct ConnectedPeer *cp = value;
1071 struct PendingRequest *pr = psc->pr;
1072 double score;
1073 unsigned int i;
1074
1075 /* 1) check if we have already (recently) forwarded to this peer */
1076 for (i=0;i<pr->used_pids_off;i++)
1077 if (pr->used_pids[i] == cp->pid)
1078 return GNUNET_YES; /* skip */
1079 // 2) calculate how much we'd like to forward to this peer
1080 score = 0; // FIXME!
1081
1082 /* store best-fit in closure */
1083 if (score > psc->target_score)
1084 {
1085 psc->target_score = score;
1086 psc->target.hashPubKey = *key;
1087 }
1088 return GNUNET_YES;
1089}
1090
1091
1092/**
1093 * We use a random delay to make the timing of requests
1094 * less predictable. This function returns such a random
1095 * delay.
1096 * 1205 *
1097 * @return random delay to use for some request, between 0 and TTL_DECREMENT ms 1206 * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1098 */ 1207 */
@@ -1106,69 +1215,6 @@ get_processing_delay ()
1106 1215
1107 1216
1108/** 1217/**
1109 * Task that is run for each request with the goal of forwarding the
1110 * associated query to other peers. The task should re-schedule
1111 * itself to be re-run once the TTL has expired. (or at a later time
1112 * if more peers should be queried earlier).
1113 *
1114 * @param cls the requests "struct PendingRequest*"
1115 * @param tc task context (unused)
1116 */
1117static void
1118forward_request_task (void *cls,
1119 const struct GNUNET_SCHEDULER_TaskContext *tc);
1120
1121
1122/**
1123 * We've selected a peer for forwarding of a query. Construct the
1124 * message and then re-schedule the task to forward again to (other)
1125 * peers.
1126 *
1127 * @param cls closure
1128 * @param size number of bytes available in buf
1129 * @param buf where the callee should write the message
1130 * @return number of bytes written to buf
1131 */
1132static size_t
1133transmit_request_cb (void *cls,
1134 size_t size,
1135 void *buf)
1136{
1137 struct ConnectedPeer *cp = cls;
1138 char *cbuf = buf;
1139 struct GNUNET_PeerIdentity target;
1140 struct PendingMessage *pr;
1141 size_t tot;
1142
1143 cp->cth = NULL;
1144 tot = 0;
1145 while ( (NULL != (pr = cp->pending_messages)) &&
1146 (pr->msize < size - tot) )
1147 {
1148 memcpy (&cbuf[tot],
1149 &pr[1],
1150 pr->msize);
1151 tot += pr->msize;
1152 cp->pending_messages = pr->next;
1153 GNUNET_free (pr);
1154 }
1155 if (NULL != pr)
1156 {
1157 GNUNET_PEER_resolve (cp->pid,
1158 &target);
1159 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1160 pr->priority,
1161 GNUNET_TIME_UNIT_FOREVER_REL,
1162 &target,
1163 pr->msize,
1164 &transmit_request_cb,
1165 cp);
1166 }
1167 return tot;
1168}
1169
1170
1171/**
1172 * Function called after we've tried to reserve a certain amount of 1218 * Function called after we've tried to reserve a certain amount of
1173 * bandwidth for a reply. Check if we succeeded and if so send our 1219 * bandwidth for a reply. Check if we succeeded and if so send our
1174 * query. 1220 * query.
@@ -1192,18 +1238,12 @@ target_reservation_cb (void *cls,
1192 struct PendingRequest *pr = cls; 1238 struct PendingRequest *pr = cls;
1193 struct ConnectedPeer *cp; 1239 struct ConnectedPeer *cp;
1194 struct PendingMessage *pm; 1240 struct PendingMessage *pm;
1195 struct PendingMessage *pos;
1196 struct PendingMessage *prev;
1197 struct GetMessage *gm; 1241 struct GetMessage *gm;
1198 GNUNET_HashCode *ext; 1242 GNUNET_HashCode *ext;
1199 char *bfdata; 1243 char *bfdata;
1200 size_t msize; 1244 size_t msize;
1201 unsigned int k; 1245 unsigned int k;
1202 1246
1203 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1204 get_processing_delay (), // FIXME: longer?
1205 &forward_request_task,
1206 pr);
1207 pr->irc = NULL; 1247 pr->irc = NULL;
1208 GNUNET_assert (peer != NULL); 1248 GNUNET_assert (peer != NULL);
1209 if (amount != DBLOCK_SIZE) 1249 if (amount != DBLOCK_SIZE)
@@ -1211,15 +1251,12 @@ target_reservation_cb (void *cls,
1211 /* FIXME: call stats... */ 1251 /* FIXME: call stats... */
1212 return; /* this target round failed */ 1252 return; /* this target round failed */
1213 } 1253 }
1214 // (2) transmit, update ttl/priority 1254 // (3) transmit, update ttl/priority
1215 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, 1255 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1216 &peer->hashPubKey); 1256 &peer->hashPubKey);
1217 if (cp == NULL) 1257 if (cp == NULL)
1218 { 1258 {
1219 /* Peer must have just left; try again immediately */ 1259 /* Peer must have just left */
1220 pr->task = GNUNET_SCHEDULER_add_now (sched,
1221 &forward_request_task,
1222 pr);
1223 return; 1260 return;
1224 } 1261 }
1225 /* build message and insert message into priority queue */ 1262 /* build message and insert message into priority queue */
@@ -1228,7 +1265,6 @@ target_reservation_cb (void *cls,
1228 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); 1265 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1229 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); 1266 pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1230 pm->msize = msize; 1267 pm->msize = msize;
1231 pm->priority = 0; // FIXME: calculate priority properly!
1232 gm = (struct GetMessage*) &pm[1]; 1268 gm = (struct GetMessage*) &pm[1];
1233 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET); 1269 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1234 gm->header.size = htons (msize); 1270 gm->header.size = htons (msize);
@@ -1236,784 +1272,570 @@ target_reservation_cb (void *cls,
1236 pr->remaining_priority /= 2; 1272 pr->remaining_priority /= 2;
1237 gm->priority = htonl (pr->remaining_priority); 1273 gm->priority = htonl (pr->remaining_priority);
1238 gm->ttl = htonl (pr->ttl); 1274 gm->ttl = htonl (pr->ttl);
1239 gm->filter_mutator = htonl(pr->mingle); 1275 gm->filter_mutator = htonl(pr->mingle); // FIXME: bad endianess conversion?
1240 gm->hash_bitmap = htonl (42); 1276 gm->hash_bitmap = htonl (42); // FIXME!
1241 gm->query = pr->query; 1277 gm->query = pr->query;
1242 ext = (GNUNET_HashCode*) &gm[1]; 1278 ext = (GNUNET_HashCode*) &gm[1];
1243
1244 // FIXME: setup "ext[0]..[k-1]" 1279 // FIXME: setup "ext[0]..[k-1]"
1245 bfdata = (char *) &ext[k]; 1280 bfdata = (char *) &ext[k];
1246 if (pr->bf != NULL) 1281 if (pr->bf != NULL)
1247 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf, 1282 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1248 bfdata, 1283 bfdata,
1249 pr->bf_size); 1284 pr->bf_size);
1285 pm->cont = &transmit_query_continuation;
1286 pm->cont_cls = pr;
1287 add_to_pending_messages_for_peer (cp, pm, pr);
1288}
1250 1289
1251 1290
1252 prev = NULL; 1291
1253 pos = cp->pending_messages; 1292/**
1254 while ( (pos != NULL) && 1293 * Closure used for "target_peer_select_cb".
1255 (pm->priority < pos->priority) ) 1294 */
1256 { 1295struct PeerSelectionContext
1257 prev = pos; 1296{
1258 pos = pos->next; 1297 /**
1259 } 1298 * The request for which we are selecting
1260 if (prev == NULL) 1299 * peers.
1261 cp->pending_messages = pm; 1300 */
1262 else 1301 struct PendingRequest *pr;
1263 prev->next = pm; 1302
1264 pm->next = pos; 1303 /**
1265 if (cp->cth == NULL) 1304 * Current "prime" target.
1266 cp->cth = GNUNET_CORE_notify_transmit_ready (core, 1305 */
1267 cp->pending_messages->priority, 1306 struct GNUNET_PeerIdentity target;
1268 GNUNET_TIME_UNIT_FOREVER_REL, 1307
1269 peer, 1308 /**
1270 msize, 1309 * How much do we like this target?
1271 &transmit_request_cb, 1310 */
1272 cp); 1311 double target_score;
1273 if (cp->cth == NULL) 1312
1313};
1314
1315
1316/**
1317 * Function called for each connected peer to determine
1318 * which one(s) would make good targets for forwarding.
1319 *
1320 * @param cls closure (struct PeerSelectionContext)
1321 * @param key current key code (peer identity)
1322 * @param value value in the hash map (struct ConnectedPeer)
1323 * @return GNUNET_YES if we should continue to
1324 * iterate,
1325 * GNUNET_NO if not.
1326 */
1327static int
1328target_peer_select_cb (void *cls,
1329 const GNUNET_HashCode * key,
1330 void *value)
1331{
1332 struct PeerSelectionContext *psc = cls;
1333 struct ConnectedPeer *cp = value;
1334 struct PendingRequest *pr = psc->pr;
1335 double score;
1336 unsigned int i;
1337
1338 /* 1) check if we have already (recently) forwarded to this peer */
1339 for (i=0;i<pr->used_pids_off;i++)
1340 if (pr->used_pids[i] == cp->pid)
1341 return GNUNET_YES; /* skip */
1342 // 2) calculate how much we'd like to forward to this peer
1343 score = 42; // FIXME!
1344 // FIXME: also need API to gather data on responsiveness
1345 // of this peer (we have fields for that in 'cp', but
1346 // they are never set!)
1347
1348 /* store best-fit in closure */
1349 if (score > psc->target_score)
1274 { 1350 {
1275 /* technically, this should not be a 'break'; but 1351 psc->target_score = score;
1276 we don't handle this (rare) case yet, so let's warn 1352 psc->target.hashPubKey = *key;
1277 about it... */
1278 GNUNET_break (0);
1279 // FIXME: now what?
1280 } 1353 }
1354 return GNUNET_YES;
1281} 1355}
1282 1356
1283 1357
1284/** 1358/**
1285 * Task that is run for each request with the goal of forwarding the 1359 * We're processing a GET request from another peer and have decided
1286 * associated query to other peers. The task should re-schedule 1360 * to forward it to other peers. This function is called periodically
1287 * itself to be re-run once the TTL has expired. (or at a later time 1361 * and should forward the request to other peers until we have all
1288 * if more peers should be queried earlier). 1362 * possible replies. If we have transmitted the *only* reply to
1363 * the initiator we should destroy the pending request. If we have
1364 * many replies in the queue to the initiator, we should delay sending
1365 * out more queries until the reply queue has shrunk some.
1289 * 1366 *
1290 * @param cls the requests "struct PendingRequest*" 1367 * @param cls our "struct ProcessGetContext *"
1291 * @param tc task context (unused) 1368 * @param tc unused
1292 */ 1369 */
1293static void 1370static void
1294forward_request_task (void *cls, 1371forward_request_task (void *cls,
1295 const struct GNUNET_SCHEDULER_TaskContext *tc) 1372 const struct GNUNET_SCHEDULER_TaskContext *tc)
1296{ 1373{
1297 struct PendingRequest *pr = cls; 1374 struct PendingRequest *pr = cls;
1298 struct PeerSelectionContext psc; 1375 struct PeerSelectionContext psc;
1376 struct ConnectedPeer *cp;
1299 1377
1300 pr->task = GNUNET_SCHEDULER_NO_TASK; 1378 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1379 get_processing_delay (),
1380 &forward_request_task,
1381 pr);
1382 if (pr->irc != NULL)
1383 return; /* previous request still pending */
1301 /* (1) select target */ 1384 /* (1) select target */
1302 psc.pr = pr; 1385 psc.pr = pr;
1303 psc.target_score = DBL_MIN; 1386 psc.target_score = DBL_MIN;
1304 GNUNET_CONTAINER_multihashmap_iterate (connected_peers, 1387 GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1305 &target_peer_select_cb, 1388 &target_peer_select_cb,
1306 &psc); 1389 &psc);
1307 if (psc.target_score == DBL_MIN) 1390 if (psc.target_score == DBL_MIN)
1308 { 1391 return; /* nobody selected */
1309 /* no possible target found, wait some time */ 1392
1310 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1311 get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1312 &forward_request_task,
1313 pr);
1314 return;
1315 }
1316 /* (2) reserve reply bandwidth */ 1393 /* (2) reserve reply bandwidth */
1317 GNUNET_assert (NULL == pr->irc); 1394 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1395 &psc.target.hashPubKey);
1318 pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg, 1396 pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
1319 &psc.target, 1397 &psc.target,
1320 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 1398 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1321 -1, 1399 (uint32_t) -1 /* no limit */,
1322 DBLOCK_SIZE, // FIXME: make dependent on type? 1400 DBLOCK_SIZE,
1323 0, 1401 (uint64_t) cp->inc_preference,
1324 &target_reservation_cb, 1402 &target_reservation_cb,
1325 pr); 1403 pr);
1404 cp->inc_preference = 0.0;
1326} 1405}
1327 1406
1328 1407
1329/** 1408/* **************************** P2P PUT Handling ************************ */
1330 * We're processing (local) results for a search request
1331 * from a (local) client. Pass applicable results to the
1332 * client and if we are done either clean up (operation
1333 * complete) or switch to P2P search (more results possible).
1334 *
1335 * @param cls our closure (struct LocalGetContext)
1336 * @param key key for the content
1337 * @param size number of bytes in data
1338 * @param data content stored
1339 * @param type type of the content
1340 * @param priority priority of the content
1341 * @param anonymity anonymity-level for the content
1342 * @param expiration expiration time for the content
1343 * @param uid unique identifier for the datum;
1344 * maybe 0 if no unique identifier is available
1345 */
1346static void
1347process_local_get_result (void *cls,
1348 const GNUNET_HashCode * key,
1349 uint32_t size,
1350 const void *data,
1351 uint32_t type,
1352 uint32_t priority,
1353 uint32_t anonymity,
1354 struct GNUNET_TIME_Absolute
1355 expiration,
1356 uint64_t uid)
1357{
1358 struct LocalGetContext *lgc = cls;
1359 struct PendingRequest *pr;
1360 struct ClientRequestList *crl;
1361 struct ClientList *cl;
1362 size_t msize;
1363 unsigned int i;
1364
1365 if (key == NULL)
1366 {
1367#if DEBUG_FS
1368 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1369 "Received last result for `%s' from local datastore, deciding what to do next.\n",
1370 GNUNET_h2s (&lgc->query));
1371#endif
1372 /* no further results from datastore; continue
1373 processing further requests from the client and
1374 allow the next task to use the datastore; also,
1375 switch to P2P requests or clean up our state. */
1376 next_ds_request ();
1377 GNUNET_SERVER_receive_done (lgc->client,
1378 GNUNET_OK);
1379 if ( (lgc->results_used == 0) ||
1380 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1381 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1382 (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1383 {
1384#if DEBUG_FS
1385 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386 "Forwarding query for `%s' to network.\n",
1387 GNUNET_h2s (&lgc->query));
1388#endif
1389 cl = clients;
1390 while ( (NULL != cl) &&
1391 (cl->client != lgc->client) )
1392 cl = cl->next;
1393 if (cl == NULL)
1394 {
1395 cl = GNUNET_malloc (sizeof (struct ClientList));
1396 cl->client = lgc->client;
1397 cl->next = clients;
1398 clients = cl;
1399 }
1400 crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1401 crl->cl = cl;
1402 GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1403 pr = GNUNET_malloc (sizeof (struct PendingRequest));
1404 pr->client = lgc->client;
1405 GNUNET_SERVER_client_keep (pr->client);
1406 pr->crl_entry = crl;
1407 crl->req = pr;
1408 if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1409 {
1410 pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1411 *pr->namespace = lgc->namespace;
1412 }
1413 pr->replies_seen = lgc->results;
1414 lgc->results = NULL;
1415 pr->start_time = GNUNET_TIME_absolute_get ();
1416 pr->query = lgc->query;
1417 pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1418 pr->replies_seen_off = lgc->results_used;
1419 pr->replies_seen_size = lgc->results_size;
1420 lgc->results_size = 0;
1421 pr->type = lgc->type;
1422 pr->anonymity_level = lgc->anonymity_level;
1423 pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1424 &pr->mingle,
1425 &pr->bf_size,
1426 pr->replies_seen);
1427 GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1428 &pr->query,
1429 pr,
1430 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1431 pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1432 get_processing_delay (),
1433 &forward_request_task,
1434 pr);
1435 local_get_context_free (lgc);
1436 return;
1437 }
1438 /* got all possible results, clean up! */
1439#if DEBUG_FS
1440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441 "Found all possible results for query for `%s', done!\n",
1442 GNUNET_h2s (&lgc->query));
1443#endif
1444 local_get_context_free (lgc);
1445 return;
1446 }
1447 if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1448 {
1449#if DEBUG_FS
1450 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1451 "Received on-demand block for `%s' from local datastore, fetching data.\n",
1452 GNUNET_h2s (&lgc->query));
1453#endif
1454 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
1455 anonymity, expiration, uid,
1456 dsh,
1457 &process_local_get_result,
1458 lgc);
1459 return;
1460 }
1461 if ( (type != lgc->type) &&
1462 (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
1463 {
1464 /* this should be virtually impossible to reach (DBLOCK
1465 query hash being identical to KBLOCK/SBLOCK query hash);
1466 nevertheless, if it happens, the correct thing is to
1467 simply skip the result. */
1468#if DEBUG_FS
1469 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1470 "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
1471 type,
1472 lgc->type,
1473 GNUNET_h2s (&lgc->query));
1474#endif
1475 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1476 return;
1477 }
1478 /* check if this is a result we've alredy
1479 received */
1480 for (i=0;i<lgc->results_used;i++)
1481 if (0 == memcmp (key,
1482 &lgc->results[i],
1483 sizeof (GNUNET_HashCode)))
1484 {
1485#if DEBUG_FS
1486 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1487 "Received duplicate result for `%s' from local datastore, ignoring.\n",
1488 GNUNET_h2s (&lgc->query));
1489#endif
1490 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1491 return;
1492 }
1493 if (lgc->results_used == lgc->results_size)
1494 GNUNET_array_grow (lgc->results,
1495 lgc->results_size,
1496 lgc->results_size * 2 + 2);
1497 GNUNET_CRYPTO_hash (data,
1498 size,
1499 &lgc->results[lgc->results_used++]);
1500 msize = size + sizeof (struct ContentMessage);
1501 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1502 lgc->result = GNUNET_malloc (msize);
1503 lgc->result->header.size = htons (msize);
1504 lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
1505 lgc->result->type = htonl (type);
1506 lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
1507 memcpy (&lgc->result[1],
1508 data,
1509 size);
1510#if DEBUG_FS
1511 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1512 "Received new result for `%s' from local datastore, passing to client.\n",
1513 GNUNET_h2s (&lgc->query));
1514#endif
1515 GNUNET_SERVER_notify_transmit_ready (lgc->client,
1516 msize,
1517 GNUNET_TIME_UNIT_FOREVER_REL,
1518 &transmit_local_result,
1519 lgc);
1520}
1521 1409
1522 1410
1523/** 1411/**
1524 * We're processing a search request from a local 1412 * Function called after we either failed or succeeded
1525 * client. Now it is our turn to query the datastore. 1413 * at transmitting a reply to a peer.
1526 *
1527 * @param cls our closure (struct LocalGetContext)
1528 * @param tc unused
1529 */
1530static void
1531transmit_local_get (void *cls,
1532 const struct GNUNET_SCHEDULER_TaskContext *tc)
1533{
1534 struct LocalGetContext *lgc = cls;
1535 uint32_t type;
1536
1537 type = lgc->type;
1538 if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1539 type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1540 GNUNET_DATASTORE_get (dsh,
1541 &lgc->query,
1542 type,
1543 &process_local_get_result,
1544 lgc,
1545 GNUNET_TIME_UNIT_FOREVER_REL);
1546}
1547
1548
1549/**
1550 * We're processing a search request from a local
1551 * client. Now it is our turn to query the datastore.
1552 *
1553 * @param cls our closure (struct LocalGetContext)
1554 * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
1555 */
1556static void
1557transmit_local_get_ready (void *cls,
1558 int ok)
1559{
1560 struct LocalGetContext *lgc = cls;
1561
1562 GNUNET_assert (GNUNET_OK == ok);
1563 GNUNET_SCHEDULER_add_continuation (sched,
1564 &transmit_local_get,
1565 lgc,
1566 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1567}
1568
1569
1570/**
1571 * Handle START_SEARCH-message (search request from client).
1572 * 1414 *
1573 * @param cls closure 1415 * @param cls the requests "struct PendingRequest*"
1574 * @param client identification of the client 1416 * @param pid ID of receiving peer, 0 on transmission error
1575 * @param message the actual message
1576 */ 1417 */
1577static void 1418static void
1578handle_start_search (void *cls, 1419transmit_reply_continuation (void *cls,
1579 struct GNUNET_SERVER_Client *client, 1420 GNUNET_PEER_Id tpid)
1580 const struct GNUNET_MessageHeader *message)
1581{ 1421{
1582 const struct SearchMessage *sm; 1422 struct PendingRequest *pr = cls;
1583 struct LocalGetContext *lgc; 1423
1584 uint16_t msize; 1424 switch (pr->type)
1585 unsigned int sc;
1586
1587 msize = ntohs (message->size);
1588 if ( (msize < sizeof (struct SearchMessage)) ||
1589 (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
1590 {
1591 GNUNET_break (0);
1592 GNUNET_SERVER_receive_done (client,
1593 GNUNET_SYSERR);
1594 return;
1595 }
1596 sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
1597 sm = (const struct SearchMessage*) message;
1598 GNUNET_SERVER_client_keep (client);
1599 lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
1600 if (sc > 0)
1601 {
1602 lgc->results_used = sc;
1603 GNUNET_array_grow (lgc->results,
1604 lgc->results_size,
1605 sc * 2);
1606 memcpy (lgc->results,
1607 &sm[1],
1608 sc * sizeof (GNUNET_HashCode));
1609 }
1610 lgc->client = client;
1611 lgc->type = ntohl (sm->type);
1612 lgc->anonymity_level = ntohl (sm->anonymity_level);
1613 switch (lgc->type)
1614 { 1425 {
1615 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: 1426 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1616 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: 1427 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1617 lgc->target.hashPubKey = sm->target; 1428 /* only one reply expected, done with the request! */
1429 destroy_pending_request (pr);
1618 break; 1430 break;
1431 case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
1619 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: 1432 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1620 lgc->namespace = sm->target;
1621 break; 1433 break;
1622 default: 1434 default:
1435 GNUNET_break (0);
1623 break; 1436 break;
1624 } 1437 }
1625 lgc->query = sm->query;
1626 GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
1627 lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
1628 &transmit_local_get_ready,
1629 lgc);
1630} 1438}
1631 1439
1632 1440
1633/** 1441/**
1634 * List of handlers for the messages understood by this 1442 * Check if the given KBlock is well-formed.
1635 * service.
1636 */
1637static struct GNUNET_SERVER_MessageHandler handlers[] = {
1638 {&GNUNET_FS_handle_index_start, NULL,
1639 GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
1640 {&GNUNET_FS_handle_index_list_get, NULL,
1641 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
1642 {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
1643 sizeof (struct UnindexMessage) },
1644 {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
1645 0 },
1646 {NULL, NULL, 0, 0}
1647};
1648
1649
1650/**
1651 * Clean up the memory used by the PendingRequest structure (except
1652 * for the client or peer list that the request may be part of).
1653 * 1443 *
1654 * @param pr request to clean up 1444 * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
1445 * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
1446 * @param query where to store the query that this block answers
1447 * @return GNUNET_OK if this is actually a well-formed KBlock
1655 */ 1448 */
1656static void 1449static int
1657destroy_pending_request (struct PendingRequest *pr) 1450check_kblock (const struct KBlock *kb,
1451 size_t dsize,
1452 GNUNET_HashCode *query)
1658{ 1453{
1659 struct PendingMessage *reply; 1454 if (dsize < sizeof (struct KBlock))
1660 struct ClientList *cl;
1661
1662 GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
1663 &pr->query,
1664 pr);
1665 // FIXME: not sure how this can work (efficiently)
1666 // also, what does the return value mean?
1667 if (pr->irc != NULL)
1668 {
1669 GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1670 pr->irc = NULL;
1671 }
1672 if (pr->client == NULL)
1673 { 1455 {
1674 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration, 1456 GNUNET_break_op (0);
1675 pr->hnode); 1457 return GNUNET_SYSERR;
1676 } 1458 }
1677 else 1459 if (dsize - sizeof (struct KBlock) !=
1460 ntohs (kb->purpose.size)
1461 - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose)
1462 - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) )
1678 { 1463 {
1679 cl = pr->crl_entry->cl; 1464 GNUNET_break_op (0);
1680 GNUNET_CONTAINER_DLL_remove (cl->head, 1465 return GNUNET_SYSERR;
1681 cl->tail,
1682 pr->crl_entry);
1683 } 1466 }
1684 if (GNUNET_SCHEDULER_NO_TASK != pr->task) 1467 if (GNUNET_OK !=
1685 GNUNET_SCHEDULER_cancel (sched, pr->task); 1468 GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
1686 if (NULL != pr->bf) 1469 &kb->purpose,
1687 GNUNET_CONTAINER_bloomfilter_free (pr->bf); 1470 &kb->signature,
1688 if (NULL != pr->th) 1471 &kb->keyspace))
1689 GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
1690 while (NULL != (reply = pr->replies_pending))
1691 { 1472 {
1692 pr->replies_pending = reply->next; 1473 GNUNET_break_op (0);
1693 GNUNET_free (reply); 1474 return GNUNET_SYSERR;
1694 } 1475 }
1695 if (NULL != pr->cth) 1476 if (query != NULL)
1696 GNUNET_CORE_notify_transmit_ready_cancel (pr->cth); 1477 GNUNET_CRYPTO_hash (&kb->keyspace,
1697 GNUNET_PEER_change_rc (pr->source_pid, -1); 1478 sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1698 GNUNET_PEER_change_rc (pr->target_pid, -1); 1479 query);
1699 GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off); 1480 return GNUNET_OK;
1700 GNUNET_free_non_null (pr->used_pids);
1701 GNUNET_free_non_null (pr->replies_seen);
1702 GNUNET_free_non_null (pr->namespace);
1703 GNUNET_free (pr);
1704} 1481}
1705 1482
1706 1483
1707/** 1484/**
1708 * A client disconnected. Remove all of its pending queries. 1485 * Check if the given SBlock is well-formed.
1709 * 1486 *
1710 * @param cls closure, NULL 1487 * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
1711 * @param client identification of the client 1488 * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
1489 * @param query where to store the query that this block answers
1490 * @param namespace where to store the namespace that this block belongs to
1491 * @return GNUNET_OK if this is actually a well-formed SBlock
1712 */ 1492 */
1713static void 1493static int
1714handle_client_disconnect (void *cls, 1494check_sblock (const struct SBlock *sb,
1715 struct GNUNET_SERVER_Client 1495 size_t dsize,
1716 * client) 1496 GNUNET_HashCode *query,
1497 GNUNET_HashCode *namespace)
1717{ 1498{
1718 struct LocalGetContext *lgc; 1499 if (dsize < sizeof (struct SBlock))
1719 struct ClientList *cpos;
1720 struct ClientList *cprev;
1721 struct ClientRequestList *rl;
1722
1723 if (client == NULL)
1724 return;
1725 lgc = lgc_head;
1726 while ( (NULL != lgc) &&
1727 (lgc->client != client) )
1728 lgc = lgc->next;
1729 if (lgc != NULL)
1730 local_get_context_free (lgc);
1731 cprev = NULL;
1732 cpos = clients;
1733 while ( (NULL != cpos) &&
1734 (clients->client != client) )
1735 { 1500 {
1736 cprev = cpos; 1501 GNUNET_break_op (0);
1737 cpos = cpos->next; 1502 return GNUNET_SYSERR;
1738 } 1503 }
1739 if (cpos != NULL) 1504 if (dsize !=
1505 ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
1740 { 1506 {
1741 if (cprev == NULL) 1507 GNUNET_break_op (0);
1742 clients = cpos->next; 1508 return GNUNET_SYSERR;
1743 else
1744 cprev->next = cpos->next;
1745 while (NULL != (rl = cpos->head))
1746 {
1747 cpos->head = rl->next;
1748 destroy_pending_request (rl->req);
1749 GNUNET_free (rl);
1750 }
1751 GNUNET_free (cpos);
1752 } 1509 }
1510 if (GNUNET_OK !=
1511 GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
1512 &sb->purpose,
1513 &sb->signature,
1514 &sb->subspace))
1515 {
1516 GNUNET_break_op (0);
1517 return GNUNET_SYSERR;
1518 }
1519 if (query != NULL)
1520 *query = sb->identifier;
1521 if (namespace != NULL)
1522 GNUNET_CRYPTO_hash (&sb->subspace,
1523 sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1524 namespace);
1525 return GNUNET_OK;
1753} 1526}
1754 1527
1755 1528
1756/** 1529/**
1757 * Iterator over entries in the "requests_by_query" map 1530 * Transmit the given message by copying it to the target buffer
1758 * that frees all the entries. 1531 * "buf". "buf" will be NULL and "size" zero if the socket was closed
1759 * 1532 * for writing in the meantime. In that case, do nothing
1760 * @param cls closure, NULL 1533 * (the disconnect or shutdown handler will take care of the rest).
1761 * @param key current key code (the query, unused) 1534 * If we were able to transmit messages and there are still more
1762 * @param value value in the hash map, of type "struct PendingRequest*" 1535 * pending, ask core again for further calls to this function.
1763 * @return GNUNET_YES (we should continue to iterate)
1764 */
1765static int
1766destroy_pending_request_cb (void *cls,
1767 const GNUNET_HashCode * key,
1768 void *value)
1769{
1770 struct PendingRequest *pr = value;
1771
1772 destroy_pending_request (pr);
1773 return GNUNET_YES;
1774}
1775
1776
1777/**
1778 * Task run during shutdown.
1779 * 1536 *
1780 * @param cls unused 1537 * @param cls closure, pointer to the 'struct ClientList*'
1781 * @param tc unused 1538 * @param size number of bytes available in buf
1539 * @param buf where the callee should write the message
1540 * @return number of bytes written to buf
1782 */ 1541 */
1783static void 1542static size_t
1784shutdown_task (void *cls, 1543transmit_to_client (void *cls,
1785 const struct GNUNET_SCHEDULER_TaskContext *tc) 1544 size_t size, void *buf)
1786{ 1545{
1787 if (NULL != core) 1546 struct ClientList *cl = cls;
1788 { 1547 char *cbuf = buf;
1789 GNUNET_CORE_disconnect (core); 1548 struct ClientResponseMessage *creply;
1790 core = NULL; 1549 size_t msize;
1791 } 1550
1792 if (NULL != dsh) 1551 cl->th = NULL;
1552 if (NULL == buf)
1793 { 1553 {
1794 GNUNET_DATASTORE_disconnect (dsh, 1554#if DEBUG_FS
1795 GNUNET_NO); 1555 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1796 dsh = NULL; 1556 "Not sending reply, client communication problem.\n");
1557#endif
1558 return 0;
1797 } 1559 }
1798 GNUNET_CONTAINER_multihashmap_iterate (requests_by_query, 1560 msize = 0;
1799 &destroy_pending_request_cb, 1561 while ( (NULL != (creply = cl->res_head) ) &&
1800 NULL); 1562 (creply->msize <= size) )
1801 while (clients != NULL) 1563 {
1802 handle_client_disconnect (NULL, 1564 memcpy (&cbuf[msize], &creply[1], creply->msize);
1803 clients->client); 1565 msize += creply->msize;
1804 GNUNET_CONTAINER_multihashmap_destroy (requests_by_query); 1566 size -= creply->msize;
1805 requests_by_query = NULL; 1567 GNUNET_CONTAINER_DLL_remove (cl->res_head,
1806 GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer); 1568 cl->res_tail,
1807 requests_by_peer = NULL; 1569 creply);
1808 GNUNET_CONTAINER_heap_destroy (requests_by_expiration); 1570 GNUNET_free (creply);
1809 requests_by_expiration = NULL; 1571 }
1810 // FIXME: iterate over entries and free individually? 1572 if (NULL != creply)
1811 // (or do we get disconnect notifications?) 1573 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
1812 GNUNET_CONTAINER_multihashmap_destroy (connected_peers); 1574 creply->msize,
1813 connected_peers = NULL; 1575 GNUNET_TIME_UNIT_FOREVER_REL,
1576 &transmit_to_client,
1577 cl);
1578 return msize;
1814} 1579}
1815 1580
1816 1581
1817/** 1582/**
1818 * Free (each) request made by the peer. 1583 * Closure for "process_reply" function.
1819 *
1820 * @param cls closure, points to peer that the request belongs to
1821 * @param key current key code
1822 * @param value value in the hash map
1823 * @return GNUNET_YES (we should continue to iterate)
1824 */ 1584 */
1825static int 1585struct ProcessReplyClosure
1826destroy_request (void *cls,
1827 const GNUNET_HashCode * key,
1828 void *value)
1829{ 1586{
1830 const struct GNUNET_PeerIdentity * peer = cls; 1587 /**
1831 struct PendingRequest *pr = value; 1588 * The data for the reply.
1832 1589 */
1833 GNUNET_CONTAINER_multihashmap_remove (requests_by_peer, 1590 const void *data;
1834 &peer->hashPubKey,
1835 pr);
1836 destroy_pending_request (pr);
1837 return GNUNET_YES;
1838}
1839 1591
1592 // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here!
1840 1593
1594 /**
1595 * When the reply expires.
1596 */
1597 struct GNUNET_TIME_Absolute expiration;
1841 1598
1842/** 1599 /**
1843 * Method called whenever a given peer connects. 1600 * Size of data.
1844 * 1601 */
1845 * @param cls closure, not used 1602 size_t size;
1846 * @param peer peer identity this notification is about
1847 * @param latency reported latency of the connection with 'other'
1848 * @param distance reported distance (DV) to 'other'
1849 */
1850static void
1851peer_connect_handler (void *cls,
1852 const struct
1853 GNUNET_PeerIdentity * peer,
1854 struct GNUNET_TIME_Relative latency,
1855 uint32_t distance)
1856{
1857 struct ConnectedPeer *cp;
1858 1603
1859 cp = GNUNET_malloc (sizeof (struct ConnectedPeer)); 1604 /**
1860 cp->pid = GNUNET_PEER_intern (peer); 1605 * Namespace that this reply belongs to
1861 GNUNET_CONTAINER_multihashmap_put (connected_peers, 1606 * (if it is of type SBLOCK).
1862 &peer->hashPubKey, 1607 */
1863 cp, 1608 GNUNET_HashCode namespace;
1864 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); 1609
1865} 1610 /**
1611 * Type of the block.
1612 */
1613 uint32_t type;
1614
1615 /**
1616 * How much was this reply worth to us?
1617 */
1618 uint32_t priority;
1619};
1866 1620
1867 1621
1868/** 1622/**
1869 * Method called whenever a peer disconnects. 1623 * We have received a reply; handle it!
1870 * 1624 *
1871 * @param cls closure, not used 1625 * @param cls response (struct ProcessReplyClosure)
1872 * @param peer peer identity this notification is about 1626 * @param key our query
1627 * @param value value in the hash map (info about the query)
1628 * @return GNUNET_YES (we should continue to iterate)
1873 */ 1629 */
1874static void 1630static int
1875peer_disconnect_handler (void *cls, 1631process_reply (void *cls,
1876 const struct 1632 const GNUNET_HashCode * key,
1877 GNUNET_PeerIdentity * peer) 1633 void *value)
1878{ 1634{
1635 struct ProcessReplyClosure *prq = cls;
1636 struct PendingRequest *pr = value;
1637 struct PendingMessage *reply;
1638 struct ClientResponseMessage *creply;
1639 struct ClientList *cl;
1640 struct PutMessage *pm;
1641 struct ContentMessage *cm;
1879 struct ConnectedPeer *cp; 1642 struct ConnectedPeer *cp;
1880 struct PendingMessage *pm; 1643 GNUNET_HashCode chash;
1644 GNUNET_HashCode mhash;
1645 size_t msize;
1646 uint32_t prio;
1881 1647
1882 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers, 1648
1883 &peer->hashPubKey); 1649 GNUNET_CRYPTO_hash (prq->data,
1884 if (cp != NULL) 1650 prq->size,
1651 &chash);
1652 switch (prq->type)
1885 { 1653 {
1886 GNUNET_PEER_change_rc (cp->pid, -1); 1654 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1887 GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE); 1655 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1888 if (NULL != cp->cth) 1656 /* only possible reply, stop requesting! */
1889 GNUNET_CORE_notify_transmit_ready_cancel (cp->cth); 1657 while (NULL != pr->pending_head)
1890 while (NULL != (pm = cp->pending_messages)) 1658 destroy_pending_message_list_entry (pr->pending_head);
1659 GNUNET_break (GNUNET_YES ==
1660 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1661 key,
1662 prq));
1663 break;
1664 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1665 if (0 != memcmp (pr->namespace,
1666 &prq->namespace,
1667 sizeof (GNUNET_HashCode)))
1668 return GNUNET_YES; /* wrong namespace */
1669 /* then: fall-through! */
1670 case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
1671 if (pr->bf != NULL)
1891 { 1672 {
1892 cp->pending_messages = pm->next; 1673 mingle_hash (&chash, pr->mingle, &mhash);
1893 GNUNET_free (pm); 1674 if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
1675 &mhash))
1676 return GNUNET_YES; /* duplicate */
1677 GNUNET_CONTAINER_bloomfilter_add (pr->bf,
1678 &mhash);
1894 } 1679 }
1895 GNUNET_free (cp); 1680 if (pr->client_request_list != NULL)
1681 {
1682 cl = pr->client_request_list->client_list;
1683 if (pr->replies_seen_size == pr->replies_seen_off)
1684 {
1685 GNUNET_array_grow (pr->replies_seen,
1686 pr->replies_seen_size,
1687 pr->replies_seen_size * 2 + 4);
1688 // FIXME: recalculate BF!
1689 }
1690 pr->replies_seen[pr->replies_seen_off++] = chash;
1691 }
1692 break;
1693 case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
1694 // FIXME: any checks against duplicates for SKBlocks?
1695 break;
1696 default:
1697 GNUNET_break (0);
1698 return GNUNET_YES;
1896 } 1699 }
1897 GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer, 1700 prio = pr->priority;
1898 &peer->hashPubKey, 1701 prq->priority += pr->remaining_priority;
1899 &destroy_request, 1702 pr->remaining_priority = 0;
1900 (void*) peer); 1703 if (pr->client_request_list != NULL)
1704 {
1705 msize = sizeof (struct PutMessage) + prq->size;
1706 creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
1707 creply->msize = msize;
1708 creply->client_list = cl;
1709 GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
1710 cl->res_tail,
1711 cl->res_tail,
1712 creply);
1713 pm = (struct PutMessage*) &creply[1];
1714 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1715 pm->header.size = htons (msize);
1716 pm->type = htonl (prq->type);
1717 pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
1718 memcpy (&creply[1], prq->data, prq->size);
1719 if (NULL == cl->th)
1720 cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
1721 msize,
1722 GNUNET_TIME_UNIT_FOREVER_REL,
1723 &transmit_to_client,
1724 cl);
1725 GNUNET_break (cl->th != NULL);
1726 }
1727 else
1728 {
1729 cp = pr->pht_entry->cp;
1730 msize = sizeof (struct ContentMessage) + prq->size;
1731 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
1732 reply->cont = &transmit_reply_continuation;
1733 reply->cont_cls = pr;
1734 reply->msize = msize;
1735 reply->priority = (uint32_t) -1; /* send replies first! */
1736 cm = (struct ContentMessage*) &reply[1];
1737 cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
1738 cm->header.size = htons (msize);
1739 cm->type = htonl (prq->type);
1740 cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
1741 memcpy (&reply[1], prq->data, prq->size);
1742 add_to_pending_messages_for_peer (cp, reply, pr);
1743 }
1744
1745
1746 // FIXME: implement hot-path routing statistics keeping!
1747 return GNUNET_YES;
1901} 1748}
1902 1749
1903 1750
1904/** 1751/**
1905 * We're processing a GET request from another peer and have decided 1752 * Handle P2P "PUT" message.
1906 * to forward it to other peers.
1907 * 1753 *
1908 * @param cls our "struct ProcessGetContext *" 1754 * @param cls closure, always NULL
1909 * @param tc unused 1755 * @param other the other peer involved (sender or receiver, NULL
1756 * for loopback messages where we are both sender and receiver)
1757 * @param message the actual message
1758 * @param latency reported latency of the connection with 'other'
1759 * @param distance reported distance (DV) to 'other'
1760 * @return GNUNET_OK to keep the connection open,
1761 * GNUNET_SYSERR to close it (signal serious error)
1910 */ 1762 */
1911static void 1763static int
1912forward_get_request (void *cls, 1764handle_p2p_put (void *cls,
1913 const struct GNUNET_SCHEDULER_TaskContext *tc) 1765 const struct GNUNET_PeerIdentity *other,
1766 const struct GNUNET_MessageHeader *message,
1767 struct GNUNET_TIME_Relative latency,
1768 uint32_t distance)
1914{ 1769{
1915 struct ProcessGetContext *pgc = cls; 1770 const struct PutMessage *put;
1916 struct PendingRequest *pr; 1771 uint16_t msize;
1917 struct PendingRequest *eer; 1772 size_t dsize;
1918 struct GNUNET_PeerIdentity target; 1773 uint32_t type;
1774 struct GNUNET_TIME_Absolute expiration;
1775 GNUNET_HashCode query;
1776 struct ProcessReplyClosure prq;
1919 1777
1920 pr = GNUNET_malloc (sizeof (struct PendingRequest)); 1778 msize = ntohs (message->size);
1921 if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm)) 1779 if (msize < sizeof (struct PutMessage))
1922 {
1923 pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
1924 *pr->namespace = pgc->namespace;
1925 }
1926 pr->bf = pgc->bf;
1927 pr->bf_size = pgc->bf_size;
1928 pgc->bf = NULL;
1929 pr->start_time = pgc->start_time;
1930 pr->query = pgc->query;
1931 pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
1932 if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
1933 pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
1934 pr->anonymity_level = 1; /* default */
1935 pr->priority = pgc->priority;
1936 pr->remaining_priority = pr->priority;
1937 pr->mingle = pgc->mingle;
1938 pr->ttl = pgc->ttl;
1939 pr->type = pgc->type;
1940 GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1941 &pr->query,
1942 pr,
1943 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1944 GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
1945 &pgc->reply_to.hashPubKey,
1946 pr,
1947 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1948 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration,
1949 pr,
1950 pr->start_time.value + pr->ttl);
1951 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
1952 { 1780 {
1953 /* expire oldest request! */ 1781 GNUNET_break_op(0);
1954 eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration); 1782 return GNUNET_SYSERR;
1955 GNUNET_PEER_resolve (eer->source_pid,
1956 &target);
1957 GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1958 &target.hashPubKey,
1959 eer);
1960 destroy_pending_request (eer);
1961 } 1783 }
1962 pr->task = GNUNET_SCHEDULER_add_delayed (sched, 1784 put = (const struct PutMessage*) message;
1963 get_processing_delay (), 1785 dsize = msize - sizeof (struct PutMessage);
1964 &forward_request_task, 1786 type = ntohl (put->type);
1965 pr); 1787 expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
1966 GNUNET_free (pgc);
1967}
1968/**
1969 * Transmit the given message by copying it to
1970 * the target buffer "buf". "buf" will be
1971 * NULL and "size" zero if the socket was closed for
1972 * writing in the meantime. In that case, only
1973 1788
1974 * free the message 1789 /* first, validate! */
1975 * 1790 switch (type)
1976 * @param cls closure, pointer to the message
1977 * @param size number of bytes available in buf
1978 * @param buf where the callee should write the message
1979 * @return number of bytes written to buf
1980 */
1981static size_t
1982transmit_message (void *cls,
1983 size_t size, void *buf)
1984{
1985 struct GNUNET_MessageHeader *msg = cls;
1986 uint16_t msize;
1987
1988 if (NULL == buf)
1989 { 1791 {
1990#if DEBUG_FS 1792 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1991 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1793 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1992 "Dropping reply, core too busy.\n"); 1794 GNUNET_CRYPTO_hash (&put[1], dsize, &query);
1993#endif 1795 break;
1994 GNUNET_free (msg); 1796 case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
1995 return 0; 1797 if (GNUNET_OK !=
1798 check_kblock ((const struct KBlock*) &put[1],
1799 dsize,
1800 &query))
1801 return GNUNET_SYSERR;
1802 break;
1803 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1804 if (GNUNET_OK !=
1805 check_sblock ((const struct SBlock*) &put[1],
1806 dsize,
1807 &query,
1808 &prq.namespace))
1809 return GNUNET_SYSERR;
1810 break;
1811 case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
1812 // FIXME -- validate SKBLOCK!
1813 GNUNET_break (0);
1814 return GNUNET_OK;
1815 default:
1816 /* unknown block type */
1817 GNUNET_break_op (0);
1818 return GNUNET_SYSERR;
1996 } 1819 }
1997 msize = ntohs (msg->size); 1820
1998 GNUNET_assert (size >= msize); 1821 /* now, lookup 'query' */
1999 memcpy (buf, msg, msize); 1822 prq.data = (const void*) &put[1];
2000 GNUNET_free (msg); 1823 prq.size = dsize;
2001 return msize; 1824 prq.type = type;
1825 prq.expiration = expiration;
1826 prq.priority = 0;
1827 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
1828 &query,
1829 &process_reply,
1830 &prq);
1831 // FIXME: if migration is on and load is low,
1832 // queue to store data in datastore;
1833 // use "prq.priority" for that!
1834 return GNUNET_OK;
2002} 1835}
2003 1836
2004 1837
2005/** 1838/* **************************** P2P GET Handling ************************ */
2006 * Test if the load on this peer is too high
2007 * to even consider processing the query at
2008 * all.
2009 *
2010 * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2011 */
2012static int
2013test_load_too_high ()
2014{
2015 return GNUNET_NO; // FIXME
2016}
2017 1839
2018 1840
2019/** 1841/**
@@ -2034,142 +1856,101 @@ test_load_too_high ()
2034 * maybe 0 if no unique identifier is available 1856 * maybe 0 if no unique identifier is available
2035 */ 1857 */
2036static void 1858static void
2037process_p2p_get_result (void *cls, 1859process_local_reply (void *cls,
2038 const GNUNET_HashCode * key, 1860 const GNUNET_HashCode * key,
2039 uint32_t size, 1861 uint32_t size,
2040 const void *data, 1862 const void *data,
2041 uint32_t type, 1863 uint32_t type,
2042 uint32_t priority, 1864 uint32_t priority,
2043 uint32_t anonymity, 1865 uint32_t anonymity,
2044 struct GNUNET_TIME_Absolute 1866 struct GNUNET_TIME_Absolute
2045 expiration, 1867 expiration,
2046 uint64_t uid) 1868 uint64_t uid)
2047{ 1869{
2048 struct ProcessGetContext *pgc = cls; 1870 struct PendingRequest *pr = cls;
1871 struct ProcessReplyClosure prq;
2049 GNUNET_HashCode dhash; 1872 GNUNET_HashCode dhash;
2050 GNUNET_HashCode mhash; 1873 GNUNET_HashCode mhash;
2051 struct PutMessage *reply; 1874 GNUNET_HashCode query;
2052 1875
1876 pr->drq = NULL;
2053 if (NULL == key) 1877 if (NULL == key)
2054 { 1878 {
2055 /* no more results */ 1879 /* no more results */
2056 if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) == ROUTING_POLICY_FORWARD) && 1880 if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2057 ( (0 == pgc->results_found) || 1881 pr->task = GNUNET_SCHEDULER_add_now (sched,
2058 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) || 1882 &forward_request_task,
2059 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || 1883 pr);
2060 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2061 {
2062 GNUNET_SCHEDULER_add_continuation (sched,
2063 &forward_get_request,
2064 pgc,
2065 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2066 }
2067 else
2068 {
2069 if (pgc->bf != NULL)
2070 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2071 GNUNET_free (pgc);
2072 }
2073 next_ds_request ();
2074 return; 1884 return;
2075 } 1885 }
2076 if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND) 1886 if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2077 { 1887 {
2078 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 1888 if (GNUNET_OK !=
2079 anonymity, expiration, uid, dsh, 1889 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
2080 &process_p2p_get_result, 1890 anonymity, expiration, uid,
2081 pgc); 1891 &process_local_reply,
1892 pr))
1893 GNUNET_FS_drq_get_next (GNUNET_YES);
2082 return; 1894 return;
2083 } 1895 }
2084 /* check for duplicates */ 1896 /* check for duplicates */
2085 GNUNET_CRYPTO_hash (data, size, &dhash); 1897 GNUNET_CRYPTO_hash (data, size, &dhash);
2086 mingle_hash (&dhash, 1898 mingle_hash (&dhash,
2087 pgc->mingle, 1899 pr->mingle,
2088 &mhash); 1900 &mhash);
2089 if ( (pgc->bf != NULL) && 1901 if ( (pr->bf != NULL) &&
2090 (GNUNET_YES == 1902 (GNUNET_YES ==
2091 GNUNET_CONTAINER_bloomfilter_test (pgc->bf, 1903 GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2092 &mhash)) ) 1904 &mhash)) )
2093 { 1905 {
2094#if DEBUG_FS 1906#if DEBUG_FS
2095 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 1907 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2096 "Result from datastore filtered by bloomfilter.\n"); 1908 "Result from datastore filtered by bloomfilter.\n");
2097#endif 1909#endif
2098 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 1910 GNUNET_FS_drq_get_next (GNUNET_YES);
2099 return; 1911 return;
2100 } 1912 }
2101 pgc->results_found++; 1913 pr->results_found++;
2102 if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) || 1914 if ( (pr->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2103 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) || 1915 (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2104 (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) 1916 (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2105 { 1917 {
2106 if (pgc->bf == NULL) 1918 if (pr->bf == NULL)
2107 { 1919 {
2108 pgc->bf_size = 32; 1920 pr->bf_size = 32;
2109 pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 1921 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2110 pgc->bf_size, 1922 pr->bf_size,
2111 BLOOMFILTER_K); 1923 BLOOMFILTER_K);
2112 } 1924 }
2113 GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 1925 GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2114 &mhash); 1926 &mhash);
2115 } 1927 }
2116 1928 memset (&prq, 0, sizeof (prq));
2117 reply = GNUNET_malloc (sizeof (struct PutMessage) + size); 1929 prq.data = data;
2118 reply->header.size = htons (sizeof (struct PutMessage) + size); 1930 prq.expiration = expiration;
2119 reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT); 1931 prq.size = size;
2120 reply->type = htonl (type); 1932 if ( (type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) &&
2121 reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration)); 1933 (GNUNET_OK != check_sblock ((const struct SBlock*) data,
2122 memcpy (&reply[1], data, size); 1934 size,
2123 GNUNET_CORE_notify_transmit_ready (core, 1935 &query,
2124 pgc->priority, 1936 &prq.namespace)) )
2125 ACCEPTABLE_REPLY_DELAY,
2126 &pgc->reply_to,
2127 sizeof (struct PutMessage) + size,
2128 &transmit_message,
2129 reply);
2130 if ( (GNUNET_YES == test_load_too_high()) ||
2131 (pgc->results_found > 5 + 2 * pgc->priority) )
2132 { 1937 {
2133 GNUNET_DATASTORE_get_next (dsh, GNUNET_NO); 1938 GNUNET_break (0);
2134 pgc->policy &= ~ ROUTING_POLICY_FORWARD; 1939 /* FIXME: consider removing the block? */
1940 GNUNET_FS_drq_get_next (GNUNET_YES);
2135 return; 1941 return;
2136 } 1942 }
2137 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 1943 prq.type = type;
2138} 1944 prq.priority = priority;
1945 process_reply (&prq, key, pr);
2139 1946
2140 1947 if ( (GNUNET_YES == test_load_too_high()) ||
2141/** 1948 (pr->results_found > 5 + 2 * pr->priority) )
2142 * We're processing a GET request from another peer. Give it to our
2143 * local datastore.
2144 *
2145 * @param cls our "struct ProcessGetContext"
2146 * @param ok did we get a datastore slice or not?
2147 */
2148static void
2149ds_get_request (void *cls,
2150 int ok)
2151{
2152 struct ProcessGetContext *pgc = cls;
2153 uint32_t type;
2154 struct GNUNET_TIME_Relative timeout;
2155
2156 if (GNUNET_OK != ok)
2157 { 1949 {
2158 /* no point in doing P2P stuff if we can't even do local */ 1950 GNUNET_FS_drq_get_next (GNUNET_NO);
2159 GNUNET_free (dsh);
2160 return; 1951 return;
2161 } 1952 }
2162 type = pgc->type; 1953 GNUNET_FS_drq_get_next (GNUNET_YES);
2163 if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2164 type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2165 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2166 (pgc->priority + 1));
2167 GNUNET_DATASTORE_get (dsh,
2168 &pgc->query,
2169 type,
2170 &process_p2p_get_result,
2171 pgc,
2172 timeout);
2173} 1954}
2174 1955
2175 1956
@@ -2201,17 +1982,16 @@ bound_ttl (int32_t ttl_in, uint32_t prio)
2201 1982
2202 1983
2203/** 1984/**
2204 * We've received a request with the specified 1985 * We've received a request with the specified priority. Bound it
2205 * priority. Bound it according to how much 1986 * according to how much we trust the given peer.
2206 * we trust the given peer.
2207 * 1987 *
2208 * @param prio_in requested priority 1988 * @param prio_in requested priority
2209 * @param peer the peer making the request 1989 * @param cp the peer making the request
2210 * @return effective priority 1990 * @return effective priority
2211 */ 1991 */
2212static uint32_t 1992static uint32_t
2213bound_priority (uint32_t prio_in, 1993bound_priority (uint32_t prio_in,
2214 const struct GNUNET_PeerIdentity *peer) 1994 struct ConnectedPeer *cp)
2215{ 1995{
2216 return 0; // FIXME! 1996 return 0; // FIXME!
2217} 1997}
@@ -2233,20 +2013,23 @@ static int
2233handle_p2p_get (void *cls, 2013handle_p2p_get (void *cls,
2234 const struct GNUNET_PeerIdentity *other, 2014 const struct GNUNET_PeerIdentity *other,
2235 const struct GNUNET_MessageHeader *message, 2015 const struct GNUNET_MessageHeader *message,
2236 struct GNUNET_TIME_Relative latency, 2016 struct GNUNET_TIME_Relative latency,
2237 uint32_t distance) 2017 uint32_t distance)
2238{ 2018{
2019 struct PendingRequest *pr;
2020 struct PeerRequestEntry *pre;
2021 struct ConnectedPeer *cp;
2022 struct ConnectedPeer *cps;
2023 struct GNUNET_TIME_Relative timeout;
2239 uint16_t msize; 2024 uint16_t msize;
2240 const struct GetMessage *gm; 2025 const struct GetMessage *gm;
2241 unsigned int bits; 2026 unsigned int bits;
2242 const GNUNET_HashCode *opt; 2027 const GNUNET_HashCode *opt;
2243 struct ProcessGetContext *pgc;
2244 uint32_t bm; 2028 uint32_t bm;
2245 size_t bfsize; 2029 size_t bfsize;
2246 uint32_t ttl_decrement; 2030 uint32_t ttl_decrement;
2031 uint32_t type;
2247 double preference; 2032 double preference;
2248 int net_load_up;
2249 int net_load_down;
2250 2033
2251 msize = ntohs(message->size); 2034 msize = ntohs(message->size);
2252 if (msize < sizeof (struct GetMessage)) 2035 if (msize < sizeof (struct GetMessage))
@@ -2270,40 +2053,33 @@ handle_p2p_get (void *cls,
2270 } 2053 }
2271 opt = (const GNUNET_HashCode*) &gm[1]; 2054 opt = (const GNUNET_HashCode*) &gm[1];
2272 bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode); 2055 bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2273 pgc = GNUNET_malloc (sizeof (struct ProcessGetContext)); 2056
2274 if (bfsize > 0) 2057 bm = ntohl (gm->hash_bitmap);
2058 if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
2059 (ntohl (gm->type) == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) )
2275 { 2060 {
2276 pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1], 2061 GNUNET_break_op (0);
2277 bfsize, 2062 return GNUNET_SYSERR;
2278 BLOOMFILTER_K);
2279 pgc->bf_size = bfsize;
2280 } 2063 }
2281 pgc->type = ntohl (gm->type);
2282 pgc->bm = ntohl (gm->hash_bitmap);
2283 pgc->mingle = gm->filter_mutator;
2284 bits = 0; 2064 bits = 0;
2285 if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO)) 2065 cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2286 pgc->reply_to.hashPubKey = opt[bits++]; 2066 &other->hashPubKey);
2067 GNUNET_assert (NULL != cps);
2068 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
2069 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2070 &opt[bits++]);
2287 else 2071 else
2288 pgc->reply_to = *other; 2072 cp = cps;
2289 if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) 2073 if (cp == NULL)
2290 pgc->namespace = opt[bits++];
2291 else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2292 { 2074 {
2293 GNUNET_break_op (0); 2075 /* FIXME: try connect? */
2294 GNUNET_free (pgc); 2076 return GNUNET_OK;
2295 return GNUNET_SYSERR;
2296 } 2077 }
2297 if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2298 pgc->prime_target.hashPubKey = opt[bits++];
2299 /* note that we can really only check load here since otherwise 2078 /* note that we can really only check load here since otherwise
2300 peers could find out that we are overloaded by being disconnected 2079 peers could find out that we are overloaded by not being
2301 after sending us a malformed query... */ 2080 disconnected after sending us a malformed query... */
2302 if (GNUNET_YES == test_load_too_high ()) 2081 if (GNUNET_YES == test_load_too_high ())
2303 { 2082 {
2304 if (NULL != pgc->bf)
2305 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2306 GNUNET_free (pgc);
2307#if DEBUG_FS 2083#if DEBUG_FS
2308 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2084 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2309 "Dropping query from `%s', this peer is too busy.\n", 2085 "Dropping query from `%s', this peer is too busy.\n",
@@ -2311,55 +2087,34 @@ handle_p2p_get (void *cls,
2311#endif 2087#endif
2312 return GNUNET_OK; 2088 return GNUNET_OK;
2313 } 2089 }
2314 net_load_up = 50; // FIXME 2090
2315 net_load_down = 50; // FIXME 2091 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
2316 pgc->policy = ROUTING_POLICY_NONE; 2092 (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)?sizeof(GNUNET_HashCode):0);
2317 if ( (net_load_up < IDLE_LOAD_THRESHOLD) && 2093 if ((bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2318 (net_load_down < IDLE_LOAD_THRESHOLD) ) 2094 pr->namespace = (GNUNET_HashCode*) &pr[1];
2319 { 2095 pr->type = ntohl (gm->type);
2320 pgc->policy |= ROUTING_POLICY_ALL; 2096 pr->mingle = gm->filter_mutator;
2321 pgc->priority = 0; /* no charge */ 2097 if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2322 } 2098 memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
2323 else 2099 else if (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2324 {
2325 pgc->priority = bound_priority (ntohl (gm->priority), other);
2326 if ( (net_load_up <
2327 IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2328 (net_load_down <
2329 IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2330 {
2331 pgc->policy |= ROUTING_POLICY_ALL;
2332 }
2333 else
2334 {
2335 // FIXME: is this sound?
2336 if (net_load_up < 90 + 10 * pgc->priority)
2337 pgc->policy |= ROUTING_POLICY_FORWARD;
2338 if (net_load_down < 90 + 10 * pgc->priority)
2339 pgc->policy |= ROUTING_POLICY_ANSWER;
2340 }
2341 }
2342 if (pgc->policy == ROUTING_POLICY_NONE)
2343 { 2100 {
2344#if DEBUG_FS 2101 GNUNET_break_op (0);
2345 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2102 GNUNET_free (pr);
2346 "Dropping query from `%s', network saturated.\n", 2103 return GNUNET_SYSERR;
2347 GNUNET_i2s (other));
2348#endif
2349 if (NULL != pgc->bf)
2350 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2351 GNUNET_free (pgc);
2352 return GNUNET_OK; /* drop */
2353 } 2104 }
2354 if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT) 2105 if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2355 pgc->priority = 0; /* kill the priority (we cannot benefit) */ 2106 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
2356 pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority); 2107
2108 pr->anonymity_level = 1;
2109 pr->priority = bound_priority (ntohl (gm->priority), cps);
2110 pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
2111 pr->query = gm->query;
2357 /* decrement ttl (always) */ 2112 /* decrement ttl (always) */
2358 ttl_decrement = 2 * TTL_DECREMENT + 2113 ttl_decrement = 2 * TTL_DECREMENT +
2359 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 2114 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2360 TTL_DECREMENT); 2115 TTL_DECREMENT);
2361 if ( (pgc->ttl < 0) && 2116 if ( (pr->ttl < 0) &&
2362 (pgc->ttl - ttl_decrement > 0) ) 2117 (pr->ttl - ttl_decrement > 0) )
2363 { 2118 {
2364#if DEBUG_FS 2119#if DEBUG_FS
2365 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 2120 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2367,400 +2122,177 @@ handle_p2p_get (void *cls,
2367 GNUNET_i2s (other)); 2122 GNUNET_i2s (other));
2368#endif 2123#endif
2369 /* integer underflow => drop (should be very rare)! */ 2124 /* integer underflow => drop (should be very rare)! */
2370 if (NULL != pgc->bf) 2125 GNUNET_free (pr);
2371 GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2372 GNUNET_free (pgc);
2373 return GNUNET_OK; 2126 return GNUNET_OK;
2127 }
2128 pr->ttl -= ttl_decrement;
2129 pr->start_time = GNUNET_TIME_absolute_get ();
2130
2131 /* get bloom filter */
2132 if (bfsize > 0)
2133 {
2134 pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
2135 bfsize,
2136 BLOOMFILTER_K);
2137 pr->bf_size = bfsize;
2374 } 2138 }
2375 pgc->ttl -= ttl_decrement;
2376 pgc->start_time = GNUNET_TIME_absolute_get ();
2377 preference = (double) pgc->priority;
2378 if (preference < QUERY_BANDWIDTH_VALUE)
2379 preference = QUERY_BANDWIDTH_VALUE;
2380 // FIXME: also reserve bandwidth for reply?
2381 (void) GNUNET_CORE_peer_change_preference (sched, cfg,
2382 other,
2383 GNUNET_TIME_UNIT_FOREVER_REL,
2384 0, 0, preference, NULL, NULL);
2385 if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2386 pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2387 &ds_get_request,
2388 pgc);
2389 else
2390 GNUNET_SCHEDULER_add_continuation (sched,
2391 &forward_get_request,
2392 pgc,
2393 GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2394 return GNUNET_OK;
2395}
2396 2139
2140 /* FIXME: check somewhere if request already exists, and if so,
2141 recycle old state... */
2142 pre = GNUNET_malloc (sizeof (struct PeerRequestEntry));
2143 pre->cp = cp;
2144 pre->req = pr;
2145 GNUNET_CONTAINER_multihashmap_put (query_request_map,
2146 &gm->query,
2147 pre,
2148 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2149
2150 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
2151 pr,
2152 GNUNET_TIME_absolute_get().value + pr->ttl);
2397 2153
2398/**
2399 * Function called to notify us that we can now transmit a reply to a
2400 * client or peer. "buf" will be NULL and "size" zero if the socket was
2401 * closed for writing in the meantime.
2402 *
2403 * @param cls closure, points to a "struct PendingRequest*" with
2404 * one or more pending replies
2405 * @param size number of bytes available in buf
2406 * @param buf where the callee should write the message
2407 * @return number of bytes written to buf
2408 */
2409static size_t
2410transmit_result (void *cls,
2411 size_t size,
2412 void *buf)
2413{
2414 struct PendingRequest *pr = cls;
2415 char *cbuf = buf;
2416 struct PendingMessage *reply;
2417 size_t ret;
2418 2154
2419 ret = 0; 2155 /* calculate change in traffic preference */
2420 while (NULL != (reply = pr->replies_pending)) 2156 preference = (double) pr->priority;
2421 { 2157 if (preference < QUERY_BANDWIDTH_VALUE)
2422 if ( (reply->msize + ret < ret) || 2158 preference = QUERY_BANDWIDTH_VALUE;
2423 (reply->msize + ret > size) ) 2159 cps->inc_preference += preference;
2424 break;
2425 pr->replies_pending = reply->next;
2426 memcpy (&cbuf[ret], &reply[1], reply->msize);
2427 ret += reply->msize;
2428 GNUNET_free (reply);
2429 }
2430 return ret;
2431}
2432 2160
2161 /* process locally */
2162 type = pr->type;
2163 if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2164 type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2165 timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2166 (pr->priority + 1));
2167 pr->drq = GNUNET_FS_drq_get (&gm->query,
2168 pr->type,
2169 &process_local_reply,
2170 pr,
2171 timeout);
2433 2172
2434/** 2173 /* Are multiple results possible? If so, start processing remotely now! */
2435 * We have received a reply; handle it! 2174 switch (pr->type)
2436 *
2437 * @param cls response (struct ProcessReplyClosure)
2438 * @param key our query
2439 * @param value value in the hash map (meta-info about the query)
2440 * @return GNUNET_YES (we should continue to iterate)
2441 */
2442static int
2443process_reply (void *cls,
2444 const GNUNET_HashCode * key,
2445 void *value)
2446{
2447 struct ProcessReplyClosure *prq = cls;
2448 struct PendingRequest *pr = value;
2449 struct PendingRequest *eer;
2450 struct PendingMessage *reply;
2451 struct PutMessage *pm;
2452 struct ContentMessage *cm;
2453 struct ConnectedPeer *cp;
2454 GNUNET_HashCode chash;
2455 GNUNET_HashCode mhash;
2456 struct GNUNET_PeerIdentity target;
2457 size_t msize;
2458 uint32_t prio;
2459 struct GNUNET_TIME_Relative max_delay;
2460
2461 GNUNET_CRYPTO_hash (prq->data,
2462 prq->size,
2463 &chash);
2464 switch (prq->type)
2465 { 2175 {
2466 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: 2176 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2467 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: 2177 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2178 /* only one result, wait for datastore */
2468 break; 2179 break;
2469 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: 2180 default:
2470 /* FIXME: does prq->namespace match our expectations? */ 2181 pr->task = GNUNET_SCHEDULER_add_now (sched,
2471 /* then: fall-through??? */ 2182 &forward_request_task,
2472 case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: 2183 pr);
2473 if (pr->bf != NULL)
2474 {
2475 mingle_hash (&chash, pr->mingle, &mhash);
2476 if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2477 &mhash))
2478 return GNUNET_YES; /* duplicate */
2479 GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2480 &mhash);
2481 }
2482 break;
2483 case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2484 // FIXME: any checks against duplicates for SKBlocks?
2485 break;
2486 }
2487 prio = pr->priority;
2488 prq->priority += pr->remaining_priority;
2489 pr->remaining_priority = 0;
2490 if (pr->client != NULL)
2491 {
2492 if (pr->replies_seen_size == pr->replies_seen_off)
2493 GNUNET_array_grow (pr->replies_seen,
2494 pr->replies_seen_size,
2495 pr->replies_seen_size * 2 + 4);
2496 pr->replies_seen[pr->replies_seen_off++] = chash;
2497 // FIXME: possibly recalculate BF!
2498 } 2184 }
2499 if (pr->client == NULL)
2500 {
2501 GNUNET_PEER_resolve (pr->source_pid,
2502 &target);
2503 cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2504 &target.hashPubKey);
2505 msize = sizeof (struct ContentMessage) + prq->size;
2506 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2507 reply->msize = msize;
2508 reply->priority = (uint32_t) -1; /* send replies first! */
2509 cm = (struct ContentMessage*) &reply[1];
2510 cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2511 cm->header.size = htons (msize);
2512 cm->type = htonl (prq->type);
2513 cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2514 memcpy (&reply[1], prq->data, prq->size);
2515 max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2516 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
2517 {
2518 /* estimate expiration time from time difference between
2519 first request that will be discarded and this request */
2520 eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2521 max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
2522 eer->start_time);
2523 }
2524 2185
2525 if (cp == NULL) 2186 /* make sure we don't track too many requests */
2526 { 2187 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
2527 /* FIXME: bound queue size! */
2528 reply->next = pr->replies_pending;
2529 pr->replies_pending = reply;
2530 if (pr->cth == NULL)
2531 {
2532 /* implicitly tries to connect */
2533 pr->cth = GNUNET_CORE_notify_transmit_ready (core,
2534 prio,
2535 max_delay,
2536 &target,
2537 msize,
2538 &transmit_result,
2539 pr);
2540 }
2541 }
2542 else
2543 {
2544 /* insert replies always at the head */
2545 reply->next = cp->pending_messages;
2546 cp->pending_messages = reply;
2547 if (cp->cth == NULL)
2548 cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2549 reply->priority,
2550 GNUNET_TIME_UNIT_FOREVER_REL,
2551 &target,
2552 msize,
2553 &transmit_request_cb,
2554 cp);
2555 }
2556 }
2557 else
2558 { 2188 {
2559 msize = sizeof (struct PutMessage) + prq->size; 2189 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
2560 reply = GNUNET_malloc (msize + sizeof (struct PendingMessage)); 2190 destroy_pending_request (pr);
2561 reply->msize = msize;
2562 reply->next = pr->replies_pending;
2563 pm = (struct PutMessage*) &reply[1];
2564 pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2565 pm->header.size = htons (msize);
2566 pm->type = htonl (prq->type);
2567 pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
2568 pr->replies_pending = reply;
2569 memcpy (&reply[1], prq->data, prq->size);
2570 if (pr->th != NULL)
2571 return GNUNET_YES;
2572 pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
2573 msize,
2574 GNUNET_TIME_UNIT_FOREVER_REL,
2575 &transmit_result,
2576 pr);
2577 if (pr->th == NULL)
2578 {
2579 // FIXME: need to try again later (not much
2580 // to do here specifically, but we need to
2581 // check somewhere else to handle this case!)
2582 }
2583 } 2191 }
2584 // FIXME: implement hot-path routing statistics keeping!
2585 return GNUNET_YES;
2586}
2587
2588
2589/**
2590 * Check if the given KBlock is well-formed.
2591 *
2592 * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
2593 * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
2594 * @param query where to store the query that this block answers
2595 * @return GNUNET_OK if this is actually a well-formed KBlock
2596 */
2597static int
2598check_kblock (const struct KBlock *kb,
2599 size_t dsize,
2600 GNUNET_HashCode *query)
2601{
2602 if (dsize < sizeof (struct KBlock))
2603 {
2604 GNUNET_break_op (0);
2605 return GNUNET_SYSERR;
2606 }
2607 if (dsize - sizeof (struct KBlock) !=
2608 ntohs (kb->purpose.size)
2609 - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose)
2610 - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) )
2611 {
2612 GNUNET_break_op (0);
2613 return GNUNET_SYSERR;
2614 }
2615 if (GNUNET_OK !=
2616 GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
2617 &kb->purpose,
2618 &kb->signature,
2619 &kb->keyspace))
2620 {
2621 GNUNET_break_op (0);
2622 return GNUNET_SYSERR;
2623 }
2624 if (query != NULL)
2625 GNUNET_CRYPTO_hash (&kb->keyspace,
2626 sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2627 query);
2628 return GNUNET_OK; 2192 return GNUNET_OK;
2629} 2193}
2630 2194
2631 2195
2632/** 2196/* **************************** CS GET Handling ************************ */
2633 * Check if the given SBlock is well-formed.
2634 *
2635 * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
2636 * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
2637 * @param query where to store the query that this block answers
2638 * @param namespace where to store the namespace that this block belongs to
2639 * @return GNUNET_OK if this is actually a well-formed SBlock
2640 */
2641static int
2642check_sblock (const struct SBlock *sb,
2643 size_t dsize,
2644 GNUNET_HashCode *query,
2645 GNUNET_HashCode *namespace)
2646{
2647 if (dsize < sizeof (struct SBlock))
2648 {
2649 GNUNET_break_op (0);
2650 return GNUNET_SYSERR;
2651 }
2652 if (dsize !=
2653 ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
2654 {
2655 GNUNET_break_op (0);
2656 return GNUNET_SYSERR;
2657 }
2658 if (GNUNET_OK !=
2659 GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
2660 &sb->purpose,
2661 &sb->signature,
2662 &sb->subspace))
2663 {
2664 GNUNET_break_op (0);
2665 return GNUNET_SYSERR;
2666 }
2667 if (query != NULL)
2668 *query = sb->identifier;
2669 if (namespace != NULL)
2670 GNUNET_CRYPTO_hash (&sb->subspace,
2671 sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2672 namespace);
2673 return GNUNET_OK;
2674}
2675 2197
2676 2198
2677/** 2199/**
2678 * Handle P2P "PUT" request. 2200 * Handle START_SEARCH-message (search request from client).
2679 * 2201 *
2680 * @param cls closure, always NULL 2202 * @param cls closure
2681 * @param other the other peer involved (sender or receiver, NULL 2203 * @param client identification of the client
2682 * for loopback messages where we are both sender and receiver)
2683 * @param message the actual message 2204 * @param message the actual message
2684 * @param latency reported latency of the connection with 'other'
2685 * @param distance reported distance (DV) to 'other'
2686 * @return GNUNET_OK to keep the connection open,
2687 * GNUNET_SYSERR to close it (signal serious error)
2688 */ 2205 */
2689static int 2206static void
2690handle_p2p_put (void *cls, 2207handle_start_search (void *cls,
2691 const struct GNUNET_PeerIdentity *other, 2208 struct GNUNET_SERVER_Client *client,
2692 const struct GNUNET_MessageHeader *message, 2209 const struct GNUNET_MessageHeader *message)
2693 struct GNUNET_TIME_Relative latency,
2694 uint32_t distance)
2695{ 2210{
2696 const struct PutMessage *put; 2211 static GNUNET_HashCode all_zeros;
2212 const struct SearchMessage *sm;
2213 struct ClientList *cl;
2214 struct ClientRequestList *crl;
2215 struct PendingRequest *pr;
2697 uint16_t msize; 2216 uint16_t msize;
2698 size_t dsize; 2217 unsigned int sc;
2699 uint32_t type; 2218 uint32_t type;
2700 struct GNUNET_TIME_Absolute expiration; 2219
2701 GNUNET_HashCode query;
2702 struct ProcessReplyClosure prq;
2703
2704 msize = ntohs (message->size); 2220 msize = ntohs (message->size);
2705 if (msize < sizeof (struct PutMessage)) 2221 if ( (msize < sizeof (struct SearchMessage)) ||
2222 (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2706 { 2223 {
2707 GNUNET_break_op(0); 2224 GNUNET_break (0);
2708 return GNUNET_SYSERR; 2225 GNUNET_SERVER_receive_done (client,
2226 GNUNET_SYSERR);
2227 return;
2709 } 2228 }
2710 put = (const struct PutMessage*) message; 2229 sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2711 dsize = msize - sizeof (struct PutMessage); 2230 sm = (const struct SearchMessage*) message;
2712 type = ntohl (put->type);
2713 expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
2714 2231
2715 /* first, validate! */ 2232 cl = client_list;
2233 while ( (cl != NULL) &&
2234 (cl->client != client) )
2235 cl = cl->next;
2236 if (cl == NULL)
2237 {
2238 cl = GNUNET_malloc (sizeof (struct ClientList));
2239 cl->client = client;
2240 GNUNET_SERVER_client_keep (client);
2241 cl->next = client_list;
2242 client_list = cl;
2243 }
2244 type = ntohl (sm->type);
2245
2246 /* FIXME: detect duplicate request; if duplicate, simply update (merge)
2247 'pr->replies_seen'! */
2248 pr = GNUNET_malloc (sizeof (struct PendingRequest) +
2249 (type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)?sizeof(GNUNET_HashCode):0);
2250 crl = GNUNET_malloc (sizeof (struct ClientRequestList));
2251 crl->client_list = cl;
2252 GNUNET_CONTAINER_DLL_insert (cl->rl_head,
2253 cl->rl_tail,
2254 crl);
2255 crl->req = pr;
2256 pr->type = type;
2257 pr->client_request_list = crl;
2258 GNUNET_array_grow (pr->replies_seen,
2259 pr->replies_seen_size,
2260 sc);
2261 memcpy (pr->replies_seen,
2262 &sm[1],
2263 sc * sizeof (GNUNET_HashCode));
2264 pr->replies_seen_off = sc;
2265 pr->anonymity_level = ntohl (sm->anonymity_level);
2266 pr->mingle = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
2267 (uint32_t) -1);
2268 pr->query = sm->query;
2716 switch (type) 2269 switch (type)
2717 { 2270 {
2718 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK: 2271 case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2719 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK: 2272 case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2720 GNUNET_CRYPTO_hash (&put[1], dsize, &query); 2273 if (0 != memcmp (&sm->target,
2721 break; 2274 &all_zeros,
2722 case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK: 2275 sizeof (GNUNET_HashCode)))
2723 if (GNUNET_OK != 2276 pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
2724 check_kblock ((const struct KBlock*) &put[1],
2725 dsize,
2726 &query))
2727 return GNUNET_SYSERR;
2728 break; 2277 break;
2729 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK: 2278 case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2730 if (GNUNET_OK != 2279 pr->namespace = (GNUNET_HashCode*) &pr[1];
2731 check_sblock ((const struct SBlock*) &put[1], 2280 memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
2732 dsize,
2733 &query,
2734 &prq.namespace))
2735 return GNUNET_SYSERR;
2736 break; 2281 break;
2737 case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2738 // FIXME -- validate SKBLOCK!
2739 GNUNET_break (0);
2740 return GNUNET_OK;
2741 default: 2282 default:
2742 /* unknown block type */ 2283 break;
2743 GNUNET_break_op (0);
2744 return GNUNET_SYSERR;
2745 } 2284 }
2746 2285 pr->drq = GNUNET_FS_drq_get (&sm->query,
2747 /* now, lookup 'query' */ 2286 pr->type,
2748 prq.data = (const void*) &put[1]; 2287 &process_local_reply,
2749 prq.size = dsize; 2288 pr,
2750 prq.type = type; 2289 GNUNET_TIME_UNIT_FOREVER_REL);
2751 prq.expiration = expiration;
2752 prq.priority = 0;
2753 GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
2754 &query,
2755 &process_reply,
2756 &prq);
2757 // FIXME: if migration is on and load is low,
2758 // queue to store data in datastore;
2759 // use "prq.priority" for that!
2760 return GNUNET_OK;
2761} 2290}
2762 2291
2763 2292
2293/* **************************** Startup ************************ */
2294
2295
2764/** 2296/**
2765 * List of handlers for P2P messages 2297 * List of handlers for P2P messages
2766 * that we care about. 2298 * that we care about.
@@ -2776,6 +2308,23 @@ static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2776 2308
2777 2309
2778/** 2310/**
2311 * List of handlers for the messages understood by this
2312 * service.
2313 */
2314static struct GNUNET_SERVER_MessageHandler handlers[] = {
2315 {&GNUNET_FS_handle_index_start, NULL,
2316 GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2317 {&GNUNET_FS_handle_index_list_get, NULL,
2318 GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2319 {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX,
2320 sizeof (struct UnindexMessage) },
2321 {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH,
2322 0 },
2323 {NULL, NULL, 0, 0}
2324};
2325
2326
2327/**
2779 * Process fs requests. 2328 * Process fs requests.
2780 * 2329 *
2781 * @param cls closure 2330 * @param cls closure
@@ -2783,22 +2332,13 @@ static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2783 * @param server the initialized server 2332 * @param server the initialized server
2784 * @param c configuration to use 2333 * @param c configuration to use
2785 */ 2334 */
2786static void 2335static int
2787run (void *cls, 2336main_init (struct GNUNET_SCHEDULER_Handle *s,
2788 struct GNUNET_SCHEDULER_Handle *s, 2337 struct GNUNET_SERVER_Handle *server,
2789 struct GNUNET_SERVER_Handle *server, 2338 const struct GNUNET_CONFIGURATION_Handle *c)
2790 const struct GNUNET_CONFIGURATION_Handle *c)
2791{ 2339{
2792 sched = s; 2340 sched = s;
2793 cfg = c; 2341 cfg = c;
2794
2795 requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2796 requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2797 connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
2798 requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2799 GNUNET_FS_init_indexing (sched, cfg);
2800 dsh = GNUNET_DATASTORE_connect (cfg,
2801 sched);
2802 core = GNUNET_CORE_connect (sched, 2342 core = GNUNET_CORE_connect (sched,
2803 cfg, 2343 cfg,
2804 GNUNET_TIME_UNIT_FOREVER_REL, 2344 GNUNET_TIME_UNIT_FOREVER_REL,
@@ -2810,7 +2350,16 @@ run (void *cls,
2810 NULL, GNUNET_NO, 2350 NULL, GNUNET_NO,
2811 NULL, GNUNET_NO, 2351 NULL, GNUNET_NO,
2812 p2p_handlers); 2352 p2p_handlers);
2813 2353 if (NULL == core)
2354 {
2355 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2356 _("Failed to connect to `%s' service.\n"),
2357 "core");
2358 return GNUNET_SYSERR;
2359 }
2360 query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2361 peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2362 requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
2814 GNUNET_SERVER_disconnect_notify (server, 2363 GNUNET_SERVER_disconnect_notify (server,
2815 &handle_client_disconnect, 2364 &handle_client_disconnect,
2816 NULL); 2365 NULL);
@@ -2819,21 +2368,30 @@ run (void *cls,
2819 GNUNET_TIME_UNIT_FOREVER_REL, 2368 GNUNET_TIME_UNIT_FOREVER_REL,
2820 &shutdown_task, 2369 &shutdown_task,
2821 NULL); 2370 NULL);
2822 if (NULL == dsh) 2371 return GNUNET_OK;
2823 { 2372}
2824 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2373
2825 _("Failed to connect to `%s' service.\n"), 2374
2826 "datastore"); 2375/**
2827 GNUNET_SCHEDULER_shutdown (sched); 2376 * Process fs requests.
2828 return; 2377 *
2829 } 2378 * @param cls closure
2830 if (NULL == core) 2379 * @param sched scheduler to use
2831 { 2380 * @param server the initialized server
2832 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 2381 * @param cfg configuration to use
2833 _("Failed to connect to `%s' service.\n"), 2382 */
2834 "core"); 2383static void
2384run (void *cls,
2385 struct GNUNET_SCHEDULER_Handle *sched,
2386 struct GNUNET_SERVER_Handle *server,
2387 const struct GNUNET_CONFIGURATION_Handle *cfg)
2388{
2389 if ( (GNUNET_OK != GNUNET_FS_drq_init (sched, cfg)) ||
2390 (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg)) ||
2391 (GNUNET_OK != main_init (sched, server, cfg)) )
2392 {
2835 GNUNET_SCHEDULER_shutdown (sched); 2393 GNUNET_SCHEDULER_shutdown (sched);
2836 return; 2394 return;
2837 } 2395 }
2838} 2396}
2839 2397
diff --git a/src/fs/gnunet-service-fs_drq.c b/src/fs/gnunet-service-fs_drq.c
new file mode 100644
index 000000000..c15e37a0d
--- /dev/null
+++ b/src/fs/gnunet-service-fs_drq.c
@@ -0,0 +1,416 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file fs/gnunet-service-fs_drq.c
23 * @brief queueing of requests to the datastore service
24 * @author Christian Grothoff
25 */
26#include "platform.h"
27#include "gnunet-service-fs_drq.h"
28
29
30/**
31 * Signature of a function that is called whenever a datastore
32 * request can be processed (or an entry put on the queue times out).
33 *
34 * @param cls closure
35 * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
36 */
37typedef void (*RequestFunction)(void *cls,
38 int ok);
39
40
41/**
42 * Doubly-linked list of our requests for the datastore.
43 */
44struct DatastoreRequestQueue
45{
46
47 /**
48 * This is a doubly-linked list.
49 */
50 struct DatastoreRequestQueue *next;
51
52 /**
53 * This is a doubly-linked list.
54 */
55 struct DatastoreRequestQueue *prev;
56
57 /**
58 * Function to call (will issue the request).
59 */
60 RequestFunction req;
61
62 /**
63 * Closure for req.
64 */
65 void *req_cls;
66
67 /**
68 * When should this request time-out because we don't care anymore?
69 */
70 struct GNUNET_TIME_Absolute timeout;
71
72 /**
73 * ID of task used for signaling timeout.
74 */
75 GNUNET_SCHEDULER_TaskIdentifier task;
76
77};
78
79/**
80 * Our scheduler.
81 */
82static struct GNUNET_SCHEDULER_Handle *sched;
83
84/**
85 * Our configuration.
86 */
87const struct GNUNET_CONFIGURATION_Handle *cfg;
88
89/**
90 * Head of request queue for the datastore, sorted by timeout.
91 */
92static struct DatastoreRequestQueue *drq_head;
93
94/**
95 * Tail of request queue for the datastore.
96 */
97static struct DatastoreRequestQueue *drq_tail;
98
99/**
100 * Our connection to the datastore.
101 */
102static struct GNUNET_DATASTORE_Handle *dsh;
103
104
105/**
106 * Run the next DS request in our
107 * queue, we're done with the current one.
108 */
109static void
110next_ds_request ()
111{
112 struct DatastoreRequestQueue *e;
113
114 while (NULL != (e = drq_head))
115 {
116 if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
117 break;
118 if (e->task != GNUNET_SCHEDULER_NO_TASK)
119 GNUNET_SCHEDULER_cancel (sched, e->task);
120 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
121 e->req (e->req_cls, GNUNET_NO);
122 GNUNET_free (e);
123 }
124 if (e == NULL)
125 return;
126 if (e->task != GNUNET_SCHEDULER_NO_TASK)
127 GNUNET_SCHEDULER_cancel (sched, e->task);
128 e->task = GNUNET_SCHEDULER_NO_TASK;
129 e->req (e->req_cls, GNUNET_YES);
130 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
131 GNUNET_free (e);
132}
133
134
135/**
136 * A datastore request had to be timed out.
137 *
138 * @param cls closure (of type "struct DatastoreRequestQueue*")
139 * @param tc task context, unused
140 */
141static void
142timeout_ds_request (void *cls,
143 const struct GNUNET_SCHEDULER_TaskContext *tc)
144{
145 struct DatastoreRequestQueue *e = cls;
146
147 e->task = GNUNET_SCHEDULER_NO_TASK;
148 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
149 e->req (e->req_cls, GNUNET_NO);
150 GNUNET_free (e);
151}
152
153
154static void
155dequeue_ds_request (struct DatastoreRequestQueue *req)
156{
157 if (req->task != GNUNET_SCHEDULER_NO_TASK)
158 GNUNET_SCHEDULER_cancel (sched, req->task);
159 GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, req);
160 GNUNET_free (req);
161}
162
163
164/**
165 * Queue a request for the datastore.
166 *
167 * @param deadline by when the request should run
168 * @param fun function to call once the request can be run
169 * @param fun_cls closure for fun
170 * @return handle that can be used to dequeue the request
171 */
172static struct DatastoreRequestQueue *
173queue_ds_request (struct GNUNET_TIME_Relative deadline,
174 RequestFunction fun,
175 void *fun_cls)
176{
177 struct DatastoreRequestQueue *e;
178 struct DatastoreRequestQueue *bef;
179
180 if (drq_head == NULL)
181 {
182 /* no other requests pending, run immediately */
183 // FIXME: should probably use scheduler nevertheless
184 // and return non-null!
185 fun (fun_cls, GNUNET_OK);
186 return NULL;
187 }
188 e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
189 e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
190 e->req = fun;
191 e->req_cls = fun_cls;
192 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
193 {
194 /* local request, highest prio, put at head of queue
195 regardless of deadline */
196 bef = NULL;
197 }
198 else
199 {
200 bef = drq_tail;
201 while ( (NULL != bef) &&
202 (e->timeout.value < bef->timeout.value) )
203 bef = bef->prev;
204 }
205 GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
206 if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
207 return e;
208 e->task = GNUNET_SCHEDULER_add_delayed (sched,
209 deadline,
210 &timeout_ds_request,
211 e);
212 return e;
213}
214
215
216/**
217 * Task run during shutdown.
218 *
219 * @param cls unused
220 * @param tc unused
221 */
222static void
223shutdown_task (void *cls,
224 const struct GNUNET_SCHEDULER_TaskContext *tc)
225{
226 struct DatastoreRequestQueue *drq;
227
228 GNUNET_assert (NULL != dsh);
229 GNUNET_DATASTORE_disconnect (dsh,
230 GNUNET_NO);
231 dsh = NULL;
232 while (NULL != (drq = drq_head))
233 {
234 drq_head = drq->next;
235 drq->req (drq->req_cls, GNUNET_NO);
236 dequeue_ds_request (drq);
237 }
238 drq_tail = NULL;
239}
240
241
242struct GetClosure
243{
244 GNUNET_HashCode key;
245 uint32_t type;
246 GNUNET_DATASTORE_Iterator iter;
247 void *iter_cls;
248 struct GNUNET_TIME_Absolute timeout;
249};
250
251
252static void
253get_iterator (void *cls,
254 const GNUNET_HashCode * key,
255 uint32_t size,
256 const void *data,
257 uint32_t type,
258 uint32_t priority,
259 uint32_t anonymity,
260 struct GNUNET_TIME_Absolute
261 expiration,
262 uint64_t uid)
263{
264 struct GetClosure *gc = cls;
265
266 gc->iter (gc->iter_cls,
267 key, size, data, type,
268 priority, anonymity, expiration, uid);
269 if (key == NULL)
270 {
271 next_ds_request ();
272 GNUNET_free (gc);
273 }
274}
275
276
277static void
278do_get (void *cls,
279 int ok)
280{
281 struct GetClosure *gc = cls;
282
283 if (ok != GNUNET_OK)
284 {
285 gc->iter (gc->iter_cls,
286 NULL, 0, NULL, 0, 0, 0,
287 GNUNET_TIME_UNIT_ZERO_ABS, 0);
288 GNUNET_free (gc);
289 next_ds_request ();
290 return;
291 }
292 GNUNET_DATASTORE_get (dsh, &gc->key, gc->type,
293 &get_iterator,
294 gc,
295 GNUNET_TIME_absolute_get_remaining(gc->timeout));
296}
297
298
299/**
300 * Iterate over the results for a particular key
301 * in the datastore. The iterator will only be called
302 * once initially; if the first call did contain a
303 * result, further results can be obtained by calling
304 * "GNUNET_DATASTORE_get_next" with the given argument.
305 *
306 * @param key maybe NULL (to match all entries)
307 * @param type desired type, 0 for any
308 * @param iter function to call on each matching value;
309 * will be called once with a NULL value at the end
310 * @param iter_cls closure for iter
311 * @param timeout how long to wait at most for a response
312 */
313struct DatastoreRequestQueue *
314GNUNET_FS_drq_get (const GNUNET_HashCode * key,
315 uint32_t type,
316 GNUNET_DATASTORE_Iterator iter,
317 void *iter_cls,
318 struct GNUNET_TIME_Relative timeout)
319{
320 struct GetClosure *gc;
321
322 gc = GNUNET_malloc (sizeof (struct GetClosure));
323 gc->key = *key;
324 gc->type = type;
325 gc->iter = iter;
326 gc->iter_cls = iter_cls;
327 gc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
328 return queue_ds_request (timeout,
329 &do_get,
330 gc);
331}
332
333
334void
335GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq)
336{
337 dequeue_ds_request (drq);
338}
339
340
341/**
342 * Function called to trigger obtaining the next result
343 * from the datastore.
344 *
345 * @param more GNUNET_YES to get more results, GNUNET_NO to abort
346 * iteration (with a final call to "iter" with key/data == NULL).
347 */
348void
349GNUNET_FS_drq_get_next (int more)
350{
351 GNUNET_DATASTORE_get_next (dsh, more);
352}
353
354
355/**
356 * Explicitly remove some content from the database.
357 * The "cont"inuation will be called with status
358 * "GNUNET_OK" if content was removed, "GNUNET_NO"
359 * if no matching entry was found and "GNUNET_SYSERR"
360 * on all other types of errors.
361 *
362 * @param key key for the value
363 * @param size number of bytes in data
364 * @param data content stored
365 * @param cont continuation to call when done
366 * @param cont_cls closure for cont
367 * @param timeout how long to wait at most for a response
368 */
369void
370GNUNET_FS_drq_remove (const GNUNET_HashCode *key,
371 uint32_t size, const void *data,
372 GNUNET_DATASTORE_ContinuationWithStatus cont,
373 void *cont_cls,
374 struct GNUNET_TIME_Relative timeout)
375{
376 if (dsh == NULL)
377 {
378 GNUNET_break (0);
379 return;
380 }
381 GNUNET_DATASTORE_remove (dsh, key, size, data,
382 cont, cont_cls, timeout);
383}
384
385
386/**
387 * Setup datastore request queues.
388 *
389 * @param s scheduler to use
390 * @param c configuration to use
391 * @return GNUNET_OK on success
392 */
393int
394GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s,
395 const struct GNUNET_CONFIGURATION_Handle *c)
396{
397 sched = s;
398 cfg = c;
399 dsh = GNUNET_DATASTORE_connect (cfg,
400 sched);
401 if (NULL == dsh)
402 {
403 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
404 _("Failed to connect to `%s' service.\n"),
405 "datastore");
406 return GNUNET_SYSERR;
407 }
408 GNUNET_SCHEDULER_add_delayed (sched,
409 GNUNET_TIME_UNIT_FOREVER_REL,
410 &shutdown_task,
411 NULL);
412 return GNUNET_OK;
413}
414
415
416/* end of gnunet-service-fs_drq.c */
diff --git a/src/fs/gnunet-service-fs_drq.h b/src/fs/gnunet-service-fs_drq.h
new file mode 100644
index 000000000..306db9bad
--- /dev/null
+++ b/src/fs/gnunet-service-fs_drq.h
@@ -0,0 +1,137 @@
1/*
2 This file is part of GNUnet.
3 (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 2, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file fs/gnunet-service-fs_drq.h
23 * @brief queueing of requests to the datastore service
24 * @author Christian Grothoff
25 */
26#ifndef GNUNET_SERVICE_FS_DRQ_H
27#define GNUNET_SERVICE_FS_DRQ_H
28
29#include "gnunet_datastore_service.h"
30#include "gnunet_util_lib.h"
31
32
33/**
34 * Handle for pending, abortable requests for the datastore.
35 */
36struct DatastoreRequestQueue;
37
38
39/**
40 * Iterate over the results for a particular key
41 * in the datastore. The iterator will only be called
42 * once initially; if the first call did contain a
43 * result, further results can be obtained by calling
44 * "GNUNET_DATASTORE_get_next" with the given argument.
45 *
46 * @param key maybe NULL (to match all entries)
47 * @param type desired type, 0 for any
48 * @param iter function to call on each matching value;
49 * will be called once with a NULL value at the end
50 * @param iter_cls closure for iter
51 * @param timeout how long to wait at most for a response
52 */
53struct DatastoreRequestQueue *
54GNUNET_FS_drq_get (const GNUNET_HashCode * key,
55 uint32_t type,
56 GNUNET_DATASTORE_Iterator iter,
57 void *iter_cls,
58 struct GNUNET_TIME_Relative timeout);
59
60
61
62void
63GNUNET_FS_drq_get_cancel (struct DatastoreRequestQueue *drq);
64
65
66/**
67 * Function called to trigger obtaining the next result
68 * from the datastore. Must be called (directly or indirectly)
69 * from the 'iter' callback given to 'GNUNET_FS_drq_get'.
70 * Not calling 'get_next' means no other datastore
71 * interactions (other than remove) will happen.
72 *
73 * @param more GNUNET_YES to get more results, GNUNET_NO to abort
74 * iteration (with a final call to "iter" with key/data == NULL).
75 */
76void
77GNUNET_FS_drq_get_next (int more);
78
79
80/**
81 * Explicitly remove some content from the database.
82 * The "cont"inuation will be called with status
83 * "GNUNET_OK" if content was removed, "GNUNET_NO"
84 * if no matching entry was found and "GNUNET_SYSERR"
85 * on all other types of errors.
86 *
87 * @param key key for the value
88 * @param size number of bytes in data
89 * @param data content stored
90 * @param cont continuation to call when done
91 * @param cont_cls closure for cont
92 * @param timeout how long to wait at most for a response
93 */
94void
95GNUNET_FS_drq_remove (const GNUNET_HashCode *key,
96 uint32_t size, const void *data,
97 GNUNET_DATASTORE_ContinuationWithStatus cont,
98 void *cont_cls,
99 struct GNUNET_TIME_Relative timeout);
100
101
102
103/**
104 * Explicitly remove some content from the database.
105 * The "cont"inuation will be called with status
106 * "GNUNET_OK" if content was removed, "GNUNET_NO"
107 * if no matching entry was found and "GNUNET_SYSERR"
108 * on all other types of errors.
109 *
110 * @param key key for the value
111 * @param size number of bytes in data
112 * @param data content stored
113 * @param cont continuation to call when done
114 * @param cont_cls closure for cont
115 * @param timeout how long to wait at most for a response
116 */
117void
118GNUNET_FS_drq_remove (const GNUNET_HashCode *key,
119 uint32_t size, const void *data,
120 GNUNET_DATASTORE_ContinuationWithStatus cont,
121 void *cont_cls,
122 struct GNUNET_TIME_Relative timeout);
123/**
124 * Setup datastore request queues.
125 *
126 * @param s scheduler to use
127 * @param c configuration to use
128 * @return GNUNET_OK on success
129 */
130int
131GNUNET_FS_drq_init (struct GNUNET_SCHEDULER_Handle *s,
132 const struct GNUNET_CONFIGURATION_Handle *c);
133
134
135
136/* end of gnunet-service-fs_drq.h */
137#endif
diff --git a/src/fs/gnunet-service-fs_indexing.c b/src/fs/gnunet-service-fs_indexing.c
index ebd7114d3..7dea53ee9 100644
--- a/src/fs/gnunet-service-fs_indexing.c
+++ b/src/fs/gnunet-service-fs_indexing.c
@@ -34,6 +34,7 @@
34#include "gnunet_protocols.h" 34#include "gnunet_protocols.h"
35#include "gnunet_signatures.h" 35#include "gnunet_signatures.h"
36#include "gnunet_util_lib.h" 36#include "gnunet_util_lib.h"
37#include "gnunet-service-fs_drq.h"
37#include "gnunet-service-fs_indexing.h" 38#include "gnunet-service-fs_indexing.h"
38#include "fs.h" 39#include "fs.h"
39 40
@@ -508,13 +509,10 @@ remove_cont (void *cls,
508 int success, 509 int success,
509 const char *msg) 510 const char *msg)
510{ 511{
511 struct GNUNET_DATASTORE_Handle *dsh = cls;
512
513 if (GNUNET_OK != success) 512 if (GNUNET_OK != success)
514 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 513 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
515 _("Failed to delete bogus block: %s\n"), 514 _("Failed to delete bogus block: %s\n"),
516 msg); 515 msg);
517 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
518} 516}
519 517
520 518
@@ -533,10 +531,11 @@ remove_cont (void *cls,
533 * @param expiration expiration time for the content 531 * @param expiration expiration time for the content
534 * @param uid unique identifier for the datum; 532 * @param uid unique identifier for the datum;
535 * maybe 0 if no unique identifier is available 533 * maybe 0 if no unique identifier is available
536 * @param cont function to call with the actual block 534 * @param cont function to call with the actual block (at most once, on success)
537 * @param cont_cls closure for cont 535 * @param cont_cls closure for cont
536 * @return GNUNET_OK on success
538 */ 537 */
539void 538int
540GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, 539GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
541 uint32_t size, 540 uint32_t size,
542 const void *data, 541 const void *data,
@@ -545,7 +544,6 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
545 uint32_t anonymity, 544 uint32_t anonymity,
546 struct GNUNET_TIME_Absolute 545 struct GNUNET_TIME_Absolute
547 expiration, uint64_t uid, 546 expiration, uint64_t uid,
548 struct GNUNET_DATASTORE_Handle *dsh,
549 GNUNET_DATASTORE_Iterator cont, 547 GNUNET_DATASTORE_Iterator cont,
550 void *cont_cls) 548 void *cont_cls)
551{ 549{
@@ -564,14 +562,13 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
564 if (size != sizeof (struct OnDemandBlock)) 562 if (size != sizeof (struct OnDemandBlock))
565 { 563 {
566 GNUNET_break (0); 564 GNUNET_break (0);
567 GNUNET_DATASTORE_remove (dsh, 565 GNUNET_FS_drq_remove (key,
568 key, 566 size,
569 size, 567 data,
570 data, 568 &remove_cont,
571 &remove_cont, 569 NULL,
572 dsh, 570 GNUNET_TIME_UNIT_FOREVER_REL);
573 GNUNET_TIME_UNIT_FOREVER_REL); 571 return GNUNET_SYSERR;
574 return;
575 } 572 }
576 odb = (const struct OnDemandBlock*) data; 573 odb = (const struct OnDemandBlock*) data;
577 off = GNUNET_ntohll (odb->offset); 574 off = GNUNET_ntohll (odb->offset);
@@ -600,8 +597,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
600 GNUNET_DISK_file_close (fh); 597 GNUNET_DISK_file_close (fh);
601 /* FIXME: if this happens often, we need 598 /* FIXME: if this happens often, we need
602 to remove the OnDemand block from the DS! */ 599 to remove the OnDemand block from the DS! */
603 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 600 return GNUNET_SYSERR;
604 return;
605 } 601 }
606 GNUNET_DISK_file_close (fh); 602 GNUNET_DISK_file_close (fh);
607 GNUNET_CRYPTO_hash (ndata, 603 GNUNET_CRYPTO_hash (ndata,
@@ -626,8 +622,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
626 (unsigned long long) off); 622 (unsigned long long) off);
627 /* FIXME: if this happens often, we need 623 /* FIXME: if this happens often, we need
628 to remove the OnDemand block from the DS! */ 624 to remove the OnDemand block from the DS! */
629 GNUNET_DATASTORE_get_next (dsh, GNUNET_YES); 625 return GNUNET_SYSERR;
630 return;
631 } 626 }
632 cont (cont_cls, 627 cont (cont_cls,
633 key, 628 key,
@@ -638,6 +633,7 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
638 anonymity, 633 anonymity,
639 expiration, 634 expiration,
640 uid); 635 uid);
636 return GNUNET_OK;
641} 637}
642 638
643 639
@@ -671,8 +667,8 @@ shutdown_task (void *cls,
671 * @param s scheduler to use 667 * @param s scheduler to use
672 * @param c configuration to use 668 * @param c configuration to use
673 */ 669 */
674void 670int
675GNUNET_FS_init_indexing (struct GNUNET_SCHEDULER_Handle *s, 671GNUNET_FS_indexing_init (struct GNUNET_SCHEDULER_Handle *s,
676 const struct GNUNET_CONFIGURATION_Handle *c) 672 const struct GNUNET_CONFIGURATION_Handle *c)
677{ 673{
678 sched = s; 674 sched = s;
@@ -683,6 +679,7 @@ GNUNET_FS_init_indexing (struct GNUNET_SCHEDULER_Handle *s,
683 &shutdown_task, 679 &shutdown_task,
684 NULL); 680 NULL);
685 read_index_list (); 681 read_index_list ();
682 return GNUNET_OK;
686} 683}
687 684
688/* end of gnunet-service-fs_indexing.c */ 685/* end of gnunet-service-fs_indexing.c */
diff --git a/src/fs/gnunet-service-fs_indexing.h b/src/fs/gnunet-service-fs_indexing.h
index dc6427234..9749b42a0 100644
--- a/src/fs/gnunet-service-fs_indexing.h
+++ b/src/fs/gnunet-service-fs_indexing.h
@@ -49,11 +49,11 @@
49 * @param expiration expiration time for the content 49 * @param expiration expiration time for the content
50 * @param uid unique identifier for the datum; 50 * @param uid unique identifier for the datum;
51 * maybe 0 if no unique identifier is available 51 * maybe 0 if no unique identifier is available
52 * @param dsh connection to the datastore (to ask for more) 52 * @param cont function to call with the actual block (at most once, on success)
53 * @param cont function to call with the actual block
54 * @param cont_cls closure for cont 53 * @param cont_cls closure for cont
54 * @return GNUNET_OK on success
55 */ 55 */
56void 56int
57GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key, 57GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
58 uint32_t size, 58 uint32_t size,
59 const void *data, 59 const void *data,
@@ -62,7 +62,6 @@ GNUNET_FS_handle_on_demand_block (const GNUNET_HashCode * key,
62 uint32_t anonymity, 62 uint32_t anonymity,
63 struct GNUNET_TIME_Absolute 63 struct GNUNET_TIME_Absolute
64 expiration, uint64_t uid, 64 expiration, uint64_t uid,
65 struct GNUNET_DATASTORE_Handle *dsh,
66 GNUNET_DATASTORE_Iterator cont, 65 GNUNET_DATASTORE_Iterator cont,
67 void *cont_cls); 66 void *cont_cls);
68 67
@@ -112,9 +111,10 @@ GNUNET_FS_handle_unindex (void *cls,
112 * 111 *
113 * @param s scheduler to use 112 * @param s scheduler to use
114 * @param c configuration to use 113 * @param c configuration to use
114 * @return GNUNET_OK on success
115 */ 115 */
116void 116int
117GNUNET_FS_init_indexing (struct GNUNET_SCHEDULER_Handle *s, 117GNUNET_FS_indexing_init (struct GNUNET_SCHEDULER_Handle *s,
118 const struct GNUNET_CONFIGURATION_Handle *c); 118 const struct GNUNET_CONFIGURATION_Handle *c);
119 119
120 120