aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/gnunet-service-fs.c16
-rw-r--r--src/fs/gnunet-service-fs.h6
-rw-r--r--src/fs/gnunet-service-fs_pr.c26
-rw-r--r--src/fs/gnunet-service-fs_stream.c269
4 files changed, 290 insertions, 27 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 3a8a076f6..125a14118 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -116,6 +116,11 @@ struct GNUNET_TIME_Relative GSF_avg_latency = { 500 };
116double GSF_current_priorities; 116double GSF_current_priorities;
117 117
118/** 118/**
119 * Size of the datastore queue we assume for common requests.
120 */
121unsigned int GSF_datastore_queue_size;
122
123/**
119 * How many query messages have we received 'recently' that 124 * How many query messages have we received 'recently' that
120 * have not yet been claimed as cover traffic? 125 * have not yet been claimed as cover traffic?
121 */ 126 */
@@ -615,7 +620,18 @@ static void
615run (void *cls, struct GNUNET_SERVER_Handle *server, 620run (void *cls, struct GNUNET_SERVER_Handle *server,
616 const struct GNUNET_CONFIGURATION_Handle *cfg) 621 const struct GNUNET_CONFIGURATION_Handle *cfg)
617{ 622{
623 unsigned long long dqs;
624
618 GSF_cfg = cfg; 625 GSF_cfg = cfg;
626 if (GNUNET_OK !=
627 GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs", "DATASTORE_QUEUE_SIZE",
628 &dqs))
629 {
630 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
631 "fs", "DATASTORE_QUEUE_SIZE");
632 dqs = 1024;
633 }
634 GSF_datastore_queue_size = (unsigned int) dqs;
619 GSF_enable_randomized_delays = 635 GSF_enable_randomized_delays =
620 GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY"); 636 GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY");
621 GSF_dsh = GNUNET_DATASTORE_connect (cfg); 637 GSF_dsh = GNUNET_DATASTORE_connect (cfg);
diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h
index d198d864d..3213712c8 100644
--- a/src/fs/gnunet-service-fs.h
+++ b/src/fs/gnunet-service-fs.h
@@ -258,6 +258,12 @@ extern struct GNUNET_BLOCK_Context *GSF_block_ctx;
258extern int GSF_enable_randomized_delays; 258extern int GSF_enable_randomized_delays;
259 259
260/** 260/**
261 * Size of the datastore queue we assume for common requests.
262 */
263extern unsigned int GSF_datastore_queue_size;
264
265
266/**
261 * Test if the DATABASE (GET) load on this peer is too high 267 * Test if the DATABASE (GET) load on this peer is too high
262 * to even consider processing the query at 268 * to even consider processing the query at
263 * all. 269 * all.
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 349232daf..9233658eb 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -196,12 +196,6 @@ static int active_to_migration;
196 196
197 197
198/** 198/**
199 * Size of the datastore queue we assume for common requests.
200 * Determined based on the network quota.
201 */
202static unsigned int datastore_queue_size;
203
204/**
205 * Heap with the request that will expire next at the top. Contains 199 * Heap with the request that will expire next at the top. Contains
206 * pointers of type "struct PendingRequest*"; these will *also* be 200 * pointers of type "struct PendingRequest*"; these will *also* be
207 * aliased from the "requests_by_peer" data structures and the 201 * aliased from the "requests_by_peer" data structures and the
@@ -1307,7 +1301,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
1307 (0 != 1301 (0 !=
1308 (GSF_PRO_PRIORITY_UNLIMITED & 1302 (GSF_PRO_PRIORITY_UNLIMITED &
1309 pr->public_data.options)) ? UINT_MAX : 1303 pr->public_data.options)) ? UINT_MAX :
1310 datastore_queue_size 1304 GSF_datastore_queue_size
1311 /* max queue size */ , 1305 /* max queue size */ ,
1312 GNUNET_TIME_UNIT_FOREVER_REL, 1306 GNUNET_TIME_UNIT_FOREVER_REL,
1313 &process_local_reply, pr); 1307 &process_local_reply, pr);
@@ -1347,7 +1341,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
1347 (0 != 1341 (0 !=
1348 (GSF_PRO_PRIORITY_UNLIMITED & 1342 (GSF_PRO_PRIORITY_UNLIMITED &
1349 pr->public_data.options)) ? UINT_MAX : 1343 pr->public_data.options)) ? UINT_MAX :
1350 datastore_queue_size 1344 GSF_datastore_queue_size
1351 /* max queue size */ , 1345 /* max queue size */ ,
1352 GNUNET_TIME_UNIT_FOREVER_REL, 1346 GNUNET_TIME_UNIT_FOREVER_REL,
1353 &process_local_reply, pr); 1347 &process_local_reply, pr);
@@ -1405,7 +1399,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
1405 (0 != 1399 (0 !=
1406 (GSF_PRO_PRIORITY_UNLIMITED & pr-> 1400 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1407 public_data.options)) ? UINT_MAX : 1401 public_data.options)) ? UINT_MAX :
1408 datastore_queue_size 1402 GSF_datastore_queue_size
1409 /* max queue size */ , 1403 /* max queue size */ ,
1410 GNUNET_TIME_UNIT_FOREVER_REL, 1404 GNUNET_TIME_UNIT_FOREVER_REL,
1411 &process_local_reply, pr); 1405 &process_local_reply, pr);
@@ -1487,7 +1481,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1487 (0 != 1481 (0 !=
1488 (GSF_PRO_PRIORITY_UNLIMITED & pr-> 1482 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1489 public_data.options)) ? UINT_MAX : 1483 public_data.options)) ? UINT_MAX :
1490 datastore_queue_size 1484 GSF_datastore_queue_size
1491 /* max queue size */ , 1485 /* max queue size */ ,
1492 GNUNET_TIME_UNIT_FOREVER_REL, 1486 GNUNET_TIME_UNIT_FOREVER_REL,
1493 &process_local_reply, pr); 1487 &process_local_reply, pr);
@@ -1639,8 +1633,6 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1639void 1633void
1640GSF_pending_request_init_ () 1634GSF_pending_request_init_ ()
1641{ 1635{
1642 unsigned long long dqs;
1643
1644 if (GNUNET_OK != 1636 if (GNUNET_OK !=
1645 GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs", 1637 GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs",
1646 "MAX_PENDING_REQUESTS", 1638 "MAX_PENDING_REQUESTS",
@@ -1649,16 +1641,6 @@ GSF_pending_request_init_ ()
1649 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO, 1641 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
1650 "fs", "MAX_PENDING_REQUESTS"); 1642 "fs", "MAX_PENDING_REQUESTS");
1651 } 1643 }
1652 if (GNUNET_OK !=
1653 GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs", "DATASTORE_QUEUE_SIZE",
1654 &dqs))
1655 {
1656 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
1657 "fs", "DATASTORE_QUEUE_SIZE");
1658 dqs = 1024;
1659 }
1660 datastore_queue_size = (unsigned int) dqs;
1661
1662 active_to_migration = 1644 active_to_migration =
1663 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING"); 1645 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
1664 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); 1646 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c
index befa90dd5..f49b4246e 100644
--- a/src/fs/gnunet-service-fs_stream.c
+++ b/src/fs/gnunet-service-fs_stream.c
@@ -24,6 +24,7 @@
24 * @author Christian Grothoff 24 * @author Christian Grothoff
25 * 25 *
26 * TODO: 26 * TODO:
27 * - add statistics
27 * - limit # concurrent clients, timeout for read 28 * - limit # concurrent clients, timeout for read
28 */ 29 */
29#include "platform.h" 30#include "platform.h"
@@ -33,6 +34,7 @@
33#include "gnunet_protocols.h" 34#include "gnunet_protocols.h"
34#include "gnunet_applications.h" 35#include "gnunet_applications.h"
35#include "gnunet-service-fs.h" 36#include "gnunet-service-fs.h"
37#include "gnunet-service-fs_indexing.h"
36#include "gnunet-service-fs_stream.h" 38#include "gnunet-service-fs_stream.h"
37 39
38/** 40/**
@@ -66,6 +68,16 @@ struct StreamClient
66 struct GNUNET_STREAM_IOWriteHandle *wh; 68 struct GNUNET_STREAM_IOWriteHandle *wh;
67 69
68 /** 70 /**
71 * Tokenizer for requests.
72 */
73 struct GNUNET_SERVER_MessageStreamTokenizer *mst;
74
75 /**
76 * Current active request to the datastore, if we have one pending.
77 */
78 struct GNUNET_DATASTORE_QueueEntry *qe;
79
80 /**
69 * Size of the last write that was initiated. 81 * Size of the last write that was initiated.
70 */ 82 */
71 size_t reply_size; 83 size_t reply_size;
@@ -74,6 +86,56 @@ struct StreamClient
74 86
75 87
76/** 88/**
89 * Query from one peer, asking the other for CHK-data.
90 */
91struct StreamQueryMessage
92{
93
94 /**
95 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
96 */
97 struct GNUNET_MessageHeader header;
98
99 /**
100 * Block type must be DBLOCK or IBLOCK.
101 */
102 uint32_t type;
103
104 /**
105 * Query hash from CHK (hash of encrypted block).
106 */
107 struct GNUNET_HashCode query;
108
109};
110
111
112/**
113 * Reply to a StreamQueryMessage.
114 */
115struct StreamReplyMessage
116{
117
118 /**
119 * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
120 */
121 struct GNUNET_MessageHeader header;
122
123 /**
124 * Block type must be DBLOCK or IBLOCK.
125 */
126 uint32_t type;
127
128 /**
129 * Expiration time for the block.
130 */
131 struct GNUNET_TIME_AbsoluteNBO expiration;
132
133 /* followed by the encrypted block */
134
135};
136
137
138/**
77 * Listen socket for incoming requests. 139 * Listen socket for incoming requests.
78 */ 140 */
79static struct GNUNET_STREAM_ListenSocket *listen_socket; 141static struct GNUNET_STREAM_ListenSocket *listen_socket;
@@ -101,6 +163,9 @@ terminate_stream (struct StreamClient *sc)
101 GNUNET_STREAM_io_read_cancel (sc->rh); 163 GNUNET_STREAM_io_read_cancel (sc->rh);
102 if (NULL != sc->wh) 164 if (NULL != sc->wh)
103 GNUNET_STREAM_io_write_cancel (sc->wh); 165 GNUNET_STREAM_io_write_cancel (sc->wh);
166 if (NULL != sc->qe)
167 GNUNET_DATASTORE_cancel (sc->qe);
168 GNUNET_SERVER_mst_destroy (sc->mst);
104 GNUNET_STREAM_close (sc->socket); 169 GNUNET_STREAM_close (sc->socket);
105 GNUNET_CONTAINER_DLL_remove (sc_head, 170 GNUNET_CONTAINER_DLL_remove (sc_head,
106 sc_tail, 171 sc_tail,
@@ -124,15 +189,70 @@ static size_t
124process_request (void *cls, 189process_request (void *cls,
125 enum GNUNET_STREAM_Status status, 190 enum GNUNET_STREAM_Status status,
126 const void *data, 191 const void *data,
192 size_t size);
193
194
195/**
196 * We're done handling a request from a client, read the next one.
197 *
198 * @param sc client to continue reading requests from
199 */
200static void
201continue_reading (struct StreamClient *sc)
202{
203 int ret;
204
205 ret =
206 GNUNET_SERVER_mst_receive (sc->mst,
207 NULL,
208 NULL, 0,
209 GNUNET_NO, GNUNET_YES);
210 if (GNUNET_NO == ret)
211 return;
212 sc->rh = GNUNET_STREAM_read (sc->socket,
213 GNUNET_TIME_UNIT_FOREVER_REL,
214 &process_request,
215 sc);
216}
217
218
219/**
220 * Functions of this signature are called whenever data is available from the
221 * stream.
222 *
223 * @param cls the closure from GNUNET_STREAM_read
224 * @param status the status of the stream at the time this function is called
225 * @param data traffic from the other side
226 * @param size the number of bytes available in data read; will be 0 on timeout
227 * @return number of bytes of processed from 'data' (any data remaining should be
228 * given to the next time the read processor is called).
229 */
230static size_t
231process_request (void *cls,
232 enum GNUNET_STREAM_Status status,
233 const void *data,
127 size_t size) 234 size_t size)
128{ 235{
129 struct StreamClient *sc = cls; 236 struct StreamClient *sc = cls;
237 int ret;
130 238
131 sc->rh = NULL; 239 sc->rh = NULL;
132 switch (status) 240 switch (status)
133 { 241 {
134 case GNUNET_STREAM_OK: 242 case GNUNET_STREAM_OK:
135 // fixme: handle request... 243 ret =
244 GNUNET_SERVER_mst_receive (sc->mst,
245 NULL,
246 data, size,
247 GNUNET_NO, GNUNET_YES);
248 if (GNUNET_NO == ret)
249 return size; /* more messages in MST */
250 if (GNUNET_SYSERR == ret)
251 {
252 GNUNET_break_op (0);
253 terminate_stream (sc);
254 return size;
255 }
136 break; 256 break;
137 case GNUNET_STREAM_TIMEOUT: 257 case GNUNET_STREAM_TIMEOUT:
138 case GNUNET_STREAM_SHUTDOWN: 258 case GNUNET_STREAM_SHUTDOWN:
@@ -144,15 +264,152 @@ process_request (void *cls,
144 GNUNET_break (0); 264 GNUNET_break (0);
145 return size; 265 return size;
146 } 266 }
147 sc->rh = GNUNET_STREAM_read (sc->socket, 267 continue_reading (sc);
148 GNUNET_TIME_UNIT_FOREVER_REL,
149 &process_request,
150 sc);
151 return size; 268 return size;
152} 269}
153 270
154 271
155/** 272/**
273 * Sending a reply was completed, continue processing.
274 *
275 * @param cls closure with the struct StreamClient which sent the query
276 */
277static void
278write_continuation (void *cls,
279 enum GNUNET_STREAM_Status status,
280 size_t size)
281{
282 struct StreamClient *sc = cls;
283
284 sc->wh = NULL;
285 if ( (GNUNET_STREAM_OK == status) &&
286 (size == sc->reply_size) )
287 continue_reading (sc);
288 else
289 terminate_stream (sc);
290}
291
292
293/**
294 * Process a datum that was stored in the datastore.
295 *
296 * @param cls closure with the struct StreamClient which sent the query
297 * @param key key for the content
298 * @param size number of bytes in data
299 * @param data content stored
300 * @param type type of the content
301 * @param priority priority of the content
302 * @param anonymity anonymity-level for the content
303 * @param expiration expiration time for the content
304 * @param uid unique identifier for the datum;
305 * maybe 0 if no unique identifier is available
306 */
307static void
308handle_datastore_reply (void *cls,
309 const struct GNUNET_HashCode * key,
310 size_t size, const void *data,
311 enum GNUNET_BLOCK_Type type,
312 uint32_t priority,
313 uint32_t anonymity,
314 struct GNUNET_TIME_Absolute
315 expiration, uint64_t uid)
316{
317 struct StreamClient *sc = cls;
318 size_t msize = size + sizeof (struct StreamReplyMessage);
319 char buf[msize] GNUNET_ALIGN;
320 struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
321
322 sc->qe = NULL;
323 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
324 {
325 if (GNUNET_OK !=
326 GNUNET_FS_handle_on_demand_block (key,
327 size, data, type,
328 priority, anonymity,
329 expiration, uid,
330 &handle_datastore_reply,
331 sc))
332 {
333 continue_reading (sc);
334 }
335 return;
336 }
337 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
338 {
339 GNUNET_break (0);
340 continue_reading (sc);
341 return;
342 }
343 srm->header.size = htons ((uint16_t) msize);
344 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
345 srm->type = htonl (type);
346 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
347 memcpy (&srm[1], data, size);
348 sc->reply_size = msize;
349 sc->wh = GNUNET_STREAM_write (sc->socket,
350 buf, msize,
351 GNUNET_TIME_UNIT_FOREVER_REL,
352 &write_continuation,
353 sc);
354 if (NULL == sc->wh)
355 {
356 terminate_stream (sc);
357 return;
358 }
359}
360
361
362/**
363 * Functions with this signature are called whenever a
364 * complete message is received.
365 *
366 * Do not call GNUNET_SERVER_mst_destroy in callback
367 *
368 * @param cls closure with the 'struct StreamClient'
369 * @param client identification of the client, NULL
370 * @param message the actual message
371 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
372 */
373static int
374request_cb (void *cls,
375 void *client,
376 const struct GNUNET_MessageHeader *message)
377{
378 struct StreamClient *sc = cls;
379 const struct StreamQueryMessage *sqm;
380
381 switch (ntohs (message->type))
382 {
383 case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
384 if (sizeof (struct StreamQueryMessage) !=
385 ntohs (message->size))
386 {
387 GNUNET_break_op (0);
388 return GNUNET_SYSERR;
389 }
390 sqm = (const struct StreamQueryMessage *) message;
391 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
392 "Received query for `%s' via stream\n",
393 GNUNET_h2s (&sqm->query));
394 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
395 0,
396 &sqm->query,
397 ntohl (sqm->type),
398 0 /* FIXME: priority */,
399 GSF_datastore_queue_size,
400 GNUNET_TIME_UNIT_FOREVER_REL,
401 &handle_datastore_reply, sc);
402 if (NULL == sc->qe)
403 continue_reading (sc);
404 return GNUNET_OK;
405 default:
406 GNUNET_break_op (0);
407 return GNUNET_SYSERR;
408 }
409}
410
411
412/**
156 * Functions of this type are called upon new stream connection from other peers 413 * Functions of this type are called upon new stream connection from other peers
157 * or upon binding error which happen when the app_port given in 414 * or upon binding error which happen when the app_port given in
158 * GNUNET_STREAM_listen() is already taken. 415 * GNUNET_STREAM_listen() is already taken.
@@ -175,6 +432,8 @@ accept_cb (void *cls,
175 return GNUNET_SYSERR; 432 return GNUNET_SYSERR;
176 sc = GNUNET_malloc (sizeof (struct StreamClient)); 433 sc = GNUNET_malloc (sizeof (struct StreamClient));
177 sc->socket = socket; 434 sc->socket = socket;
435 sc->mst = GNUNET_SERVER_mst_create (&request_cb,
436 sc);
178 sc->rh = GNUNET_STREAM_read (sc->socket, 437 sc->rh = GNUNET_STREAM_read (sc->socket,
179 GNUNET_TIME_UNIT_FOREVER_REL, 438 GNUNET_TIME_UNIT_FOREVER_REL,
180 &process_request, 439 &process_request,