diff options
author | Christian Grothoff <christian@grothoff.org> | 2013-07-18 11:06:23 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2013-07-18 11:06:23 +0000 |
commit | 943f9aa9278e67ed95daa5a85491868067523a95 (patch) | |
tree | bdd22800cbc893ad0e25bdefa60efc7e58287d97 | |
parent | 3f5dc2ddcfc59ca54f18a3688fa1f8aa89340ce6 (diff) | |
download | gnunet-943f9aa9278e67ed95daa5a85491868067523a95.tar.gz gnunet-943f9aa9278e67ed95daa5a85491868067523a95.zip |
-splitting mesh into server and client parts
-rw-r--r-- | src/fs/Makefile.am | 3 | ||||
-rw-r--r-- | src/fs/fs.conf.in | 2 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs.c | 6 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_mesh.c | 1245 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_mesh.h | 72 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_mesh_server.c | 589 | ||||
-rw-r--r-- | src/fs/test_gnunet_service_fs_p2p_mesh.conf | 4 |
7 files changed, 670 insertions, 1251 deletions
diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index a0d88051a..e8928f770 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am | |||
@@ -190,7 +190,8 @@ gnunet_service_fs_SOURCES = \ | |||
190 | gnunet-service-fs_pr.c gnunet-service-fs_pr.h \ | 190 | gnunet-service-fs_pr.c gnunet-service-fs_pr.h \ |
191 | gnunet-service-fs_push.c gnunet-service-fs_push.h \ | 191 | gnunet-service-fs_push.c gnunet-service-fs_push.h \ |
192 | gnunet-service-fs_put.c gnunet-service-fs_put.h \ | 192 | gnunet-service-fs_put.c gnunet-service-fs_put.h \ |
193 | gnunet-service-fs_mesh.c gnunet-service-fs_mesh.h | 193 | gnunet-service-fs_mesh_client.c gnunet-service-fs_mesh.h \ |
194 | gnunet-service-fs_mesh_server.c | ||
194 | gnunet_service_fs_LDADD = \ | 195 | gnunet_service_fs_LDADD = \ |
195 | $(top_builddir)/src/fs/libgnunetfs.la \ | 196 | $(top_builddir)/src/fs/libgnunetfs.la \ |
196 | $(top_builddir)/src/dht/libgnunetdht.la \ | 197 | $(top_builddir)/src/dht/libgnunetdht.la \ |
diff --git a/src/fs/fs.conf.in b/src/fs/fs.conf.in index f45a78fc4..c9748a73a 100644 --- a/src/fs/fs.conf.in +++ b/src/fs/fs.conf.in | |||
@@ -60,7 +60,7 @@ DISABLE_ANON_TRANSFER = NO | |||
60 | # some reasonable level. And if we have a very, very large | 60 | # some reasonable level. And if we have a very, very large |
61 | # number, we probably won't have enough bandwidth to suppor them | 61 | # number, we probably won't have enough bandwidth to suppor them |
62 | # well anyway, so better have a moderate cap. | 62 | # well anyway, so better have a moderate cap. |
63 | MAX_STREAM_CLIENTS = 128 | 63 | MAX_MESH_CLIENTS = 128 |
64 | 64 | ||
65 | 65 | ||
66 | [gnunet-auto-share] | 66 | [gnunet-auto-share] |
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index 2aac9e165..9074f053e 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c | |||
@@ -475,7 +475,8 @@ handle_start_search (void *cls, struct GNUNET_SERVER_Client *client, | |||
475 | static void | 475 | static void |
476 | shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | 476 | shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) |
477 | { | 477 | { |
478 | GSF_mesh_stop (); | 478 | GSF_mesh_stop_client (); |
479 | GSF_mesh_stop_server (); | ||
479 | if (NULL != GSF_core) | 480 | if (NULL != GSF_core) |
480 | { | 481 | { |
481 | GNUNET_CORE_disconnect (GSF_core); | 482 | GNUNET_CORE_disconnect (GSF_core); |
@@ -646,7 +647,8 @@ main_init (struct GNUNET_SERVER_Handle *server, | |||
646 | GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters, | 647 | GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters, |
647 | NULL); | 648 | NULL); |
648 | datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); | 649 | datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); |
649 | GSF_mesh_start (); | 650 | GSF_mesh_start_server (); |
651 | GSF_mesh_start_client (); | ||
650 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, | 652 | GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, |
651 | NULL); | 653 | NULL); |
652 | return GNUNET_OK; | 654 | return GNUNET_OK; |
diff --git a/src/fs/gnunet-service-fs_mesh.c b/src/fs/gnunet-service-fs_mesh.c deleted file mode 100644 index ff40a2621..000000000 --- a/src/fs/gnunet-service-fs_mesh.c +++ /dev/null | |||
@@ -1,1245 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) 2012, 2013 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file fs/gnunet-service-fs_mesh.c | ||
23 | * @brief non-anonymous file-transfer | ||
24 | * @author Christian Grothoff | ||
25 | * | ||
26 | * TODO: | ||
27 | * - MESH2 API doesn't allow flow control for server yet (needed!) | ||
28 | * - likely need to register clean up handler with mesh to handle | ||
29 | * client disconnect (likely leaky right now) | ||
30 | * - server is optional, currently client code will NPE if we have | ||
31 | * no server, again MESH2 API requirement forcing this for now | ||
32 | * - message handlers are symmetric for client/server, should be | ||
33 | * separated (currently clients can get requests and servers can | ||
34 | * handle answers, not good) | ||
35 | * - code is entirely untested | ||
36 | * - might have overlooked a few possible simplifications | ||
37 | * - PORT is set to old application type, unsure if we should keep | ||
38 | * it that way (fine for now) | ||
39 | */ | ||
40 | #include "platform.h" | ||
41 | #include "gnunet_constants.h" | ||
42 | #include "gnunet_util_lib.h" | ||
43 | #include "gnunet_mesh_service.h" | ||
44 | #include "gnunet_protocols.h" | ||
45 | #include "gnunet_applications.h" | ||
46 | #include "gnunet-service-fs.h" | ||
47 | #include "gnunet-service-fs_indexing.h" | ||
48 | #include "gnunet-service-fs_mesh.h" | ||
49 | |||
50 | /** | ||
51 | * After how long do we termiante idle connections? | ||
52 | */ | ||
53 | #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2) | ||
54 | |||
55 | /** | ||
56 | * After how long do we reset connections without replies? | ||
57 | */ | ||
58 | #define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) | ||
59 | |||
60 | |||
61 | /** | ||
62 | * A message in the queue to be written to the mesh. | ||
63 | */ | ||
64 | struct WriteQueueItem | ||
65 | { | ||
66 | /** | ||
67 | * Kept in a DLL. | ||
68 | */ | ||
69 | struct WriteQueueItem *next; | ||
70 | |||
71 | /** | ||
72 | * Kept in a DLL. | ||
73 | */ | ||
74 | struct WriteQueueItem *prev; | ||
75 | |||
76 | /** | ||
77 | * Number of bytes of payload, allocated at the end of this struct. | ||
78 | */ | ||
79 | size_t msize; | ||
80 | }; | ||
81 | |||
82 | |||
83 | /** | ||
84 | * Information we keep around for each active meshing client. | ||
85 | */ | ||
86 | struct MeshClient | ||
87 | { | ||
88 | /** | ||
89 | * DLL | ||
90 | */ | ||
91 | struct MeshClient *next; | ||
92 | |||
93 | /** | ||
94 | * DLL | ||
95 | */ | ||
96 | struct MeshClient *prev; | ||
97 | |||
98 | /** | ||
99 | * Socket for communication. | ||
100 | */ | ||
101 | struct GNUNET_MESH_Tunnel *socket; | ||
102 | |||
103 | /** | ||
104 | * Handle for active write operation, or NULL. | ||
105 | */ | ||
106 | struct GNUNET_MESH_TransmitHandle *wh; | ||
107 | |||
108 | /** | ||
109 | * Head of write queue. | ||
110 | */ | ||
111 | struct WriteQueueItem *wqi_head; | ||
112 | |||
113 | /** | ||
114 | * Tail of write queue. | ||
115 | */ | ||
116 | struct WriteQueueItem *wqi_tail; | ||
117 | |||
118 | /** | ||
119 | * Current active request to the datastore, if we have one pending. | ||
120 | */ | ||
121 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
122 | |||
123 | /** | ||
124 | * Task that is scheduled to asynchronously terminate the connection. | ||
125 | */ | ||
126 | GNUNET_SCHEDULER_TaskIdentifier terminate_task; | ||
127 | |||
128 | /** | ||
129 | * Task that is scheduled to terminate idle connections. | ||
130 | */ | ||
131 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | ||
132 | |||
133 | /** | ||
134 | * Size of the last write that was initiated. | ||
135 | */ | ||
136 | size_t reply_size; | ||
137 | |||
138 | }; | ||
139 | |||
140 | |||
141 | /** | ||
142 | * Query from one peer, asking the other for CHK-data. | ||
143 | */ | ||
144 | struct MeshQueryMessage | ||
145 | { | ||
146 | |||
147 | /** | ||
148 | * Type is GNUNET_MESSAGE_TYPE_FS_MESH_QUERY. | ||
149 | */ | ||
150 | struct GNUNET_MessageHeader header; | ||
151 | |||
152 | /** | ||
153 | * Block type must be DBLOCK or IBLOCK. | ||
154 | */ | ||
155 | uint32_t type; | ||
156 | |||
157 | /** | ||
158 | * Query hash from CHK (hash of encrypted block). | ||
159 | */ | ||
160 | struct GNUNET_HashCode query; | ||
161 | |||
162 | }; | ||
163 | |||
164 | |||
165 | /** | ||
166 | * Reply to a MeshQueryMessage. | ||
167 | */ | ||
168 | struct MeshReplyMessage | ||
169 | { | ||
170 | |||
171 | /** | ||
172 | * Type is GNUNET_MESSAGE_TYPE_FS_MESH_REPLY. | ||
173 | */ | ||
174 | struct GNUNET_MessageHeader header; | ||
175 | |||
176 | /** | ||
177 | * Block type must be DBLOCK or IBLOCK. | ||
178 | */ | ||
179 | uint32_t type; | ||
180 | |||
181 | /** | ||
182 | * Expiration time for the block. | ||
183 | */ | ||
184 | struct GNUNET_TIME_AbsoluteNBO expiration; | ||
185 | |||
186 | /* followed by the encrypted block */ | ||
187 | |||
188 | }; | ||
189 | |||
190 | |||
191 | /** | ||
192 | * Handle for a mesh to another peer. | ||
193 | */ | ||
194 | struct MeshHandle; | ||
195 | |||
196 | |||
197 | /** | ||
198 | * Handle for a request that is going out via mesh API. | ||
199 | */ | ||
200 | struct GSF_MeshRequest | ||
201 | { | ||
202 | |||
203 | /** | ||
204 | * DLL. | ||
205 | */ | ||
206 | struct GSF_MeshRequest *next; | ||
207 | |||
208 | /** | ||
209 | * DLL. | ||
210 | */ | ||
211 | struct GSF_MeshRequest *prev; | ||
212 | |||
213 | /** | ||
214 | * Which mesh is this request associated with? | ||
215 | */ | ||
216 | struct MeshHandle *sh; | ||
217 | |||
218 | /** | ||
219 | * Function to call with the result. | ||
220 | */ | ||
221 | GSF_MeshReplyProcessor proc; | ||
222 | |||
223 | /** | ||
224 | * Closure for 'proc' | ||
225 | */ | ||
226 | void *proc_cls; | ||
227 | |||
228 | /** | ||
229 | * Query to transmit to the other peer. | ||
230 | */ | ||
231 | struct GNUNET_HashCode query; | ||
232 | |||
233 | /** | ||
234 | * Desired type for the reply. | ||
235 | */ | ||
236 | enum GNUNET_BLOCK_Type type; | ||
237 | |||
238 | /** | ||
239 | * Did we transmit this request already? YES if we are | ||
240 | * in the 'waiting' DLL, NO if we are in the 'pending' DLL. | ||
241 | */ | ||
242 | int was_transmitted; | ||
243 | }; | ||
244 | |||
245 | |||
246 | /** | ||
247 | * Handle for a mesh to another peer. | ||
248 | */ | ||
249 | struct MeshHandle | ||
250 | { | ||
251 | /** | ||
252 | * Head of DLL of pending requests on this mesh. | ||
253 | */ | ||
254 | struct GSF_MeshRequest *pending_head; | ||
255 | |||
256 | /** | ||
257 | * Tail of DLL of pending requests on this mesh. | ||
258 | */ | ||
259 | struct GSF_MeshRequest *pending_tail; | ||
260 | |||
261 | /** | ||
262 | * Map from query to 'struct GSF_MeshRequest's waiting for | ||
263 | * a reply. | ||
264 | */ | ||
265 | struct GNUNET_CONTAINER_MultiHashMap *waiting_map; | ||
266 | |||
267 | /** | ||
268 | * Connection to the other peer. | ||
269 | */ | ||
270 | struct GNUNET_MESH_Tunnel *mesh; | ||
271 | |||
272 | /** | ||
273 | * Handle for active write operation, or NULL. | ||
274 | */ | ||
275 | struct GNUNET_MESH_TransmitHandle *wh; | ||
276 | |||
277 | /** | ||
278 | * Which peer does this mesh go to? | ||
279 | */ | ||
280 | struct GNUNET_PeerIdentity target; | ||
281 | |||
282 | /** | ||
283 | * Task to kill inactive meshs (we keep them around for | ||
284 | * a few seconds to give the application a chance to give | ||
285 | * us another query). | ||
286 | */ | ||
287 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | ||
288 | |||
289 | /** | ||
290 | * Task to reset meshs that had errors (asynchronously, | ||
291 | * as we may not be able to do it immediately during a | ||
292 | * callback from the mesh API). | ||
293 | */ | ||
294 | GNUNET_SCHEDULER_TaskIdentifier reset_task; | ||
295 | |||
296 | /** | ||
297 | * Is this mesh ready for transmission? | ||
298 | */ | ||
299 | int is_ready; | ||
300 | |||
301 | }; | ||
302 | |||
303 | |||
304 | /** | ||
305 | * Listen socket for incoming requests. | ||
306 | */ | ||
307 | static struct GNUNET_MESH_Handle *listen_socket; | ||
308 | |||
309 | /** | ||
310 | * Head of DLL of mesh clients. | ||
311 | */ | ||
312 | static struct MeshClient *sc_head; | ||
313 | |||
314 | /** | ||
315 | * Tail of DLL of mesh clients. | ||
316 | */ | ||
317 | static struct MeshClient *sc_tail; | ||
318 | |||
319 | /** | ||
320 | * Number of active mesh clients in the 'sc_*'-DLL. | ||
321 | */ | ||
322 | static unsigned int sc_count; | ||
323 | |||
324 | /** | ||
325 | * Maximum allowed number of mesh clients. | ||
326 | */ | ||
327 | static unsigned long long sc_count_max; | ||
328 | |||
329 | /** | ||
330 | * Map from peer identities to 'struct MeshHandles' with meshs to | ||
331 | * those peers. | ||
332 | */ | ||
333 | static struct GNUNET_CONTAINER_MultiHashMap *mesh_map; | ||
334 | |||
335 | |||
336 | /* ********************* client-side code ************************* */ | ||
337 | |||
338 | /** | ||
339 | * Iterator called on each entry in a waiting map to | ||
340 | * call the 'proc' continuation and release associated | ||
341 | * resources. | ||
342 | * | ||
343 | * @param cls the 'struct MeshHandle' | ||
344 | * @param key the key of the entry in the map (the query) | ||
345 | * @param value the 'struct GSF_MeshRequest' to clean up | ||
346 | * @return GNUNET_YES (continue to iterate) | ||
347 | */ | ||
348 | static int | ||
349 | free_waiting_entry (void *cls, | ||
350 | const struct GNUNET_HashCode *key, | ||
351 | void *value) | ||
352 | { | ||
353 | struct GSF_MeshRequest *sr = value; | ||
354 | |||
355 | sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, | ||
356 | GNUNET_TIME_UNIT_FOREVER_ABS, | ||
357 | 0, NULL); | ||
358 | GSF_mesh_query_cancel (sr); | ||
359 | return GNUNET_YES; | ||
360 | } | ||
361 | |||
362 | |||
363 | /** | ||
364 | * Destroy a mesh handle. | ||
365 | * | ||
366 | * @param sh mesh to process | ||
367 | */ | ||
368 | static void | ||
369 | destroy_mesh_handle (struct MeshHandle *sh) | ||
370 | { | ||
371 | struct GSF_MeshRequest *sr; | ||
372 | |||
373 | while (NULL != (sr = sh->pending_head)) | ||
374 | { | ||
375 | sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, | ||
376 | GNUNET_TIME_UNIT_FOREVER_ABS, | ||
377 | 0, NULL); | ||
378 | GSF_mesh_query_cancel (sr); | ||
379 | } | ||
380 | GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map, | ||
381 | &free_waiting_entry, | ||
382 | sh); | ||
383 | if (NULL != sh->wh) | ||
384 | GNUNET_MESH_notify_transmit_ready_cancel (sh->wh); | ||
385 | if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task) | ||
386 | GNUNET_SCHEDULER_cancel (sh->timeout_task); | ||
387 | if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task) | ||
388 | GNUNET_SCHEDULER_cancel (sh->reset_task); | ||
389 | GNUNET_MESH_tunnel_destroy (sh->mesh); | ||
390 | GNUNET_assert (GNUNET_OK == | ||
391 | GNUNET_CONTAINER_multihashmap_remove (mesh_map, | ||
392 | &sh->target.hashPubKey, | ||
393 | sh)); | ||
394 | GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map); | ||
395 | GNUNET_free (sh); | ||
396 | } | ||
397 | |||
398 | |||
399 | /** | ||
400 | * Transmit pending requests via the mesh. | ||
401 | * | ||
402 | * @param sh mesh to process | ||
403 | */ | ||
404 | static void | ||
405 | transmit_pending (struct MeshHandle *sh); | ||
406 | |||
407 | |||
408 | /** | ||
409 | * Iterator called on each entry in a waiting map to | ||
410 | * move it back to the pending list. | ||
411 | * | ||
412 | * @param cls the 'struct MeshHandle' | ||
413 | * @param key the key of the entry in the map (the query) | ||
414 | * @param value the 'struct GSF_MeshRequest' to move to pending | ||
415 | * @return GNUNET_YES (continue to iterate) | ||
416 | */ | ||
417 | static int | ||
418 | move_to_pending (void *cls, | ||
419 | const struct GNUNET_HashCode *key, | ||
420 | void *value) | ||
421 | { | ||
422 | struct MeshHandle *sh = cls; | ||
423 | struct GSF_MeshRequest *sr = value; | ||
424 | |||
425 | GNUNET_assert (GNUNET_YES == | ||
426 | GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map, | ||
427 | key, | ||
428 | value)); | ||
429 | GNUNET_CONTAINER_DLL_insert (sh->pending_head, | ||
430 | sh->pending_tail, | ||
431 | sr); | ||
432 | sr->was_transmitted = GNUNET_NO; | ||
433 | return GNUNET_YES; | ||
434 | } | ||
435 | |||
436 | |||
437 | /** | ||
438 | * We had a serious error, tear down and re-create mesh from scratch. | ||
439 | * | ||
440 | * @param sh mesh to reset | ||
441 | */ | ||
442 | static void | ||
443 | reset_mesh (struct MeshHandle *sh) | ||
444 | { | ||
445 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
446 | "Resetting mesh to %s\n", | ||
447 | GNUNET_i2s (&sh->target)); | ||
448 | GNUNET_MESH_tunnel_destroy (sh->mesh); | ||
449 | sh->is_ready = GNUNET_NO; | ||
450 | GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map, | ||
451 | &move_to_pending, | ||
452 | sh); | ||
453 | sh->mesh = GNUNET_MESH_tunnel_create (listen_socket, | ||
454 | sh, | ||
455 | &sh->target, | ||
456 | GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, | ||
457 | GNUNET_YES, | ||
458 | GNUNET_YES); | ||
459 | } | ||
460 | |||
461 | |||
462 | /** | ||
463 | * Task called when it is time to destroy an inactive mesh. | ||
464 | * | ||
465 | * @param cls the 'struct MeshHandle' to tear down | ||
466 | * @param tc scheduler context, unused | ||
467 | */ | ||
468 | static void | ||
469 | mesh_timeout (void *cls, | ||
470 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
471 | { | ||
472 | struct MeshHandle *sh = cls; | ||
473 | |||
474 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
475 | "Timeout on mesh to %s\n", | ||
476 | GNUNET_i2s (&sh->target)); | ||
477 | sh->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
478 | destroy_mesh_handle (sh); | ||
479 | } | ||
480 | |||
481 | |||
482 | /** | ||
483 | * Task called when it is time to reset an mesh. | ||
484 | * | ||
485 | * @param cls the 'struct MeshHandle' to tear down | ||
486 | * @param tc scheduler context, unused | ||
487 | */ | ||
488 | static void | ||
489 | reset_mesh_task (void *cls, | ||
490 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
491 | { | ||
492 | struct MeshHandle *sh = cls; | ||
493 | |||
494 | sh->reset_task = GNUNET_SCHEDULER_NO_TASK; | ||
495 | reset_mesh (sh); | ||
496 | } | ||
497 | |||
498 | |||
499 | /** | ||
500 | * We had a serious error, tear down and re-create mesh from scratch, | ||
501 | * but do so asynchronously. | ||
502 | * | ||
503 | * @param sh mesh to reset | ||
504 | */ | ||
505 | static void | ||
506 | reset_mesh_async (struct MeshHandle *sh) | ||
507 | { | ||
508 | if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task) | ||
509 | GNUNET_SCHEDULER_cancel (sh->reset_task); | ||
510 | sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_mesh_task, | ||
511 | sh); | ||
512 | } | ||
513 | |||
514 | |||
515 | /** | ||
516 | * Functions of this signature are called whenever we are ready to transmit | ||
517 | * query via a mesh. | ||
518 | * | ||
519 | * @param cls the struct MeshHandle for which we did the write call | ||
520 | * @param size the number of bytes that can be written to 'buf' | ||
521 | * @param buf where to write the message | ||
522 | * @return number of bytes written to 'buf' | ||
523 | */ | ||
524 | static size_t | ||
525 | transmit_sqm (void *cls, | ||
526 | size_t size, | ||
527 | void *buf) | ||
528 | { | ||
529 | struct MeshHandle *sh = cls; | ||
530 | struct MeshQueryMessage sqm; | ||
531 | struct GSF_MeshRequest *sr; | ||
532 | |||
533 | sh->wh = NULL; | ||
534 | if (NULL == buf) | ||
535 | { | ||
536 | reset_mesh (sh); | ||
537 | return 0; | ||
538 | } | ||
539 | sr = sh->pending_head; | ||
540 | if (NULL == sr) | ||
541 | return 0; | ||
542 | GNUNET_assert (size >= sizeof (struct MeshQueryMessage)); | ||
543 | GNUNET_CONTAINER_DLL_remove (sh->pending_head, | ||
544 | sh->pending_tail, | ||
545 | sr); | ||
546 | GNUNET_CONTAINER_multihashmap_put (sh->waiting_map, | ||
547 | &sr->query, | ||
548 | sr, | ||
549 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); | ||
550 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
551 | "Sending query via mesh to %s\n", | ||
552 | GNUNET_i2s (&sh->target)); | ||
553 | sr->was_transmitted = GNUNET_YES; | ||
554 | sqm.header.size = htons (sizeof (sqm)); | ||
555 | sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_QUERY); | ||
556 | sqm.type = htonl (sr->type); | ||
557 | sqm.query = sr->query; | ||
558 | memcpy (buf, &sqm, sizeof (sqm)); | ||
559 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
560 | "Successfully transmitted %u bytes via mesh to %s\n", | ||
561 | (unsigned int) size, | ||
562 | GNUNET_i2s (&sh->target)); | ||
563 | transmit_pending (sh); | ||
564 | return sizeof (sqm); | ||
565 | } | ||
566 | |||
567 | |||
568 | /** | ||
569 | * Transmit pending requests via the mesh. | ||
570 | * | ||
571 | * @param sh mesh to process | ||
572 | */ | ||
573 | static void | ||
574 | transmit_pending (struct MeshHandle *sh) | ||
575 | { | ||
576 | if (NULL != sh->wh) | ||
577 | return; | ||
578 | sh->wh = GNUNET_MESH_notify_transmit_ready (sh->mesh, GNUNET_YES /* allow cork */, | ||
579 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
580 | sizeof (struct MeshQueryMessage), | ||
581 | &transmit_sqm, sh); | ||
582 | } | ||
583 | |||
584 | |||
585 | /** | ||
586 | * Closure for 'handle_reply'. | ||
587 | */ | ||
588 | struct HandleReplyClosure | ||
589 | { | ||
590 | |||
591 | /** | ||
592 | * Reply payload. | ||
593 | */ | ||
594 | const void *data; | ||
595 | |||
596 | /** | ||
597 | * Expiration time for the block. | ||
598 | */ | ||
599 | struct GNUNET_TIME_Absolute expiration; | ||
600 | |||
601 | /** | ||
602 | * Number of bytes in 'data'. | ||
603 | */ | ||
604 | size_t data_size; | ||
605 | |||
606 | /** | ||
607 | * Type of the block. | ||
608 | */ | ||
609 | enum GNUNET_BLOCK_Type type; | ||
610 | |||
611 | /** | ||
612 | * Did we have a matching query? | ||
613 | */ | ||
614 | int found; | ||
615 | }; | ||
616 | |||
617 | |||
618 | /** | ||
619 | * Iterator called on each entry in a waiting map to | ||
620 | * process a result. | ||
621 | * | ||
622 | * @param cls the 'struct HandleReplyClosure' | ||
623 | * @param key the key of the entry in the map (the query) | ||
624 | * @param value the 'struct GSF_MeshRequest' to handle result for | ||
625 | * @return GNUNET_YES (continue to iterate) | ||
626 | */ | ||
627 | static int | ||
628 | handle_reply (void *cls, | ||
629 | const struct GNUNET_HashCode *key, | ||
630 | void *value) | ||
631 | { | ||
632 | struct HandleReplyClosure *hrc = cls; | ||
633 | struct GSF_MeshRequest *sr = value; | ||
634 | |||
635 | sr->proc (sr->proc_cls, | ||
636 | hrc->type, | ||
637 | hrc->expiration, | ||
638 | hrc->data_size, | ||
639 | hrc->data); | ||
640 | GSF_mesh_query_cancel (sr); | ||
641 | hrc->found = GNUNET_YES; | ||
642 | return GNUNET_YES; | ||
643 | } | ||
644 | |||
645 | |||
646 | /** | ||
647 | * Functions with this signature are called whenever a | ||
648 | * complete reply is received. | ||
649 | * | ||
650 | * @param cls closure with the 'struct MeshHandle' | ||
651 | * @param tunnel tunnel handle | ||
652 | * @param tunnel_ctx tunnel context | ||
653 | * @param message the actual message | ||
654 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
655 | */ | ||
656 | static int | ||
657 | reply_cb (void *cls, | ||
658 | struct GNUNET_MESH_Tunnel *tunnel, | ||
659 | void **tunnel_ctx, | ||
660 | const struct GNUNET_MessageHeader *message) | ||
661 | { | ||
662 | struct MeshHandle *sh = *tunnel_ctx; | ||
663 | const struct MeshReplyMessage *srm; | ||
664 | struct HandleReplyClosure hrc; | ||
665 | uint16_t msize; | ||
666 | enum GNUNET_BLOCK_Type type; | ||
667 | struct GNUNET_HashCode query; | ||
668 | |||
669 | msize = ntohs (message->size); | ||
670 | if (sizeof (struct MeshReplyMessage) > msize) | ||
671 | { | ||
672 | GNUNET_break_op (0); | ||
673 | reset_mesh_async (sh); | ||
674 | return GNUNET_SYSERR; | ||
675 | } | ||
676 | srm = (const struct MeshReplyMessage *) message; | ||
677 | msize -= sizeof (struct MeshReplyMessage); | ||
678 | type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); | ||
679 | if (GNUNET_YES != | ||
680 | GNUNET_BLOCK_get_key (GSF_block_ctx, | ||
681 | type, | ||
682 | &srm[1], msize, &query)) | ||
683 | { | ||
684 | GNUNET_break_op (0); | ||
685 | reset_mesh_async (sh); | ||
686 | return GNUNET_SYSERR; | ||
687 | } | ||
688 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
689 | "Received reply `%s' via mesh\n", | ||
690 | GNUNET_h2s (&query)); | ||
691 | GNUNET_STATISTICS_update (GSF_stats, | ||
692 | gettext_noop ("# replies received via mesh"), 1, | ||
693 | GNUNET_NO); | ||
694 | hrc.data = &srm[1]; | ||
695 | hrc.data_size = msize; | ||
696 | hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration); | ||
697 | hrc.type = type; | ||
698 | hrc.found = GNUNET_NO; | ||
699 | GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map, | ||
700 | &query, | ||
701 | &handle_reply, | ||
702 | &hrc); | ||
703 | if (GNUNET_NO == hrc.found) | ||
704 | { | ||
705 | GNUNET_STATISTICS_update (GSF_stats, | ||
706 | gettext_noop ("# replies received via mesh dropped"), 1, | ||
707 | GNUNET_NO); | ||
708 | return GNUNET_OK; | ||
709 | } | ||
710 | return GNUNET_OK; | ||
711 | } | ||
712 | |||
713 | |||
714 | /** | ||
715 | * Get (or create) a mesh to talk to the given peer. | ||
716 | * | ||
717 | * @param target peer we want to communicate with | ||
718 | */ | ||
719 | static struct MeshHandle * | ||
720 | get_mesh (const struct GNUNET_PeerIdentity *target) | ||
721 | { | ||
722 | struct MeshHandle *sh; | ||
723 | |||
724 | sh = GNUNET_CONTAINER_multihashmap_get (mesh_map, | ||
725 | &target->hashPubKey); | ||
726 | if (NULL != sh) | ||
727 | { | ||
728 | if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task) | ||
729 | { | ||
730 | GNUNET_SCHEDULER_cancel (sh->timeout_task); | ||
731 | sh->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
732 | } | ||
733 | return sh; | ||
734 | } | ||
735 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
736 | "Creating mesh to %s\n", | ||
737 | GNUNET_i2s (target)); | ||
738 | sh = GNUNET_malloc (sizeof (struct MeshHandle)); | ||
739 | sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, | ||
740 | &reset_mesh_task, | ||
741 | sh); | ||
742 | sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES); | ||
743 | sh->target = *target; | ||
744 | sh->mesh = GNUNET_MESH_tunnel_create (listen_socket, | ||
745 | sh, | ||
746 | &sh->target, | ||
747 | GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, | ||
748 | GNUNET_NO, | ||
749 | GNUNET_YES); | ||
750 | GNUNET_assert (GNUNET_OK == | ||
751 | GNUNET_CONTAINER_multihashmap_put (mesh_map, | ||
752 | &sh->target.hashPubKey, | ||
753 | sh, | ||
754 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
755 | return sh; | ||
756 | } | ||
757 | |||
758 | |||
759 | /** | ||
760 | * Look for a block by directly contacting a particular peer. | ||
761 | * | ||
762 | * @param target peer that should have the block | ||
763 | * @param query hash to query for the block | ||
764 | * @param type desired type for the block | ||
765 | * @param proc function to call with result | ||
766 | * @param proc_cls closure for 'proc' | ||
767 | * @return handle to cancel the operation | ||
768 | */ | ||
769 | struct GSF_MeshRequest * | ||
770 | GSF_mesh_query (const struct GNUNET_PeerIdentity *target, | ||
771 | const struct GNUNET_HashCode *query, | ||
772 | enum GNUNET_BLOCK_Type type, | ||
773 | GSF_MeshReplyProcessor proc, void *proc_cls) | ||
774 | { | ||
775 | struct MeshHandle *sh; | ||
776 | struct GSF_MeshRequest *sr; | ||
777 | |||
778 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
779 | "Preparing to send query for %s via mesh to %s\n", | ||
780 | GNUNET_h2s (query), | ||
781 | GNUNET_i2s (target)); | ||
782 | sh = get_mesh (target); | ||
783 | sr = GNUNET_malloc (sizeof (struct GSF_MeshRequest)); | ||
784 | sr->sh = sh; | ||
785 | sr->proc = proc; | ||
786 | sr->proc_cls = proc_cls; | ||
787 | sr->type = type; | ||
788 | sr->query = *query; | ||
789 | GNUNET_CONTAINER_DLL_insert (sh->pending_head, | ||
790 | sh->pending_tail, | ||
791 | sr); | ||
792 | if (GNUNET_YES == sh->is_ready) | ||
793 | transmit_pending (sh); | ||
794 | return sr; | ||
795 | } | ||
796 | |||
797 | |||
798 | /** | ||
799 | * Cancel an active request; must not be called after 'proc' | ||
800 | * was calld. | ||
801 | * | ||
802 | * @param sr request to cancel | ||
803 | */ | ||
804 | void | ||
805 | GSF_mesh_query_cancel (struct GSF_MeshRequest *sr) | ||
806 | { | ||
807 | struct MeshHandle *sh = sr->sh; | ||
808 | |||
809 | if (GNUNET_YES == sr->was_transmitted) | ||
810 | GNUNET_assert (GNUNET_OK == | ||
811 | GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map, | ||
812 | &sr->query, | ||
813 | sr)); | ||
814 | else | ||
815 | GNUNET_CONTAINER_DLL_remove (sh->pending_head, | ||
816 | sh->pending_tail, | ||
817 | sr); | ||
818 | GNUNET_free (sr); | ||
819 | if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) && | ||
820 | (NULL == sh->pending_head) ) | ||
821 | sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, | ||
822 | &mesh_timeout, | ||
823 | sh); | ||
824 | } | ||
825 | |||
826 | |||
827 | /* ********************* server-side code ************************* */ | ||
828 | |||
829 | |||
830 | /** | ||
831 | * We're done with a particular client, clean up. | ||
832 | * | ||
833 | * @param sc client to clean up | ||
834 | */ | ||
835 | static void | ||
836 | terminate_mesh (struct MeshClient *sc) | ||
837 | { | ||
838 | GNUNET_STATISTICS_update (GSF_stats, | ||
839 | gettext_noop ("# mesh connections active"), -1, | ||
840 | GNUNET_NO); | ||
841 | if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task) | ||
842 | GNUNET_SCHEDULER_cancel (sc->terminate_task); | ||
843 | if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task) | ||
844 | GNUNET_SCHEDULER_cancel (sc->timeout_task); | ||
845 | if (NULL != sc->wh) | ||
846 | GNUNET_MESH_notify_transmit_ready_cancel (sc->wh); | ||
847 | if (NULL != sc->qe) | ||
848 | GNUNET_DATASTORE_cancel (sc->qe); | ||
849 | GNUNET_MESH_tunnel_destroy (sc->socket); | ||
850 | struct WriteQueueItem *wqi; | ||
851 | while (NULL != (wqi = sc->wqi_head)) | ||
852 | { | ||
853 | GNUNET_CONTAINER_DLL_remove (sc->wqi_head, | ||
854 | sc->wqi_tail, | ||
855 | wqi); | ||
856 | GNUNET_free (wqi); | ||
857 | } | ||
858 | GNUNET_CONTAINER_DLL_remove (sc_head, | ||
859 | sc_tail, | ||
860 | sc); | ||
861 | sc_count--; | ||
862 | GNUNET_free (sc); | ||
863 | } | ||
864 | |||
865 | |||
866 | /** | ||
867 | * Task run to asynchronously terminate the mesh due to timeout. | ||
868 | * | ||
869 | * @param cls the 'struct MeshClient' | ||
870 | * @param tc scheduler context | ||
871 | */ | ||
872 | static void | ||
873 | timeout_mesh_task (void *cls, | ||
874 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
875 | { | ||
876 | struct MeshClient *sc = cls; | ||
877 | |||
878 | sc->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
879 | terminate_mesh (sc); | ||
880 | } | ||
881 | |||
882 | |||
883 | /** | ||
884 | * Reset the timeout for the mesh client (due to activity). | ||
885 | * | ||
886 | * @param sc client handle to reset timeout for | ||
887 | */ | ||
888 | static void | ||
889 | refresh_timeout_task (struct MeshClient *sc) | ||
890 | { | ||
891 | if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task) | ||
892 | GNUNET_SCHEDULER_cancel (sc->timeout_task); | ||
893 | sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT, | ||
894 | &timeout_mesh_task, | ||
895 | sc); | ||
896 | } | ||
897 | |||
898 | |||
899 | /** | ||
900 | * We're done handling a request from a client, read the next one. | ||
901 | * | ||
902 | * @param sc client to continue reading requests from | ||
903 | */ | ||
904 | static void | ||
905 | continue_reading (struct MeshClient *sc) | ||
906 | { | ||
907 | refresh_timeout_task (sc); | ||
908 | } | ||
909 | |||
910 | |||
911 | /** | ||
912 | * Transmit the next entry from the write queue. | ||
913 | * | ||
914 | * @param sc where to process the write queue | ||
915 | */ | ||
916 | static void | ||
917 | continue_writing (struct MeshClient *sc); | ||
918 | |||
919 | |||
920 | /** | ||
921 | * Send a reply now, mesh is ready. | ||
922 | * | ||
923 | * @param cls closure with the struct MeshClient which sent the query | ||
924 | * @param size number of bytes available in 'buf' | ||
925 | * @param buf where to write the message | ||
926 | * @return number of bytes written to 'buf' | ||
927 | */ | ||
928 | static size_t | ||
929 | write_continuation (void *cls, | ||
930 | size_t size, | ||
931 | void *buf) | ||
932 | { | ||
933 | struct MeshClient *sc = cls; | ||
934 | struct WriteQueueItem *wqi; | ||
935 | size_t ret; | ||
936 | |||
937 | sc->wh = NULL; | ||
938 | if (NULL == (wqi = sc->wqi_head)) | ||
939 | { | ||
940 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
941 | "Write queue empty, reading more requests\n"); | ||
942 | return 0; | ||
943 | } | ||
944 | if (0 == size) | ||
945 | { | ||
946 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
947 | "Transmission of reply failed, terminating mesh\n"); | ||
948 | terminate_mesh (sc); | ||
949 | return 0; | ||
950 | } | ||
951 | GNUNET_CONTAINER_DLL_remove (sc->wqi_head, | ||
952 | sc->wqi_tail, | ||
953 | wqi); | ||
954 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
955 | "Transmitted %u byte reply via mesh\n", | ||
956 | (unsigned int) size); | ||
957 | GNUNET_STATISTICS_update (GSF_stats, | ||
958 | gettext_noop ("# Blocks transferred via mesh"), 1, | ||
959 | GNUNET_NO); | ||
960 | memcpy (buf, &wqi[1], ret = wqi->msize); | ||
961 | GNUNET_free (wqi); | ||
962 | continue_writing (sc); | ||
963 | return ret; | ||
964 | } | ||
965 | |||
966 | |||
967 | /** | ||
968 | * Transmit the next entry from the write queue. | ||
969 | * | ||
970 | * @param sc where to process the write queue | ||
971 | */ | ||
972 | static void | ||
973 | continue_writing (struct MeshClient *sc) | ||
974 | { | ||
975 | struct WriteQueueItem *wqi; | ||
976 | |||
977 | if (NULL != sc->wh) | ||
978 | { | ||
979 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
980 | "Write pending, waiting for it to complete\n"); | ||
981 | return; /* write already pending */ | ||
982 | } | ||
983 | if (NULL == (wqi = sc->wqi_head)) | ||
984 | { | ||
985 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
986 | "Write queue empty, reading more requests\n"); | ||
987 | continue_reading (sc); | ||
988 | return; | ||
989 | } | ||
990 | sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO, | ||
991 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
992 | wqi->msize, | ||
993 | &write_continuation, | ||
994 | sc); | ||
995 | if (NULL == sc->wh) | ||
996 | { | ||
997 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
998 | "Write failed; terminating mesh\n"); | ||
999 | terminate_mesh (sc); | ||
1000 | return; | ||
1001 | } | ||
1002 | } | ||
1003 | |||
1004 | |||
1005 | /** | ||
1006 | * Process a datum that was stored in the datastore. | ||
1007 | * | ||
1008 | * @param cls closure with the struct MeshClient which sent the query | ||
1009 | * @param key key for the content | ||
1010 | * @param size number of bytes in data | ||
1011 | * @param data content stored | ||
1012 | * @param type type of the content | ||
1013 | * @param priority priority of the content | ||
1014 | * @param anonymity anonymity-level for the content | ||
1015 | * @param expiration expiration time for the content | ||
1016 | * @param uid unique identifier for the datum; | ||
1017 | * maybe 0 if no unique identifier is available | ||
1018 | */ | ||
1019 | static void | ||
1020 | handle_datastore_reply (void *cls, | ||
1021 | const struct GNUNET_HashCode * key, | ||
1022 | size_t size, const void *data, | ||
1023 | enum GNUNET_BLOCK_Type type, | ||
1024 | uint32_t priority, | ||
1025 | uint32_t anonymity, | ||
1026 | struct GNUNET_TIME_Absolute | ||
1027 | expiration, uint64_t uid) | ||
1028 | { | ||
1029 | struct MeshClient *sc = cls; | ||
1030 | size_t msize = size + sizeof (struct MeshReplyMessage); | ||
1031 | struct WriteQueueItem *wqi; | ||
1032 | struct MeshReplyMessage *srm; | ||
1033 | |||
1034 | sc->qe = NULL; | ||
1035 | if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) | ||
1036 | { | ||
1037 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1038 | "Performing on-demand encoding\n"); | ||
1039 | if (GNUNET_OK != | ||
1040 | GNUNET_FS_handle_on_demand_block (key, | ||
1041 | size, data, type, | ||
1042 | priority, anonymity, | ||
1043 | expiration, uid, | ||
1044 | &handle_datastore_reply, | ||
1045 | sc)) | ||
1046 | { | ||
1047 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1048 | "On-demand encoding request failed\n"); | ||
1049 | continue_writing (sc); | ||
1050 | } | ||
1051 | return; | ||
1052 | } | ||
1053 | if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE) | ||
1054 | { | ||
1055 | GNUNET_break (0); | ||
1056 | continue_writing (sc); | ||
1057 | return; | ||
1058 | } | ||
1059 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1060 | "Starting transmission of %u byte reply for query `%s' via mesh\n", | ||
1061 | (unsigned int) size, | ||
1062 | GNUNET_h2s (key)); | ||
1063 | wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize); | ||
1064 | wqi->msize = msize; | ||
1065 | srm = (struct MeshReplyMessage *) &wqi[1]; | ||
1066 | srm->header.size = htons ((uint16_t) msize); | ||
1067 | srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY); | ||
1068 | srm->type = htonl (type); | ||
1069 | srm->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
1070 | memcpy (&srm[1], data, size); | ||
1071 | sc->reply_size = msize; | ||
1072 | GNUNET_CONTAINER_DLL_insert (sc->wqi_head, | ||
1073 | sc->wqi_tail, | ||
1074 | wqi); | ||
1075 | continue_writing (sc); | ||
1076 | } | ||
1077 | |||
1078 | |||
1079 | /** | ||
1080 | * Functions with this signature are called whenever a | ||
1081 | * complete query message is received. | ||
1082 | * | ||
1083 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
1084 | * | ||
1085 | * @param cls closure with the 'struct MeshClient' | ||
1086 | * @param tunnel tunnel handle | ||
1087 | * @param tunnel_ctx tunnel context | ||
1088 | * @param message the actual message | ||
1089 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
1090 | */ | ||
1091 | static int | ||
1092 | request_cb (void *cls, | ||
1093 | struct GNUNET_MESH_Tunnel *tunnel, | ||
1094 | void **tunnel_ctx, | ||
1095 | const struct GNUNET_MessageHeader *message) | ||
1096 | { | ||
1097 | struct MeshClient *sc = *tunnel_ctx; | ||
1098 | const struct MeshQueryMessage *sqm; | ||
1099 | |||
1100 | sqm = (const struct MeshQueryMessage *) message; | ||
1101 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1102 | "Received query for `%s' via mesh\n", | ||
1103 | GNUNET_h2s (&sqm->query)); | ||
1104 | GNUNET_STATISTICS_update (GSF_stats, | ||
1105 | gettext_noop ("# queries received via mesh"), 1, | ||
1106 | GNUNET_NO); | ||
1107 | refresh_timeout_task (sc); | ||
1108 | sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, | ||
1109 | 0, | ||
1110 | &sqm->query, | ||
1111 | ntohl (sqm->type), | ||
1112 | 0 /* priority */, | ||
1113 | GSF_datastore_queue_size, | ||
1114 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
1115 | &handle_datastore_reply, sc); | ||
1116 | if (NULL == sc->qe) | ||
1117 | { | ||
1118 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1119 | "Queueing request with datastore failed (queue full?)\n"); | ||
1120 | continue_writing (sc); | ||
1121 | } | ||
1122 | return GNUNET_OK; | ||
1123 | } | ||
1124 | |||
1125 | |||
1126 | /** | ||
1127 | * Functions of this type are called upon new mesh connection from other peers. | ||
1128 | * | ||
1129 | * @param cls the closure from GNUNET_MESH_connect | ||
1130 | * @param socket the socket representing the mesh | ||
1131 | * @param initiator the identity of the peer who wants to establish a mesh | ||
1132 | * with us; NULL on binding error | ||
1133 | * @param port mesh port used for the incoming connection | ||
1134 | * @return initial tunnel context (our 'struct MeshClient') | ||
1135 | */ | ||
1136 | static void * | ||
1137 | accept_cb (void *cls, | ||
1138 | struct GNUNET_MESH_Tunnel *socket, | ||
1139 | const struct GNUNET_PeerIdentity *initiator, | ||
1140 | uint32_t port) | ||
1141 | { | ||
1142 | struct MeshClient *sc; | ||
1143 | |||
1144 | GNUNET_assert (NULL != socket); | ||
1145 | if (sc_count >= sc_count_max) | ||
1146 | { | ||
1147 | GNUNET_STATISTICS_update (GSF_stats, | ||
1148 | gettext_noop ("# mesh client connections rejected"), 1, | ||
1149 | GNUNET_NO); | ||
1150 | GNUNET_MESH_tunnel_destroy (socket); | ||
1151 | return NULL; | ||
1152 | } | ||
1153 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
1154 | "Accepting inbound mesh connection from `%s'\n", | ||
1155 | GNUNET_i2s (initiator)); | ||
1156 | GNUNET_STATISTICS_update (GSF_stats, | ||
1157 | gettext_noop ("# mesh connections active"), 1, | ||
1158 | GNUNET_NO); | ||
1159 | sc = GNUNET_malloc (sizeof (struct MeshClient)); | ||
1160 | sc->socket = socket; | ||
1161 | GNUNET_CONTAINER_DLL_insert (sc_head, | ||
1162 | sc_tail, | ||
1163 | sc); | ||
1164 | sc_count++; | ||
1165 | refresh_timeout_task (sc); | ||
1166 | return sc; | ||
1167 | } | ||
1168 | |||
1169 | |||
1170 | /** | ||
1171 | * Initialize subsystem for non-anonymous file-sharing. | ||
1172 | */ | ||
1173 | void | ||
1174 | GSF_mesh_start () | ||
1175 | { | ||
1176 | static const struct GNUNET_MESH_MessageHandler handlers[] = { | ||
1177 | { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct MeshQueryMessage)}, | ||
1178 | { &reply_cb, GNUNET_MESSAGE_TYPE_FS_MESH_REPLY, 0 }, | ||
1179 | { NULL, 0, 0 } | ||
1180 | }; | ||
1181 | static const uint32_t ports[] = { | ||
1182 | GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, | ||
1183 | 0 | ||
1184 | }; | ||
1185 | |||
1186 | mesh_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES); | ||
1187 | if (GNUNET_YES == | ||
1188 | GNUNET_CONFIGURATION_get_value_number (GSF_cfg, | ||
1189 | "fs", | ||
1190 | "MAX_MESH_CLIENTS", | ||
1191 | &sc_count_max)) | ||
1192 | { | ||
1193 | listen_socket = GNUNET_MESH_connect (GSF_cfg, | ||
1194 | NULL, | ||
1195 | &accept_cb, | ||
1196 | NULL /* FIXME: have a cleanup callback? */, | ||
1197 | handlers, | ||
1198 | ports); | ||
1199 | } | ||
1200 | } | ||
1201 | |||
1202 | |||
1203 | /** | ||
1204 | * Function called on each active meshs to shut them down. | ||
1205 | * | ||
1206 | * @param cls NULL | ||
1207 | * @param key target peer, unused | ||
1208 | * @param value the 'struct MeshHandle' to destroy | ||
1209 | * @return GNUNET_YES (continue to iterate) | ||
1210 | */ | ||
1211 | static int | ||
1212 | release_meshs (void *cls, | ||
1213 | const struct GNUNET_HashCode *key, | ||
1214 | void *value) | ||
1215 | { | ||
1216 | struct MeshHandle *sh = value; | ||
1217 | |||
1218 | destroy_mesh_handle (sh); | ||
1219 | return GNUNET_YES; | ||
1220 | } | ||
1221 | |||
1222 | |||
1223 | /** | ||
1224 | * Shutdown subsystem for non-anonymous file-sharing. | ||
1225 | */ | ||
1226 | void | ||
1227 | GSF_mesh_stop () | ||
1228 | { | ||
1229 | struct MeshClient *sc; | ||
1230 | |||
1231 | while (NULL != (sc = sc_head)) | ||
1232 | terminate_mesh (sc); | ||
1233 | GNUNET_CONTAINER_multihashmap_iterate (mesh_map, | ||
1234 | &release_meshs, | ||
1235 | NULL); | ||
1236 | GNUNET_CONTAINER_multihashmap_destroy (mesh_map); | ||
1237 | mesh_map = NULL; | ||
1238 | if (NULL != listen_socket) | ||
1239 | { | ||
1240 | GNUNET_MESH_disconnect (listen_socket); | ||
1241 | listen_socket = NULL; | ||
1242 | } | ||
1243 | } | ||
1244 | |||
1245 | /* end of gnunet-service-fs_mesh.c */ | ||
diff --git a/src/fs/gnunet-service-fs_mesh.h b/src/fs/gnunet-service-fs_mesh.h index a718ed132..f136940aa 100644 --- a/src/fs/gnunet-service-fs_mesh.h +++ b/src/fs/gnunet-service-fs_mesh.h | |||
@@ -79,13 +79,81 @@ GSF_mesh_query_cancel (struct GSF_MeshRequest *sr); | |||
79 | * Initialize subsystem for non-anonymous file-sharing. | 79 | * Initialize subsystem for non-anonymous file-sharing. |
80 | */ | 80 | */ |
81 | void | 81 | void |
82 | GSF_mesh_start (void); | 82 | GSF_mesh_start_server (void); |
83 | 83 | ||
84 | 84 | ||
85 | /** | 85 | /** |
86 | * Shutdown subsystem for non-anonymous file-sharing. | 86 | * Shutdown subsystem for non-anonymous file-sharing. |
87 | */ | 87 | */ |
88 | void | 88 | void |
89 | GSF_mesh_stop (void); | 89 | GSF_mesh_stop_server (void); |
90 | |||
91 | /** | ||
92 | * Initialize subsystem for non-anonymous file-sharing. | ||
93 | */ | ||
94 | void | ||
95 | GSF_mesh_start_client (void); | ||
96 | |||
97 | |||
98 | /** | ||
99 | * Shutdown subsystem for non-anonymous file-sharing. | ||
100 | */ | ||
101 | void | ||
102 | GSF_mesh_stop_client (void); | ||
103 | |||
104 | |||
105 | GNUNET_NETWORK_STRUCT_BEGIN | ||
106 | |||
107 | /** | ||
108 | * Query from one peer, asking the other for CHK-data. | ||
109 | */ | ||
110 | struct MeshQueryMessage | ||
111 | { | ||
112 | |||
113 | /** | ||
114 | * Type is GNUNET_MESSAGE_TYPE_FS_MESH_QUERY. | ||
115 | */ | ||
116 | struct GNUNET_MessageHeader header; | ||
117 | |||
118 | /** | ||
119 | * Block type must be DBLOCK or IBLOCK. | ||
120 | */ | ||
121 | uint32_t type GNUNET_PACKED; | ||
122 | |||
123 | /** | ||
124 | * Query hash from CHK (hash of encrypted block). | ||
125 | */ | ||
126 | struct GNUNET_HashCode query; | ||
127 | |||
128 | }; | ||
129 | |||
130 | |||
131 | /** | ||
132 | * Reply to a MeshQueryMessage. | ||
133 | */ | ||
134 | struct MeshReplyMessage | ||
135 | { | ||
136 | |||
137 | /** | ||
138 | * Type is GNUNET_MESSAGE_TYPE_FS_MESH_REPLY. | ||
139 | */ | ||
140 | struct GNUNET_MessageHeader header; | ||
141 | |||
142 | /** | ||
143 | * Block type must be DBLOCK or IBLOCK. | ||
144 | */ | ||
145 | uint32_t type GNUNET_PACKED; | ||
146 | |||
147 | /** | ||
148 | * Expiration time for the block. | ||
149 | */ | ||
150 | struct GNUNET_TIME_AbsoluteNBO expiration; | ||
151 | |||
152 | /* followed by the encrypted block */ | ||
153 | |||
154 | }; | ||
155 | |||
156 | GNUNET_NETWORK_STRUCT_END | ||
157 | |||
90 | 158 | ||
91 | #endif | 159 | #endif |
diff --git a/src/fs/gnunet-service-fs_mesh_server.c b/src/fs/gnunet-service-fs_mesh_server.c new file mode 100644 index 000000000..182408ef0 --- /dev/null +++ b/src/fs/gnunet-service-fs_mesh_server.c | |||
@@ -0,0 +1,589 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | (C) 2012, 2013 Christian Grothoff (and other contributing authors) | ||
4 | |||
5 | GNUnet is free software; you can redistribute it and/or modify | ||
6 | it under the terms of the GNU General Public License as published | ||
7 | by the Free Software Foundation; either version 3, or (at your | ||
8 | option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU General Public License | ||
16 | along with GNUnet; see the file COPYING. If not, write to the | ||
17 | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | ||
18 | Boston, MA 02111-1307, USA. | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file fs/gnunet-service-fs_mesh.c | ||
23 | * @brief non-anonymous file-transfer | ||
24 | * @author Christian Grothoff | ||
25 | * | ||
26 | * TODO: | ||
27 | * - MESH2 API doesn't allow flow control for server yet (needed!) | ||
28 | * - likely need to register clean up handler with mesh to handle | ||
29 | * client disconnect (likely leaky right now) | ||
30 | * - server is optional, currently client code will NPE if we have | ||
31 | * no server, again MESH2 API requirement forcing this for now | ||
32 | * - message handlers are symmetric for client/server, should be | ||
33 | * separated (currently clients can get requests and servers can | ||
34 | * handle answers, not good) | ||
35 | * - code is entirely untested | ||
36 | * - might have overlooked a few possible simplifications | ||
37 | * - PORT is set to old application type, unsure if we should keep | ||
38 | * it that way (fine for now) | ||
39 | */ | ||
40 | #include "platform.h" | ||
41 | #include "gnunet_constants.h" | ||
42 | #include "gnunet_util_lib.h" | ||
43 | #include "gnunet_mesh_service.h" | ||
44 | #include "gnunet_protocols.h" | ||
45 | #include "gnunet_applications.h" | ||
46 | #include "gnunet-service-fs.h" | ||
47 | #include "gnunet-service-fs_indexing.h" | ||
48 | #include "gnunet-service-fs_mesh.h" | ||
49 | |||
50 | /** | ||
51 | * After how long do we termiante idle connections? | ||
52 | */ | ||
53 | #define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2) | ||
54 | |||
55 | |||
56 | /** | ||
57 | * A message in the queue to be written to the mesh. | ||
58 | */ | ||
59 | struct WriteQueueItem | ||
60 | { | ||
61 | /** | ||
62 | * Kept in a DLL. | ||
63 | */ | ||
64 | struct WriteQueueItem *next; | ||
65 | |||
66 | /** | ||
67 | * Kept in a DLL. | ||
68 | */ | ||
69 | struct WriteQueueItem *prev; | ||
70 | |||
71 | /** | ||
72 | * Number of bytes of payload, allocated at the end of this struct. | ||
73 | */ | ||
74 | size_t msize; | ||
75 | }; | ||
76 | |||
77 | |||
78 | /** | ||
79 | * Information we keep around for each active meshing client. | ||
80 | */ | ||
81 | struct MeshClient | ||
82 | { | ||
83 | /** | ||
84 | * DLL | ||
85 | */ | ||
86 | struct MeshClient *next; | ||
87 | |||
88 | /** | ||
89 | * DLL | ||
90 | */ | ||
91 | struct MeshClient *prev; | ||
92 | |||
93 | /** | ||
94 | * Socket for communication. | ||
95 | */ | ||
96 | struct GNUNET_MESH_Tunnel *socket; | ||
97 | |||
98 | /** | ||
99 | * Handle for active write operation, or NULL. | ||
100 | */ | ||
101 | struct GNUNET_MESH_TransmitHandle *wh; | ||
102 | |||
103 | /** | ||
104 | * Head of write queue. | ||
105 | */ | ||
106 | struct WriteQueueItem *wqi_head; | ||
107 | |||
108 | /** | ||
109 | * Tail of write queue. | ||
110 | */ | ||
111 | struct WriteQueueItem *wqi_tail; | ||
112 | |||
113 | /** | ||
114 | * Current active request to the datastore, if we have one pending. | ||
115 | */ | ||
116 | struct GNUNET_DATASTORE_QueueEntry *qe; | ||
117 | |||
118 | /** | ||
119 | * Task that is scheduled to asynchronously terminate the connection. | ||
120 | */ | ||
121 | GNUNET_SCHEDULER_TaskIdentifier terminate_task; | ||
122 | |||
123 | /** | ||
124 | * Task that is scheduled to terminate idle connections. | ||
125 | */ | ||
126 | GNUNET_SCHEDULER_TaskIdentifier timeout_task; | ||
127 | |||
128 | /** | ||
129 | * Size of the last write that was initiated. | ||
130 | */ | ||
131 | size_t reply_size; | ||
132 | |||
133 | }; | ||
134 | |||
135 | |||
136 | /** | ||
137 | * Listen socket for incoming requests. | ||
138 | */ | ||
139 | static struct GNUNET_MESH_Handle *listen_socket; | ||
140 | |||
141 | /** | ||
142 | * Head of DLL of mesh clients. | ||
143 | */ | ||
144 | static struct MeshClient *sc_head; | ||
145 | |||
146 | /** | ||
147 | * Tail of DLL of mesh clients. | ||
148 | */ | ||
149 | static struct MeshClient *sc_tail; | ||
150 | |||
151 | /** | ||
152 | * Number of active mesh clients in the 'sc_*'-DLL. | ||
153 | */ | ||
154 | static unsigned int sc_count; | ||
155 | |||
156 | /** | ||
157 | * Maximum allowed number of mesh clients. | ||
158 | */ | ||
159 | static unsigned long long sc_count_max; | ||
160 | |||
161 | |||
162 | |||
163 | /* ********************* server-side code ************************* */ | ||
164 | |||
165 | |||
166 | /** | ||
167 | * We're done with a particular client, clean up. | ||
168 | * | ||
169 | * @param sc client to clean up | ||
170 | */ | ||
171 | static void | ||
172 | terminate_mesh (struct MeshClient *sc) | ||
173 | { | ||
174 | struct WriteQueueItem *wqi; | ||
175 | |||
176 | fprintf (stderr, | ||
177 | "terminate mesh called for %p\n", | ||
178 | sc); | ||
179 | GNUNET_STATISTICS_update (GSF_stats, | ||
180 | gettext_noop ("# mesh connections active"), -1, | ||
181 | GNUNET_NO); | ||
182 | if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task) | ||
183 | GNUNET_SCHEDULER_cancel (sc->terminate_task); | ||
184 | if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task) | ||
185 | GNUNET_SCHEDULER_cancel (sc->timeout_task); | ||
186 | if (NULL != sc->wh) | ||
187 | GNUNET_MESH_notify_transmit_ready_cancel (sc->wh); | ||
188 | if (NULL != sc->qe) | ||
189 | GNUNET_DATASTORE_cancel (sc->qe); | ||
190 | while (NULL != (wqi = sc->wqi_head)) | ||
191 | { | ||
192 | GNUNET_CONTAINER_DLL_remove (sc->wqi_head, | ||
193 | sc->wqi_tail, | ||
194 | wqi); | ||
195 | GNUNET_free (wqi); | ||
196 | } | ||
197 | GNUNET_CONTAINER_DLL_remove (sc_head, | ||
198 | sc_tail, | ||
199 | sc); | ||
200 | sc_count--; | ||
201 | GNUNET_free (sc); | ||
202 | } | ||
203 | |||
204 | |||
205 | /** | ||
206 | * Task run to asynchronously terminate the mesh due to timeout. | ||
207 | * | ||
208 | * @param cls the 'struct MeshClient' | ||
209 | * @param tc scheduler context | ||
210 | */ | ||
211 | static void | ||
212 | timeout_mesh_task (void *cls, | ||
213 | const struct GNUNET_SCHEDULER_TaskContext *tc) | ||
214 | { | ||
215 | struct MeshClient *sc = cls; | ||
216 | struct GNUNET_MESH_Tunnel *tun; | ||
217 | |||
218 | sc->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||
219 | tun = sc->socket; | ||
220 | sc->socket = NULL; | ||
221 | GNUNET_MESH_tunnel_destroy (tun); | ||
222 | } | ||
223 | |||
224 | |||
225 | /** | ||
226 | * Reset the timeout for the mesh client (due to activity). | ||
227 | * | ||
228 | * @param sc client handle to reset timeout for | ||
229 | */ | ||
230 | static void | ||
231 | refresh_timeout_task (struct MeshClient *sc) | ||
232 | { | ||
233 | if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task) | ||
234 | GNUNET_SCHEDULER_cancel (sc->timeout_task); | ||
235 | sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT, | ||
236 | &timeout_mesh_task, | ||
237 | sc); | ||
238 | } | ||
239 | |||
240 | |||
241 | /** | ||
242 | * We're done handling a request from a client, read the next one. | ||
243 | * | ||
244 | * @param sc client to continue reading requests from | ||
245 | */ | ||
246 | static void | ||
247 | continue_reading (struct MeshClient *sc) | ||
248 | { | ||
249 | refresh_timeout_task (sc); | ||
250 | GNUNET_MESH_receive_done (sc->socket); | ||
251 | } | ||
252 | |||
253 | |||
254 | /** | ||
255 | * Transmit the next entry from the write queue. | ||
256 | * | ||
257 | * @param sc where to process the write queue | ||
258 | */ | ||
259 | static void | ||
260 | continue_writing (struct MeshClient *sc); | ||
261 | |||
262 | |||
263 | /** | ||
264 | * Send a reply now, mesh is ready. | ||
265 | * | ||
266 | * @param cls closure with the struct MeshClient which sent the query | ||
267 | * @param size number of bytes available in 'buf' | ||
268 | * @param buf where to write the message | ||
269 | * @return number of bytes written to 'buf' | ||
270 | */ | ||
271 | static size_t | ||
272 | write_continuation (void *cls, | ||
273 | size_t size, | ||
274 | void *buf) | ||
275 | { | ||
276 | struct MeshClient *sc = cls; | ||
277 | struct WriteQueueItem *wqi; | ||
278 | size_t ret; | ||
279 | |||
280 | sc->wh = NULL; | ||
281 | if (NULL == (wqi = sc->wqi_head)) | ||
282 | { | ||
283 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
284 | "Write queue empty, reading more requests\n"); | ||
285 | return 0; | ||
286 | } | ||
287 | if (0 == size) | ||
288 | { | ||
289 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
290 | "Transmission of reply failed, terminating mesh\n"); | ||
291 | terminate_mesh (sc); | ||
292 | return 0; | ||
293 | } | ||
294 | GNUNET_CONTAINER_DLL_remove (sc->wqi_head, | ||
295 | sc->wqi_tail, | ||
296 | wqi); | ||
297 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
298 | "Transmitted %u byte reply via mesh\n", | ||
299 | (unsigned int) size); | ||
300 | GNUNET_STATISTICS_update (GSF_stats, | ||
301 | gettext_noop ("# Blocks transferred via mesh"), 1, | ||
302 | GNUNET_NO); | ||
303 | memcpy (buf, &wqi[1], ret = wqi->msize); | ||
304 | GNUNET_free (wqi); | ||
305 | continue_writing (sc); | ||
306 | return ret; | ||
307 | } | ||
308 | |||
309 | |||
310 | /** | ||
311 | * Transmit the next entry from the write queue. | ||
312 | * | ||
313 | * @param sc where to process the write queue | ||
314 | */ | ||
315 | static void | ||
316 | continue_writing (struct MeshClient *sc) | ||
317 | { | ||
318 | struct WriteQueueItem *wqi; | ||
319 | |||
320 | if (NULL != sc->wh) | ||
321 | { | ||
322 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
323 | "Write pending, waiting for it to complete\n"); | ||
324 | return; /* write already pending */ | ||
325 | } | ||
326 | if (NULL == (wqi = sc->wqi_head)) | ||
327 | { | ||
328 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
329 | "Write queue empty, reading more requests\n"); | ||
330 | continue_reading (sc); | ||
331 | return; | ||
332 | } | ||
333 | sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO, | ||
334 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
335 | wqi->msize, | ||
336 | &write_continuation, | ||
337 | sc); | ||
338 | if (NULL == sc->wh) | ||
339 | { | ||
340 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
341 | "Write failed; terminating mesh\n"); | ||
342 | terminate_mesh (sc); | ||
343 | return; | ||
344 | } | ||
345 | } | ||
346 | |||
347 | |||
348 | /** | ||
349 | * Process a datum that was stored in the datastore. | ||
350 | * | ||
351 | * @param cls closure with the struct MeshClient which sent the query | ||
352 | * @param key key for the content | ||
353 | * @param size number of bytes in data | ||
354 | * @param data content stored | ||
355 | * @param type type of the content | ||
356 | * @param priority priority of the content | ||
357 | * @param anonymity anonymity-level for the content | ||
358 | * @param expiration expiration time for the content | ||
359 | * @param uid unique identifier for the datum; | ||
360 | * maybe 0 if no unique identifier is available | ||
361 | */ | ||
362 | static void | ||
363 | handle_datastore_reply (void *cls, | ||
364 | const struct GNUNET_HashCode * key, | ||
365 | size_t size, const void *data, | ||
366 | enum GNUNET_BLOCK_Type type, | ||
367 | uint32_t priority, | ||
368 | uint32_t anonymity, | ||
369 | struct GNUNET_TIME_Absolute | ||
370 | expiration, uint64_t uid) | ||
371 | { | ||
372 | struct MeshClient *sc = cls; | ||
373 | size_t msize = size + sizeof (struct MeshReplyMessage); | ||
374 | struct WriteQueueItem *wqi; | ||
375 | struct MeshReplyMessage *srm; | ||
376 | |||
377 | sc->qe = NULL; | ||
378 | if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) | ||
379 | { | ||
380 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
381 | "Performing on-demand encoding\n"); | ||
382 | if (GNUNET_OK != | ||
383 | GNUNET_FS_handle_on_demand_block (key, | ||
384 | size, data, type, | ||
385 | priority, anonymity, | ||
386 | expiration, uid, | ||
387 | &handle_datastore_reply, | ||
388 | sc)) | ||
389 | { | ||
390 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
391 | "On-demand encoding request failed\n"); | ||
392 | continue_writing (sc); | ||
393 | } | ||
394 | return; | ||
395 | } | ||
396 | if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE) | ||
397 | { | ||
398 | GNUNET_break (0); | ||
399 | continue_writing (sc); | ||
400 | return; | ||
401 | } | ||
402 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
403 | "Starting transmission of %u byte reply for query `%s' via mesh\n", | ||
404 | (unsigned int) size, | ||
405 | GNUNET_h2s (key)); | ||
406 | wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize); | ||
407 | wqi->msize = msize; | ||
408 | srm = (struct MeshReplyMessage *) &wqi[1]; | ||
409 | srm->header.size = htons ((uint16_t) msize); | ||
410 | srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MESH_REPLY); | ||
411 | srm->type = htonl (type); | ||
412 | srm->expiration = GNUNET_TIME_absolute_hton (expiration); | ||
413 | memcpy (&srm[1], data, size); | ||
414 | sc->reply_size = msize; | ||
415 | GNUNET_CONTAINER_DLL_insert (sc->wqi_head, | ||
416 | sc->wqi_tail, | ||
417 | wqi); | ||
418 | continue_writing (sc); | ||
419 | } | ||
420 | |||
421 | |||
422 | /** | ||
423 | * Functions with this signature are called whenever a | ||
424 | * complete query message is received. | ||
425 | * | ||
426 | * Do not call GNUNET_SERVER_mst_destroy in callback | ||
427 | * | ||
428 | * @param cls closure with the 'struct MeshClient' | ||
429 | * @param tunnel tunnel handle | ||
430 | * @param tunnel_ctx tunnel context | ||
431 | * @param message the actual message | ||
432 | * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing | ||
433 | */ | ||
434 | static int | ||
435 | request_cb (void *cls, | ||
436 | struct GNUNET_MESH_Tunnel *tunnel, | ||
437 | void **tunnel_ctx, | ||
438 | const struct GNUNET_MessageHeader *message) | ||
439 | { | ||
440 | struct MeshClient *sc = *tunnel_ctx; | ||
441 | const struct MeshQueryMessage *sqm; | ||
442 | |||
443 | fprintf (stderr, | ||
444 | "Request gets %p\n", | ||
445 | sc); | ||
446 | sqm = (const struct MeshQueryMessage *) message; | ||
447 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
448 | "Received query for `%s' via mesh\n", | ||
449 | GNUNET_h2s (&sqm->query)); | ||
450 | GNUNET_STATISTICS_update (GSF_stats, | ||
451 | gettext_noop ("# queries received via mesh"), 1, | ||
452 | GNUNET_NO); | ||
453 | refresh_timeout_task (sc); | ||
454 | sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, | ||
455 | 0, | ||
456 | &sqm->query, | ||
457 | ntohl (sqm->type), | ||
458 | 0 /* priority */, | ||
459 | GSF_datastore_queue_size, | ||
460 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
461 | &handle_datastore_reply, sc); | ||
462 | if (NULL == sc->qe) | ||
463 | { | ||
464 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
465 | "Queueing request with datastore failed (queue full?)\n"); | ||
466 | continue_writing (sc); | ||
467 | } | ||
468 | return GNUNET_OK; | ||
469 | } | ||
470 | |||
471 | |||
472 | /** | ||
473 | * Functions of this type are called upon new mesh connection from other peers. | ||
474 | * | ||
475 | * @param cls the closure from GNUNET_MESH_connect | ||
476 | * @param socket the socket representing the mesh | ||
477 | * @param initiator the identity of the peer who wants to establish a mesh | ||
478 | * with us; NULL on binding error | ||
479 | * @param port mesh port used for the incoming connection | ||
480 | * @return initial tunnel context (our 'struct MeshClient') | ||
481 | */ | ||
482 | static void * | ||
483 | accept_cb (void *cls, | ||
484 | struct GNUNET_MESH_Tunnel *socket, | ||
485 | const struct GNUNET_PeerIdentity *initiator, | ||
486 | uint32_t port) | ||
487 | { | ||
488 | struct MeshClient *sc; | ||
489 | |||
490 | GNUNET_assert (NULL != socket); | ||
491 | if (sc_count >= sc_count_max) | ||
492 | { | ||
493 | GNUNET_STATISTICS_update (GSF_stats, | ||
494 | gettext_noop ("# mesh client connections rejected"), 1, | ||
495 | GNUNET_NO); | ||
496 | GNUNET_MESH_tunnel_destroy (socket); | ||
497 | return NULL; | ||
498 | } | ||
499 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
500 | "Accepting inbound mesh connection from `%s'\n", | ||
501 | GNUNET_i2s (initiator)); | ||
502 | GNUNET_STATISTICS_update (GSF_stats, | ||
503 | gettext_noop ("# mesh connections active"), 1, | ||
504 | GNUNET_NO); | ||
505 | sc = GNUNET_new (struct MeshClient); | ||
506 | sc->socket = socket; | ||
507 | GNUNET_CONTAINER_DLL_insert (sc_head, | ||
508 | sc_tail, | ||
509 | sc); | ||
510 | sc_count++; | ||
511 | refresh_timeout_task (sc); | ||
512 | fprintf (stderr, | ||
513 | "Accept returns %p\n", | ||
514 | sc); | ||
515 | return sc; | ||
516 | } | ||
517 | |||
518 | |||
519 | /** | ||
520 | * Function called by mesh when a client disconnects. | ||
521 | * Cleans up our 'struct MeshClient' of that tunnel. | ||
522 | * | ||
523 | * @param cls NULL | ||
524 | * @param tunnel tunnel of the disconnecting client | ||
525 | * @param tunnel_ctx our 'struct MeshClient' | ||
526 | */ | ||
527 | static void | ||
528 | cleaner_cb (void *cls, | ||
529 | const struct GNUNET_MESH_Tunnel *tunnel, | ||
530 | void *tunnel_ctx) | ||
531 | { | ||
532 | struct MeshClient *sc = tunnel_ctx; | ||
533 | |||
534 | fprintf (stderr, | ||
535 | "Cleaner called with %p\n", | ||
536 | sc); | ||
537 | if (NULL != sc) | ||
538 | terminate_mesh (sc); | ||
539 | } | ||
540 | |||
541 | |||
542 | /** | ||
543 | * Initialize subsystem for non-anonymous file-sharing. | ||
544 | */ | ||
545 | void | ||
546 | GSF_mesh_start_server () | ||
547 | { | ||
548 | static const struct GNUNET_MESH_MessageHandler handlers[] = { | ||
549 | { &request_cb, GNUNET_MESSAGE_TYPE_FS_MESH_QUERY, sizeof (struct MeshQueryMessage)}, | ||
550 | { NULL, 0, 0 } | ||
551 | }; | ||
552 | static const uint32_t ports[] = { | ||
553 | GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, | ||
554 | 0 | ||
555 | }; | ||
556 | |||
557 | if (GNUNET_YES != | ||
558 | GNUNET_CONFIGURATION_get_value_number (GSF_cfg, | ||
559 | "fs", | ||
560 | "MAX_MESH_CLIENTS", | ||
561 | &sc_count_max)) | ||
562 | return; | ||
563 | listen_socket = GNUNET_MESH_connect (GSF_cfg, | ||
564 | NULL, | ||
565 | &accept_cb, | ||
566 | &cleaner_cb, | ||
567 | handlers, | ||
568 | ports); | ||
569 | } | ||
570 | |||
571 | |||
572 | /** | ||
573 | * Shutdown subsystem for non-anonymous file-sharing. | ||
574 | */ | ||
575 | void | ||
576 | GSF_mesh_stop_server () | ||
577 | { | ||
578 | struct MeshClient *sc; | ||
579 | |||
580 | while (NULL != (sc = sc_head)) | ||
581 | terminate_mesh (sc); | ||
582 | if (NULL != listen_socket) | ||
583 | { | ||
584 | GNUNET_MESH_disconnect (listen_socket); | ||
585 | listen_socket = NULL; | ||
586 | } | ||
587 | } | ||
588 | |||
589 | /* end of gnunet-service-fs_mesh.c */ | ||
diff --git a/src/fs/test_gnunet_service_fs_p2p_mesh.conf b/src/fs/test_gnunet_service_fs_p2p_mesh.conf index 9d01208cb..328bc986c 100644 --- a/src/fs/test_gnunet_service_fs_p2p_mesh.conf +++ b/src/fs/test_gnunet_service_fs_p2p_mesh.conf | |||
@@ -14,3 +14,7 @@ CONTENT_CACHING = NO | |||
14 | # (may improve anonymity, probably not a good idea if content_caching is NO) | 14 | # (may improve anonymity, probably not a good idea if content_caching is NO) |
15 | CONTENT_PUSHING = NO | 15 | CONTENT_PUSHING = NO |
16 | 16 | ||
17 | #PREFIX = valgrind | ||
18 | |||
19 | [mesh] | ||
20 | #PREFIX = valgrind | ||