aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cadet_client.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-10-05 15:09:28 +0200
committerChristian Grothoff <christian@grothoff.org>2019-10-05 15:09:28 +0200
commitc4e9ba925ffd758aaa3feee2ccfc0b76f26fe207 (patch)
treecac3ce030d77b4cbe7c7dc62ed58cfe6d24f73e1 /src/fs/gnunet-service-fs_cadet_client.c
parentfbb71d527c7d6babf269a8fefce1db291b9f7068 (diff)
downloadgnunet-c4e9ba925ffd758aaa3feee2ccfc0b76f26fe207.tar.gz
gnunet-c4e9ba925ffd758aaa3feee2ccfc0b76f26fe207.zip
global reindent, now with uncrustify hook enabled
Diffstat (limited to 'src/fs/gnunet-service-fs_cadet_client.c')
-rw-r--r--src/fs/gnunet-service-fs_cadet_client.c423
1 files changed, 213 insertions, 210 deletions
diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c
index a494ba751..96ccf8232 100644
--- a/src/fs/gnunet-service-fs_cadet_client.c
+++ b/src/fs/gnunet-service-fs_cadet_client.c
@@ -42,7 +42,7 @@
42 * After how long do we reset connections without replies? 42 * After how long do we reset connections without replies?
43 */ 43 */
44#define CLIENT_RETRY_TIMEOUT \ 44#define CLIENT_RETRY_TIMEOUT \
45 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30) 45 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
46 46
47 47
48/** 48/**
@@ -54,7 +54,8 @@ struct CadetHandle;
54/** 54/**
55 * Handle for a request that is going out via cadet API. 55 * Handle for a request that is going out via cadet API.
56 */ 56 */
57struct GSF_CadetRequest { 57struct GSF_CadetRequest
58{
58 /** 59 /**
59 * DLL. 60 * DLL.
60 */ 61 */
@@ -101,7 +102,8 @@ struct GSF_CadetRequest {
101/** 102/**
102 * Handle for a cadet to another peer. 103 * Handle for a cadet to another peer.
103 */ 104 */
104struct CadetHandle { 105struct CadetHandle
106{
105 /** 107 /**
106 * Head of DLL of pending requests on this cadet. 108 * Head of DLL of pending requests on this cadet.
107 */ 109 */
@@ -165,7 +167,7 @@ struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
165 * @param cls `struct CadetHandle` to process 167 * @param cls `struct CadetHandle` to process
166 */ 168 */
167static void 169static void
168transmit_pending(void *cls); 170transmit_pending (void *cls);
169 171
170 172
171/** 173/**
@@ -178,15 +180,15 @@ transmit_pending(void *cls);
178 * @return #GNUNET_YES (continue to iterate) 180 * @return #GNUNET_YES (continue to iterate)
179 */ 181 */
180static int 182static int
181move_to_pending(void *cls, const struct GNUNET_HashCode *key, void *value) 183move_to_pending (void *cls, const struct GNUNET_HashCode *key, void *value)
182{ 184{
183 struct CadetHandle *mh = cls; 185 struct CadetHandle *mh = cls;
184 struct GSF_CadetRequest *sr = value; 186 struct GSF_CadetRequest *sr = value;
185 187
186 GNUNET_assert( 188 GNUNET_assert (
187 GNUNET_YES == 189 GNUNET_YES ==
188 GNUNET_CONTAINER_multihashmap_remove(mh->waiting_map, key, value)); 190 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, key, value));
189 GNUNET_CONTAINER_DLL_insert(mh->pending_head, mh->pending_tail, sr); 191 GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr);
190 sr->was_transmitted = GNUNET_NO; 192 sr->was_transmitted = GNUNET_NO;
191 return GNUNET_YES; 193 return GNUNET_YES;
192} 194}
@@ -201,7 +203,7 @@ move_to_pending(void *cls, const struct GNUNET_HashCode *key, void *value)
201 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing 203 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
202 */ 204 */
203static int 205static int
204check_reply(void *cls, const struct CadetReplyMessage *srm) 206check_reply (void *cls, const struct CadetReplyMessage *srm)
205{ 207{
206 /* We check later... */ 208 /* We check later... */
207 return GNUNET_OK; 209 return GNUNET_OK;
@@ -214,7 +216,7 @@ check_reply(void *cls, const struct CadetReplyMessage *srm)
214 * @param cls the `struct CadetHandle` to tear down 216 * @param cls the `struct CadetHandle` to tear down
215 */ 217 */
216static void 218static void
217reset_cadet_task(void *cls); 219reset_cadet_task (void *cls);
218 220
219 221
220/** 222/**
@@ -224,18 +226,19 @@ reset_cadet_task(void *cls);
224 * @param mh cadet to reset 226 * @param mh cadet to reset
225 */ 227 */
226static void 228static void
227reset_cadet_async(struct CadetHandle *mh) 229reset_cadet_async (struct CadetHandle *mh)
228{ 230{
229 if (NULL != mh->reset_task) 231 if (NULL != mh->reset_task)
230 GNUNET_SCHEDULER_cancel(mh->reset_task); 232 GNUNET_SCHEDULER_cancel (mh->reset_task);
231 mh->reset_task = GNUNET_SCHEDULER_add_now(&reset_cadet_task, mh); 233 mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, mh);
232} 234}
233 235
234 236
235/** 237/**
236 * Closure for handle_reply(). 238 * Closure for handle_reply().
237 */ 239 */
238struct HandleReplyClosure { 240struct HandleReplyClosure
241{
239 /** 242 /**
240 * Reply payload. 243 * Reply payload.
241 */ 244 */
@@ -273,18 +276,18 @@ struct HandleReplyClosure {
273 * @return #GNUNET_YES (continue to iterate) 276 * @return #GNUNET_YES (continue to iterate)
274 */ 277 */
275static int 278static int
276process_reply(void *cls, const struct GNUNET_HashCode *key, void *value) 279process_reply (void *cls, const struct GNUNET_HashCode *key, void *value)
277{ 280{
278 struct HandleReplyClosure *hrc = cls; 281 struct HandleReplyClosure *hrc = cls;
279 struct GSF_CadetRequest *sr = value; 282 struct GSF_CadetRequest *sr = value;
280 283
281 sr->proc(sr->proc_cls, 284 sr->proc (sr->proc_cls,
282 hrc->type, 285 hrc->type,
283 hrc->expiration, 286 hrc->expiration,
284 hrc->data_size, 287 hrc->data_size,
285 hrc->data); 288 hrc->data);
286 sr->proc = NULL; 289 sr->proc = NULL;
287 GSF_cadet_query_cancel(sr); 290 GSF_cadet_query_cancel (sr);
288 hrc->found = GNUNET_YES; 291 hrc->found = GNUNET_YES;
289 return GNUNET_YES; 292 return GNUNET_YES;
290} 293}
@@ -301,11 +304,11 @@ process_reply(void *cls, const struct GNUNET_HashCode *key, void *value)
301 * @return #GNUNET_YES (continue to iterate) 304 * @return #GNUNET_YES (continue to iterate)
302 */ 305 */
303static int 306static int
304free_waiting_entry(void *cls, const struct GNUNET_HashCode *key, void *value) 307free_waiting_entry (void *cls, const struct GNUNET_HashCode *key, void *value)
305{ 308{
306 struct GSF_CadetRequest *sr = value; 309 struct GSF_CadetRequest *sr = value;
307 310
308 GSF_cadet_query_cancel(sr); 311 GSF_cadet_query_cancel (sr);
309 return GNUNET_YES; 312 return GNUNET_YES;
310} 313}
311 314
@@ -318,7 +321,7 @@ free_waiting_entry(void *cls, const struct GNUNET_HashCode *key, void *value)
318 * @param srm the actual message 321 * @param srm the actual message
319 */ 322 */
320static void 323static void
321handle_reply(void *cls, const struct CadetReplyMessage *srm) 324handle_reply (void *cls, const struct CadetReplyMessage *srm)
322{ 325{
323 struct CadetHandle *mh = cls; 326 struct CadetHandle *mh = cls;
324 struct HandleReplyClosure hrc; 327 struct HandleReplyClosure hrc;
@@ -326,47 +329,47 @@ handle_reply(void *cls, const struct CadetReplyMessage *srm)
326 enum GNUNET_BLOCK_Type type; 329 enum GNUNET_BLOCK_Type type;
327 struct GNUNET_HashCode query; 330 struct GNUNET_HashCode query;
328 331
329 msize = ntohs(srm->header.size) - sizeof(struct CadetReplyMessage); 332 msize = ntohs (srm->header.size) - sizeof(struct CadetReplyMessage);
330 type = (enum GNUNET_BLOCK_Type)ntohl(srm->type); 333 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
331 if (GNUNET_YES != 334 if (GNUNET_YES !=
332 GNUNET_BLOCK_get_key(GSF_block_ctx, type, &srm[1], msize, &query)) 335 GNUNET_BLOCK_get_key (GSF_block_ctx, type, &srm[1], msize, &query))
333 { 336 {
334 GNUNET_break_op(0); 337 GNUNET_break_op (0);
335 GNUNET_log( 338 GNUNET_log (
336 GNUNET_ERROR_TYPE_WARNING, 339 GNUNET_ERROR_TYPE_WARNING,
337 "Received bogus reply of type %u with %u bytes via cadet from peer %s\n", 340 "Received bogus reply of type %u with %u bytes via cadet from peer %s\n",
338 type, 341 type,
339 msize, 342 msize,
340 GNUNET_i2s(&mh->target)); 343 GNUNET_i2s (&mh->target));
341 reset_cadet_async(mh); 344 reset_cadet_async (mh);
342 return; 345 return;
343 } 346 }
344 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 347 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
345 "Received reply `%s' via cadet from peer %s\n", 348 "Received reply `%s' via cadet from peer %s\n",
346 GNUNET_h2s(&query), 349 GNUNET_h2s (&query),
347 GNUNET_i2s(&mh->target)); 350 GNUNET_i2s (&mh->target));
348 GNUNET_CADET_receive_done(mh->channel); 351 GNUNET_CADET_receive_done (mh->channel);
349 GNUNET_STATISTICS_update(GSF_stats, 352 GNUNET_STATISTICS_update (GSF_stats,
350 gettext_noop("# replies received via cadet"), 353 gettext_noop ("# replies received via cadet"),
351 1, 354 1,
352 GNUNET_NO); 355 GNUNET_NO);
353 hrc.data = &srm[1]; 356 hrc.data = &srm[1];
354 hrc.data_size = msize; 357 hrc.data_size = msize;
355 hrc.expiration = GNUNET_TIME_absolute_ntoh(srm->expiration); 358 hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
356 hrc.type = type; 359 hrc.type = type;
357 hrc.found = GNUNET_NO; 360 hrc.found = GNUNET_NO;
358 GNUNET_CONTAINER_multihashmap_get_multiple(mh->waiting_map, 361 GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
359 &query, 362 &query,
360 &process_reply, 363 &process_reply,
361 &hrc); 364 &hrc);
362 if (GNUNET_NO == hrc.found) 365 if (GNUNET_NO == hrc.found)
363 { 366 {
364 GNUNET_STATISTICS_update(GSF_stats, 367 GNUNET_STATISTICS_update (GSF_stats,
365 gettext_noop( 368 gettext_noop (
366 "# replies received via cadet dropped"), 369 "# replies received via cadet dropped"),
367 1, 370 1,
368 GNUNET_NO); 371 GNUNET_NO);
369 } 372 }
370} 373}
371 374
372 375
@@ -378,34 +381,34 @@ handle_reply(void *cls, const struct CadetReplyMessage *srm)
378 * @param channel channel of the disconnecting client 381 * @param channel channel of the disconnecting client
379 */ 382 */
380static void 383static void
381disconnect_cb(void *cls, const struct GNUNET_CADET_Channel *channel) 384disconnect_cb (void *cls, const struct GNUNET_CADET_Channel *channel)
382{ 385{
383 struct CadetHandle *mh = cls; 386 struct CadetHandle *mh = cls;
384 struct GSF_CadetRequest *sr; 387 struct GSF_CadetRequest *sr;
385 388
386 if (NULL == mh->channel) 389 if (NULL == mh->channel)
387 return; /* being destroyed elsewhere */ 390 return; /* being destroyed elsewhere */
388 GNUNET_assert(channel == mh->channel); 391 GNUNET_assert (channel == mh->channel);
389 mh->channel = NULL; 392 mh->channel = NULL;
390 while (NULL != (sr = mh->pending_head)) 393 while (NULL != (sr = mh->pending_head))
391 GSF_cadet_query_cancel(sr); 394 GSF_cadet_query_cancel (sr);
392 /* first remove `mh` from the `cadet_map`, so that if the 395 /* first remove `mh` from the `cadet_map`, so that if the
393 callback from `free_waiting_entry()` happens to re-issue 396 callback from `free_waiting_entry()` happens to re-issue
394 the request, we don't immediately have it back in the 397 the request, we don't immediately have it back in the
395 `waiting_map`. */ 398 `waiting_map`. */
396 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove(cadet_map, 399 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove (cadet_map,
397 &mh->target, 400 &mh->target,
398 mh)); 401 mh));
399 GNUNET_CONTAINER_multihashmap_iterate(mh->waiting_map, 402 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
400 &free_waiting_entry, 403 &free_waiting_entry,
401 mh); 404 mh);
402 if (NULL != mh->timeout_task) 405 if (NULL != mh->timeout_task)
403 GNUNET_SCHEDULER_cancel(mh->timeout_task); 406 GNUNET_SCHEDULER_cancel (mh->timeout_task);
404 if (NULL != mh->reset_task) 407 if (NULL != mh->reset_task)
405 GNUNET_SCHEDULER_cancel(mh->reset_task); 408 GNUNET_SCHEDULER_cancel (mh->reset_task);
406 GNUNET_assert(0 == GNUNET_CONTAINER_multihashmap_size(mh->waiting_map)); 409 GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
407 GNUNET_CONTAINER_multihashmap_destroy(mh->waiting_map); 410 GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
408 GNUNET_free(mh); 411 GNUNET_free (mh);
409} 412}
410 413
411 414
@@ -424,16 +427,16 @@ disconnect_cb(void *cls, const struct GNUNET_CADET_Channel *channel)
424 * this value will be negative.. 427 * this value will be negative..
425 */ 428 */
426static void 429static void
427window_change_cb(void *cls, 430window_change_cb (void *cls,
428 const struct GNUNET_CADET_Channel *channel, 431 const struct GNUNET_CADET_Channel *channel,
429 int window_size) 432 int window_size)
430{ 433{
431 /* FIXME: for flow control, implement? */ 434 /* FIXME: for flow control, implement? */
432#if 0 435#if 0
433 /* Something like this instead of the GNUNET_MQ_notify_sent() in 436 /* Something like this instead of the GNUNET_MQ_notify_sent() in
434 transmit_pending() might be good (once the window change CB works...) */ 437 transmit_pending() might be good (once the window change CB works...) */
435 if (0 < window_size) /* test needed? */ 438 if (0 < window_size) /* test needed? */
436 transmit_pending(mh); 439 transmit_pending (mh);
437#endif 440#endif
438} 441}
439 442
@@ -444,38 +447,38 @@ window_change_cb(void *cls,
444 * @param mh cadet to reset 447 * @param mh cadet to reset
445 */ 448 */
446static void 449static void
447reset_cadet(struct CadetHandle *mh) 450reset_cadet (struct CadetHandle *mh)
448{ 451{
449 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
450 "Resetting cadet channel to %s\n", 453 "Resetting cadet channel to %s\n",
451 GNUNET_i2s(&mh->target)); 454 GNUNET_i2s (&mh->target));
452 if (NULL != mh->channel) 455 if (NULL != mh->channel)
453 { 456 {
454 GNUNET_CADET_channel_destroy(mh->channel); 457 GNUNET_CADET_channel_destroy (mh->channel);
455 mh->channel = NULL; 458 mh->channel = NULL;
456 } 459 }
457 GNUNET_CONTAINER_multihashmap_iterate(mh->waiting_map, &move_to_pending, mh); 460 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, &move_to_pending, mh);
458 { 461 {
459 struct GNUNET_MQ_MessageHandler handlers[] = 462 struct GNUNET_MQ_MessageHandler handlers[] =
460 { GNUNET_MQ_hd_var_size(reply, 463 { GNUNET_MQ_hd_var_size (reply,
461 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 464 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
462 struct CadetReplyMessage, 465 struct CadetReplyMessage,
463 mh), 466 mh),
464 GNUNET_MQ_handler_end() }; 467 GNUNET_MQ_handler_end () };
465 struct GNUNET_HashCode port; 468 struct GNUNET_HashCode port;
466 469
467 GNUNET_CRYPTO_hash(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, 470 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
468 strlen(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), 471 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
469 &port); 472 &port);
470 mh->channel = GNUNET_CADET_channel_create(cadet_handle, 473 mh->channel = GNUNET_CADET_channel_create (cadet_handle,
471 mh, 474 mh,
472 &mh->target, 475 &mh->target,
473 &port, 476 &port,
474 &window_change_cb, 477 &window_change_cb,
475 &disconnect_cb, 478 &disconnect_cb,
476 handlers); 479 handlers);
477 } 480 }
478 transmit_pending(mh); 481 transmit_pending (mh);
479} 482}
480 483
481 484
@@ -485,19 +488,19 @@ reset_cadet(struct CadetHandle *mh)
485 * @param cls the `struct CadetHandle` to tear down 488 * @param cls the `struct CadetHandle` to tear down
486 */ 489 */
487static void 490static void
488cadet_timeout(void *cls) 491cadet_timeout (void *cls)
489{ 492{
490 struct CadetHandle *mh = cls; 493 struct CadetHandle *mh = cls;
491 struct GNUNET_CADET_Channel *tun; 494 struct GNUNET_CADET_Channel *tun;
492 495
493 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 496 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
494 "Timeout on cadet channel to %s\n", 497 "Timeout on cadet channel to %s\n",
495 GNUNET_i2s(&mh->target)); 498 GNUNET_i2s (&mh->target));
496 mh->timeout_task = NULL; 499 mh->timeout_task = NULL;
497 tun = mh->channel; 500 tun = mh->channel;
498 mh->channel = NULL; 501 mh->channel = NULL;
499 if (NULL != tun) 502 if (NULL != tun)
500 GNUNET_CADET_channel_destroy(tun); 503 GNUNET_CADET_channel_destroy (tun);
501} 504}
502 505
503 506
@@ -507,12 +510,12 @@ cadet_timeout(void *cls)
507 * @param cls the `struct CadetHandle` to tear down 510 * @param cls the `struct CadetHandle` to tear down
508 */ 511 */
509static void 512static void
510reset_cadet_task(void *cls) 513reset_cadet_task (void *cls)
511{ 514{
512 struct CadetHandle *mh = cls; 515 struct CadetHandle *mh = cls;
513 516
514 mh->reset_task = NULL; 517 mh->reset_task = NULL;
515 reset_cadet(mh); 518 reset_cadet (mh);
516} 519}
517 520
518 521
@@ -522,36 +525,36 @@ reset_cadet_task(void *cls)
522 * @param cls `struct CadetHandle` to process 525 * @param cls `struct CadetHandle` to process
523 */ 526 */
524static void 527static void
525transmit_pending(void *cls) 528transmit_pending (void *cls)
526{ 529{
527 struct CadetHandle *mh = cls; 530 struct CadetHandle *mh = cls;
528 struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq(mh->channel); 531 struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
529 struct GSF_CadetRequest *sr; 532 struct GSF_CadetRequest *sr;
530 struct GNUNET_MQ_Envelope *env; 533 struct GNUNET_MQ_Envelope *env;
531 struct CadetQueryMessage *sqm; 534 struct CadetQueryMessage *sqm;
532 535
533 if ((0 != GNUNET_MQ_get_length(mq)) || (NULL == (sr = mh->pending_head))) 536 if ((0 != GNUNET_MQ_get_length (mq)) || (NULL == (sr = mh->pending_head)))
534 return; 537 return;
535 GNUNET_CONTAINER_DLL_remove(mh->pending_head, mh->pending_tail, sr); 538 GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr);
536 GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put( 539 GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (
537 mh->waiting_map, 540 mh->waiting_map,
538 &sr->query, 541 &sr->query,
539 sr, 542 sr,
540 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 543 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
541 sr->was_transmitted = GNUNET_YES; 544 sr->was_transmitted = GNUNET_YES;
542 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 545 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543 "Sending query for %s via cadet to %s\n", 546 "Sending query for %s via cadet to %s\n",
544 GNUNET_h2s(&sr->query), 547 GNUNET_h2s (&sr->query),
545 GNUNET_i2s(&mh->target)); 548 GNUNET_i2s (&mh->target));
546 env = GNUNET_MQ_msg(sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); 549 env = GNUNET_MQ_msg (sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
547 GNUNET_MQ_env_set_options(env, 550 GNUNET_MQ_env_set_options (env,
548 GNUNET_MQ_PREF_GOODPUT | 551 GNUNET_MQ_PREF_GOODPUT
549 GNUNET_MQ_PREF_CORK_ALLOWED | 552 | GNUNET_MQ_PREF_CORK_ALLOWED
550 GNUNET_MQ_PREF_OUT_OF_ORDER); 553 | GNUNET_MQ_PREF_OUT_OF_ORDER);
551 sqm->type = htonl(sr->type); 554 sqm->type = htonl (sr->type);
552 sqm->query = sr->query; 555 sqm->query = sr->query;
553 GNUNET_MQ_notify_sent(env, &transmit_pending, mh); 556 GNUNET_MQ_notify_sent (env, &transmit_pending, mh);
554 GNUNET_MQ_send(mq, env); 557 GNUNET_MQ_send (mq, env);
555} 558}
556 559
557 560
@@ -561,53 +564,53 @@ transmit_pending(void *cls)
561 * @param target peer we want to communicate with 564 * @param target peer we want to communicate with
562 */ 565 */
563static struct CadetHandle * 566static struct CadetHandle *
564get_cadet(const struct GNUNET_PeerIdentity *target) 567get_cadet (const struct GNUNET_PeerIdentity *target)
565{ 568{
566 struct CadetHandle *mh; 569 struct CadetHandle *mh;
567 570
568 mh = GNUNET_CONTAINER_multipeermap_get(cadet_map, target); 571 mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, target);
569 if (NULL != mh) 572 if (NULL != mh)
573 {
574 if (NULL != mh->timeout_task)
570 { 575 {
571 if (NULL != mh->timeout_task) 576 GNUNET_SCHEDULER_cancel (mh->timeout_task);
572 { 577 mh->timeout_task = NULL;
573 GNUNET_SCHEDULER_cancel(mh->timeout_task);
574 mh->timeout_task = NULL;
575 }
576 return mh;
577 } 578 }
578 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 579 return mh;
579 "Creating cadet channel to %s\n", 580 }
580 GNUNET_i2s(target)); 581 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
581 mh = GNUNET_new(struct CadetHandle); 582 "Creating cadet channel to %s\n",
583 GNUNET_i2s (target));
584 mh = GNUNET_new (struct CadetHandle);
582 mh->reset_task = 585 mh->reset_task =
583 GNUNET_SCHEDULER_add_delayed(CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh); 586 GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh);
584 mh->waiting_map = GNUNET_CONTAINER_multihashmap_create(16, GNUNET_YES); 587 mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
585 mh->target = *target; 588 mh->target = *target;
586 GNUNET_assert(GNUNET_OK == 589 GNUNET_assert (GNUNET_OK ==
587 GNUNET_CONTAINER_multipeermap_put( 590 GNUNET_CONTAINER_multipeermap_put (
588 cadet_map, 591 cadet_map,
589 &mh->target, 592 &mh->target,
590 mh, 593 mh,
591 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 594 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
592 { 595 {
593 struct GNUNET_MQ_MessageHandler handlers[] = 596 struct GNUNET_MQ_MessageHandler handlers[] =
594 { GNUNET_MQ_hd_var_size(reply, 597 { GNUNET_MQ_hd_var_size (reply,
595 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 598 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
596 struct CadetReplyMessage, 599 struct CadetReplyMessage,
597 mh), 600 mh),
598 GNUNET_MQ_handler_end() }; 601 GNUNET_MQ_handler_end () };
599 struct GNUNET_HashCode port; 602 struct GNUNET_HashCode port;
600 603
601 GNUNET_CRYPTO_hash(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, 604 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
602 strlen(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), 605 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
603 &port); 606 &port);
604 mh->channel = GNUNET_CADET_channel_create(cadet_handle, 607 mh->channel = GNUNET_CADET_channel_create (cadet_handle,
605 mh, 608 mh,
606 &mh->target, 609 &mh->target,
607 &port, 610 &port,
608 &window_change_cb, 611 &window_change_cb,
609 &disconnect_cb, 612 &disconnect_cb,
610 handlers); 613 handlers);
611 } 614 }
612 return mh; 615 return mh;
613} 616}
@@ -624,28 +627,28 @@ get_cadet(const struct GNUNET_PeerIdentity *target)
624 * @return handle to cancel the operation 627 * @return handle to cancel the operation
625 */ 628 */
626struct GSF_CadetRequest * 629struct GSF_CadetRequest *
627GSF_cadet_query(const struct GNUNET_PeerIdentity *target, 630GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
628 const struct GNUNET_HashCode *query, 631 const struct GNUNET_HashCode *query,
629 enum GNUNET_BLOCK_Type type, 632 enum GNUNET_BLOCK_Type type,
630 GSF_CadetReplyProcessor proc, 633 GSF_CadetReplyProcessor proc,
631 void *proc_cls) 634 void *proc_cls)
632{ 635{
633 struct CadetHandle *mh; 636 struct CadetHandle *mh;
634 struct GSF_CadetRequest *sr; 637 struct GSF_CadetRequest *sr;
635 638
636 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 639 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
637 "Preparing to send query for %s via cadet to %s\n", 640 "Preparing to send query for %s via cadet to %s\n",
638 GNUNET_h2s(query), 641 GNUNET_h2s (query),
639 GNUNET_i2s(target)); 642 GNUNET_i2s (target));
640 mh = get_cadet(target); 643 mh = get_cadet (target);
641 sr = GNUNET_new(struct GSF_CadetRequest); 644 sr = GNUNET_new (struct GSF_CadetRequest);
642 sr->mh = mh; 645 sr->mh = mh;
643 sr->proc = proc; 646 sr->proc = proc;
644 sr->proc_cls = proc_cls; 647 sr->proc_cls = proc_cls;
645 sr->type = type; 648 sr->type = type;
646 sr->query = *query; 649 sr->query = *query;
647 GNUNET_CONTAINER_DLL_insert(mh->pending_head, mh->pending_tail, sr); 650 GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr);
648 transmit_pending(mh); 651 transmit_pending (mh);
649 return sr; 652 return sr;
650} 653}
651 654
@@ -657,7 +660,7 @@ GSF_cadet_query(const struct GNUNET_PeerIdentity *target,
657 * @param sr request to cancel 660 * @param sr request to cancel
658 */ 661 */
659void 662void
660GSF_cadet_query_cancel(struct GSF_CadetRequest *sr) 663GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
661{ 664{
662 struct CadetHandle *mh = sr->mh; 665 struct CadetHandle *mh = sr->mh;
663 GSF_CadetReplyProcessor p; 666 GSF_CadetReplyProcessor p;
@@ -665,26 +668,26 @@ GSF_cadet_query_cancel(struct GSF_CadetRequest *sr)
665 p = sr->proc; 668 p = sr->proc;
666 sr->proc = NULL; 669 sr->proc = NULL;
667 if (NULL != p) 670 if (NULL != p)
668 { 671 {
669 /* signal failure / cancellation to callback */ 672 /* signal failure / cancellation to callback */
670 p(sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL); 673 p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL);
671 } 674 }
672 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 675 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
673 "Cancelled query for %s via cadet to %s\n", 676 "Cancelled query for %s via cadet to %s\n",
674 GNUNET_h2s(&sr->query), 677 GNUNET_h2s (&sr->query),
675 GNUNET_i2s(&sr->mh->target)); 678 GNUNET_i2s (&sr->mh->target));
676 if (GNUNET_YES == sr->was_transmitted) 679 if (GNUNET_YES == sr->was_transmitted)
677 GNUNET_assert( 680 GNUNET_assert (
678 GNUNET_OK == 681 GNUNET_OK ==
679 GNUNET_CONTAINER_multihashmap_remove(mh->waiting_map, &sr->query, sr)); 682 GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, &sr->query, sr));
680 else 683 else
681 GNUNET_CONTAINER_DLL_remove(mh->pending_head, mh->pending_tail, sr); 684 GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr);
682 GNUNET_free(sr); 685 GNUNET_free (sr);
683 if ((0 == GNUNET_CONTAINER_multihashmap_size(mh->waiting_map)) && 686 if ((0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
684 (NULL == mh->pending_head)) 687 (NULL == mh->pending_head))
685 mh->timeout_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_SECONDS, 688 mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
686 &cadet_timeout, 689 &cadet_timeout,
687 mh); 690 mh);
688} 691}
689 692
690 693
@@ -697,27 +700,27 @@ GSF_cadet_query_cancel(struct GSF_CadetRequest *sr)
697 * @return #GNUNET_YES (continue to iterate) 700 * @return #GNUNET_YES (continue to iterate)
698 */ 701 */
699int 702int
700GSF_cadet_release_clients(void *cls, 703GSF_cadet_release_clients (void *cls,
701 const struct GNUNET_PeerIdentity *key, 704 const struct GNUNET_PeerIdentity *key,
702 void *value) 705 void *value)
703{ 706{
704 struct CadetHandle *mh = value; 707 struct CadetHandle *mh = value;
705 708
706 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, 709 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
707 "Timeout on cadet channel to %s\n", 710 "Timeout on cadet channel to %s\n",
708 GNUNET_i2s(&mh->target)); 711 GNUNET_i2s (&mh->target));
709 if (NULL != mh->channel) 712 if (NULL != mh->channel)
710 { 713 {
711 struct GNUNET_CADET_Channel *channel = mh->channel; 714 struct GNUNET_CADET_Channel *channel = mh->channel;
712 715
713 mh->channel = NULL; 716 mh->channel = NULL;
714 GNUNET_CADET_channel_destroy(channel); 717 GNUNET_CADET_channel_destroy (channel);
715 } 718 }
716 if (NULL != mh->reset_task) 719 if (NULL != mh->reset_task)
717 { 720 {
718 GNUNET_SCHEDULER_cancel(mh->reset_task); 721 GNUNET_SCHEDULER_cancel (mh->reset_task);
719 mh->reset_task = NULL; 722 mh->reset_task = NULL;
720 } 723 }
721 return GNUNET_YES; 724 return GNUNET_YES;
722} 725}
723 726