aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2013-07-18 11:06:23 +0000
committerChristian Grothoff <christian@grothoff.org>2013-07-18 11:06:23 +0000
commit943f9aa9278e67ed95daa5a85491868067523a95 (patch)
treebdd22800cbc893ad0e25bdefa60efc7e58287d97 /src/fs
parent3f5dc2ddcfc59ca54f18a3688fa1f8aa89340ce6 (diff)
downloadgnunet-943f9aa9278e67ed95daa5a85491868067523a95.tar.gz
gnunet-943f9aa9278e67ed95daa5a85491868067523a95.zip
-splitting mesh into server and client parts
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/Makefile.am3
-rw-r--r--src/fs/fs.conf.in2
-rw-r--r--src/fs/gnunet-service-fs.c6
-rw-r--r--src/fs/gnunet-service-fs_mesh.c1245
-rw-r--r--src/fs/gnunet-service-fs_mesh.h72
-rw-r--r--src/fs/gnunet-service-fs_mesh_server.c589
-rw-r--r--src/fs/test_gnunet_service_fs_p2p_mesh.conf4
7 files changed, 670 insertions, 1251 deletions
diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am
index a0d88051a..e8928f770 100644
--- a/src/fs/Makefile.am
+++ b/src/fs/Makefile.am
@@ -190,7 +190,8 @@ gnunet_service_fs_SOURCES = \
190 gnunet-service-fs_pr.c gnunet-service-fs_pr.h \ 190 gnunet-service-fs_pr.c gnunet-service-fs_pr.h \
191 gnunet-service-fs_push.c gnunet-service-fs_push.h \ 191 gnunet-service-fs_push.c gnunet-service-fs_push.h \
192 gnunet-service-fs_put.c gnunet-service-fs_put.h \ 192 gnunet-service-fs_put.c gnunet-service-fs_put.h \
193 gnunet-service-fs_mesh.c gnunet-service-fs_mesh.h 193 gnunet-service-fs_mesh_client.c gnunet-service-fs_mesh.h \
194 gnunet-service-fs_mesh_server.c
194gnunet_service_fs_LDADD = \ 195gnunet_service_fs_LDADD = \
195 $(top_builddir)/src/fs/libgnunetfs.la \ 196 $(top_builddir)/src/fs/libgnunetfs.la \
196 $(top_builddir)/src/dht/libgnunetdht.la \ 197 $(top_builddir)/src/dht/libgnunetdht.la \
diff --git a/src/fs/fs.conf.in b/src/fs/fs.conf.in
index f45a78fc4..c9748a73a 100644
--- a/src/fs/fs.conf.in
+++ b/src/fs/fs.conf.in
@@ -60,7 +60,7 @@ DISABLE_ANON_TRANSFER = NO
60# some reasonable level. And if we have a very, very large 60# some reasonable level. And if we have a very, very large
61# number, we probably won't have enough bandwidth to suppor them 61# number, we probably won't have enough bandwidth to suppor them
62# well anyway, so better have a moderate cap. 62# well anyway, so better have a moderate cap.
63MAX_STREAM_CLIENTS = 128 63MAX_MESH_CLIENTS = 128
64 64
65 65
66[gnunet-auto-share] 66[gnunet-auto-share]
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index 2aac9e165..9074f053e 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -475,7 +475,8 @@ handle_start_search (void *cls, struct GNUNET_SERVER_Client *client,
475static void 475static void
476shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) 476shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
477{ 477{
478 GSF_mesh_stop (); 478 GSF_mesh_stop_client ();
479 GSF_mesh_stop_server ();
479 if (NULL != GSF_core) 480 if (NULL != GSF_core)
480 { 481 {
481 GNUNET_CORE_disconnect (GSF_core); 482 GNUNET_CORE_disconnect (GSF_core);
@@ -646,7 +647,8 @@ main_init (struct GNUNET_SERVER_Handle *server,
646 GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters, 647 GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters,
647 NULL); 648 NULL);
648 datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); 649 datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
649 GSF_mesh_start (); 650 GSF_mesh_start_server ();
651 GSF_mesh_start_client ();
650 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, 652 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
651 NULL); 653 NULL);
652 return GNUNET_OK; 654 return GNUNET_OK;
diff --git a/src/fs/gnunet-service-fs_mesh.c b/src/fs/gnunet-service-fs_mesh.c
deleted file mode 100644
index ff40a2621..000000000
--- a/src/fs/gnunet-service-fs_mesh.c
+++ /dev/null
@@ -1,1245 +0,0 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file fs/gnunet-service-fs_mesh.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
25 *
26 * TODO:
27 * - MESH2 API doesn't allow flow control for server yet (needed!)
28 * - likely need to register clean up handler with mesh to handle
29 * client disconnect (likely leaky right now)
30 * - server is optional, currently client code will NPE if we have
31 * no server, again MESH2 API requirement forcing this for now
32 * - message handlers are symmetric for client/server, should be
33 * separated (currently clients can get requests and servers can
34 * handle answers, not good)
35 * - code is entirely untested
36 * - might have overlooked a few possible simplifications
37 * - PORT is set to old application type, unsure if we should keep
38 * it that way (fine for now)
39 */
40#include "platform.h"
41#include "gnunet_constants.h"
42#include "gnunet_util_lib.h"
43#include "gnunet_mesh_service.h"
44#include "gnunet_protocols.h"
45#include "gnunet_applications.h"
46#include "gnunet-service-fs.h"
47#include "gnunet-service-fs_indexing.h"
48#include "gnunet-service-fs_mesh.h"
49
50/**
51 * After how long do we termiante idle connections?
52 */
53#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
54
55/**
56 * After how long do we reset connections without replies?
57 */
58#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
59
60
61/**
62 * A message in the queue to be written to the mesh.
63 */
64struct WriteQueueItem
65{
66 /**
67 * Kept in a DLL.
68 */
69 struct WriteQueueItem *next;
70
71 /**
72 * Kept in a DLL.
73 */
74 struct WriteQueueItem *prev;
75
76 /**
77 * Number of bytes of payload, allocated at the end of this struct.
78 */
79 size_t msize;
80};
81
82
83/**
84 * Information we keep around for each active meshing client.
85 */
86struct MeshClient
87{
88 /**
89 * DLL
90 */
91 struct MeshClient *next;
92
93 /**
94 * DLL
95 */
96 struct MeshClient *prev;
97
98 /**
99 * Socket for communication.
100 */
101 struct GNUNET_MESH_Tunnel *socket;
102
103 /**
104 * Handle for active write operation, or NULL.
105 */
106 struct GNUNET_MESH_TransmitHandle *wh;
107
108 /**
109 * Head of write queue.
110 */
111 struct WriteQueueItem *wqi_head;
112
113 /**
114 * Tail of write queue.
115 */
116 struct WriteQueueItem *wqi_tail;
117
118 /**
119 * Current active request to the datastore, if we have one pending.
120 */
121 struct GNUNET_DATASTORE_QueueEntry *qe;
122
123 /**
124 * Task that is scheduled to asynchronously terminate the connection.
125 */
126 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
127
128 /**
129 * Task that is scheduled to terminate idle connections.
130 */
131 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
132
133 /**
134 * Size of the last write that was initiated.
135 */
136 size_t reply_size;
137
138};
139
140
141/**
142 * Query from one peer, asking the other for CHK-data.
143 */
144struct MeshQueryMessage
145{
146
147 /**
148 * Type is GNUNET_MESSAGE_TYPE_FS_MESH_QUERY.
149 */
150 struct GNUNET_MessageHeader header;
151
152 /**
153 * Block type must be DBLOCK or IBLOCK.
154 */
155 uint32_t type;
156
157 /**
158 * Query hash from CHK (hash of encrypted block).
159 */
160 struct GNUNET_HashCode query;
161
162};
163
164
165/**
166 * Reply to a MeshQueryMessage.
167 */
168struct MeshReplyMessage
169{
170
171 /**
172 * Type is GNUNET_MESSAGE_TYPE_FS_MESH_REPLY.
173 */
174 struct GNUNET_MessageHeader header;
175
176 /**
177 * Block type must be DBLOCK or IBLOCK.
178 */
179 uint32_t type;
180
181 /**
182 * Expiration time for the block.
183 */
184 struct GNUNET_TIME_AbsoluteNBO expiration;
185
186 /* followed by the encrypted block */
187
188};
189
190
191/**
192 * Handle for a mesh to another peer.
193 */
194struct MeshHandle;
195
196
197/**
198 * Handle for a request that is going out via mesh API.
199 */
200struct GSF_MeshRequest
201{
202
203 /**
204 * DLL.
205 */
206 struct GSF_MeshRequest *next;
207
208 /**
209 * DLL.
210 */
211 struct GSF_MeshRequest *prev;
212
213 /**
214 * Which mesh is this request associated with?
215 */
216 struct MeshHandle *sh;
217
218 /**
219 * Function to call with the result.
220 */
221 GSF_MeshReplyProcessor proc;
222
223 /**
224 * Closure for 'proc'
225 */
226 void *proc_cls;
227
228 /**
229 * Query to transmit to the other peer.
230 */
231 struct GNUNET_HashCode query;
232
233 /**
234 * Desired type for the reply.
235 */
236 enum GNUNET_BLOCK_Type type;
237
238 /**
239 * Did we transmit this request already? YES if we are
240 * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
241 */
242 int was_transmitted;
243};
244
245
246/**
247 * Handle for a mesh to another peer.
248 */
249struct MeshHandle
250{
251 /**
252 * Head of DLL of pending requests on this mesh.
253 */
254 struct GSF_MeshRequest *pending_head;
255
256 /**
257 * Tail of DLL of pending requests on this mesh.
258 */
259 struct GSF_MeshRequest *pending_tail;
260
261 /**
262 * Map from query to 'struct GSF_MeshRequest's waiting for
263 * a reply.
264 */
265 struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
266
267 /**
268 * Connection to the other peer.
269 */
270 struct GNUNET_MESH_Tunnel *mesh;
271
272 /**
273 * Handle for active write operation, or NULL.
274 */
275 struct GNUNET_MESH_TransmitHandle *wh;
276
277 /**
278 * Which peer does this mesh go to?
279 */
280 struct GNUNET_PeerIdentity target;
281
282 /**
283 * Task to kill inactive meshs (we keep them around for
284 * a few seconds to give the application a chance to give
285 * us another query).
286 */
287 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
288
289 /**
290 * Task to reset meshs that had errors (asynchronously,
291 * as we may not be able to do it immediately during a
292 * callback from the mesh API).
293 */
294 GNUNET_SCHEDULER_TaskIdentifier reset_task;
295
296 /**
297 * Is this mesh ready for transmission?
298 */
299 int is_ready;
300
301};
302
303
304/**
305 * Listen socket for incoming requests.
306 */
307static struct GNUNET_MESH_Handle *listen_socket;
308
309/**
310 * Head of DLL of mesh clients.
311 */
312static struct MeshClient *sc_head;
313
314/**
315 * Tail of DLL of mesh clients.
316 */
317static struct MeshClient *sc_tail;
318
319/**
320 * Number of active mesh clients in the 'sc_*'-DLL.
321 */
322static unsigned int sc_count;
323
324/**
325 * Maximum allowed number of mesh clients.
326 */
327static unsigned long long sc_count_max;
328
329/**
330 * Map from peer identities to 'struct MeshHandles' with meshs to
331 * those peers.
332 */
333static struct GNUNET_CONTAINER_MultiHashMap *mesh_map;
334
335
336/* ********************* client-side code ************************* */
337
338/**
339 * Iterator called on each entry in a waiting map to
340 * call the 'proc' continuation and release associated
341 * resources.
342 *
343 * @param cls the 'struct MeshHandle'
344 * @param key the key of the entry in the map (the query)
345 * @param value the 'struct GSF_MeshRequest' to clean up
346 * @return GNUNET_YES (continue to iterate)
347 */
348static int
349free_waiting_entry (void *cls,
350 const struct GNUNET_HashCode *key,
351 void *value)
352{
353 struct GSF_MeshRequest *sr = value;
354
355 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
356 GNUNET_TIME_UNIT_FOREVER_ABS,
357 0, NULL);
358 GSF_mesh_query_cancel (sr);
359 return GNUNET_YES;
360}
361
362
363/**
364 * Destroy a mesh handle.
365 *
366 * @param sh mesh to process
367 */
368static void
369destroy_mesh_handle (struct MeshHandle *sh)
370{
371 struct GSF_MeshRequest *sr;
372
373 while (NULL != (sr = sh->pending_head))
374 {
375 sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
376 GNUNET_TIME_UNIT_FOREVER_ABS,
377 0, NULL);
378 GSF_mesh_query_cancel (sr);
379 }
380 GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
381 &free_waiting_entry,
382 sh);
383 if (NULL != sh->wh)
384 GNUNET_MESH_notify_transmit_ready_cancel (sh->wh);
385 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
386 GNUNET_SCHEDULER_cancel (sh->timeout_task);
387 if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
388 GNUNET_SCHEDULER_cancel (sh->reset_task);
389 GNUNET_MESH_tunnel_destroy (sh->mesh);
390 GNUNET_assert (GNUNET_OK ==
391 GNUNET_CONTAINER_multihashmap_remove (mesh_map,
392 &sh->target.hashPubKey,
393 sh));
394 GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
395 GNUNET_free (sh);
396}
397
398
399/**
400 * Transmit pending requests via the mesh.
401 *
402 * @param sh mesh to process
403 */
404static void
405transmit_pending (struct MeshHandle *sh);
406
407
408/**
409 * Iterator called on each entry in a waiting map to
410 * move it back to the pending list.
411 *
412 * @param cls the 'struct MeshHandle'
413 * @param key the key of the entry in the map (the query)
414 * @param value the 'struct GSF_MeshRequest' to move to pending
415 * @return GNUNET_YES (continue to iterate)
416 */
417static int
418move_to_pending (void *cls,
419 const struct GNUNET_HashCode *key,
420 void *value)
421{
422 struct MeshHandle *sh = cls;
423 struct GSF_MeshRequest *sr = value;
424
425 GNUNET_assert (GNUNET_YES ==
426 GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
427 key,
428 value));
429 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
430 sh->pending_tail,
431 sr);
432 sr->was_transmitted = GNUNET_NO;
433 return GNUNET_YES;
434}
435
436
437/**
438 * We had a serious error, tear down and re-create mesh from scratch.
439 *
440 * @param sh mesh to reset
441 */
442static void
443reset_mesh (struct MeshHandle *sh)
444{
445 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
446 "Resetting mesh to %s\n",
447 GNUNET_i2s (&sh->target));
448 GNUNET_MESH_tunnel_destroy (sh->mesh);
449 sh->is_ready = GNUNET_NO;
450 GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
451 &move_to_pending,
452 sh);
453 sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
454 sh,
455 &sh->target,
456 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
457 GNUNET_YES,
458 GNUNET_YES);
459}
460
461
462/**
463 * Task called when it is time to destroy an inactive mesh.
464 *
465 * @param cls the 'struct MeshHandle' to tear down
466 * @param tc scheduler context, unused
467 */
468static void
469mesh_timeout (void *cls,
470 const struct GNUNET_SCHEDULER_TaskContext *tc)
471{
472 struct MeshHandle *sh = cls;
473
474 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475 "Timeout on mesh to %s\n",
476 GNUNET_i2s (&sh->target));
477 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
478 destroy_mesh_handle (sh);
479}
480
481
482/**
483 * Task called when it is time to reset an mesh.
484 *
485 * @param cls the 'struct MeshHandle' to tear down
486 * @param tc scheduler context, unused
487 */
488static void
489reset_mesh_task (void *cls,
490 const struct GNUNET_SCHEDULER_TaskContext *tc)
491{
492 struct MeshHandle *sh = cls;
493
494 sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
495 reset_mesh (sh);
496}
497
498
499/**
500 * We had a serious error, tear down and re-create mesh from scratch,
501 * but do so asynchronously.
502 *
503 * @param sh mesh to reset
504 */
505static void
506reset_mesh_async (struct MeshHandle *sh)
507{
508 if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
509 GNUNET_SCHEDULER_cancel (sh->reset_task);
510 sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_mesh_task,
511 sh);
512}
513
514
515/**
516 * Functions of this signature are called whenever we are ready to transmit
517 * query via a mesh.
518 *
519 * @param cls the struct MeshHandle for which we did the write call
520 * @param size the number of bytes that can be written to 'buf'
521 * @param buf where to write the message
522 * @return number of bytes written to 'buf'
523 */
524static size_t
525transmit_sqm (void *cls,
526 size_t size,
527 void *buf)
528{
529 struct MeshHandle *sh = cls;
530 struct MeshQueryMessage sqm;
531 struct GSF_MeshRequest *sr;
532
533 sh->wh = NULL;
534 if (NULL == buf)
535 {
536 reset_mesh (sh);
537 return 0;
538 }
539 sr = sh->pending_head;
540 if (NULL == sr)
541 return 0;
542 GNUNET_assert (size >= sizeof (struct MeshQueryMessage));
543 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
544 sh->pending_tail,
545 sr);
546 GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
547 &sr->query,
548 sr,
549 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
550 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
551 "Sending query via mesh to %s\n",
552 GNUNET_i2s (&sh->target));
553 sr->was_transmitted = GNUNET_YES;
554 sqm.header.size = htons (sizeof (sqm));
555 sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_QUERY);
556 sqm.type = htonl (sr->type);
557 sqm.query = sr->query;
558 memcpy (buf, &sqm, sizeof (sqm));
559 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
560 "Successfully transmitted %u bytes via mesh to %s\n",
561 (unsigned int) size,
562 GNUNET_i2s (&sh->target));
563 transmit_pending (sh);
564 return sizeof (sqm);
565}
566
567
568/**
569 * Transmit pending requests via the mesh.
570 *
571 * @param sh mesh to process
572 */
573static void
574transmit_pending (struct MeshHandle *sh)
575{
576 if (NULL != sh->wh)
577 return;
578 sh->wh = GNUNET_MESH_notify_transmit_ready (sh->mesh, GNUNET_YES /* allow cork */,
579 GNUNET_TIME_UNIT_FOREVER_REL,
580 sizeof (struct MeshQueryMessage),
581 &transmit_sqm, sh);
582}
583
584
585/**
586 * Closure for 'handle_reply'.
587 */
588struct HandleReplyClosure
589{
590
591 /**
592 * Reply payload.
593 */
594 const void *data;
595
596 /**
597 * Expiration time for the block.
598 */
599 struct GNUNET_TIME_Absolute expiration;
600
601 /**
602 * Number of bytes in 'data'.
603 */
604 size_t data_size;
605
606 /**
607 * Type of the block.
608 */
609 enum GNUNET_BLOCK_Type type;
610
611 /**
612 * Did we have a matching query?
613 */
614 int found;
615};
616
617
618/**
619 * Iterator called on each entry in a waiting map to
620 * process a result.
621 *
622 * @param cls the 'struct HandleReplyClosure'
623 * @param key the key of the entry in the map (the query)
624 * @param value the 'struct GSF_MeshRequest' to handle result for
625 * @return GNUNET_YES (continue to iterate)
626 */
627static int
628handle_reply (void *cls,
629 const struct GNUNET_HashCode *key,
630 void *value)
631{
632 struct HandleReplyClosure *hrc = cls;
633 struct GSF_MeshRequest *sr = value;
634
635 sr->proc (sr->proc_cls,
636 hrc->type,
637 hrc->expiration,
638 hrc->data_size,
639 hrc->data);
640 GSF_mesh_query_cancel (sr);
641 hrc->found = GNUNET_YES;
642 return GNUNET_YES;
643}
644
645
646/**
647 * Functions with this signature are called whenever a
648 * complete reply is received.
649 *
650 * @param cls closure with the 'struct MeshHandle'
651 * @param tunnel tunnel handle
652 * @param tunnel_ctx tunnel context
653 * @param message the actual message
654 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
655 */
656static int
657reply_cb (void *cls,
658 struct GNUNET_MESH_Tunnel *tunnel,
659 void **tunnel_ctx,
660 const struct GNUNET_MessageHeader *message)
661{
662 struct MeshHandle *sh = *tunnel_ctx;
663 const struct MeshReplyMessage *srm;
664 struct HandleReplyClosure hrc;
665 uint16_t msize;
666 enum GNUNET_BLOCK_Type type;
667 struct GNUNET_HashCode query;
668
669 msize = ntohs (message->size);
670 if (sizeof (struct MeshReplyMessage) > msize)
671 {
672 GNUNET_break_op (0);
673 reset_mesh_async (sh);
674 return GNUNET_SYSERR;
675 }
676 srm = (const struct MeshReplyMessage *) message;
677 msize -= sizeof (struct MeshReplyMessage);
678 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
679 if (GNUNET_YES !=
680 GNUNET_BLOCK_get_key (GSF_block_ctx,
681 type,
682 &srm[1], msize, &query))
683 {
684 GNUNET_break_op (0);
685 reset_mesh_async (sh);
686 return GNUNET_SYSERR;
687 }
688 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
689 "Received reply `%s' via mesh\n",
690 GNUNET_h2s (&query));
691 GNUNET_STATISTICS_update (GSF_stats,
692 gettext_noop ("# replies received via mesh"), 1,
693 GNUNET_NO);
694 hrc.data = &srm[1];
695 hrc.data_size = msize;
696 hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
697 hrc.type = type;
698 hrc.found = GNUNET_NO;
699 GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
700 &query,
701 &handle_reply,
702 &hrc);
703 if (GNUNET_NO == hrc.found)
704 {
705 GNUNET_STATISTICS_update (GSF_stats,
706 gettext_noop ("# replies received via mesh dropped"), 1,
707 GNUNET_NO);
708 return GNUNET_OK;
709 }
710 return GNUNET_OK;
711}
712
713
714/**
715 * Get (or create) a mesh to talk to the given peer.
716 *
717 * @param target peer we want to communicate with
718 */
719static struct MeshHandle *
720get_mesh (const struct GNUNET_PeerIdentity *target)
721{
722 struct MeshHandle *sh;
723
724 sh = GNUNET_CONTAINER_multihashmap_get (mesh_map,
725 &target->hashPubKey);
726 if (NULL != sh)
727 {
728 if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
729 {
730 GNUNET_SCHEDULER_cancel (sh->timeout_task);
731 sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
732 }
733 return sh;
734 }
735 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
736 "Creating mesh to %s\n",
737 GNUNET_i2s (target));
738 sh = GNUNET_malloc (sizeof (struct MeshHandle));
739 sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
740 &reset_mesh_task,
741 sh);
742 sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
743 sh->target = *target;
744 sh->mesh = GNUNET_MESH_tunnel_create (listen_socket,
745 sh,
746 &sh->target,
747 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
748 GNUNET_NO,
749 GNUNET_YES);
750 GNUNET_assert (GNUNET_OK ==
751 GNUNET_CONTAINER_multihashmap_put (mesh_map,
752 &sh->target.hashPubKey,
753 sh,
754 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
755 return sh;
756}
757
758
759/**
760 * Look for a block by directly contacting a particular peer.
761 *
762 * @param target peer that should have the block
763 * @param query hash to query for the block
764 * @param type desired type for the block
765 * @param proc function to call with result
766 * @param proc_cls closure for 'proc'
767 * @return handle to cancel the operation
768 */
769struct GSF_MeshRequest *
770GSF_mesh_query (const struct GNUNET_PeerIdentity *target,
771 const struct GNUNET_HashCode *query,
772 enum GNUNET_BLOCK_Type type,
773 GSF_MeshReplyProcessor proc, void *proc_cls)
774{
775 struct MeshHandle *sh;
776 struct GSF_MeshRequest *sr;
777
778 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
779 "Preparing to send query for %s via mesh to %s\n",
780 GNUNET_h2s (query),
781 GNUNET_i2s (target));
782 sh = get_mesh (target);
783 sr = GNUNET_malloc (sizeof (struct GSF_MeshRequest));
784 sr->sh = sh;
785 sr->proc = proc;
786 sr->proc_cls = proc_cls;
787 sr->type = type;
788 sr->query = *query;
789 GNUNET_CONTAINER_DLL_insert (sh->pending_head,
790 sh->pending_tail,
791 sr);
792 if (GNUNET_YES == sh->is_ready)
793 transmit_pending (sh);
794 return sr;
795}
796
797
798/**
799 * Cancel an active request; must not be called after 'proc'
800 * was calld.
801 *
802 * @param sr request to cancel
803 */
804void
805GSF_mesh_query_cancel (struct GSF_MeshRequest *sr)
806{
807 struct MeshHandle *sh = sr->sh;
808
809 if (GNUNET_YES == sr->was_transmitted)
810 GNUNET_assert (GNUNET_OK ==
811 GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
812 &sr->query,
813 sr));
814 else
815 GNUNET_CONTAINER_DLL_remove (sh->pending_head,
816 sh->pending_tail,
817 sr);
818 GNUNET_free (sr);
819 if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
820 (NULL == sh->pending_head) )
821 sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
822 &mesh_timeout,
823 sh);
824}
825
826
827/* ********************* server-side code ************************* */
828
829
830/**
831 * We're done with a particular client, clean up.
832 *
833 * @param sc client to clean up
834 */
835static void
836terminate_mesh (struct MeshClient *sc)
837{
838 GNUNET_STATISTICS_update (GSF_stats,
839 gettext_noop ("# mesh connections active"), -1,
840 GNUNET_NO);
841 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
842 GNUNET_SCHEDULER_cancel (sc->terminate_task);
843 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
844 GNUNET_SCHEDULER_cancel (sc->timeout_task);
845 if (NULL != sc->wh)
846 GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
847 if (NULL != sc->qe)
848 GNUNET_DATASTORE_cancel (sc->qe);
849 GNUNET_MESH_tunnel_destroy (sc->socket);
850 struct WriteQueueItem *wqi;
851 while (NULL != (wqi = sc->wqi_head))
852 {
853 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
854 sc->wqi_tail,
855 wqi);
856 GNUNET_free (wqi);
857 }
858 GNUNET_CONTAINER_DLL_remove (sc_head,
859 sc_tail,
860 sc);
861 sc_count--;
862 GNUNET_free (sc);
863}
864
865
866/**
867 * Task run to asynchronously terminate the mesh due to timeout.
868 *
869 * @param cls the 'struct MeshClient'
870 * @param tc scheduler context
871 */
872static void
873timeout_mesh_task (void *cls,
874 const struct GNUNET_SCHEDULER_TaskContext *tc)
875{
876 struct MeshClient *sc = cls;
877
878 sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
879 terminate_mesh (sc);
880}
881
882
883/**
884 * Reset the timeout for the mesh client (due to activity).
885 *
886 * @param sc client handle to reset timeout for
887 */
888static void
889refresh_timeout_task (struct MeshClient *sc)
890{
891 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
892 GNUNET_SCHEDULER_cancel (sc->timeout_task);
893 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
894 &timeout_mesh_task,
895 sc);
896}
897
898
899/**
900 * We're done handling a request from a client, read the next one.
901 *
902 * @param sc client to continue reading requests from
903 */
904static void
905continue_reading (struct MeshClient *sc)
906{
907 refresh_timeout_task (sc);
908}
909
910
911/**
912 * Transmit the next entry from the write queue.
913 *
914 * @param sc where to process the write queue
915 */
916static void
917continue_writing (struct MeshClient *sc);
918
919
920/**
921 * Send a reply now, mesh is ready.
922 *
923 * @param cls closure with the struct MeshClient which sent the query
924 * @param size number of bytes available in 'buf'
925 * @param buf where to write the message
926 * @return number of bytes written to 'buf'
927 */
928static size_t
929write_continuation (void *cls,
930 size_t size,
931 void *buf)
932{
933 struct MeshClient *sc = cls;
934 struct WriteQueueItem *wqi;
935 size_t ret;
936
937 sc->wh = NULL;
938 if (NULL == (wqi = sc->wqi_head))
939 {
940 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941 "Write queue empty, reading more requests\n");
942 return 0;
943 }
944 if (0 == size)
945 {
946 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
947 "Transmission of reply failed, terminating mesh\n");
948 terminate_mesh (sc);
949 return 0;
950 }
951 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
952 sc->wqi_tail,
953 wqi);
954 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
955 "Transmitted %u byte reply via mesh\n",
956 (unsigned int) size);
957 GNUNET_STATISTICS_update (GSF_stats,
958 gettext_noop ("# Blocks transferred via mesh"), 1,
959 GNUNET_NO);
960 memcpy (buf, &wqi[1], ret = wqi->msize);
961 GNUNET_free (wqi);
962 continue_writing (sc);
963 return ret;
964}
965
966
967/**
968 * Transmit the next entry from the write queue.
969 *
970 * @param sc where to process the write queue
971 */
972static void
973continue_writing (struct MeshClient *sc)
974{
975 struct WriteQueueItem *wqi;
976
977 if (NULL != sc->wh)
978 {
979 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980 "Write pending, waiting for it to complete\n");
981 return; /* write already pending */
982 }
983 if (NULL == (wqi = sc->wqi_head))
984 {
985 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
986 "Write queue empty, reading more requests\n");
987 continue_reading (sc);
988 return;
989 }
990 sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
991 GNUNET_TIME_UNIT_FOREVER_REL,
992 wqi->msize,
993 &write_continuation,
994 sc);
995 if (NULL == sc->wh)
996 {
997 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998 "Write failed; terminating mesh\n");
999 terminate_mesh (sc);
1000 return;
1001 }
1002}
1003
1004
1005/**
1006 * Process a datum that was stored in the datastore.
1007 *
1008 * @param cls closure with the struct MeshClient which sent the query
1009 * @param key key for the content
1010 * @param size number of bytes in data
1011 * @param data content stored
1012 * @param type type of the content
1013 * @param priority priority of the content
1014 * @param anonymity anonymity-level for the content
1015 * @param expiration expiration time for the content
1016 * @param uid unique identifier for the datum;
1017 * maybe 0 if no unique identifier is available
1018 */
1019static void
1020handle_datastore_reply (void *cls,
1021 const struct GNUNET_HashCode * key,
1022 size_t size, const void *data,
1023 enum GNUNET_BLOCK_Type type,
1024 uint32_t priority,
1025 uint32_t anonymity,
1026 struct GNUNET_TIME_Absolute
1027 expiration, uint64_t uid)
1028{
1029 struct MeshClient *sc = cls;
1030 size_t msize = size + sizeof (struct MeshReplyMessage);
1031 struct WriteQueueItem *wqi;
1032 struct MeshReplyMessage *srm;
1033
1034 sc->qe = NULL;
1035 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1036 {
1037 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038 "Performing on-demand encoding\n");
1039 if (GNUNET_OK !=
1040 GNUNET_FS_handle_on_demand_block (key,
1041 size, data, type,
1042 priority, anonymity,
1043 expiration, uid,
1044 &handle_datastore_reply,
1045 sc))
1046 {
1047 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1048 "On-demand encoding request failed\n");
1049 continue_writing (sc);
1050 }
1051 return;
1052 }
1053 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
1054 {
1055 GNUNET_break (0);
1056 continue_writing (sc);
1057 return;
1058 }
1059 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060 "Starting transmission of %u byte reply for query `%s' via mesh\n",
1061 (unsigned int) size,
1062 GNUNET_h2s (key));
1063 wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
1064 wqi->msize = msize;
1065 srm = (struct MeshReplyMessage *) &wqi[1];
1066 srm->header.size = htons ((uint16_t) msize);
1067 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY);
1068 srm->type = htonl (type);
1069 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
1070 memcpy (&srm[1], data, size);
1071 sc->reply_size = msize;
1072 GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
1073 sc->wqi_tail,
1074 wqi);
1075 continue_writing (sc);
1076}
1077
1078
1079/**
1080 * Functions with this signature are called whenever a
1081 * complete query message is received.
1082 *
1083 * Do not call GNUNET_SERVER_mst_destroy in callback
1084 *
1085 * @param cls closure with the 'struct MeshClient'
1086 * @param tunnel tunnel handle
1087 * @param tunnel_ctx tunnel context
1088 * @param message the actual message
1089 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
1090 */
1091static int
1092request_cb (void *cls,
1093 struct GNUNET_MESH_Tunnel *tunnel,
1094 void **tunnel_ctx,
1095 const struct GNUNET_MessageHeader *message)
1096{
1097 struct MeshClient *sc = *tunnel_ctx;
1098 const struct MeshQueryMessage *sqm;
1099
1100 sqm = (const struct MeshQueryMessage *) message;
1101 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1102 "Received query for `%s' via mesh\n",
1103 GNUNET_h2s (&sqm->query));
1104 GNUNET_STATISTICS_update (GSF_stats,
1105 gettext_noop ("# queries received via mesh"), 1,
1106 GNUNET_NO);
1107 refresh_timeout_task (sc);
1108 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1109 0,
1110 &sqm->query,
1111 ntohl (sqm->type),
1112 0 /* priority */,
1113 GSF_datastore_queue_size,
1114 GNUNET_TIME_UNIT_FOREVER_REL,
1115 &handle_datastore_reply, sc);
1116 if (NULL == sc->qe)
1117 {
1118 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1119 "Queueing request with datastore failed (queue full?)\n");
1120 continue_writing (sc);
1121 }
1122 return GNUNET_OK;
1123}
1124
1125
1126/**
1127 * Functions of this type are called upon new mesh connection from other peers.
1128 *
1129 * @param cls the closure from GNUNET_MESH_connect
1130 * @param socket the socket representing the mesh
1131 * @param initiator the identity of the peer who wants to establish a mesh
1132 * with us; NULL on binding error
1133 * @param port mesh port used for the incoming connection
1134 * @return initial tunnel context (our 'struct MeshClient')
1135 */
1136static void *
1137accept_cb (void *cls,
1138 struct GNUNET_MESH_Tunnel *socket,
1139 const struct GNUNET_PeerIdentity *initiator,
1140 uint32_t port)
1141{
1142 struct MeshClient *sc;
1143
1144 GNUNET_assert (NULL != socket);
1145 if (sc_count >= sc_count_max)
1146 {
1147 GNUNET_STATISTICS_update (GSF_stats,
1148 gettext_noop ("# mesh client connections rejected"), 1,
1149 GNUNET_NO);
1150 GNUNET_MESH_tunnel_destroy (socket);
1151 return NULL;
1152 }
1153 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154 "Accepting inbound mesh connection from `%s'\n",
1155 GNUNET_i2s (initiator));
1156 GNUNET_STATISTICS_update (GSF_stats,
1157 gettext_noop ("# mesh connections active"), 1,
1158 GNUNET_NO);
1159 sc = GNUNET_malloc (sizeof (struct MeshClient));
1160 sc->socket = socket;
1161 GNUNET_CONTAINER_DLL_insert (sc_head,
1162 sc_tail,
1163 sc);
1164 sc_count++;
1165 refresh_timeout_task (sc);
1166 return sc;
1167}
1168
1169
1170/**
1171 * Initialize subsystem for non-anonymous file-sharing.
1172 */
1173void
1174GSF_mesh_start ()
1175{
1176 static const struct GNUNET_MESH_MessageHandler handlers[] = {
1177 { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct MeshQueryMessage)},
1178 { &reply_cb, GNUNET_MESSAGE_TYPE_FS_MESH_REPLY, 0 },
1179 { NULL, 0, 0 }
1180 };
1181 static const uint32_t ports[] = {
1182 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
1183 0
1184 };
1185
1186 mesh_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
1187 if (GNUNET_YES ==
1188 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1189 "fs",
1190 "MAX_MESH_CLIENTS",
1191 &sc_count_max))
1192 {
1193 listen_socket = GNUNET_MESH_connect (GSF_cfg,
1194 NULL,
1195 &accept_cb,
1196 NULL /* FIXME: have a cleanup callback? */,
1197 handlers,
1198 ports);
1199 }
1200}
1201
1202
1203/**
1204 * Function called on each active meshs to shut them down.
1205 *
1206 * @param cls NULL
1207 * @param key target peer, unused
1208 * @param value the 'struct MeshHandle' to destroy
1209 * @return GNUNET_YES (continue to iterate)
1210 */
1211static int
1212release_meshs (void *cls,
1213 const struct GNUNET_HashCode *key,
1214 void *value)
1215{
1216 struct MeshHandle *sh = value;
1217
1218 destroy_mesh_handle (sh);
1219 return GNUNET_YES;
1220}
1221
1222
1223/**
1224 * Shutdown subsystem for non-anonymous file-sharing.
1225 */
1226void
1227GSF_mesh_stop ()
1228{
1229 struct MeshClient *sc;
1230
1231 while (NULL != (sc = sc_head))
1232 terminate_mesh (sc);
1233 GNUNET_CONTAINER_multihashmap_iterate (mesh_map,
1234 &release_meshs,
1235 NULL);
1236 GNUNET_CONTAINER_multihashmap_destroy (mesh_map);
1237 mesh_map = NULL;
1238 if (NULL != listen_socket)
1239 {
1240 GNUNET_MESH_disconnect (listen_socket);
1241 listen_socket = NULL;
1242 }
1243}
1244
1245/* end of gnunet-service-fs_mesh.c */
diff --git a/src/fs/gnunet-service-fs_mesh.h b/src/fs/gnunet-service-fs_mesh.h
index a718ed132..f136940aa 100644
--- a/src/fs/gnunet-service-fs_mesh.h
+++ b/src/fs/gnunet-service-fs_mesh.h
@@ -79,13 +79,81 @@ GSF_mesh_query_cancel (struct GSF_MeshRequest *sr);
79 * Initialize subsystem for non-anonymous file-sharing. 79 * Initialize subsystem for non-anonymous file-sharing.
80 */ 80 */
81void 81void
82GSF_mesh_start (void); 82GSF_mesh_start_server (void);
83 83
84 84
85/** 85/**
86 * Shutdown subsystem for non-anonymous file-sharing. 86 * Shutdown subsystem for non-anonymous file-sharing.
87 */ 87 */
88void 88void
89GSF_mesh_stop (void); 89GSF_mesh_stop_server (void);
90
91/**
92 * Initialize subsystem for non-anonymous file-sharing.
93 */
94void
95GSF_mesh_start_client (void);
96
97
98/**
99 * Shutdown subsystem for non-anonymous file-sharing.
100 */
101void
102GSF_mesh_stop_client (void);
103
104
105GNUNET_NETWORK_STRUCT_BEGIN
106
107/**
108 * Query from one peer, asking the other for CHK-data.
109 */
110struct MeshQueryMessage
111{
112
113 /**
114 * Type is GNUNET_MESSAGE_TYPE_FS_MESH_QUERY.
115 */
116 struct GNUNET_MessageHeader header;
117
118 /**
119 * Block type must be DBLOCK or IBLOCK.
120 */
121 uint32_t type GNUNET_PACKED;
122
123 /**
124 * Query hash from CHK (hash of encrypted block).
125 */
126 struct GNUNET_HashCode query;
127
128};
129
130
131/**
132 * Reply to a MeshQueryMessage.
133 */
134struct MeshReplyMessage
135{
136
137 /**
138 * Type is GNUNET_MESSAGE_TYPE_FS_MESH_REPLY.
139 */
140 struct GNUNET_MessageHeader header;
141
142 /**
143 * Block type must be DBLOCK or IBLOCK.
144 */
145 uint32_t type GNUNET_PACKED;
146
147 /**
148 * Expiration time for the block.
149 */
150 struct GNUNET_TIME_AbsoluteNBO expiration;
151
152 /* followed by the encrypted block */
153
154};
155
156GNUNET_NETWORK_STRUCT_END
157
90 158
91#endif 159#endif
diff --git a/src/fs/gnunet-service-fs_mesh_server.c b/src/fs/gnunet-service-fs_mesh_server.c
new file mode 100644
index 000000000..182408ef0
--- /dev/null
+++ b/src/fs/gnunet-service-fs_mesh_server.c
@@ -0,0 +1,589 @@
1/*
2 This file is part of GNUnet.
3 (C) 2012, 2013 Christian Grothoff (and other contributing authors)
4
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
19*/
20
21/**
22 * @file fs/gnunet-service-fs_mesh.c
23 * @brief non-anonymous file-transfer
24 * @author Christian Grothoff
25 *
26 * TODO:
27 * - MESH2 API doesn't allow flow control for server yet (needed!)
28 * - likely need to register clean up handler with mesh to handle
29 * client disconnect (likely leaky right now)
30 * - server is optional, currently client code will NPE if we have
31 * no server, again MESH2 API requirement forcing this for now
32 * - message handlers are symmetric for client/server, should be
33 * separated (currently clients can get requests and servers can
34 * handle answers, not good)
35 * - code is entirely untested
36 * - might have overlooked a few possible simplifications
37 * - PORT is set to old application type, unsure if we should keep
38 * it that way (fine for now)
39 */
40#include "platform.h"
41#include "gnunet_constants.h"
42#include "gnunet_util_lib.h"
43#include "gnunet_mesh_service.h"
44#include "gnunet_protocols.h"
45#include "gnunet_applications.h"
46#include "gnunet-service-fs.h"
47#include "gnunet-service-fs_indexing.h"
48#include "gnunet-service-fs_mesh.h"
49
50/**
51 * After how long do we termiante idle connections?
52 */
53#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
54
55
56/**
57 * A message in the queue to be written to the mesh.
58 */
59struct WriteQueueItem
60{
61 /**
62 * Kept in a DLL.
63 */
64 struct WriteQueueItem *next;
65
66 /**
67 * Kept in a DLL.
68 */
69 struct WriteQueueItem *prev;
70
71 /**
72 * Number of bytes of payload, allocated at the end of this struct.
73 */
74 size_t msize;
75};
76
77
78/**
79 * Information we keep around for each active meshing client.
80 */
81struct MeshClient
82{
83 /**
84 * DLL
85 */
86 struct MeshClient *next;
87
88 /**
89 * DLL
90 */
91 struct MeshClient *prev;
92
93 /**
94 * Socket for communication.
95 */
96 struct GNUNET_MESH_Tunnel *socket;
97
98 /**
99 * Handle for active write operation, or NULL.
100 */
101 struct GNUNET_MESH_TransmitHandle *wh;
102
103 /**
104 * Head of write queue.
105 */
106 struct WriteQueueItem *wqi_head;
107
108 /**
109 * Tail of write queue.
110 */
111 struct WriteQueueItem *wqi_tail;
112
113 /**
114 * Current active request to the datastore, if we have one pending.
115 */
116 struct GNUNET_DATASTORE_QueueEntry *qe;
117
118 /**
119 * Task that is scheduled to asynchronously terminate the connection.
120 */
121 GNUNET_SCHEDULER_TaskIdentifier terminate_task;
122
123 /**
124 * Task that is scheduled to terminate idle connections.
125 */
126 GNUNET_SCHEDULER_TaskIdentifier timeout_task;
127
128 /**
129 * Size of the last write that was initiated.
130 */
131 size_t reply_size;
132
133};
134
135
136/**
137 * Listen socket for incoming requests.
138 */
139static struct GNUNET_MESH_Handle *listen_socket;
140
141/**
142 * Head of DLL of mesh clients.
143 */
144static struct MeshClient *sc_head;
145
146/**
147 * Tail of DLL of mesh clients.
148 */
149static struct MeshClient *sc_tail;
150
151/**
152 * Number of active mesh clients in the 'sc_*'-DLL.
153 */
154static unsigned int sc_count;
155
156/**
157 * Maximum allowed number of mesh clients.
158 */
159static unsigned long long sc_count_max;
160
161
162
163/* ********************* server-side code ************************* */
164
165
166/**
167 * We're done with a particular client, clean up.
168 *
169 * @param sc client to clean up
170 */
171static void
172terminate_mesh (struct MeshClient *sc)
173{
174 struct WriteQueueItem *wqi;
175
176 fprintf (stderr,
177 "terminate mesh called for %p\n",
178 sc);
179 GNUNET_STATISTICS_update (GSF_stats,
180 gettext_noop ("# mesh connections active"), -1,
181 GNUNET_NO);
182 if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
183 GNUNET_SCHEDULER_cancel (sc->terminate_task);
184 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
185 GNUNET_SCHEDULER_cancel (sc->timeout_task);
186 if (NULL != sc->wh)
187 GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
188 if (NULL != sc->qe)
189 GNUNET_DATASTORE_cancel (sc->qe);
190 while (NULL != (wqi = sc->wqi_head))
191 {
192 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
193 sc->wqi_tail,
194 wqi);
195 GNUNET_free (wqi);
196 }
197 GNUNET_CONTAINER_DLL_remove (sc_head,
198 sc_tail,
199 sc);
200 sc_count--;
201 GNUNET_free (sc);
202}
203
204
205/**
206 * Task run to asynchronously terminate the mesh due to timeout.
207 *
208 * @param cls the 'struct MeshClient'
209 * @param tc scheduler context
210 */
211static void
212timeout_mesh_task (void *cls,
213 const struct GNUNET_SCHEDULER_TaskContext *tc)
214{
215 struct MeshClient *sc = cls;
216 struct GNUNET_MESH_Tunnel *tun;
217
218 sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
219 tun = sc->socket;
220 sc->socket = NULL;
221 GNUNET_MESH_tunnel_destroy (tun);
222}
223
224
225/**
226 * Reset the timeout for the mesh client (due to activity).
227 *
228 * @param sc client handle to reset timeout for
229 */
230static void
231refresh_timeout_task (struct MeshClient *sc)
232{
233 if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
234 GNUNET_SCHEDULER_cancel (sc->timeout_task);
235 sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
236 &timeout_mesh_task,
237 sc);
238}
239
240
241/**
242 * We're done handling a request from a client, read the next one.
243 *
244 * @param sc client to continue reading requests from
245 */
246static void
247continue_reading (struct MeshClient *sc)
248{
249 refresh_timeout_task (sc);
250 GNUNET_MESH_receive_done (sc->socket);
251}
252
253
254/**
255 * Transmit the next entry from the write queue.
256 *
257 * @param sc where to process the write queue
258 */
259static void
260continue_writing (struct MeshClient *sc);
261
262
263/**
264 * Send a reply now, mesh is ready.
265 *
266 * @param cls closure with the struct MeshClient which sent the query
267 * @param size number of bytes available in 'buf'
268 * @param buf where to write the message
269 * @return number of bytes written to 'buf'
270 */
271static size_t
272write_continuation (void *cls,
273 size_t size,
274 void *buf)
275{
276 struct MeshClient *sc = cls;
277 struct WriteQueueItem *wqi;
278 size_t ret;
279
280 sc->wh = NULL;
281 if (NULL == (wqi = sc->wqi_head))
282 {
283 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
284 "Write queue empty, reading more requests\n");
285 return 0;
286 }
287 if (0 == size)
288 {
289 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
290 "Transmission of reply failed, terminating mesh\n");
291 terminate_mesh (sc);
292 return 0;
293 }
294 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
295 sc->wqi_tail,
296 wqi);
297 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
298 "Transmitted %u byte reply via mesh\n",
299 (unsigned int) size);
300 GNUNET_STATISTICS_update (GSF_stats,
301 gettext_noop ("# Blocks transferred via mesh"), 1,
302 GNUNET_NO);
303 memcpy (buf, &wqi[1], ret = wqi->msize);
304 GNUNET_free (wqi);
305 continue_writing (sc);
306 return ret;
307}
308
309
310/**
311 * Transmit the next entry from the write queue.
312 *
313 * @param sc where to process the write queue
314 */
315static void
316continue_writing (struct MeshClient *sc)
317{
318 struct WriteQueueItem *wqi;
319
320 if (NULL != sc->wh)
321 {
322 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
323 "Write pending, waiting for it to complete\n");
324 return; /* write already pending */
325 }
326 if (NULL == (wqi = sc->wqi_head))
327 {
328 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
329 "Write queue empty, reading more requests\n");
330 continue_reading (sc);
331 return;
332 }
333 sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
334 GNUNET_TIME_UNIT_FOREVER_REL,
335 wqi->msize,
336 &write_continuation,
337 sc);
338 if (NULL == sc->wh)
339 {
340 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
341 "Write failed; terminating mesh\n");
342 terminate_mesh (sc);
343 return;
344 }
345}
346
347
348/**
349 * Process a datum that was stored in the datastore.
350 *
351 * @param cls closure with the struct MeshClient which sent the query
352 * @param key key for the content
353 * @param size number of bytes in data
354 * @param data content stored
355 * @param type type of the content
356 * @param priority priority of the content
357 * @param anonymity anonymity-level for the content
358 * @param expiration expiration time for the content
359 * @param uid unique identifier for the datum;
360 * maybe 0 if no unique identifier is available
361 */
362static void
363handle_datastore_reply (void *cls,
364 const struct GNUNET_HashCode * key,
365 size_t size, const void *data,
366 enum GNUNET_BLOCK_Type type,
367 uint32_t priority,
368 uint32_t anonymity,
369 struct GNUNET_TIME_Absolute
370 expiration, uint64_t uid)
371{
372 struct MeshClient *sc = cls;
373 size_t msize = size + sizeof (struct MeshReplyMessage);
374 struct WriteQueueItem *wqi;
375 struct MeshReplyMessage *srm;
376
377 sc->qe = NULL;
378 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
379 {
380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
381 "Performing on-demand encoding\n");
382 if (GNUNET_OK !=
383 GNUNET_FS_handle_on_demand_block (key,
384 size, data, type,
385 priority, anonymity,
386 expiration, uid,
387 &handle_datastore_reply,
388 sc))
389 {
390 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
391 "On-demand encoding request failed\n");
392 continue_writing (sc);
393 }
394 return;
395 }
396 if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
397 {
398 GNUNET_break (0);
399 continue_writing (sc);
400 return;
401 }
402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
403 "Starting transmission of %u byte reply for query `%s' via mesh\n",
404 (unsigned int) size,
405 GNUNET_h2s (key));
406 wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
407 wqi->msize = msize;
408 srm = (struct MeshReplyMessage *) &wqi[1];
409 srm->header.size = htons ((uint16_t) msize);
410 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY);
411 srm->type = htonl (type);
412 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
413 memcpy (&srm[1], data, size);
414 sc->reply_size = msize;
415 GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
416 sc->wqi_tail,
417 wqi);
418 continue_writing (sc);
419}
420
421
422/**
423 * Functions with this signature are called whenever a
424 * complete query message is received.
425 *
426 * Do not call GNUNET_SERVER_mst_destroy in callback
427 *
428 * @param cls closure with the 'struct MeshClient'
429 * @param tunnel tunnel handle
430 * @param tunnel_ctx tunnel context
431 * @param message the actual message
432 * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
433 */
434static int
435request_cb (void *cls,
436 struct GNUNET_MESH_Tunnel *tunnel,
437 void **tunnel_ctx,
438 const struct GNUNET_MessageHeader *message)
439{
440 struct MeshClient *sc = *tunnel_ctx;
441 const struct MeshQueryMessage *sqm;
442
443 fprintf (stderr,
444 "Request gets %p\n",
445 sc);
446 sqm = (const struct MeshQueryMessage *) message;
447 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
448 "Received query for `%s' via mesh\n",
449 GNUNET_h2s (&sqm->query));
450 GNUNET_STATISTICS_update (GSF_stats,
451 gettext_noop ("# queries received via mesh"), 1,
452 GNUNET_NO);
453 refresh_timeout_task (sc);
454 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
455 0,
456 &sqm->query,
457 ntohl (sqm->type),
458 0 /* priority */,
459 GSF_datastore_queue_size,
460 GNUNET_TIME_UNIT_FOREVER_REL,
461 &handle_datastore_reply, sc);
462 if (NULL == sc->qe)
463 {
464 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
465 "Queueing request with datastore failed (queue full?)\n");
466 continue_writing (sc);
467 }
468 return GNUNET_OK;
469}
470
471
472/**
473 * Functions of this type are called upon new mesh connection from other peers.
474 *
475 * @param cls the closure from GNUNET_MESH_connect
476 * @param socket the socket representing the mesh
477 * @param initiator the identity of the peer who wants to establish a mesh
478 * with us; NULL on binding error
479 * @param port mesh port used for the incoming connection
480 * @return initial tunnel context (our 'struct MeshClient')
481 */
482static void *
483accept_cb (void *cls,
484 struct GNUNET_MESH_Tunnel *socket,
485 const struct GNUNET_PeerIdentity *initiator,
486 uint32_t port)
487{
488 struct MeshClient *sc;
489
490 GNUNET_assert (NULL != socket);
491 if (sc_count >= sc_count_max)
492 {
493 GNUNET_STATISTICS_update (GSF_stats,
494 gettext_noop ("# mesh client connections rejected"), 1,
495 GNUNET_NO);
496 GNUNET_MESH_tunnel_destroy (socket);
497 return NULL;
498 }
499 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
500 "Accepting inbound mesh connection from `%s'\n",
501 GNUNET_i2s (initiator));
502 GNUNET_STATISTICS_update (GSF_stats,
503 gettext_noop ("# mesh connections active"), 1,
504 GNUNET_NO);
505 sc = GNUNET_new (struct MeshClient);
506 sc->socket = socket;
507 GNUNET_CONTAINER_DLL_insert (sc_head,
508 sc_tail,
509 sc);
510 sc_count++;
511 refresh_timeout_task (sc);
512 fprintf (stderr,
513 "Accept returns %p\n",
514 sc);
515 return sc;
516}
517
518
519/**
520 * Function called by mesh when a client disconnects.
521 * Cleans up our 'struct MeshClient' of that tunnel.
522 *
523 * @param cls NULL
524 * @param tunnel tunnel of the disconnecting client
525 * @param tunnel_ctx our 'struct MeshClient'
526 */
527static void
528cleaner_cb (void *cls,
529 const struct GNUNET_MESH_Tunnel *tunnel,
530 void *tunnel_ctx)
531{
532 struct MeshClient *sc = tunnel_ctx;
533
534 fprintf (stderr,
535 "Cleaner called with %p\n",
536 sc);
537 if (NULL != sc)
538 terminate_mesh (sc);
539}
540
541
542/**
543 * Initialize subsystem for non-anonymous file-sharing.
544 */
545void
546GSF_mesh_start_server ()
547{
548 static const struct GNUNET_MESH_MessageHandler handlers[] = {
549 { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct MeshQueryMessage)},
550 { NULL, 0, 0 }
551 };
552 static const uint32_t ports[] = {
553 GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
554 0
555 };
556
557 if (GNUNET_YES !=
558 GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
559 "fs",
560 "MAX_MESH_CLIENTS",
561 &sc_count_max))
562 return;
563 listen_socket = GNUNET_MESH_connect (GSF_cfg,
564 NULL,
565 &accept_cb,
566 &cleaner_cb,
567 handlers,
568 ports);
569}
570
571
572/**
573 * Shutdown subsystem for non-anonymous file-sharing.
574 */
575void
576GSF_mesh_stop_server ()
577{
578 struct MeshClient *sc;
579
580 while (NULL != (sc = sc_head))
581 terminate_mesh (sc);
582 if (NULL != listen_socket)
583 {
584 GNUNET_MESH_disconnect (listen_socket);
585 listen_socket = NULL;
586 }
587}
588
589/* end of gnunet-service-fs_mesh.c */
diff --git a/src/fs/test_gnunet_service_fs_p2p_mesh.conf b/src/fs/test_gnunet_service_fs_p2p_mesh.conf
index 9d01208cb..328bc986c 100644
--- a/src/fs/test_gnunet_service_fs_p2p_mesh.conf
+++ b/src/fs/test_gnunet_service_fs_p2p_mesh.conf
@@ -14,3 +14,7 @@ CONTENT_CACHING = NO
14# (may improve anonymity, probably not a good idea if content_caching is NO) 14# (may improve anonymity, probably not a good idea if content_caching is NO)
15CONTENT_PUSHING = NO 15CONTENT_PUSHING = NO
16 16
17#PREFIX = valgrind
18
19[mesh]
20#PREFIX = valgrind