diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/fs/gnunet-service-fs.c | 16 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs.h | 6 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_pr.c | 26 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_stream.c | 269 | ||||
-rw-r--r-- | src/include/gnunet_protocols.h | 10 |
5 files changed, 300 insertions, 27 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 3a8a076f6..125a14118 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c | |||
@@ -116,6 +116,11 @@ struct GNUNET_TIME_Relative GSF_avg_latency = { 500 }; | |||
116 | double GSF_current_priorities; | 116 | double GSF_current_priorities; |
117 | 117 | ||
118 | /** | 118 | /** |
119 | * Size of the datastore queue we assume for common requests. | ||
120 | */ | ||
121 | unsigned int GSF_datastore_queue_size; | ||
122 | |||
123 | /** | ||
119 | * How many query messages have we received 'recently' that | 124 | * How many query messages have we received 'recently' that |
120 | * have not yet been claimed as cover traffic? | 125 | * have not yet been claimed as cover traffic? |
121 | */ | 126 | */ |
@@ -615,7 +620,18 @@ static void | |||
615 | run (void *cls, struct GNUNET_SERVER_Handle *server, | 620 | run (void *cls, struct GNUNET_SERVER_Handle *server, |
616 | const struct GNUNET_CONFIGURATION_Handle *cfg) | 621 | const struct GNUNET_CONFIGURATION_Handle *cfg) |
617 | { | 622 | { |
623 | unsigned long long dqs; | ||
624 | |||
618 | GSF_cfg = cfg; | 625 | GSF_cfg = cfg; |
626 | if (GNUNET_OK != | ||
627 | GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs", "DATASTORE_QUEUE_SIZE", | ||
628 | &dqs)) | ||
629 | { | ||
630 | GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO, | ||
631 | "fs", "DATASTORE_QUEUE_SIZE"); | ||
632 | dqs = 1024; | ||
633 | } | ||
634 | GSF_datastore_queue_size = (unsigned int) dqs; | ||
619 | GSF_enable_randomized_delays = | 635 | GSF_enable_randomized_delays = |
620 | GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY"); | 636 | GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY"); |
621 | GSF_dsh = GNUNET_DATASTORE_connect (cfg); | 637 | GSF_dsh = GNUNET_DATASTORE_connect (cfg); |
diff --git a/src/fs/gnunet-service-fs.h b/src/fs/gnunet-service-fs.h index d198d864d..3213712c8 100644 --- a/src/fs/gnunet-service-fs.h +++ b/src/fs/gnunet-service-fs.h | |||
@@ -258,6 +258,12 @@ extern struct GNUNET_BLOCK_Context *GSF_block_ctx; | |||
258 | extern int GSF_enable_randomized_delays; | 258 | extern int GSF_enable_randomized_delays; |
259 | 259 | ||
260 | /** | 260 | /** |
261 | * Size of the datastore queue we assume for common requests. | ||
262 | */ | ||
263 | extern unsigned int GSF_datastore_queue_size; | ||
264 | |||
265 | |||
266 | /** | ||
261 | * Test if the DATABASE (GET) load on this peer is too high | 267 | * Test if the DATABASE (GET) load on this peer is too high |
262 | * to even consider processing the query at | 268 | * to even consider processing the query at |
263 | * all. | 269 | * all. |
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c index 349232daf..9233658eb 100644 --- a/src/fs/gnunet-service-fs_pr.c +++ b/src/fs/gnunet-service-fs_pr.c | |||
@@ -196,12 +196,6 @@ static int active_to_migration; | |||
196 | 196 | ||
197 | 197 | ||
198 | /** | 198 | /** |
199 | * Size of the datastore queue we assume for common requests. | ||
200 | * Determined based on the network quota. | ||
201 | */ | ||
202 | static unsigned int datastore_queue_size; | ||
203 | |||
204 | /** | ||
205 | * Heap with the request that will expire next at the top. Contains | 199 | * Heap with the request that will expire next at the top. Contains |
206 | * pointers of type "struct PendingRequest*"; these will *also* be | 200 | * pointers of type "struct PendingRequest*"; these will *also* be |
207 | * aliased from the "requests_by_peer" data structures and the | 201 | * aliased from the "requests_by_peer" data structures and the |
@@ -1307,7 +1301,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size, | |||
1307 | (0 != | 1301 | (0 != |
1308 | (GSF_PRO_PRIORITY_UNLIMITED & | 1302 | (GSF_PRO_PRIORITY_UNLIMITED & |
1309 | pr->public_data.options)) ? UINT_MAX : | 1303 | pr->public_data.options)) ? UINT_MAX : |
1310 | datastore_queue_size | 1304 | GSF_datastore_queue_size |
1311 | /* max queue size */ , | 1305 | /* max queue size */ , |
1312 | GNUNET_TIME_UNIT_FOREVER_REL, | 1306 | GNUNET_TIME_UNIT_FOREVER_REL, |
1313 | &process_local_reply, pr); | 1307 | &process_local_reply, pr); |
@@ -1347,7 +1341,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size, | |||
1347 | (0 != | 1341 | (0 != |
1348 | (GSF_PRO_PRIORITY_UNLIMITED & | 1342 | (GSF_PRO_PRIORITY_UNLIMITED & |
1349 | pr->public_data.options)) ? UINT_MAX : | 1343 | pr->public_data.options)) ? UINT_MAX : |
1350 | datastore_queue_size | 1344 | GSF_datastore_queue_size |
1351 | /* max queue size */ , | 1345 | /* max queue size */ , |
1352 | GNUNET_TIME_UNIT_FOREVER_REL, | 1346 | GNUNET_TIME_UNIT_FOREVER_REL, |
1353 | &process_local_reply, pr); | 1347 | &process_local_reply, pr); |
@@ -1405,7 +1399,7 @@ process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size, | |||
1405 | (0 != | 1399 | (0 != |
1406 | (GSF_PRO_PRIORITY_UNLIMITED & pr-> | 1400 | (GSF_PRO_PRIORITY_UNLIMITED & pr-> |
1407 | public_data.options)) ? UINT_MAX : | 1401 | public_data.options)) ? UINT_MAX : |
1408 | datastore_queue_size | 1402 | GSF_datastore_queue_size |
1409 | /* max queue size */ , | 1403 | /* max queue size */ , |
1410 | GNUNET_TIME_UNIT_FOREVER_REL, | 1404 | GNUNET_TIME_UNIT_FOREVER_REL, |
1411 | &process_local_reply, pr); | 1405 | &process_local_reply, pr); |
@@ -1487,7 +1481,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr, | |||
1487 | (0 != | 1481 | (0 != |
1488 | (GSF_PRO_PRIORITY_UNLIMITED & pr-> | 1482 | (GSF_PRO_PRIORITY_UNLIMITED & pr-> |
1489 | public_data.options)) ? UINT_MAX : | 1483 | public_data.options)) ? UINT_MAX : |
1490 | datastore_queue_size | 1484 | GSF_datastore_queue_size |
1491 | /* max queue size */ , | 1485 | /* max queue size */ , |
1492 | GNUNET_TIME_UNIT_FOREVER_REL, | 1486 | GNUNET_TIME_UNIT_FOREVER_REL, |
1493 | &process_local_reply, pr); | 1487 | &process_local_reply, pr); |
@@ -1639,8 +1633,6 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp, | |||
1639 | void | 1633 | void |
1640 | GSF_pending_request_init_ () | 1634 | GSF_pending_request_init_ () |
1641 | { | 1635 | { |
1642 | unsigned long long dqs; | ||
1643 | |||
1644 | if (GNUNET_OK != | 1636 | if (GNUNET_OK != |
1645 | GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs", | 1637 | GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs", |
1646 | "MAX_PENDING_REQUESTS", | 1638 | "MAX_PENDING_REQUESTS", |
@@ -1649,16 +1641,6 @@ GSF_pending_request_init_ () | |||
1649 | GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO, | 1641 | GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO, |
1650 | "fs", "MAX_PENDING_REQUESTS"); | 1642 | "fs", "MAX_PENDING_REQUESTS"); |
1651 | } | 1643 | } |
1652 | if (GNUNET_OK != | ||
1653 | GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs", "DATASTORE_QUEUE_SIZE", | ||
1654 | &dqs)) | ||
1655 | { | ||
1656 | GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO, | ||
1657 | "fs", "DATASTORE_QUEUE_SIZE"); | ||
1658 | dqs = 1024; | ||
1659 | } | ||
1660 | datastore_queue_size = (unsigned int) dqs; | ||
1661 | |||
1662 | active_to_migration = | 1644 | active_to_migration = |
1663 | GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING"); | 1645 | GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING"); |
1664 | datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); | 1646 | datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); |
diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c index befa90dd5..f49b4246e 100644 --- a/src/fs/gnunet-service-fs_stream.c +++ b/src/fs/gnunet-service-fs_stream.c | |||
@@ -24,6 +24,7 @@ | |||
24 | * @author Christian Grothoff | 24 | * @author Christian Grothoff |
25 | * | 25 | * |
26 | * TODO: | 26 | * TODO: |
27 | * - add statistics | ||
27 | * - limit # concurrent clients, timeout for read | 28 | * - limit # concurrent clients, timeout for read |
28 | */ | 29 | */ |
29 | #include "platform.h" | 30 | #include "platform.h" |
@@ -33,6 +34,7 @@ | |||
33 | #include "gnunet_protocols.h" | 34 | #include "gnunet_protocols.h" |
34 | #include "gnunet_applications.h" | 35 | #include "gnunet_applications.h" |
35 | #include "gnunet-service-fs.h" | 36 | #include "gnunet-service-fs.h" |
37 | #include "gnunet-service-fs_indexing.h" | ||
36 | #include "gnunet-service-fs_stream.h" | 38 | #include "gnunet-service-fs_stream.h" |
37 | 39 | ||
38 | /** | 40 | /** |
@@ -66,6 +68,16 @@ struct StreamClient | |||
66 | struct GNUNET_STREAM_IOWriteHandle *wh; | 68 | struct GNUNET_STREAM_IOWriteHandle *wh; |
67 | 69 | ||
68 | /** | 70 | /** |
71 | * Tokenizer for requests. | ||
72 | */ | ||
73 | struct GNUNET_SERVER_MessageStreamTokenizer *mst; | ||
74 | |||
75 | /** | ||
76 | * Current active request to the datastore, if we have one pending. | ||
77 | */ | ||
78 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
79 | |||
80 | /** | ||
69 | * Size of the last write that was initiated. | 81 | * Size of the last write that was initiated. |
70 | */ | 82 | */ |
71 | size_t reply_size; | 83 | size_t reply_size; |
@@ -74,6 +86,56 @@ struct StreamClient | |||
74 | 86 | ||
75 | 87 | ||
76 | /** | 88 | /** |
89 | * Query from one peer, asking the other for CHK-data. | ||
90 | */ | ||
91 | struct StreamQueryMessage | ||
92 | { | ||
93 | |||
94 | /** | ||
95 | * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY. | ||
96 | */ | ||
97 | struct GNUNET_MessageHeader header; | ||
98 | |||
99 | /** | ||
100 | * Block type must be DBLOCK or IBLOCK. | ||
101 | */ | ||
102 | uint32_t type; | ||
103 | |||
104 | /** | ||
105 | * Query hash from CHK (hash of encrypted block). | ||
106 | */ | ||
107 | struct GNUNET_HashCode query; | ||
108 | |||
109 | }; | ||
110 | |||
111 | |||
112 | /** | ||
113 | * Reply to a StreamQueryMessage. | ||
114 | */ | ||
115 | struct StreamReplyMessage | ||
116 | { | ||
117 | |||
118 | /** | ||
119 | * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY. | ||
120 | */ | ||
121 | struct GNUNET_MessageHeader header; | ||
122 | |||
123 | /** | ||
124 | * Block type must be DBLOCK or IBLOCK. | ||
125 | */ | ||
126 | uint32_t type; | ||
127 | |||
128 | /** | ||
129 | * Expiration time for the block. | ||
130 | */ | ||
131 | struct GNUNET_TIME_AbsoluteNBO expiration; | ||
132 | |||
133 | /* followed by the encrypted block */ | ||
134 | |||
135 | }; | ||
136 | |||
137 | |||
138 | /** | ||
77 | * Listen socket for incoming requests. | 139 | * Listen socket for incoming requests. |
78 | */ | 140 | */ |
79 | static struct GNUNET_STREAM_ListenSocket *listen_socket; | 141 | static struct GNUNET_STREAM_ListenSocket *listen_socket; |
@@ -101,6 +163,9 @@ terminate_stream (struct StreamClient *sc) | |||
101 | GNUNET_STREAM_io_read_cancel (sc->rh); | 163 | GNUNET_STREAM_io_read_cancel (sc->rh); |
102 | if (NULL != sc->wh) | 164 | if (NULL != sc->wh) |
103 | GNUNET_STREAM_io_write_cancel (sc->wh); | 165 | GNUNET_STREAM_io_write_cancel (sc->wh); |
166 | if (NULL != sc->qe) | ||
167 | GNUNET_DATASTORE_cancel (sc->qe); | ||
168 | GNUNET_SERVER_mst_destroy (sc->mst); | ||
104 | GNUNET_STREAM_close (sc->socket); | 169 | GNUNET_STREAM_close (sc->socket); |
105 | GNUNET_CONTAINER_DLL_remove (sc_head, | 170 | GNUNET_CONTAINER_DLL_remove (sc_head, |
106 | sc_tail, | 171 | sc_tail, |
@@ -124,15 +189,70 @@ static size_t | |||
124 | process_request (void *cls, | 189 | process_request (void *cls, |
125 | enum GNUNET_STREAM_Status status, | 190 | enum GNUNET_STREAM_Status status, |
126 | const void *data, | 191 | const void *data, |
192 | size_t size); | ||
193 | |||
194 | |||
195 | /** | ||
196 | * We're done handling a request from a client, read the next one. | ||
197 | * | ||
198 | * @param sc client to continue reading requests from | ||
199 | */ | ||
200 | static void | ||
201 | continue_reading (struct StreamClient *sc) | ||
202 | { | ||
203 | int ret; | ||
204 | |||
205 | ret = | ||
206 | GNUNET_SERVER_mst_receive (sc->mst, | ||
207 | NULL, | ||
208 | NULL, 0, | ||
209 | GNUNET_NO, GNUNET_YES); | ||
210 | if (GNUNET_NO == ret) | ||
211 | return; | ||
212 | sc->rh = GNUNET_STREAM_read (sc->socket, | ||
213 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
214 | &process_request, | ||
215 | sc); | ||
216 | } | ||
217 | |||
218 | |||
219 | /** | ||
220 | * Functions of this signature are called whenever data is available from the | ||
221 | * stream. | ||
222 | * | ||
223 | * @param cls the closure from GNUNET_STREAM_read | ||
224 | * @param status the status of the stream at the time this function is called | ||
225 | * @param data traffic from the other side | ||
226 | * @param size the number of bytes available in data read; will be 0 on timeout | ||
227 | * @return number of bytes of processed from 'data' (any data remaining should be | ||
228 | * given to the next time the read processor is called). | ||
229 | */ | ||
230 | static size_t | ||
231 | process_request (void *cls, | ||
232 | enum GNUNET_STREAM_Status status, | ||
233 | const void *data, | ||
127 | size_t size) | 234 | size_t size) |
128 | { | 235 | { |
129 | struct StreamClient *sc = cls; | 236 | struct StreamClient *sc = cls; |
237 | int ret; | ||
130 | 238 | ||
131 | sc->rh = NULL; | 239 | sc->rh = NULL; |
132 | switch (status) | 240 | switch (status) |
133 | { | 241 | { |
134 | case GNUNET_STREAM_OK: | 242 | case GNUNET_STREAM_OK: |
135 | // fixme: handle request... | 243 | ret = |
244 | GNUNET_SERVER_mst_receive (sc->mst, | ||
245 | NULL, | ||
246 | data, size, | ||
247 | GNUNET_NO, GNUNET_YES); | ||
248 | if (GNUNET_NO == ret) | ||
249 | return size; /* more messages in MST */ | ||
250 | if (GNUNET_SYSERR == ret) | ||
251 | { | ||
252 | GNUNET_break_op (0); | ||
253 | terminate_stream (sc); | ||
254 | return size; | ||
255 | } | ||
136 | break; | 256 | break; |
137 | case GNUNET_STREAM_TIMEOUT: | 257 | case GNUNET_STREAM_TIMEOUT: |
138 | case GNUNET_STREAM_SHUTDOWN: | 258 | case GNUNET_STREAM_SHUTDOWN: |
@@ -144,15 +264,152 @@ process_request (void *cls, | |||
144 | GNUNET_break (0); | 264 | GNUNET_break (0); |
145 | return size; | 265 | return size; |
146 | } | 266 | } |
147 | sc->rh = GNUNET_STREAM_read (sc->socket, | 267 | continue_reading (sc); |
148 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
149 | &process_request, | ||
150 | sc); | ||
151 | return size; | 268 | return size; |
152 | } | 269 | } |
153 | 270 | ||
154 | 271 | ||
155 | /** | 272 | /** |
273 | * Sending a reply was completed, continue processing. | ||
274 | * | ||
275 | * @param cls closure with the struct StreamClient which sent the query | ||
276 | */ | ||
277 | static void | ||
278 | write_continuation (void *cls, | ||
279 | enum GNUNET_STREAM_Status status, | ||
280 | size_t size) | ||
281 | { | ||
282 | struct StreamClient *sc = cls; | ||
283 | |||
284 | sc->wh = NULL; | ||
285 | if ( (GNUNET_STREAM_OK == status) && | ||
286 | (size == sc->reply_size) ) | ||
287 | continue_reading (sc); | ||
288 | else | ||
289 | terminate_stream (sc); | ||
290 | } | ||
291 | |||
292 | |||
293 | /** | ||
294 | * Process a datum that was stored in the datastore. | ||
295 | * | ||
296 | * @param cls closure with the struct StreamClient which sent the query | ||
297 | * @param key key for the content | ||
298 | * @param size number of bytes in data | ||
299 | * @param data content stored | ||
300 | * @param type type of the content | ||
301 | * @param priority priority of the content | ||
302 | * @param anonymity anonymity-level for the content | ||
303 | * @param expiration expiration time for the content | ||
304 | * @param uid unique identifier for the datum; | ||
305 | * maybe 0 if no unique identifier is available | ||
306 | */ | ||
307 | static void | ||
308 | handle_datastore_reply (void *cls, | ||
309 | const struct GNUNET_HashCode * key, | ||
310 | size_t size, const void *data, | ||
311 | enum GNUNET_BLOCK_Type type, | ||
312 | uint32_t priority, | ||
313 | uint32_t anonymity, | ||
314 | struct GNUNET_TIME_Absolute | ||
315 | expiration, uint64_t uid) | ||
316 | { | ||
317 | struct StreamClient *sc = cls; | ||
318 | size_t msize = size + sizeof (struct StreamReplyMessage); | ||
319 | char buf[msize] GNUNET_ALIGN; | ||
320 | struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf; | ||
321 | |||
322 | sc->qe = NULL; | ||
323 | if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) | ||
324 | { | ||
325 | if (GNUNET_OK != | ||
326 | GNUNET_FS_handle_on_demand_block (key, | ||
327 | size, data, type, | ||
328 | priority, anonymity, | ||
329 | expiration, uid, | ||
330 | &handle_datastore_reply, | ||
331 | sc)) | ||
332 | { | ||
333 | continue_reading (sc); | ||
334 | } | ||
335 | return; | ||
336 | } | ||
337 | if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE) | ||
338 | { | ||
339 | GNUNET_break (0); | ||
340 | continue_reading (sc); | ||
341 | return; | ||
342 | } | ||
343 | srm->header.size = htons ((uint16_t) msize); | ||
344 | srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY); | ||
345 | srm->type = htonl (type); | ||
346 | srm->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
347 | memcpy (&srm[1], data, size); | ||
348 | sc->reply_size = msize; | ||
349 | sc->wh = GNUNET_STREAM_write (sc->socket, | ||
350 | buf, msize, | ||
351 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
352 | &write_continuation, | ||
353 | sc); | ||
354 | if (NULL == sc->wh) | ||
355 | { | ||
356 | terminate_stream (sc); | ||
357 | return; | ||
358 | } | ||
359 | } | ||
360 | |||
361 | |||
362 | /** | ||
363 | * Functions with this signature are called whenever a | ||
364 | * complete message is received. | ||
365 | * | ||
366 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
367 | * | ||
368 | * @param cls closure with the 'struct StreamClient' | ||
369 | * @param client identification of the client, NULL | ||
370 | * @param message the actual message | ||
371 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
372 | */ | ||
373 | static int | ||
374 | request_cb (void *cls, | ||
375 | void *client, | ||
376 | const struct GNUNET_MessageHeader *message) | ||
377 | { | ||
378 | struct StreamClient *sc = cls; | ||
379 | const struct StreamQueryMessage *sqm; | ||
380 | |||
381 | switch (ntohs (message->type)) | ||
382 | { | ||
383 | case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY: | ||
384 | if (sizeof (struct StreamQueryMessage) != | ||
385 | ntohs (message->size)) | ||
386 | { | ||
387 | GNUNET_break_op (0); | ||
388 | return GNUNET_SYSERR; | ||
389 | } | ||
390 | sqm = (const struct StreamQueryMessage *) message; | ||
391 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
392 | "Received query for `%s' via stream\n", | ||
393 | GNUNET_h2s (&sqm->query)); | ||
394 | sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, | ||
395 | 0, | ||
396 | &sqm->query, | ||
397 | ntohl (sqm->type), | ||
398 | 0 /* FIXME: priority */, | ||
399 | GSF_datastore_queue_size, | ||
400 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
401 | &handle_datastore_reply, sc); | ||
402 | if (NULL == sc->qe) | ||
403 | continue_reading (sc); | ||
404 | return GNUNET_OK; | ||
405 | default: | ||
406 | GNUNET_break_op (0); | ||
407 | return GNUNET_SYSERR; | ||
408 | } | ||
409 | } | ||
410 | |||
411 | |||
412 | /** | ||
156 | * Functions of this type are called upon new stream connection from other peers | 413 | * Functions of this type are called upon new stream connection from other peers |
157 | * or upon binding error which happen when the app_port given in | 414 | * or upon binding error which happen when the app_port given in |
158 | * GNUNET_STREAM_listen() is already taken. | 415 | * GNUNET_STREAM_listen() is already taken. |
@@ -175,6 +432,8 @@ accept_cb (void *cls, | |||
175 | return GNUNET_SYSERR; | 432 | return GNUNET_SYSERR; |
176 | sc = GNUNET_malloc (sizeof (struct StreamClient)); | 433 | sc = GNUNET_malloc (sizeof (struct StreamClient)); |
177 | sc->socket = socket; | 434 | sc->socket = socket; |
435 | sc->mst = GNUNET_SERVER_mst_create (&request_cb, | ||
436 | sc); | ||
178 | sc->rh = GNUNET_STREAM_read (sc->socket, | 437 | sc->rh = GNUNET_STREAM_read (sc->socket, |
179 | GNUNET_TIME_UNIT_FOREVER_REL, | 438 | GNUNET_TIME_UNIT_FOREVER_REL, |
180 | &process_request, | 439 | &process_request, |
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 2f84b0ade..12f8ab886 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h | |||
@@ -483,6 +483,16 @@ extern "C" | |||
483 | */ | 483 | */ |
484 | #define GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP 139 | 484 | #define GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP 139 |
485 | 485 | ||
486 | /** | ||
487 | * P2P request for content (one FS to another via a stream). | ||
488 | */ | ||
489 | #define GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY 140 | ||
490 | |||
491 | /** | ||
492 | * P2P answer for content (one FS to another via a stream). | ||
493 | */ | ||
494 | #define GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY 141 | ||
495 | |||
486 | 496 | ||
487 | /******************************************************************************* | 497 | /******************************************************************************* |
488 | * DHT message types | 498 | * DHT message types |