aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cadet_server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_cadet_server.c')
-rw-r--r--src/fs/gnunet-service-fs_cadet_server.c546
1 files changed, 0 insertions, 546 deletions
diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c
deleted file mode 100644
index 395842ebb..000000000
--- a/src/fs/gnunet-service-fs_cadet_server.c
+++ /dev/null
@@ -1,546 +0,0 @@
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21/**
22 * @file fs/gnunet-service-fs_cadet_server.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
25 *
26 * TODO:
27 * - PORT is set to old application type, unsure if we should keep
28 * it that way (fine for now)
29 */
30#include "platform.h"
31#include "gnunet_constants.h"
32#include "gnunet_util_lib.h"
33#include "gnunet_cadet_service.h"
34#include "gnunet_protocols.h"
35#include "gnunet_applications.h"
36#include "gnunet-service-fs.h"
37#include "gnunet-service-fs_indexing.h"
38#include "gnunet-service-fs_cadet.h"
39
40/**
41 * After how long do we termiante idle connections?
42 */
43#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
44
45
46/**
47 * A message in the queue to be written to the cadet.
48 */
49struct WriteQueueItem
50{
51 /**
52 * Kept in a DLL.
53 */
54 struct WriteQueueItem *next;
55
56 /**
57 * Kept in a DLL.
58 */
59 struct WriteQueueItem *prev;
60
61 /**
62 * Number of bytes of payload, allocated at the end of this struct.
63 */
64 size_t msize;
65};
66
67
68/**
69 * Information we keep around for each active cadeting client.
70 */
71struct CadetClient
72{
73 /**
74 * DLL
75 */
76 struct CadetClient *next;
77
78 /**
79 * DLL
80 */
81 struct CadetClient *prev;
82
83 /**
84 * Channel for communication.
85 */
86 struct GNUNET_CADET_Channel *channel;
87
88 /**
89 * Head of write queue.
90 */
91 struct WriteQueueItem *wqi_head;
92
93 /**
94 * Tail of write queue.
95 */
96 struct WriteQueueItem *wqi_tail;
97
98 /**
99 * Current active request to the datastore, if we have one pending.
100 */
101 struct GNUNET_DATASTORE_QueueEntry *qe;
102
103 /**
104 * Task that is scheduled to asynchronously terminate the connection.
105 */
106 struct GNUNET_SCHEDULER_Task *terminate_task;
107
108 /**
109 * Task that is scheduled to terminate idle connections.
110 */
111 struct GNUNET_SCHEDULER_Task *timeout_task;
112
113 /**
114 * Size of the last write that was initiated.
115 */
116 size_t reply_size;
117};
118
119
120/**
121 * Listen port for incoming requests.
122 */
123static struct GNUNET_CADET_Port *cadet_port;
124
125/**
126 * Head of DLL of cadet clients.
127 */
128static struct CadetClient *sc_head;
129
130/**
131 * Tail of DLL of cadet clients.
132 */
133static struct CadetClient *sc_tail;
134
135/**
136 * Number of active cadet clients in the 'sc_*'-DLL.
137 */
138static unsigned int sc_count;
139
140/**
141 * Maximum allowed number of cadet clients.
142 */
143static unsigned long long sc_count_max;
144
145
146/**
147 * Task run to asynchronously terminate the cadet due to timeout.
148 *
149 * @param cls the 'struct CadetClient'
150 */
151static void
152timeout_cadet_task (void *cls)
153{
154 struct CadetClient *sc = cls;
155 struct GNUNET_CADET_Channel *tun;
156
157 sc->timeout_task = NULL;
158 tun = sc->channel;
159 sc->channel = NULL;
160 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
161 "Timeout for inactive cadet client %p\n",
162 sc);
163 GNUNET_CADET_channel_destroy (tun);
164}
165
166
167/**
168 * Reset the timeout for the cadet client (due to activity).
169 *
170 * @param sc client handle to reset timeout for
171 */
172static void
173refresh_timeout_task (struct CadetClient *sc)
174{
175 if (NULL != sc->timeout_task)
176 GNUNET_SCHEDULER_cancel (sc->timeout_task);
177 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
178 &timeout_cadet_task,
179 sc);
180}
181
182
183/**
184 * Check if we are done with the write queue, and if so tell CADET
185 * that we are ready to read more.
186 *
187 * @param cls where to process the write queue
188 */
189static void
190continue_writing (void *cls)
191{
192 struct CadetClient *sc = cls;
193 struct GNUNET_MQ_Handle *mq;
194
195 mq = GNUNET_CADET_get_mq (sc->channel);
196 if (0 != GNUNET_MQ_get_length (mq))
197 {
198 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
199 "Write pending, waiting for it to complete\n");
200 return;
201 }
202 refresh_timeout_task (sc);
203 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
204 "Finished processing cadet request from client %p, ready to receive the next one\n",
205 sc);
206 GNUNET_CADET_receive_done (sc->channel);
207}
208
209
210/**
211 * Process a datum that was stored in the datastore.
212 *
213 * @param cls closure with the `struct CadetClient` which sent the query
214 * @param key key for the content
215 * @param size number of bytes in @a data
216 * @param data content stored
217 * @param type type of the content
218 * @param priority priority of the content
219 * @param anonymity anonymity-level for the content
220 * @param replication replication-level for the content
221 * @param expiration expiration time for the content
222 * @param uid unique identifier for the datum;
223 * maybe 0 if no unique identifier is available
224 */
225static void
226handle_datastore_reply (void *cls,
227 const struct GNUNET_HashCode *key,
228 size_t size,
229 const void *data,
230 enum GNUNET_BLOCK_Type type,
231 uint32_t priority,
232 uint32_t anonymity,
233 uint32_t replication,
234 struct GNUNET_TIME_Absolute expiration,
235 uint64_t uid)
236{
237 struct CadetClient *sc = cls;
238 size_t msize = size + sizeof(struct CadetReplyMessage);
239 struct GNUNET_MQ_Envelope *env;
240 struct CadetReplyMessage *srm;
241
242 sc->qe = NULL;
243 if (NULL == data)
244 {
245 /* no result, this should not really happen, as for
246 non-anonymous routing only peers that HAVE the
247 answers should be queried; OTOH, this is not a
248 hard error as we might have had the answer in the
249 past and the user might have unindexed it. Hence
250 we log at level "INFO" for now. */if (NULL == key)
251 {
252 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
253 "Have no answer and the query was NULL\n");
254 }
255 else
256 {
257 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
258 "Have no answer for query `%s'\n",
259 GNUNET_h2s (key));
260 }
261 GNUNET_STATISTICS_update (GSF_stats,
262 gettext_noop (
263 "# queries received via CADET not answered"),
264 1,
265 GNUNET_NO);
266 continue_writing (sc);
267 return;
268 }
269 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
270 {
271 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
272 "Performing on-demand encoding for query %s\n",
273 GNUNET_h2s (key));
274 if (GNUNET_OK !=
275 GNUNET_FS_handle_on_demand_block (key,
276 size,
277 data,
278 type,
279 priority,
280 anonymity,
281 replication,
282 expiration,
283 uid,
284 &handle_datastore_reply,
285 sc))
286 {
287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
288 "On-demand encoding request failed\n");
289 continue_writing (sc);
290 }
291 return;
292 }
293 if (msize > GNUNET_MAX_MESSAGE_SIZE)
294 {
295 GNUNET_break (0);
296 continue_writing (sc);
297 return;
298 }
299 GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
300 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
301 "Starting transmission of %u byte reply of type %d for query `%s' via cadet to %p\n",
302 (unsigned int) size,
303 (unsigned int) type,
304 GNUNET_h2s (key),
305 sc);
306 env = GNUNET_MQ_msg_extra (srm,
307 size,
308 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
309 srm->type = htonl (type);
310 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
311 GNUNET_memcpy (&srm[1],
312 data,
313 size);
314 GNUNET_MQ_notify_sent (env,
315 &continue_writing,
316 sc);
317 GNUNET_STATISTICS_update (GSF_stats,
318 gettext_noop ("# Blocks transferred via cadet"),
319 1,
320 GNUNET_NO);
321 GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
322 env);
323}
324
325
326/**
327 * Functions with this signature are called whenever a
328 * complete query message is received.
329 *
330 * @param cls closure with the `struct CadetClient`
331 * @param sqm the actual message
332 */
333static void
334handle_request (void *cls,
335 const struct CadetQueryMessage *sqm)
336{
337 struct CadetClient *sc = cls;
338
339 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340 "Received query for `%s' via cadet from client %p\n",
341 GNUNET_h2s (&sqm->query),
342 sc);
343 GNUNET_STATISTICS_update (GSF_stats,
344 gettext_noop ("# queries received via cadet"),
345 1,
346 GNUNET_NO);
347 refresh_timeout_task (sc);
348 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
349 0 /* next_uid */,
350 false /* random */,
351 &sqm->query,
352 ntohl (sqm->type),
353 0 /* priority */,
354 GSF_datastore_queue_size,
355 &handle_datastore_reply,
356 sc);
357 if (NULL == sc->qe)
358 {
359 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
360 "Queueing request with datastore failed (queue full?)\n");
361 continue_writing (sc);
362 }
363}
364
365
366/**
367 * Functions of this type are called upon new cadet connection from other peers.
368 *
369 * @param cls the closure from GNUNET_CADET_connect
370 * @param channel the channel representing the cadet
371 * @param initiator the identity of the peer who wants to establish a cadet
372 * with us; NULL on binding error
373 * @return initial channel context (our `struct CadetClient`)
374 */
375static void *
376connect_cb (void *cls,
377 struct GNUNET_CADET_Channel *channel,
378 const struct GNUNET_PeerIdentity *initiator)
379{
380 struct CadetClient *sc;
381
382 GNUNET_assert (NULL != channel);
383 if (sc_count >= sc_count_max)
384 {
385 GNUNET_STATISTICS_update (GSF_stats,
386 gettext_noop (
387 "# cadet client connections rejected"),
388 1,
389 GNUNET_NO);
390 GNUNET_CADET_channel_destroy (channel);
391 return NULL;
392 }
393 GNUNET_STATISTICS_update (GSF_stats,
394 gettext_noop ("# cadet connections active"),
395 1,
396 GNUNET_NO);
397 sc = GNUNET_new (struct CadetClient);
398 sc->channel = channel;
399 GNUNET_CONTAINER_DLL_insert (sc_head,
400 sc_tail,
401 sc);
402 sc_count++;
403 refresh_timeout_task (sc);
404 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
405 "Accepting inbound cadet connection from `%s' as client %p\n",
406 GNUNET_i2s (initiator),
407 sc);
408 return sc;
409}
410
411
412/**
413 * Function called by cadet when a client disconnects.
414 * Cleans up our `struct CadetClient` of that channel.
415 *
416 * @param cls our `struct CadetClient`
417 * @param channel channel of the disconnecting client
418 * @param channel_ctx
419 */
420static void
421disconnect_cb (void *cls,
422 const struct GNUNET_CADET_Channel *channel)
423{
424 struct CadetClient *sc = cls;
425 struct WriteQueueItem *wqi;
426
427 if (NULL == sc)
428 return;
429 sc->channel = NULL;
430 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
431 "Terminating cadet connection with client %p\n",
432 sc);
433 GNUNET_STATISTICS_update (GSF_stats,
434 gettext_noop ("# cadet connections active"), -1,
435 GNUNET_NO);
436 if (NULL != sc->terminate_task)
437 GNUNET_SCHEDULER_cancel (sc->terminate_task);
438 if (NULL != sc->timeout_task)
439 GNUNET_SCHEDULER_cancel (sc->timeout_task);
440 if (NULL != sc->qe)
441 GNUNET_DATASTORE_cancel (sc->qe);
442 while (NULL != (wqi = sc->wqi_head))
443 {
444 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
445 sc->wqi_tail,
446 wqi);
447 GNUNET_free (wqi);
448 }
449 GNUNET_CONTAINER_DLL_remove (sc_head,
450 sc_tail,
451 sc);
452 sc_count--;
453 GNUNET_free (sc);
454}
455
456
457/**
458 * Function called whenever an MQ-channel's transmission window size changes.
459 *
460 * The first callback in an outgoing channel will be with a non-zero value
461 * and will mean the channel is connected to the destination.
462 *
463 * For an incoming channel it will be called immediately after the
464 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
465 *
466 * @param cls Channel closure.
467 * @param channel Connection to the other end (henceforth invalid).
468 * @param window_size New window size. If the is more messages than buffer size
469 * this value will be negative..
470 */
471static void
472window_change_cb (void *cls,
473 const struct GNUNET_CADET_Channel *channel,
474 int window_size)
475{
476 /* FIXME: could do flow control here... */
477}
478
479
480/**
481 * Initialize subsystem for non-anonymous file-sharing.
482 */
483void
484GSF_cadet_start_server ()
485{
486 struct GNUNET_MQ_MessageHandler handlers[] = {
487 GNUNET_MQ_hd_fixed_size (request,
488 GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
489 struct CadetQueryMessage,
490 NULL),
491 GNUNET_MQ_handler_end ()
492 };
493 struct GNUNET_HashCode port;
494
495 if (GNUNET_YES !=
496 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
497 "fs",
498 "MAX_CADET_CLIENTS",
499 &sc_count_max))
500 return;
501 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
502 "Initializing cadet FS server with a limit of %llu connections\n",
503 sc_count_max);
504 cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
505 cadet_handle = GNUNET_CADET_connect (GSF_cfg);
506 GNUNET_assert (NULL != cadet_handle);
507 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
508 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
509 &port);
510 cadet_port = GNUNET_CADET_open_port (cadet_handle,
511 &port,
512 &connect_cb,
513 NULL,
514 &window_change_cb,
515 &disconnect_cb,
516 handlers);
517}
518
519
520/**
521 * Shutdown subsystem for non-anonymous file-sharing.
522 */
523void
524GSF_cadet_stop_server ()
525{
526 GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
527 &GSF_cadet_release_clients,
528 NULL);
529 GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
530 cadet_map = NULL;
531 if (NULL != cadet_port)
532 {
533 GNUNET_CADET_close_port (cadet_port);
534 cadet_port = NULL;
535 }
536 if (NULL != cadet_handle)
537 {
538 GNUNET_CADET_disconnect (cadet_handle);
539 cadet_handle = NULL;
540 }
541 GNUNET_assert (NULL == sc_head);
542 GNUNET_assert (0 == sc_count);
543}
544
545
546/* end of gnunet-service-fs_cadet.c */