aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cadet_server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_cadet_server.c')
-rw-r--r--src/fs/gnunet-service-fs_cadet_server.c595
1 files changed, 595 insertions, 0 deletions
diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c
new file mode 100644
index 000000000..77add757d
--- /dev/null
+++ b/src/fs/gnunet-service-fs_cadet_server.c
@@ -0,0 +1,595 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file fs/gnunet-service-fs_cadet_server.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
25 *
26 * TODO:
27 * - PORT is set to old application type, unsure if we should keep
28 * it that way (fine for now)
29 */
30#include "platform.h"
31#include "gnunet_constants.h"
32#include "gnunet_util_lib.h"
33#include "gnunet_cadet_service.h"
34#include "gnunet_protocols.h"
35#include "gnunet_applications.h"
36#include "gnunet-service-fs.h"
37#include "gnunet-service-fs_indexing.h"
38#include "gnunet-service-fs_cadet.h"
39
40/**
41 * After how long do we termiante idle connections?
42 */
43#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
44
45
46/**
47 * A message in the queue to be written to the cadet.
48 */
49struct WriteQueueItem
50{
51 /**
52 * Kept in a DLL.
53 */
54 struct WriteQueueItem *next;
55
56 /**
57 * Kept in a DLL.
58 */
59 struct WriteQueueItem *prev;
60
61 /**
62 * Number of bytes of payload, allocated at the end of this struct.
63 */
64 size_t msize;
65};
66
67
68/**
69 * Information we keep around for each active cadeting client.
70 */
71struct CadetClient
72{
73 /**
74 * DLL
75 */
76 struct CadetClient *next;
77
78 /**
79 * DLL
80 */
81 struct CadetClient *prev;
82
83 /**
84 * Channel for communication.
85 */
86 struct GNUNET_CADET_Channel *channel;
87
88 /**
89 * Handle for active write operation, or NULL.
90 */
91 struct GNUNET_CADET_TransmitHandle *wh;
92
93 /**
94 * Head of write queue.
95 */
96 struct WriteQueueItem *wqi_head;
97
98 /**
99 * Tail of write queue.
100 */
101 struct WriteQueueItem *wqi_tail;
102
103 /**
104 * Current active request to the datastore, if we have one pending.
105 */
106 struct GNUNET_DATASTORE_QueueEntry *qe;
107
108 /**
109 * Task that is scheduled to asynchronously terminate the connection.
110 */
111 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
112
113 /**
114 * Task that is scheduled to terminate idle connections.
115 */
116 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
117
118 /**
119 * Size of the last write that was initiated.
120 */
121 size_t reply_size;
122
123};
124
125
126/**
127 * Listen channel for incoming requests.
128 */
129static struct GNUNET_CADET_Handle *listen_channel;
130
131/**
132 * Head of DLL of cadet clients.
133 */
134static struct CadetClient *sc_head;
135
136/**
137 * Tail of DLL of cadet clients.
138 */
139static struct CadetClient *sc_tail;
140
141/**
142 * Number of active cadet clients in the 'sc_*'-DLL.
143 */
144static unsigned int sc_count;
145
146/**
147 * Maximum allowed number of cadet clients.
148 */
149static unsigned long long sc_count_max;
150
151
152
153/**
154 * Task run to asynchronously terminate the cadet due to timeout.
155 *
156 * @param cls the 'struct CadetClient'
157 * @param tc scheduler context
158 */
159static void
160timeout_cadet_task (void *cls,
161 const struct GNUNET_SCHEDULER_TaskContext *tc)
162{
163 struct CadetClient *sc = cls;
164 struct GNUNET_CADET_Channel *tun;
165
166 sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
167 tun = sc->channel;
168 sc->channel = NULL;
169 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
170 "Timeout for inactive cadet client %p\n",
171 sc);
172 GNUNET_CADET_channel_destroy (tun);
173}
174
175
176/**
177 * Reset the timeout for the cadet client (due to activity).
178 *
179 * @param sc client handle to reset timeout for
180 */
181static void
182refresh_timeout_task (struct CadetClient *sc)
183{
184 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
185 GNUNET_SCHEDULER_cancel (sc->timeout_task);
186 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
187 &timeout_cadet_task,
188 sc);
189}
190
191
192/**
193 * We're done handling a request from a client, read the next one.
194 *
195 * @param sc client to continue reading requests from
196 */
197static void
198continue_reading (struct CadetClient *sc)
199{
200 refresh_timeout_task (sc);
201 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
202 "Finished processing cadet request from client %p, ready to receive the next one\n",
203 sc);
204 GNUNET_CADET_receive_done (sc->channel);
205}
206
207
208/**
209 * Transmit the next entry from the write queue.
210 *
211 * @param sc where to process the write queue
212 */
213static void
214continue_writing (struct CadetClient *sc);
215
216
217/**
218 * Send a reply now, cadet is ready.
219 *
220 * @param cls closure with the `struct CadetClient` which sent the query
221 * @param size number of bytes available in @a buf
222 * @param buf where to write the message
223 * @return number of bytes written to @a buf
224 */
225static size_t
226write_continuation (void *cls,
227 size_t size,
228 void *buf)
229{
230 struct CadetClient *sc = cls;
231 struct GNUNET_CADET_Channel *tun;
232 struct WriteQueueItem *wqi;
233 size_t ret;
234
235 sc->wh = NULL;
236 if (NULL == (wqi = sc->wqi_head))
237 {
238 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
239 "Write queue empty, reading more requests\n");
240 return 0;
241 }
242 if ( (0 == size) ||
243 (size < wqi->msize) )
244 {
245 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
246 "Transmission of reply failed, terminating cadet\n");
247 tun = sc->channel;
248 sc->channel = NULL;
249 GNUNET_CADET_channel_destroy (tun);
250 return 0;
251 }
252 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
253 sc->wqi_tail,
254 wqi);
255 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
256 "Transmitted %u byte reply via cadet to %p\n",
257 (unsigned int) size,
258 sc);
259 GNUNET_STATISTICS_update (GSF_stats,
260 gettext_noop ("# Blocks transferred via cadet"), 1,
261 GNUNET_NO);
262 memcpy (buf, &wqi[1], ret = wqi->msize);
263 GNUNET_free (wqi);
264 continue_writing (sc);
265 return ret;
266}
267
268
269/**
270 * Transmit the next entry from the write queue.
271 *
272 * @param sc where to process the write queue
273 */
274static void
275continue_writing (struct CadetClient *sc)
276{
277 struct WriteQueueItem *wqi;
278 struct GNUNET_CADET_Channel *tun;
279
280 if (NULL != sc->wh)
281 {
282 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
283 "Write pending, waiting for it to complete\n");
284 return; /* write already pending */
285 }
286 if (NULL == (wqi = sc->wqi_head))
287 {
288 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
289 "Write queue empty, reading more requests\n");
290 continue_reading (sc);
291 return;
292 }
293 sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO,
294 GNUNET_TIME_UNIT_FOREVER_REL,
295 wqi->msize,
296 &write_continuation,
297 sc);
298 if (NULL == sc->wh)
299 {
300 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
301 "Write failed; terminating cadet\n");
302 tun = sc->channel;
303 sc->channel = NULL;
304 GNUNET_CADET_channel_destroy (tun);
305 return;
306 }
307}
308
309
310/**
311 * Process a datum that was stored in the datastore.
312 *
313 * @param cls closure with the `struct CadetClient` which sent the query
314 * @param key key for the content
315 * @param size number of bytes in @a data
316 * @param data content stored
317 * @param type type of the content
318 * @param priority priority of the content
319 * @param anonymity anonymity-level for the content
320 * @param expiration expiration time for the content
321 * @param uid unique identifier for the datum;
322 * maybe 0 if no unique identifier is available
323 */
324static void
325handle_datastore_reply (void *cls,
326 const struct GNUNET_HashCode *key,
327 size_t size, const void *data,
328 enum GNUNET_BLOCK_Type type,
329 uint32_t priority,
330 uint32_t anonymity,
331 struct GNUNET_TIME_Absolute expiration,
332 uint64_t uid)
333{
334 struct CadetClient *sc = cls;
335 size_t msize = size + sizeof (struct CadetReplyMessage);
336 struct WriteQueueItem *wqi;
337 struct CadetReplyMessage *srm;
338
339 sc->qe = NULL;
340 if (NULL == data)
341 {
342 /* no result, this should not really happen, as for
343 non-anonymous routing only peers that HAVE the
344 answers should be queried; OTOH, this is not a
345 hard error as we might have had the answer in the
346 past and the user might have unindexed it. Hence
347 we log at level "INFO" for now. */
348 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
349 "Have no answer for query `%s'\n",
350 GNUNET_h2s (key));
351 GNUNET_STATISTICS_update (GSF_stats,
352 gettext_noop ("# queries received via cadet not answered"), 1,
353 GNUNET_NO);
354 continue_writing (sc);
355 return;
356 }
357 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
358 {
359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360 "Performing on-demand encoding for query %s\n",
361 GNUNET_h2s (key));
362 if (GNUNET_OK !=
363 GNUNET_FS_handle_on_demand_block (key,
364 size, data, type,
365 priority, anonymity,
366 expiration, uid,
367 &handle_datastore_reply,
368 sc))
369 {
370 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
371 "On-demand encoding request failed\n");
372 continue_writing (sc);
373 }
374 return;
375 }
376 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
377 {
378 GNUNET_break (0);
379 continue_writing (sc);
380 return;
381 }
382 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
383 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384 "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n",
385 (unsigned int) size,
386 (unsigned int) type,
387 GNUNET_h2s (key),
388 sc);
389 wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
390 wqi->msize = msize;
391 srm = (struct CadetReplyMessage *) &wqi[1];
392 srm->header.size = htons ((uint16_t) msize);
393 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
394 srm->type = htonl (type);
395 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
396 memcpy (&srm[1], data, size);
397 sc->reply_size = msize;
398 GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
399 sc->wqi_tail,
400 wqi);
401 continue_writing (sc);
402}
403
404
405/**
406 * Functions with this signature are called whenever a
407 * complete query message is received.
408 *
409 * Do not call #GNUNET_SERVER_mst_destroy in callback
410 *
411 * @param cls closure with the 'struct CadetClient'
412 * @param channel channel handle
413 * @param channel_ctx channel context
414 * @param message the actual message
415 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
416 */
417static int
418request_cb (void *cls,
419 struct GNUNET_CADET_Channel *channel,
420 void **channel_ctx,
421 const struct GNUNET_MessageHeader *message)
422{
423 struct CadetClient *sc = *channel_ctx;
424 const struct CadetQueryMessage *sqm;
425
426 sqm = (const struct CadetQueryMessage *) message;
427 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
428 "Received query for `%s' via cadet from client %p\n",
429 GNUNET_h2s (&sqm->query),
430 sc);
431 GNUNET_STATISTICS_update (GSF_stats,
432 gettext_noop ("# queries received via cadet"), 1,
433 GNUNET_NO);
434 refresh_timeout_task (sc);
435 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
436 0,
437 &sqm->query,
438 ntohl (sqm->type),
439 0 /* priority */,
440 GSF_datastore_queue_size,
441 GNUNET_TIME_UNIT_FOREVER_REL,
442 &handle_datastore_reply, sc);
443 if (NULL == sc->qe)
444 {
445 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
446 "Queueing request with datastore failed (queue full?)\n");
447 continue_writing (sc);
448 }
449 return GNUNET_OK;
450}
451
452
453/**
454 * Functions of this type are called upon new cadet connection from other peers.
455 *
456 * @param cls the closure from GNUNET_CADET_connect
457 * @param channel the channel representing the cadet
458 * @param initiator the identity of the peer who wants to establish a cadet
459 * with us; NULL on binding error
460 * @param port cadet port used for the incoming connection
461 * @param options channel option flags
462 * @return initial channel context (our 'struct CadetClient')
463 */
464static void *
465accept_cb (void *cls,
466 struct GNUNET_CADET_Channel *channel,
467 const struct GNUNET_PeerIdentity *initiator,
468 uint32_t port, enum GNUNET_CADET_ChannelOption options)
469{
470 struct CadetClient *sc;
471
472 GNUNET_assert (NULL != channel);
473 if (sc_count >= sc_count_max)
474 {
475 GNUNET_STATISTICS_update (GSF_stats,
476 gettext_noop ("# cadet client connections rejected"), 1,
477 GNUNET_NO);
478 GNUNET_CADET_channel_destroy (channel);
479 return NULL;
480 }
481 GNUNET_STATISTICS_update (GSF_stats,
482 gettext_noop ("# cadet connections active"), 1,
483 GNUNET_NO);
484 sc = GNUNET_new (struct CadetClient);
485 sc->channel = channel;
486 GNUNET_CONTAINER_DLL_insert (sc_head,
487 sc_tail,
488 sc);
489 sc_count++;
490 refresh_timeout_task (sc);
491 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
492 "Accepting inbound cadet connection from `%s' as client %p\n",
493 GNUNET_i2s (initiator),
494 sc);
495 return sc;
496}
497
498
499/**
500 * Function called by cadet when a client disconnects.
501 * Cleans up our 'struct CadetClient' of that channel.
502 *
503 * @param cls NULL
504 * @param channel channel of the disconnecting client
505 * @param channel_ctx our 'struct CadetClient'
506 */
507static void
508cleaner_cb (void *cls,
509 const struct GNUNET_CADET_Channel *channel,
510 void *channel_ctx)
511{
512 struct CadetClient *sc = channel_ctx;
513 struct WriteQueueItem *wqi;
514
515 if (NULL == sc)
516 return;
517 sc->channel = NULL;
518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519 "Terminating cadet connection with client %p\n",
520 sc);
521 GNUNET_STATISTICS_update (GSF_stats,
522 gettext_noop ("# cadet connections active"), -1,
523 GNUNET_NO);
524 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
525 GNUNET_SCHEDULER_cancel (sc->terminate_task);
526 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
527 GNUNET_SCHEDULER_cancel (sc->timeout_task);
528 if (NULL != sc->wh)
529 GNUNET_CADET_notify_transmit_ready_cancel (sc->wh);
530 if (NULL != sc->qe)
531 GNUNET_DATASTORE_cancel (sc->qe);
532 while (NULL != (wqi = sc->wqi_head))
533 {
534 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
535 sc->wqi_tail,
536 wqi);
537 GNUNET_free (wqi);
538 }
539 GNUNET_CONTAINER_DLL_remove (sc_head,
540 sc_tail,
541 sc);
542 sc_count--;
543 GNUNET_free (sc);
544}
545
546
547/**
548 * Initialize subsystem for non-anonymous file-sharing.
549 */
550void
551GSF_cadet_start_server ()
552{
553 static const struct GNUNET_CADET_MessageHandler handlers[] = {
554 { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)},
555 { NULL, 0, 0 }
556 };
557 static const uint32_t ports[] = {
558 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
559 0
560 };
561
562 if (GNUNET_YES !=
563 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
564 "fs",
565 "MAX_CADET_CLIENTS",
566 &sc_count_max))
567 return;
568 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
569 "Initializing cadet FS server with a limit of %llu connections\n",
570 sc_count_max);
571 listen_channel = GNUNET_CADET_connect (GSF_cfg,
572 NULL,
573 &accept_cb,
574 &cleaner_cb,
575 handlers,
576 ports);
577}
578
579
580/**
581 * Shutdown subsystem for non-anonymous file-sharing.
582 */
583void
584GSF_cadet_stop_server ()
585{
586 if (NULL != listen_channel)
587 {
588 GNUNET_CADET_disconnect (listen_channel);
589 listen_channel = NULL;
590 }
591 GNUNET_assert (NULL == sc_head);
592 GNUNET_assert (0 == sc_count);
593}
594
595/* end of gnunet-service-fs_cadet.c */