summaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cadet_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_cadet_client.c')
-rw-r--r--src/fs/gnunet-service-fs_cadet_client.c427
1 files changed, 211 insertions, 216 deletions
diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c
index 81afe0411..a494ba751 100644
--- a/src/fs/gnunet-service-fs_cadet_client.c
+++ b/src/fs/gnunet-service-fs_cadet_client.c
@@ -16,7 +16,7 @@
16 along with this program. If not, see <http://www.gnu.org/licenses/>. 16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 17
18 SPDX-License-Identifier: AGPL3.0-or-later 18 SPDX-License-Identifier: AGPL3.0-or-later
19*/ 19 */
20 20
21/** 21/**
22 * @file fs/gnunet-service-fs_cadet_client.c 22 * @file fs/gnunet-service-fs_cadet_client.c
@@ -42,7 +42,7 @@
42 * After how long do we reset connections without replies? 42 * After how long do we reset connections without replies?
43 */ 43 */
44#define CLIENT_RETRY_TIMEOUT \ 44#define CLIENT_RETRY_TIMEOUT \
45 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) 45 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30)
46 46
47 47
48/** 48/**
@@ -54,9 +54,7 @@ struct CadetHandle;
54/** 54/**
55 * Handle for a request that is going out via cadet API. 55 * Handle for a request that is going out via cadet API.
56 */ 56 */
57struct GSF_CadetRequest 57struct GSF_CadetRequest {
58{
59
60 /** 58 /**
61 * DLL. 59 * DLL.
62 */ 60 */
@@ -103,8 +101,7 @@ struct GSF_CadetRequest
103/** 101/**
104 * Handle for a cadet to another peer. 102 * Handle for a cadet to another peer.
105 */ 103 */
106struct CadetHandle 104struct CadetHandle {
107{
108 /** 105 /**
109 * Head of DLL of pending requests on this cadet. 106 * Head of DLL of pending requests on this cadet.
110 */ 107 */
@@ -168,7 +165,7 @@ struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
168 * @param cls `struct CadetHandle` to process 165 * @param cls `struct CadetHandle` to process
169 */ 166 */
170static void 167static void
171transmit_pending (void *cls); 168transmit_pending(void *cls);
172 169
173 170
174/** 171/**
@@ -181,15 +178,15 @@ transmit_pending (void *cls);
181 * @return #GNUNET_YES (continue to iterate) 178 * @return #GNUNET_YES (continue to iterate)
182 */ 179 */
183static int 180static int
184move_to_pending (void *cls, const struct GNUNET_HashCode *key, void *value) 181move_to_pending(void *cls, const struct GNUNET_HashCode *key, void *value)
185{ 182{
186 struct CadetHandle *mh = cls; 183 struct CadetHandle *mh = cls;
187 struct GSF_CadetRequest *sr = value; 184 struct GSF_CadetRequest *sr = value;
188 185
189 GNUNET_assert ( 186 GNUNET_assert(
190 GNUNET_YES == 187 GNUNET_YES ==
191 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, key, value)); 188 GNUNET_CONTAINER_multihashmap_remove(mh->waiting_map, key, value));
192 GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr); 189 GNUNET_CONTAINER_DLL_insert(mh->pending_head, mh->pending_tail, sr);
193 sr->was_transmitted = GNUNET_NO; 190 sr->was_transmitted = GNUNET_NO;
194 return GNUNET_YES; 191 return GNUNET_YES;
195} 192}
@@ -204,7 +201,7 @@ move_to_pending (void *cls, const struct GNUNET_HashCode *key, void *value)
204 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing 201 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
205 */ 202 */
206static int 203static int
207check_reply (void *cls, const struct CadetReplyMessage *srm) 204check_reply(void *cls, const struct CadetReplyMessage *srm)
208{ 205{
209 /* We check later... */ 206 /* We check later... */
210 return GNUNET_OK; 207 return GNUNET_OK;
@@ -217,7 +214,7 @@ check_reply (void *cls, const struct CadetReplyMessage *srm)
217 * @param cls the `struct CadetHandle` to tear down 214 * @param cls the `struct CadetHandle` to tear down
218 */ 215 */
219static void 216static void
220reset_cadet_task (void *cls); 217reset_cadet_task(void *cls);
221 218
222 219
223/** 220/**
@@ -227,20 +224,18 @@ reset_cadet_task (void *cls);
227 * @param mh cadet to reset 224 * @param mh cadet to reset
228 */ 225 */
229static void 226static void
230reset_cadet_async (struct CadetHandle *mh) 227reset_cadet_async(struct CadetHandle *mh)
231{ 228{
232 if (NULL != mh->reset_task) 229 if (NULL != mh->reset_task)
233 GNUNET_SCHEDULER_cancel (mh->reset_task); 230 GNUNET_SCHEDULER_cancel(mh->reset_task);
234 mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, mh); 231 mh->reset_task = GNUNET_SCHEDULER_add_now(&reset_cadet_task, mh);
235} 232}
236 233
237 234
238/** 235/**
239 * Closure for handle_reply(). 236 * Closure for handle_reply().
240 */ 237 */
241struct HandleReplyClosure 238struct HandleReplyClosure {
242{
243
244 /** 239 /**
245 * Reply payload. 240 * Reply payload.
246 */ 241 */
@@ -278,18 +273,18 @@ struct HandleReplyClosure
278 * @return #GNUNET_YES (continue to iterate) 273 * @return #GNUNET_YES (continue to iterate)
279 */ 274 */
280static int 275static int
281process_reply (void *cls, const struct GNUNET_HashCode *key, void *value) 276process_reply(void *cls, const struct GNUNET_HashCode *key, void *value)
282{ 277{
283 struct HandleReplyClosure *hrc = cls; 278 struct HandleReplyClosure *hrc = cls;
284 struct GSF_CadetRequest *sr = value; 279 struct GSF_CadetRequest *sr = value;
285 280
286 sr->proc (sr->proc_cls, 281 sr->proc(sr->proc_cls,
287 hrc->type, 282 hrc->type,
288 hrc->expiration, 283 hrc->expiration,
289 hrc->data_size, 284 hrc->data_size,
290 hrc->data); 285 hrc->data);
291 sr->proc = NULL; 286 sr->proc = NULL;
292 GSF_cadet_query_cancel (sr); 287 GSF_cadet_query_cancel(sr);
293 hrc->found = GNUNET_YES; 288 hrc->found = GNUNET_YES;
294 return GNUNET_YES; 289 return GNUNET_YES;
295} 290}
@@ -306,11 +301,11 @@ process_reply (void *cls, const struct GNUNET_HashCode *key, void *value)
306 * @return #GNUNET_YES (continue to iterate) 301 * @return #GNUNET_YES (continue to iterate)
307 */ 302 */
308static int 303static int
309free_waiting_entry (void *cls, const struct GNUNET_HashCode *key, void *value) 304free_waiting_entry(void *cls, const struct GNUNET_HashCode *key, void *value)
310{ 305{
311 struct GSF_CadetRequest *sr = value; 306 struct GSF_CadetRequest *sr = value;
312 307
313 GSF_cadet_query_cancel (sr); 308 GSF_cadet_query_cancel(sr);
314 return GNUNET_YES; 309 return GNUNET_YES;
315} 310}
316 311
@@ -323,7 +318,7 @@ free_waiting_entry (void *cls, const struct GNUNET_HashCode *key, void *value)
323 * @param srm the actual message 318 * @param srm the actual message
324 */ 319 */
325static void 320static void
326handle_reply (void *cls, const struct CadetReplyMessage *srm) 321handle_reply(void *cls, const struct CadetReplyMessage *srm)
327{ 322{
328 struct CadetHandle *mh = cls; 323 struct CadetHandle *mh = cls;
329 struct HandleReplyClosure hrc; 324 struct HandleReplyClosure hrc;
@@ -331,47 +326,47 @@ handle_reply (void *cls, const struct CadetReplyMessage *srm)
331 enum GNUNET_BLOCK_Type type; 326 enum GNUNET_BLOCK_Type type;
332 struct GNUNET_HashCode query; 327 struct GNUNET_HashCode query;
333 328
334 msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage); 329 msize = ntohs(srm->header.size) - sizeof(struct CadetReplyMessage);
335 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); 330 type = (enum GNUNET_BLOCK_Type)ntohl(srm->type);
336 if (GNUNET_YES != 331 if (GNUNET_YES !=
337 GNUNET_BLOCK_get_key (GSF_block_ctx, type, &srm[1], msize, &query)) 332 GNUNET_BLOCK_get_key(GSF_block_ctx, type, &srm[1], msize, &query))
338 { 333 {
339 GNUNET_break_op (0); 334 GNUNET_break_op(0);
340 GNUNET_log ( 335 GNUNET_log(
341 GNUNET_ERROR_TYPE_WARNING, 336 GNUNET_ERROR_TYPE_WARNING,
342 "Received bogus reply of type %u with %u bytes via cadet from peer %s\n", 337 "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
343 type, 338 type,
344 msize, 339 msize,
345 GNUNET_i2s (&mh->target)); 340 GNUNET_i2s(&mh->target));
346 reset_cadet_async (mh); 341 reset_cadet_async(mh);
347 return; 342 return;
348 } 343 }
349 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 344 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
350 "Received reply `%s' via cadet from peer %s\n", 345 "Received reply `%s' via cadet from peer %s\n",
351 GNUNET_h2s (&query), 346 GNUNET_h2s(&query),
352 GNUNET_i2s (&mh->target)); 347 GNUNET_i2s(&mh->target));
353 GNUNET_CADET_receive_done (mh->channel); 348 GNUNET_CADET_receive_done(mh->channel);
354 GNUNET_STATISTICS_update (GSF_stats, 349 GNUNET_STATISTICS_update(GSF_stats,
355 gettext_noop ("# replies received via cadet"), 350 gettext_noop("# replies received via cadet"),
356 1, 351 1,
357 GNUNET_NO); 352 GNUNET_NO);
358 hrc.data = &srm[1]; 353 hrc.data = &srm[1];
359 hrc.data_size = msize; 354 hrc.data_size = msize;
360 hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration); 355 hrc.expiration = GNUNET_TIME_absolute_ntoh(srm->expiration);
361 hrc.type = type; 356 hrc.type = type;
362 hrc.found = GNUNET_NO; 357 hrc.found = GNUNET_NO;
363 GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map, 358 GNUNET_CONTAINER_multihashmap_get_multiple(mh->waiting_map,
364 &query, 359 &query,
365 &process_reply, 360 &process_reply,
366 &hrc); 361 &hrc);
367 if (GNUNET_NO == hrc.found) 362 if (GNUNET_NO == hrc.found)
368 { 363 {
369 GNUNET_STATISTICS_update (GSF_stats, 364 GNUNET_STATISTICS_update(GSF_stats,
370 gettext_noop ( 365 gettext_noop(
371 "# replies received via cadet dropped"), 366 "# replies received via cadet dropped"),
372 1, 367 1,
373 GNUNET_NO); 368 GNUNET_NO);
374 } 369 }
375} 370}
376 371
377 372
@@ -383,34 +378,34 @@ handle_reply (void *cls, const struct CadetReplyMessage *srm)
383 * @param channel channel of the disconnecting client 378 * @param channel channel of the disconnecting client
384 */ 379 */
385static void 380static void
386disconnect_cb (void *cls, const struct GNUNET_CADET_Channel *channel) 381disconnect_cb(void *cls, const struct GNUNET_CADET_Channel *channel)
387{ 382{
388 struct CadetHandle *mh = cls; 383 struct CadetHandle *mh = cls;
389 struct GSF_CadetRequest *sr; 384 struct GSF_CadetRequest *sr;
390 385
391 if (NULL == mh->channel) 386 if (NULL == mh->channel)
392 return; /* being destroyed elsewhere */ 387 return; /* being destroyed elsewhere */
393 GNUNET_assert (channel == mh->channel); 388 GNUNET_assert(channel == mh->channel);
394 mh->channel = NULL; 389 mh->channel = NULL;
395 while (NULL != (sr = mh->pending_head)) 390 while (NULL != (sr = mh->pending_head))
396 GSF_cadet_query_cancel (sr); 391 GSF_cadet_query_cancel(sr);
397 /* first remove `mh` from the `cadet_map`, so that if the 392 /* first remove `mh` from the `cadet_map`, so that if the
398 callback from `free_waiting_entry()` happens to re-issue 393 callback from `free_waiting_entry()` happens to re-issue
399 the request, we don't immediately have it back in the 394 the request, we don't immediately have it back in the
400 `waiting_map`. */ 395 `waiting_map`. */
401 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove (cadet_map, 396 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove(cadet_map,
402 &mh->target, 397 &mh->target,
403 mh)); 398 mh));
404 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, 399 GNUNET_CONTAINER_multihashmap_iterate(mh->waiting_map,
405 &free_waiting_entry, 400 &free_waiting_entry,
406 mh); 401 mh);
407 if (NULL != mh->timeout_task) 402 if (NULL != mh->timeout_task)
408 GNUNET_SCHEDULER_cancel (mh->timeout_task); 403 GNUNET_SCHEDULER_cancel(mh->timeout_task);
409 if (NULL != mh->reset_task) 404 if (NULL != mh->reset_task)
410 GNUNET_SCHEDULER_cancel (mh->reset_task); 405 GNUNET_SCHEDULER_cancel(mh->reset_task);
411 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)); 406 GNUNET_assert(0 == GNUNET_CONTAINER_multihashmap_size(mh->waiting_map));
412 GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map); 407 GNUNET_CONTAINER_multihashmap_destroy(mh->waiting_map);
413 GNUNET_free (mh); 408 GNUNET_free(mh);
414} 409}
415 410
416 411
@@ -429,16 +424,16 @@ disconnect_cb (void *cls, const struct GNUNET_CADET_Channel *channel)
429 * this value will be negative.. 424 * this value will be negative..
430 */ 425 */
431static void 426static void
432window_change_cb (void *cls, 427window_change_cb(void *cls,
433 const struct GNUNET_CADET_Channel *channel, 428 const struct GNUNET_CADET_Channel *channel,
434 int window_size) 429 int window_size)
435{ 430{
436 /* FIXME: for flow control, implement? */ 431 /* FIXME: for flow control, implement? */
437#if 0 432#if 0
438 /* Something like this instead of the GNUNET_MQ_notify_sent() in 433 /* Something like this instead of the GNUNET_MQ_notify_sent() in
439 transmit_pending() might be good (once the window change CB works...) */ 434 transmit_pending() might be good (once the window change CB works...) */
440 if (0 < window_size) /* test needed? */ 435 if (0 < window_size) /* test needed? */
441 transmit_pending (mh); 436 transmit_pending(mh);
442#endif 437#endif
443} 438}
444 439
@@ -449,38 +444,38 @@ window_change_cb (void *cls,
449 * @param mh cadet to reset 444 * @param mh cadet to reset
450 */ 445 */
451static void 446static void
452reset_cadet (struct CadetHandle *mh) 447reset_cadet(struct CadetHandle *mh)
453{ 448{
454 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 449 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
455 "Resetting cadet channel to %s\n", 450 "Resetting cadet channel to %s\n",
456 GNUNET_i2s (&mh->target)); 451 GNUNET_i2s(&mh->target));
457 if (NULL != mh->channel) 452 if (NULL != mh->channel)
458 { 453 {
459 GNUNET_CADET_channel_destroy (mh->channel); 454 GNUNET_CADET_channel_destroy(mh->channel);
460 mh->channel = NULL; 455 mh->channel = NULL;
461 } 456 }
462 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, &move_to_pending, mh); 457 GNUNET_CONTAINER_multihashmap_iterate(mh->waiting_map, &move_to_pending, mh);
463 { 458 {
464 struct GNUNET_MQ_MessageHandler handlers[] = 459 struct GNUNET_MQ_MessageHandler handlers[] =
465 {GNUNET_MQ_hd_var_size (reply, 460 { GNUNET_MQ_hd_var_size(reply,
466 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 461 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
467 struct CadetReplyMessage, 462 struct CadetReplyMessage,
468 mh), 463 mh),
469 GNUNET_MQ_handler_end ()}; 464 GNUNET_MQ_handler_end() };
470 struct GNUNET_HashCode port; 465 struct GNUNET_HashCode port;
471 466
472 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, 467 GNUNET_CRYPTO_hash(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
473 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), 468 strlen(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
474 &port); 469 &port);
475 mh->channel = GNUNET_CADET_channel_create (cadet_handle, 470 mh->channel = GNUNET_CADET_channel_create(cadet_handle,
476 mh, 471 mh,
477 &mh->target, 472 &mh->target,
478 &port, 473 &port,
479 &window_change_cb, 474 &window_change_cb,
480 &disconnect_cb, 475 &disconnect_cb,
481 handlers); 476 handlers);
482 } 477 }
483 transmit_pending (mh); 478 transmit_pending(mh);
484} 479}
485 480
486 481
@@ -490,19 +485,19 @@ reset_cadet (struct CadetHandle *mh)
490 * @param cls the `struct CadetHandle` to tear down 485 * @param cls the `struct CadetHandle` to tear down
491 */ 486 */
492static void 487static void
493cadet_timeout (void *cls) 488cadet_timeout(void *cls)
494{ 489{
495 struct CadetHandle *mh = cls; 490 struct CadetHandle *mh = cls;
496 struct GNUNET_CADET_Channel *tun; 491 struct GNUNET_CADET_Channel *tun;
497 492
498 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 493 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
499 "Timeout on cadet channel to %s\n", 494 "Timeout on cadet channel to %s\n",
500 GNUNET_i2s (&mh->target)); 495 GNUNET_i2s(&mh->target));
501 mh->timeout_task = NULL; 496 mh->timeout_task = NULL;
502 tun = mh->channel; 497 tun = mh->channel;
503 mh->channel = NULL; 498 mh->channel = NULL;
504 if (NULL != tun) 499 if (NULL != tun)
505 GNUNET_CADET_channel_destroy (tun); 500 GNUNET_CADET_channel_destroy(tun);
506} 501}
507 502
508 503
@@ -512,12 +507,12 @@ cadet_timeout (void *cls)
512 * @param cls the `struct CadetHandle` to tear down 507 * @param cls the `struct CadetHandle` to tear down
513 */ 508 */
514static void 509static void
515reset_cadet_task (void *cls) 510reset_cadet_task(void *cls)
516{ 511{
517 struct CadetHandle *mh = cls; 512 struct CadetHandle *mh = cls;
518 513
519 mh->reset_task = NULL; 514 mh->reset_task = NULL;
520 reset_cadet (mh); 515 reset_cadet(mh);
521} 516}
522 517
523 518
@@ -527,36 +522,36 @@ reset_cadet_task (void *cls)
527 * @param cls `struct CadetHandle` to process 522 * @param cls `struct CadetHandle` to process
528 */ 523 */
529static void 524static void
530transmit_pending (void *cls) 525transmit_pending(void *cls)
531{ 526{
532 struct CadetHandle *mh = cls; 527 struct CadetHandle *mh = cls;
533 struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel); 528 struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq(mh->channel);
534 struct GSF_CadetRequest *sr; 529 struct GSF_CadetRequest *sr;
535 struct GNUNET_MQ_Envelope *env; 530 struct GNUNET_MQ_Envelope *env;
536 struct CadetQueryMessage *sqm; 531 struct CadetQueryMessage *sqm;
537 532
538 if ((0 != GNUNET_MQ_get_length (mq)) || (NULL == (sr = mh->pending_head))) 533 if ((0 != GNUNET_MQ_get_length(mq)) || (NULL == (sr = mh->pending_head)))
539 return; 534 return;
540 GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr); 535 GNUNET_CONTAINER_DLL_remove(mh->pending_head, mh->pending_tail, sr);
541 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put ( 536 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put(
542 mh->waiting_map, 537 mh->waiting_map,
543 &sr->query, 538 &sr->query,
544 sr, 539 sr,
545 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 540 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
546 sr->was_transmitted = GNUNET_YES; 541 sr->was_transmitted = GNUNET_YES;
547 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 542 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
548 "Sending query for %s via cadet to %s\n", 543 "Sending query for %s via cadet to %s\n",
549 GNUNET_h2s (&sr->query), 544 GNUNET_h2s(&sr->query),
550 GNUNET_i2s (&mh->target)); 545 GNUNET_i2s(&mh->target));
551 env = GNUNET_MQ_msg (sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); 546 env = GNUNET_MQ_msg(sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
552 GNUNET_MQ_env_set_options (env, 547 GNUNET_MQ_env_set_options(env,
553 GNUNET_MQ_PREF_GOODPUT | 548 GNUNET_MQ_PREF_GOODPUT |
554 GNUNET_MQ_PREF_CORK_ALLOWED | 549 GNUNET_MQ_PREF_CORK_ALLOWED |
555 GNUNET_MQ_PREF_OUT_OF_ORDER); 550 GNUNET_MQ_PREF_OUT_OF_ORDER);
556 sqm->type = htonl (sr->type); 551 sqm->type = htonl(sr->type);
557 sqm->query = sr->query; 552 sqm->query = sr->query;
558 GNUNET_MQ_notify_sent (env, &transmit_pending, mh); 553 GNUNET_MQ_notify_sent(env, &transmit_pending, mh);
559 GNUNET_MQ_send (mq, env); 554 GNUNET_MQ_send(mq, env);
560} 555}
561 556
562 557
@@ -566,53 +561,53 @@ transmit_pending (void *cls)
566 * @param target peer we want to communicate with 561 * @param target peer we want to communicate with
567 */ 562 */
568static struct CadetHandle * 563static struct CadetHandle *
569get_cadet (const struct GNUNET_PeerIdentity *target) 564get_cadet(const struct GNUNET_PeerIdentity *target)
570{ 565{
571 struct CadetHandle *mh; 566 struct CadetHandle *mh;
572 567
573 mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, target); 568 mh = GNUNET_CONTAINER_multipeermap_get(cadet_map, target);
574 if (NULL != mh) 569 if (NULL != mh)
575 {
576 if (NULL != mh->timeout_task)
577 { 570 {
578 GNUNET_SCHEDULER_cancel (mh->timeout_task); 571 if (NULL != mh->timeout_task)
579 mh->timeout_task = NULL; 572 {
573 GNUNET_SCHEDULER_cancel(mh->timeout_task);
574 mh->timeout_task = NULL;
575 }
576 return mh;
580 } 577 }
581 return mh; 578 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
582 } 579 "Creating cadet channel to %s\n",
583 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 580 GNUNET_i2s(target));
584 "Creating cadet channel to %s\n", 581 mh = GNUNET_new(struct CadetHandle);
585 GNUNET_i2s (target));
586 mh = GNUNET_new (struct CadetHandle);
587 mh->reset_task = 582 mh->reset_task =
588 GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh); 583 GNUNET_SCHEDULER_add_delayed(CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh);
589 mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES); 584 mh->waiting_map = GNUNET_CONTAINER_multihashmap_create(16, GNUNET_YES);
590 mh->target = *target; 585 mh->target = *target;
591 GNUNET_assert (GNUNET_OK == 586 GNUNET_assert(GNUNET_OK ==
592 GNUNET_CONTAINER_multipeermap_put ( 587 GNUNET_CONTAINER_multipeermap_put(
593 cadet_map, 588 cadet_map,
594 &mh->target, 589 &mh->target,
595 mh, 590 mh,
596 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 591 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
597 { 592 {
598 struct GNUNET_MQ_MessageHandler handlers[] = 593 struct GNUNET_MQ_MessageHandler handlers[] =
599 {GNUNET_MQ_hd_var_size (reply, 594 { GNUNET_MQ_hd_var_size(reply,
600 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 595 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
601 struct CadetReplyMessage, 596 struct CadetReplyMessage,
602 mh), 597 mh),
603 GNUNET_MQ_handler_end ()}; 598 GNUNET_MQ_handler_end() };
604 struct GNUNET_HashCode port; 599 struct GNUNET_HashCode port;
605 600
606 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, 601 GNUNET_CRYPTO_hash(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
607 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), 602 strlen(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
608 &port); 603 &port);
609 mh->channel = GNUNET_CADET_channel_create (cadet_handle, 604 mh->channel = GNUNET_CADET_channel_create(cadet_handle,
610 mh, 605 mh,
611 &mh->target, 606 &mh->target,
612 &port, 607 &port,
613 &window_change_cb, 608 &window_change_cb,
614 &disconnect_cb, 609 &disconnect_cb,
615 handlers); 610 handlers);
616 } 611 }
617 return mh; 612 return mh;
618} 613}
@@ -629,28 +624,28 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
629 * @return handle to cancel the operation 624 * @return handle to cancel the operation
630 */ 625 */
631struct GSF_CadetRequest * 626struct GSF_CadetRequest *
632GSF_cadet_query (const struct GNUNET_PeerIdentity *target, 627GSF_cadet_query(const struct GNUNET_PeerIdentity *target,
633 const struct GNUNET_HashCode *query, 628 const struct GNUNET_HashCode *query,
634 enum GNUNET_BLOCK_Type type, 629 enum GNUNET_BLOCK_Type type,
635 GSF_CadetReplyProcessor proc, 630 GSF_CadetReplyProcessor proc,
636 void *proc_cls) 631 void *proc_cls)
637{ 632{
638 struct CadetHandle *mh; 633 struct CadetHandle *mh;
639 struct GSF_CadetRequest *sr; 634 struct GSF_CadetRequest *sr;
640 635
641 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 636 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
642 "Preparing to send query for %s via cadet to %s\n", 637 "Preparing to send query for %s via cadet to %s\n",
643 GNUNET_h2s (query), 638 GNUNET_h2s(query),
644 GNUNET_i2s (target)); 639 GNUNET_i2s(target));
645 mh = get_cadet (target); 640 mh = get_cadet(target);
646 sr = GNUNET_new (struct GSF_CadetRequest); 641 sr = GNUNET_new(struct GSF_CadetRequest);
647 sr->mh = mh; 642 sr->mh = mh;
648 sr->proc = proc; 643 sr->proc = proc;
649 sr->proc_cls = proc_cls; 644 sr->proc_cls = proc_cls;
650 sr->type = type; 645 sr->type = type;
651 sr->query = *query; 646 sr->query = *query;
652 GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr); 647 GNUNET_CONTAINER_DLL_insert(mh->pending_head, mh->pending_tail, sr);
653 transmit_pending (mh); 648 transmit_pending(mh);
654 return sr; 649 return sr;
655} 650}
656 651
@@ -662,7 +657,7 @@ GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
662 * @param sr request to cancel 657 * @param sr request to cancel
663 */ 658 */
664void 659void
665GSF_cadet_query_cancel (struct GSF_CadetRequest *sr) 660GSF_cadet_query_cancel(struct GSF_CadetRequest *sr)
666{ 661{
667 struct CadetHandle *mh = sr->mh; 662 struct CadetHandle *mh = sr->mh;
668 GSF_CadetReplyProcessor p; 663 GSF_CadetReplyProcessor p;
@@ -670,26 +665,26 @@ GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
670 p = sr->proc; 665 p = sr->proc;
671 sr->proc = NULL; 666 sr->proc = NULL;
672 if (NULL != p) 667 if (NULL != p)
673 { 668 {
674 /* signal failure / cancellation to callback */ 669 /* signal failure / cancellation to callback */
675 p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL); 670 p(sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL);
676 } 671 }
677 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 672 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
678 "Cancelled query for %s via cadet to %s\n", 673 "Cancelled query for %s via cadet to %s\n",
679 GNUNET_h2s (&sr->query), 674 GNUNET_h2s(&sr->query),
680 GNUNET_i2s (&sr->mh->target)); 675 GNUNET_i2s(&sr->mh->target));
681 if (GNUNET_YES == sr->was_transmitted) 676 if (GNUNET_YES == sr->was_transmitted)
682 GNUNET_assert ( 677 GNUNET_assert(
683 GNUNET_OK == 678 GNUNET_OK ==
684 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, &sr->query, sr)); 679 GNUNET_CONTAINER_multihashmap_remove(mh->waiting_map, &sr->query, sr));
685 else 680 else
686 GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr); 681 GNUNET_CONTAINER_DLL_remove(mh->pending_head, mh->pending_tail, sr);
687 GNUNET_free (sr); 682 GNUNET_free(sr);
688 if ((0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) && 683 if ((0 == GNUNET_CONTAINER_multihashmap_size(mh->waiting_map)) &&
689 (NULL == mh->pending_head)) 684 (NULL == mh->pending_head))
690 mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, 685 mh->timeout_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_SECONDS,
691 &cadet_timeout, 686 &cadet_timeout,
692 mh); 687 mh);
693} 688}
694 689
695 690
@@ -702,27 +697,27 @@ GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
702 * @return #GNUNET_YES (continue to iterate) 697 * @return #GNUNET_YES (continue to iterate)
703 */ 698 */
704int 699int
705GSF_cadet_release_clients (void *cls, 700GSF_cadet_release_clients(void *cls,
706 const struct GNUNET_PeerIdentity *key, 701 const struct GNUNET_PeerIdentity *key,
707 void *value) 702 void *value)
708{ 703{
709 struct CadetHandle *mh = value; 704 struct CadetHandle *mh = value;
710 705
711 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 706 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
712 "Timeout on cadet channel to %s\n", 707 "Timeout on cadet channel to %s\n",
713 GNUNET_i2s (&mh->target)); 708 GNUNET_i2s(&mh->target));
714 if (NULL != mh->channel) 709 if (NULL != mh->channel)
715 { 710 {
716 struct GNUNET_CADET_Channel *channel = mh->channel; 711 struct GNUNET_CADET_Channel *channel = mh->channel;
717 712
718 mh->channel = NULL; 713 mh->channel = NULL;
719 GNUNET_CADET_channel_destroy (channel); 714 GNUNET_CADET_channel_destroy(channel);
720 } 715 }
721 if (NULL != mh->reset_task) 716 if (NULL != mh->reset_task)
722 { 717 {
723 GNUNET_SCHEDULER_cancel (mh->reset_task); 718 GNUNET_SCHEDULER_cancel(mh->reset_task);
724 mh->reset_task = NULL; 719 mh->reset_task = NULL;
725 } 720 }
726 return GNUNET_YES; 721 return GNUNET_YES;
727} 722}
728 723