diff options
author | Martin Schanzenbach <schanzen@gnunet.org> | 2023-10-19 11:55:21 +0200 |
---|---|---|
committer | Martin Schanzenbach <schanzen@gnunet.org> | 2023-10-19 11:55:21 +0200 |
commit | 579d9473bb75072303789599b23be9b0203336fc (patch) | |
tree | 687506d1968bd2a391b71b8832d1e97905db3ca8 /src/fs/gnunet-service-fs_cadet_client.c | |
parent | b56e4e05ad919c7191260fcf1d78b1f8d739871a (diff) | |
download | gnunet-579d9473bb75072303789599b23be9b0203336fc.tar.gz gnunet-579d9473bb75072303789599b23be9b0203336fc.zip |
BUILD: Move fs to contrib/service
Diffstat (limited to 'src/fs/gnunet-service-fs_cadet_client.c')
-rw-r--r-- | src/fs/gnunet-service-fs_cadet_client.c | 728 |
1 files changed, 0 insertions, 728 deletions
diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c deleted file mode 100644 index 398fcd604..000000000 --- a/src/fs/gnunet-service-fs_cadet_client.c +++ /dev/null | |||
@@ -1,728 +0,0 @@ | |||
1 | /* | ||
2 | This file is part of GNUnet. | ||
3 | Copyright (C) 2012, 2013 GNUnet e.V. | ||
4 | |||
5 | GNUnet is free software: you can redistribute it and/or modify it | ||
6 | under the terms of the GNU Affero General Public License as published | ||
7 | by the Free Software Foundation, either version 3 of the License, | ||
8 | or (at your option) any later version. | ||
9 | |||
10 | GNUnet is distributed in the hope that it will be useful, but | ||
11 | WITHOUT ANY WARRANTY; without even the implied warranty of | ||
12 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
13 | Affero General Public License for more details. | ||
14 | |||
15 | You should have received a copy of the GNU Affero General Public License | ||
16 | along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
17 | |||
18 | SPDX-License-Identifier: AGPL3.0-or-later | ||
19 | */ | ||
20 | |||
21 | /** | ||
22 | * @file fs/gnunet-service-fs_cadet_client.c | ||
23 | * @brief non-anonymous file-transfer | ||
24 | * @author Christian Grothoff | ||
25 | * | ||
26 | * TODO: | ||
27 | * - PORT is set to old application type, unsure if we should keep | ||
28 | * it that way (fine for now) | ||
29 | */ | ||
30 | #include "platform.h" | ||
31 | #include "gnunet_constants.h" | ||
32 | #include "gnunet_util_lib.h" | ||
33 | #include "gnunet_cadet_service.h" | ||
34 | #include "gnunet_protocols.h" | ||
35 | #include "gnunet_applications.h" | ||
36 | #include "gnunet-service-fs.h" | ||
37 | #include "gnunet-service-fs_indexing.h" | ||
38 | #include "gnunet-service-fs_cadet.h" | ||
39 | |||
40 | |||
41 | /** | ||
42 | * After how long do we reset connections without replies? | ||
43 | */ | ||
44 | #define CLIENT_RETRY_TIMEOUT \ | ||
45 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) | ||
46 | |||
47 | |||
48 | /** | ||
49 | * Handle for a cadet to another peer. | ||
50 | */ | ||
51 | struct CadetHandle; | ||
52 | |||
53 | |||
54 | /** | ||
55 | * Handle for a request that is going out via cadet API. | ||
56 | */ | ||
57 | struct GSF_CadetRequest | ||
58 | { | ||
59 | /** | ||
60 | * DLL. | ||
61 | */ | ||
62 | struct GSF_CadetRequest *next; | ||
63 | |||
64 | /** | ||
65 | * DLL. | ||
66 | */ | ||
67 | struct GSF_CadetRequest *prev; | ||
68 | |||
69 | /** | ||
70 | * Which cadet is this request associated with? | ||
71 | */ | ||
72 | struct CadetHandle *mh; | ||
73 | |||
74 | /** | ||
75 | * Function to call with the result. | ||
76 | */ | ||
77 | GSF_CadetReplyProcessor proc; | ||
78 | |||
79 | /** | ||
80 | * Closure for @e proc | ||
81 | */ | ||
82 | void *proc_cls; | ||
83 | |||
84 | /** | ||
85 | * Query to transmit to the other peer. | ||
86 | */ | ||
87 | struct GNUNET_HashCode query; | ||
88 | |||
89 | /** | ||
90 | * Desired type for the reply. | ||
91 | */ | ||
92 | enum GNUNET_BLOCK_Type type; | ||
93 | |||
94 | /** | ||
95 | * Did we transmit this request already? #GNUNET_YES if we are | ||
96 | * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL. | ||
97 | */ | ||
98 | int was_transmitted; | ||
99 | }; | ||
100 | |||
101 | |||
102 | /** | ||
103 | * Handle for a cadet to another peer. | ||
104 | */ | ||
105 | struct CadetHandle | ||
106 | { | ||
107 | /** | ||
108 | * Head of DLL of pending requests on this cadet. | ||
109 | */ | ||
110 | struct GSF_CadetRequest *pending_head; | ||
111 | |||
112 | /** | ||
113 | * Tail of DLL of pending requests on this cadet. | ||
114 | */ | ||
115 | struct GSF_CadetRequest *pending_tail; | ||
116 | |||
117 | /** | ||
118 | * Map from query to `struct GSF_CadetRequest`s waiting for | ||
119 | * a reply. | ||
120 | */ | ||
121 | struct GNUNET_CONTAINER_MultiHashMap *waiting_map; | ||
122 | |||
123 | /** | ||
124 | * Channel to the other peer. | ||
125 | */ | ||
126 | struct GNUNET_CADET_Channel *channel; | ||
127 | |||
128 | /** | ||
129 | * Which peer does this cadet go to? | ||
130 | */ | ||
131 | struct GNUNET_PeerIdentity target; | ||
132 | |||
133 | /** | ||
134 | * Task to kill inactive cadets (we keep them around for | ||
135 | * a few seconds to give the application a chance to give | ||
136 | * us another query). | ||
137 | */ | ||
138 | struct GNUNET_SCHEDULER_Task *timeout_task; | ||
139 | |||
140 | /** | ||
141 | * Task to reset cadets that had errors (asynchronously, | ||
142 | * as we may not be able to do it immediately during a | ||
143 | * callback from the cadet API). | ||
144 | */ | ||
145 | struct GNUNET_SCHEDULER_Task *reset_task; | ||
146 | }; | ||
147 | |||
148 | |||
149 | /** | ||
150 | * Cadet channel for creating outbound channels. | ||
151 | */ | ||
152 | struct GNUNET_CADET_Handle *cadet_handle; | ||
153 | |||
154 | /** | ||
155 | * Map from peer identities to 'struct CadetHandles' with cadet | ||
156 | * channels to those peers. | ||
157 | */ | ||
158 | struct GNUNET_CONTAINER_MultiPeerMap *cadet_map; | ||
159 | |||
160 | |||
161 | /* ********************* client-side code ************************* */ | ||
162 | |||
163 | |||
164 | /** | ||
165 | * Transmit pending requests via the cadet. | ||
166 | * | ||
167 | * @param cls `struct CadetHandle` to process | ||
168 | */ | ||
169 | static void | ||
170 | transmit_pending (void *cls); | ||
171 | |||
172 | |||
173 | /** | ||
174 | * Iterator called on each entry in a waiting map to | ||
175 | * move it back to the pending list. | ||
176 | * | ||
177 | * @param cls the `struct CadetHandle` | ||
178 | * @param key the key of the entry in the map (the query) | ||
179 | * @param value the `struct GSF_CadetRequest` to move to pending | ||
180 | * @return #GNUNET_YES (continue to iterate) | ||
181 | */ | ||
182 | static int | ||
183 | move_to_pending (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
184 | { | ||
185 | struct CadetHandle *mh = cls; | ||
186 | struct GSF_CadetRequest *sr = value; | ||
187 | |||
188 | GNUNET_assert ( | ||
189 | GNUNET_YES == | ||
190 | GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, key, value)); | ||
191 | GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr); | ||
192 | sr->was_transmitted = GNUNET_NO; | ||
193 | return GNUNET_YES; | ||
194 | } | ||
195 | |||
196 | |||
197 | /** | ||
198 | * Functions with this signature are called whenever a complete reply | ||
199 | * is received. | ||
200 | * | ||
201 | * @param cls closure with the `struct CadetHandle` | ||
202 | * @param srm the actual message | ||
203 | * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing | ||
204 | */ | ||
205 | static int | ||
206 | check_reply (void *cls, const struct CadetReplyMessage *srm) | ||
207 | { | ||
208 | /* We check later... */ | ||
209 | return GNUNET_OK; | ||
210 | } | ||
211 | |||
212 | |||
213 | /** | ||
214 | * Task called when it is time to reset an cadet. | ||
215 | * | ||
216 | * @param cls the `struct CadetHandle` to tear down | ||
217 | */ | ||
218 | static void | ||
219 | reset_cadet_task (void *cls); | ||
220 | |||
221 | |||
222 | /** | ||
223 | * We had a serious error, tear down and re-create cadet from scratch, | ||
224 | * but do so asynchronously. | ||
225 | * | ||
226 | * @param mh cadet to reset | ||
227 | */ | ||
228 | static void | ||
229 | reset_cadet_async (struct CadetHandle *mh) | ||
230 | { | ||
231 | if (NULL != mh->reset_task) | ||
232 | GNUNET_SCHEDULER_cancel (mh->reset_task); | ||
233 | mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, mh); | ||
234 | } | ||
235 | |||
236 | |||
237 | /** | ||
238 | * Closure for handle_reply(). | ||
239 | */ | ||
240 | struct HandleReplyClosure | ||
241 | { | ||
242 | /** | ||
243 | * Reply payload. | ||
244 | */ | ||
245 | const void *data; | ||
246 | |||
247 | /** | ||
248 | * Expiration time for the block. | ||
249 | */ | ||
250 | struct GNUNET_TIME_Absolute expiration; | ||
251 | |||
252 | /** | ||
253 | * Number of bytes in @e data. | ||
254 | */ | ||
255 | size_t data_size; | ||
256 | |||
257 | /** | ||
258 | * Type of the block. | ||
259 | */ | ||
260 | enum GNUNET_BLOCK_Type type; | ||
261 | |||
262 | /** | ||
263 | * Did we have a matching query? | ||
264 | */ | ||
265 | int found; | ||
266 | }; | ||
267 | |||
268 | |||
269 | /** | ||
270 | * Iterator called on each entry in a waiting map to | ||
271 | * process a result. | ||
272 | * | ||
273 | * @param cls the `struct HandleReplyClosure` | ||
274 | * @param key the key of the entry in the map (the query) | ||
275 | * @param value the `struct GSF_CadetRequest` to handle result for | ||
276 | * @return #GNUNET_YES (continue to iterate) | ||
277 | */ | ||
278 | static int | ||
279 | process_reply (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
280 | { | ||
281 | struct HandleReplyClosure *hrc = cls; | ||
282 | struct GSF_CadetRequest *sr = value; | ||
283 | |||
284 | sr->proc (sr->proc_cls, | ||
285 | hrc->type, | ||
286 | hrc->expiration, | ||
287 | hrc->data_size, | ||
288 | hrc->data); | ||
289 | sr->proc = NULL; | ||
290 | GSF_cadet_query_cancel (sr); | ||
291 | hrc->found = GNUNET_YES; | ||
292 | return GNUNET_YES; | ||
293 | } | ||
294 | |||
295 | |||
296 | /** | ||
297 | * Iterator called on each entry in a waiting map to | ||
298 | * call the 'proc' continuation and release associated | ||
299 | * resources. | ||
300 | * | ||
301 | * @param cls the `struct CadetHandle` | ||
302 | * @param key the key of the entry in the map (the query) | ||
303 | * @param value the `struct GSF_CadetRequest` to clean up | ||
304 | * @return #GNUNET_YES (continue to iterate) | ||
305 | */ | ||
306 | static int | ||
307 | free_waiting_entry (void *cls, const struct GNUNET_HashCode *key, void *value) | ||
308 | { | ||
309 | struct GSF_CadetRequest *sr = value; | ||
310 | |||
311 | GSF_cadet_query_cancel (sr); | ||
312 | return GNUNET_YES; | ||
313 | } | ||
314 | |||
315 | |||
316 | /** | ||
317 | * Functions with this signature are called whenever a complete reply | ||
318 | * is received. | ||
319 | * | ||
320 | * @param cls closure with the `struct CadetHandle` | ||
321 | * @param srm the actual message | ||
322 | */ | ||
323 | static void | ||
324 | handle_reply (void *cls, const struct CadetReplyMessage *srm) | ||
325 | { | ||
326 | struct CadetHandle *mh = cls; | ||
327 | struct HandleReplyClosure hrc; | ||
328 | uint16_t msize; | ||
329 | enum GNUNET_BLOCK_Type type; | ||
330 | struct GNUNET_HashCode query; | ||
331 | |||
332 | msize = ntohs (srm->header.size) - sizeof(struct CadetReplyMessage); | ||
333 | type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); | ||
334 | if (GNUNET_YES != | ||
335 | GNUNET_BLOCK_get_key (GSF_block_ctx, type, &srm[1], msize, &query)) | ||
336 | { | ||
337 | GNUNET_break_op (0); | ||
338 | GNUNET_log ( | ||
339 | GNUNET_ERROR_TYPE_WARNING, | ||
340 | "Received bogus reply of type %u with %u bytes via cadet from peer %s\n", | ||
341 | type, | ||
342 | msize, | ||
343 | GNUNET_i2s (&mh->target)); | ||
344 | reset_cadet_async (mh); | ||
345 | return; | ||
346 | } | ||
347 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
348 | "Received reply `%s' via cadet from peer %s\n", | ||
349 | GNUNET_h2s (&query), | ||
350 | GNUNET_i2s (&mh->target)); | ||
351 | GNUNET_CADET_receive_done (mh->channel); | ||
352 | GNUNET_STATISTICS_update (GSF_stats, | ||
353 | gettext_noop ("# replies received via cadet"), | ||
354 | 1, | ||
355 | GNUNET_NO); | ||
356 | hrc.data = &srm[1]; | ||
357 | hrc.data_size = msize; | ||
358 | hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration); | ||
359 | hrc.type = type; | ||
360 | hrc.found = GNUNET_NO; | ||
361 | GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map, | ||
362 | &query, | ||
363 | &process_reply, | ||
364 | &hrc); | ||
365 | if (GNUNET_NO == hrc.found) | ||
366 | { | ||
367 | GNUNET_STATISTICS_update (GSF_stats, | ||
368 | gettext_noop ( | ||
369 | "# replies received via cadet dropped"), | ||
370 | 1, | ||
371 | GNUNET_NO); | ||
372 | } | ||
373 | } | ||
374 | |||
375 | |||
376 | /** | ||
377 | * Function called by cadet when a client disconnects. | ||
378 | * Cleans up our `struct CadetClient` of that channel. | ||
379 | * | ||
380 | * @param cls our `struct CadetClient` | ||
381 | * @param channel channel of the disconnecting client | ||
382 | */ | ||
383 | static void | ||
384 | disconnect_cb (void *cls, const struct GNUNET_CADET_Channel *channel) | ||
385 | { | ||
386 | struct CadetHandle *mh = cls; | ||
387 | struct GSF_CadetRequest *sr; | ||
388 | |||
389 | if (NULL == mh->channel) | ||
390 | return; /* being destroyed elsewhere */ | ||
391 | GNUNET_assert (channel == mh->channel); | ||
392 | mh->channel = NULL; | ||
393 | while (NULL != (sr = mh->pending_head)) | ||
394 | GSF_cadet_query_cancel (sr); | ||
395 | /* first remove `mh` from the `cadet_map`, so that if the | ||
396 | callback from `free_waiting_entry()` happens to re-issue | ||
397 | the request, we don't immediately have it back in the | ||
398 | `waiting_map`. */ | ||
399 | GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove (cadet_map, | ||
400 | &mh->target, | ||
401 | mh)); | ||
402 | GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, | ||
403 | &free_waiting_entry, | ||
404 | mh); | ||
405 | if (NULL != mh->timeout_task) | ||
406 | GNUNET_SCHEDULER_cancel (mh->timeout_task); | ||
407 | if (NULL != mh->reset_task) | ||
408 | GNUNET_SCHEDULER_cancel (mh->reset_task); | ||
409 | GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)); | ||
410 | GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map); | ||
411 | GNUNET_free (mh); | ||
412 | } | ||
413 | |||
414 | |||
415 | /** | ||
416 | * Function called whenever an MQ-channel's transmission window size changes. | ||
417 | * | ||
418 | * The first callback in an outgoing channel will be with a non-zero value | ||
419 | * and will mean the channel is connected to the destination. | ||
420 | * | ||
421 | * For an incoming channel it will be called immediately after the | ||
422 | * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value. | ||
423 | * | ||
424 | * @param cls Channel closure. | ||
425 | * @param channel Connection to the other end (henceforth invalid). | ||
426 | * @param window_size New window size. If the is more messages than buffer size | ||
427 | * this value will be negative.. | ||
428 | */ | ||
429 | static void | ||
430 | window_change_cb (void *cls, | ||
431 | const struct GNUNET_CADET_Channel *channel, | ||
432 | int window_size) | ||
433 | { | ||
434 | /* FIXME: for flow control, implement? */ | ||
435 | #if 0 | ||
436 | /* Something like this instead of the GNUNET_MQ_notify_sent() in | ||
437 | transmit_pending() might be good (once the window change CB works...) */ | ||
438 | if (0 < window_size) /* test needed? */ | ||
439 | transmit_pending (mh); | ||
440 | #endif | ||
441 | } | ||
442 | |||
443 | |||
444 | /** | ||
445 | * We had a serious error, tear down and re-create cadet from scratch. | ||
446 | * | ||
447 | * @param mh cadet to reset | ||
448 | */ | ||
449 | static void | ||
450 | reset_cadet (struct CadetHandle *mh) | ||
451 | { | ||
452 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
453 | "Resetting cadet channel to %s\n", | ||
454 | GNUNET_i2s (&mh->target)); | ||
455 | if (NULL != mh->channel) | ||
456 | { | ||
457 | GNUNET_CADET_channel_destroy (mh->channel); | ||
458 | mh->channel = NULL; | ||
459 | } | ||
460 | GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, &move_to_pending, mh); | ||
461 | { | ||
462 | struct GNUNET_MQ_MessageHandler handlers[] = | ||
463 | { GNUNET_MQ_hd_var_size (reply, | ||
464 | GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, | ||
465 | struct CadetReplyMessage, | ||
466 | mh), | ||
467 | GNUNET_MQ_handler_end () }; | ||
468 | struct GNUNET_HashCode port; | ||
469 | |||
470 | GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, | ||
471 | strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), | ||
472 | &port); | ||
473 | mh->channel = GNUNET_CADET_channel_create (cadet_handle, | ||
474 | mh, | ||
475 | &mh->target, | ||
476 | &port, | ||
477 | &window_change_cb, | ||
478 | &disconnect_cb, | ||
479 | handlers); | ||
480 | } | ||
481 | transmit_pending (mh); | ||
482 | } | ||
483 | |||
484 | |||
485 | /** | ||
486 | * Task called when it is time to destroy an inactive cadet channel. | ||
487 | * | ||
488 | * @param cls the `struct CadetHandle` to tear down | ||
489 | */ | ||
490 | static void | ||
491 | cadet_timeout (void *cls) | ||
492 | { | ||
493 | struct CadetHandle *mh = cls; | ||
494 | struct GNUNET_CADET_Channel *tun; | ||
495 | |||
496 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
497 | "Timeout on cadet channel to %s\n", | ||
498 | GNUNET_i2s (&mh->target)); | ||
499 | mh->timeout_task = NULL; | ||
500 | tun = mh->channel; | ||
501 | mh->channel = NULL; | ||
502 | if (NULL != tun) | ||
503 | GNUNET_CADET_channel_destroy (tun); | ||
504 | } | ||
505 | |||
506 | |||
507 | /** | ||
508 | * Task called when it is time to reset an cadet. | ||
509 | * | ||
510 | * @param cls the `struct CadetHandle` to tear down | ||
511 | */ | ||
512 | static void | ||
513 | reset_cadet_task (void *cls) | ||
514 | { | ||
515 | struct CadetHandle *mh = cls; | ||
516 | |||
517 | mh->reset_task = NULL; | ||
518 | reset_cadet (mh); | ||
519 | } | ||
520 | |||
521 | |||
522 | /** | ||
523 | * Transmit pending requests via the cadet. | ||
524 | * | ||
525 | * @param cls `struct CadetHandle` to process | ||
526 | */ | ||
527 | static void | ||
528 | transmit_pending (void *cls) | ||
529 | { | ||
530 | struct CadetHandle *mh = cls; | ||
531 | struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel); | ||
532 | struct GSF_CadetRequest *sr; | ||
533 | struct GNUNET_MQ_Envelope *env; | ||
534 | struct CadetQueryMessage *sqm; | ||
535 | |||
536 | if ((0 != GNUNET_MQ_get_length (mq)) || (NULL == (sr = mh->pending_head))) | ||
537 | return; | ||
538 | GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr); | ||
539 | GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put ( | ||
540 | mh->waiting_map, | ||
541 | &sr->query, | ||
542 | sr, | ||
543 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | ||
544 | sr->was_transmitted = GNUNET_YES; | ||
545 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
546 | "Sending query for %s via cadet to %s\n", | ||
547 | GNUNET_h2s (&sr->query), | ||
548 | GNUNET_i2s (&mh->target)); | ||
549 | env = GNUNET_MQ_msg (sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); | ||
550 | GNUNET_MQ_env_set_options (env, | ||
551 | GNUNET_MQ_PREF_GOODPUT | ||
552 | | GNUNET_MQ_PREF_CORK_ALLOWED | ||
553 | | GNUNET_MQ_PREF_OUT_OF_ORDER); | ||
554 | sqm->type = htonl (sr->type); | ||
555 | sqm->query = sr->query; | ||
556 | GNUNET_MQ_notify_sent (env, &transmit_pending, mh); | ||
557 | GNUNET_MQ_send (mq, env); | ||
558 | } | ||
559 | |||
560 | |||
561 | /** | ||
562 | * Get (or create) a cadet to talk to the given peer. | ||
563 | * | ||
564 | * @param target peer we want to communicate with | ||
565 | */ | ||
566 | static struct CadetHandle * | ||
567 | get_cadet (const struct GNUNET_PeerIdentity *target) | ||
568 | { | ||
569 | struct CadetHandle *mh; | ||
570 | |||
571 | mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, target); | ||
572 | if (NULL != mh) | ||
573 | { | ||
574 | if (NULL != mh->timeout_task) | ||
575 | { | ||
576 | GNUNET_SCHEDULER_cancel (mh->timeout_task); | ||
577 | mh->timeout_task = NULL; | ||
578 | } | ||
579 | return mh; | ||
580 | } | ||
581 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
582 | "Creating cadet channel to %s\n", | ||
583 | GNUNET_i2s (target)); | ||
584 | mh = GNUNET_new (struct CadetHandle); | ||
585 | mh->reset_task = | ||
586 | GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh); | ||
587 | mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES); | ||
588 | mh->target = *target; | ||
589 | GNUNET_assert (GNUNET_OK == | ||
590 | GNUNET_CONTAINER_multipeermap_put ( | ||
591 | cadet_map, | ||
592 | &mh->target, | ||
593 | mh, | ||
594 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | ||
595 | { | ||
596 | struct GNUNET_MQ_MessageHandler handlers[] = | ||
597 | { GNUNET_MQ_hd_var_size (reply, | ||
598 | GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, | ||
599 | struct CadetReplyMessage, | ||
600 | mh), | ||
601 | GNUNET_MQ_handler_end () }; | ||
602 | struct GNUNET_HashCode port; | ||
603 | |||
604 | GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, | ||
605 | strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), | ||
606 | &port); | ||
607 | mh->channel = GNUNET_CADET_channel_create (cadet_handle, | ||
608 | mh, | ||
609 | &mh->target, | ||
610 | &port, | ||
611 | &window_change_cb, | ||
612 | &disconnect_cb, | ||
613 | handlers); | ||
614 | } | ||
615 | return mh; | ||
616 | } | ||
617 | |||
618 | |||
619 | /** | ||
620 | * Look for a block by directly contacting a particular peer. | ||
621 | * | ||
622 | * @param target peer that should have the block | ||
623 | * @param query hash to query for the block | ||
624 | * @param type desired type for the block | ||
625 | * @param proc function to call with result | ||
626 | * @param proc_cls closure for @a proc | ||
627 | * @return handle to cancel the operation | ||
628 | */ | ||
629 | struct GSF_CadetRequest * | ||
630 | GSF_cadet_query (const struct GNUNET_PeerIdentity *target, | ||
631 | const struct GNUNET_HashCode *query, | ||
632 | enum GNUNET_BLOCK_Type type, | ||
633 | GSF_CadetReplyProcessor proc, | ||
634 | void *proc_cls) | ||
635 | { | ||
636 | struct CadetHandle *mh; | ||
637 | struct GSF_CadetRequest *sr; | ||
638 | |||
639 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
640 | "Preparing to send query for %s via cadet to %s\n", | ||
641 | GNUNET_h2s (query), | ||
642 | GNUNET_i2s (target)); | ||
643 | mh = get_cadet (target); | ||
644 | sr = GNUNET_new (struct GSF_CadetRequest); | ||
645 | sr->mh = mh; | ||
646 | sr->proc = proc; | ||
647 | sr->proc_cls = proc_cls; | ||
648 | sr->type = type; | ||
649 | sr->query = *query; | ||
650 | GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr); | ||
651 | transmit_pending (mh); | ||
652 | return sr; | ||
653 | } | ||
654 | |||
655 | |||
656 | /** | ||
657 | * Cancel an active request; must not be called after 'proc' | ||
658 | * was called. | ||
659 | * | ||
660 | * @param sr request to cancel | ||
661 | */ | ||
662 | void | ||
663 | GSF_cadet_query_cancel (struct GSF_CadetRequest *sr) | ||
664 | { | ||
665 | struct CadetHandle *mh = sr->mh; | ||
666 | GSF_CadetReplyProcessor p; | ||
667 | |||
668 | p = sr->proc; | ||
669 | sr->proc = NULL; | ||
670 | if (NULL != p) | ||
671 | { | ||
672 | /* signal failure / cancellation to callback */ | ||
673 | p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL); | ||
674 | } | ||
675 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
676 | "Cancelled query for %s via cadet to %s\n", | ||
677 | GNUNET_h2s (&sr->query), | ||
678 | GNUNET_i2s (&sr->mh->target)); | ||
679 | if (GNUNET_YES == sr->was_transmitted) | ||
680 | GNUNET_assert ( | ||
681 | GNUNET_OK == | ||
682 | GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, &sr->query, sr)); | ||
683 | else | ||
684 | GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr); | ||
685 | GNUNET_free (sr); | ||
686 | if ((0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) && | ||
687 | (NULL == mh->pending_head)) | ||
688 | mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, | ||
689 | &cadet_timeout, | ||
690 | mh); | ||
691 | } | ||
692 | |||
693 | |||
694 | /** | ||
695 | * Function called on each active cadets to shut them down. | ||
696 | * | ||
697 | * @param cls NULL | ||
698 | * @param key target peer, unused | ||
699 | * @param value the `struct CadetHandle` to destroy | ||
700 | * @return #GNUNET_YES (continue to iterate) | ||
701 | */ | ||
702 | int | ||
703 | GSF_cadet_release_clients (void *cls, | ||
704 | const struct GNUNET_PeerIdentity *key, | ||
705 | void *value) | ||
706 | { | ||
707 | struct CadetHandle *mh = value; | ||
708 | |||
709 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||
710 | "Timeout on cadet channel to %s\n", | ||
711 | GNUNET_i2s (&mh->target)); | ||
712 | if (NULL != mh->channel) | ||
713 | { | ||
714 | struct GNUNET_CADET_Channel *channel = mh->channel; | ||
715 | |||
716 | mh->channel = NULL; | ||
717 | GNUNET_CADET_channel_destroy (channel); | ||
718 | } | ||
719 | if (NULL != mh->reset_task) | ||
720 | { | ||
721 | GNUNET_SCHEDULER_cancel (mh->reset_task); | ||
722 | mh->reset_task = NULL; | ||
723 | } | ||
724 | return GNUNET_YES; | ||
725 | } | ||
726 | |||
727 | |||
728 | /* end of gnunet-service-fs_cadet_client.c */ | ||