diff options
Diffstat (limited to 'src/fs/gnunet-service-fs_cadet_client.c')
-rw-r--r-- | src/fs/gnunet-service-fs_cadet_client.c | 423 |
1 files changed, 213 insertions, 210 deletions
diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c index a494ba751..96ccf8232 100644 --- a/src/fs/gnunet-service-fs_cadet_client.c +++ b/src/fs/gnunet-service-fs_cadet_client.c | |||
@@ -42,7 +42,7 @@ | |||
42 | * After how long do we reset connections without replies? | 42 | * After how long do we reset connections without replies? |
43 | */ | 43 | */ |
44 | #define CLIENT_RETRY_TIMEOUT \ | 44 | #define CLIENT_RETRY_TIMEOUT \ |
45 | GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30) | 45 | GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) |
46 | 46 | ||
47 | 47 | ||
48 | /** | 48 | /** |
@@ -54,7 +54,8 @@ struct CadetHandle; | |||
54 | /** | 54 | /** |
55 | * Handle for a request that is going out via cadet API. | 55 | * Handle for a request that is going out via cadet API. |
56 | */ | 56 | */ |
57 | struct GSF_CadetRequest { | 57 | struct GSF_CadetRequest |
58 | { | ||
58 | /** | 59 | /** |
59 | * DLL. | 60 | * DLL. |
60 | */ | 61 | */ |
@@ -101,7 +102,8 @@ struct GSF_CadetRequest { | |||
101 | /** | 102 | /** |
102 | * Handle for a cadet to another peer. | 103 | * Handle for a cadet to another peer. |
103 | */ | 104 | */ |
104 | struct CadetHandle { | 105 | struct CadetHandle |
106 | { | ||
105 | /** | 107 | /** |
106 | * Head of DLL of pending requests on this cadet. | 108 | * Head of DLL of pending requests on this cadet. |
107 | */ | 109 | */ |
@@ -165,7 +167,7 @@ struct GNUNET_CONTAINER_MultiPeerMap *cadet_map; | |||
165 | * @param cls `struct CadetHandle` to process | 167 | * @param cls `struct CadetHandle` to process |
166 | */ | 168 | */ |
167 | static void | 169 | static void |
168 | transmit_pending(void *cls); | 170 | transmit_pending (void *cls); |
169 | 171 | ||
170 | 172 | ||
171 | /** | 173 | /** |
@@ -178,15 +180,15 @@ transmit_pending(void *cls); | |||
178 | * @return #GNUNET_YES (continue to iterate) | 180 | * @return #GNUNET_YES (continue to iterate) |
179 | */ | 181 | */ |
180 | static int | 182 | static int |
181 | move_to_pending(void *cls, const struct GNUNET_HashCode *key, void *value) | 183 | move_to_pending (void *cls, const struct GNUNET_HashCode *key, void *value) |
182 | { | 184 | { |
183 | struct CadetHandle *mh = cls; | 185 | struct CadetHandle *mh = cls; |
184 | struct GSF_CadetRequest *sr = value; | 186 | struct GSF_CadetRequest *sr = value; |
185 | 187 | ||
186 | GNUNET_assert( | 188 | GNUNET_assert ( |
187 | GNUNET_YES == | 189 | GNUNET_YES == |
188 | GNUNET_CONTAINER_multihashmap_remove(mh->waiting_map, key, value)); | 190 | GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, key, value)); |
189 | GNUNET_CONTAINER_DLL_insert(mh->pending_head, mh->pending_tail, sr); | 191 | GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr); |
190 | sr->was_transmitted = GNUNET_NO; | 192 | sr->was_transmitted = GNUNET_NO; |
191 | return GNUNET_YES; | 193 | return GNUNET_YES; |
192 | } | 194 | } |
@@ -201,7 +203,7 @@ move_to_pending(void *cls, const struct GNUNET_HashCode *key, void *value) | |||
201 | * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing | 203 | * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing |
202 | */ | 204 | */ |
203 | static int | 205 | static int |
204 | check_reply(void *cls, const struct CadetReplyMessage *srm) | 206 | check_reply (void *cls, const struct CadetReplyMessage *srm) |
205 | { | 207 | { |
206 | /* We check later... */ | 208 | /* We check later... */ |
207 | return GNUNET_OK; | 209 | return GNUNET_OK; |
@@ -214,7 +216,7 @@ check_reply(void *cls, const struct CadetReplyMessage *srm) | |||
214 | * @param cls the `struct CadetHandle` to tear down | 216 | * @param cls the `struct CadetHandle` to tear down |
215 | */ | 217 | */ |
216 | static void | 218 | static void |
217 | reset_cadet_task(void *cls); | 219 | reset_cadet_task (void *cls); |
218 | 220 | ||
219 | 221 | ||
220 | /** | 222 | /** |
@@ -224,18 +226,19 @@ reset_cadet_task(void *cls); | |||
224 | * @param mh cadet to reset | 226 | * @param mh cadet to reset |
225 | */ | 227 | */ |
226 | static void | 228 | static void |
227 | reset_cadet_async(struct CadetHandle *mh) | 229 | reset_cadet_async (struct CadetHandle *mh) |
228 | { | 230 | { |
229 | if (NULL != mh->reset_task) | 231 | if (NULL != mh->reset_task) |
230 | GNUNET_SCHEDULER_cancel(mh->reset_task); | 232 | GNUNET_SCHEDULER_cancel (mh->reset_task); |
231 | mh->reset_task = GNUNET_SCHEDULER_add_now(&reset_cadet_task, mh); | 233 | mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task, mh); |
232 | } | 234 | } |
233 | 235 | ||
234 | 236 | ||
235 | /** | 237 | /** |
236 | * Closure for handle_reply(). | 238 | * Closure for handle_reply(). |
237 | */ | 239 | */ |
238 | struct HandleReplyClosure { | 240 | struct HandleReplyClosure |
241 | { | ||
239 | /** | 242 | /** |
240 | * Reply payload. | 243 | * Reply payload. |
241 | */ | 244 | */ |
@@ -273,18 +276,18 @@ struct HandleReplyClosure { | |||
273 | * @return #GNUNET_YES (continue to iterate) | 276 | * @return #GNUNET_YES (continue to iterate) |
274 | */ | 277 | */ |
275 | static int | 278 | static int |
276 | process_reply(void *cls, const struct GNUNET_HashCode *key, void *value) | 279 | process_reply (void *cls, const struct GNUNET_HashCode *key, void *value) |
277 | { | 280 | { |
278 | struct HandleReplyClosure *hrc = cls; | 281 | struct HandleReplyClosure *hrc = cls; |
279 | struct GSF_CadetRequest *sr = value; | 282 | struct GSF_CadetRequest *sr = value; |
280 | 283 | ||
281 | sr->proc(sr->proc_cls, | 284 | sr->proc (sr->proc_cls, |
282 | hrc->type, | 285 | hrc->type, |
283 | hrc->expiration, | 286 | hrc->expiration, |
284 | hrc->data_size, | 287 | hrc->data_size, |
285 | hrc->data); | 288 | hrc->data); |
286 | sr->proc = NULL; | 289 | sr->proc = NULL; |
287 | GSF_cadet_query_cancel(sr); | 290 | GSF_cadet_query_cancel (sr); |
288 | hrc->found = GNUNET_YES; | 291 | hrc->found = GNUNET_YES; |
289 | return GNUNET_YES; | 292 | return GNUNET_YES; |
290 | } | 293 | } |
@@ -301,11 +304,11 @@ process_reply(void *cls, const struct GNUNET_HashCode *key, void *value) | |||
301 | * @return #GNUNET_YES (continue to iterate) | 304 | * @return #GNUNET_YES (continue to iterate) |
302 | */ | 305 | */ |
303 | static int | 306 | static int |
304 | free_waiting_entry(void *cls, const struct GNUNET_HashCode *key, void *value) | 307 | free_waiting_entry (void *cls, const struct GNUNET_HashCode *key, void *value) |
305 | { | 308 | { |
306 | struct GSF_CadetRequest *sr = value; | 309 | struct GSF_CadetRequest *sr = value; |
307 | 310 | ||
308 | GSF_cadet_query_cancel(sr); | 311 | GSF_cadet_query_cancel (sr); |
309 | return GNUNET_YES; | 312 | return GNUNET_YES; |
310 | } | 313 | } |
311 | 314 | ||
@@ -318,7 +321,7 @@ free_waiting_entry(void *cls, const struct GNUNET_HashCode *key, void *value) | |||
318 | * @param srm the actual message | 321 | * @param srm the actual message |
319 | */ | 322 | */ |
320 | static void | 323 | static void |
321 | handle_reply(void *cls, const struct CadetReplyMessage *srm) | 324 | handle_reply (void *cls, const struct CadetReplyMessage *srm) |
322 | { | 325 | { |
323 | struct CadetHandle *mh = cls; | 326 | struct CadetHandle *mh = cls; |
324 | struct HandleReplyClosure hrc; | 327 | struct HandleReplyClosure hrc; |
@@ -326,47 +329,47 @@ handle_reply(void *cls, const struct CadetReplyMessage *srm) | |||
326 | enum GNUNET_BLOCK_Type type; | 329 | enum GNUNET_BLOCK_Type type; |
327 | struct GNUNET_HashCode query; | 330 | struct GNUNET_HashCode query; |
328 | 331 | ||
329 | msize = ntohs(srm->header.size) - sizeof(struct CadetReplyMessage); | 332 | msize = ntohs (srm->header.size) - sizeof(struct CadetReplyMessage); |
330 | type = (enum GNUNET_BLOCK_Type)ntohl(srm->type); | 333 | type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); |
331 | if (GNUNET_YES != | 334 | if (GNUNET_YES != |
332 | GNUNET_BLOCK_get_key(GSF_block_ctx, type, &srm[1], msize, &query)) | 335 | GNUNET_BLOCK_get_key (GSF_block_ctx, type, &srm[1], msize, &query)) |
333 | { | 336 | { |
334 | GNUNET_break_op(0); | 337 | GNUNET_break_op (0); |
335 | GNUNET_log( | 338 | GNUNET_log ( |
336 | GNUNET_ERROR_TYPE_WARNING, | 339 | GNUNET_ERROR_TYPE_WARNING, |
337 | "Received bogus reply of type %u with %u bytes via cadet from peer %s\n", | 340 | "Received bogus reply of type %u with %u bytes via cadet from peer %s\n", |
338 | type, | 341 | type, |
339 | msize, | 342 | msize, |
340 | GNUNET_i2s(&mh->target)); | 343 | GNUNET_i2s (&mh->target)); |
341 | reset_cadet_async(mh); | 344 | reset_cadet_async (mh); |
342 | return; | 345 | return; |
343 | } | 346 | } |
344 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 347 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
345 | "Received reply `%s' via cadet from peer %s\n", | 348 | "Received reply `%s' via cadet from peer %s\n", |
346 | GNUNET_h2s(&query), | 349 | GNUNET_h2s (&query), |
347 | GNUNET_i2s(&mh->target)); | 350 | GNUNET_i2s (&mh->target)); |
348 | GNUNET_CADET_receive_done(mh->channel); | 351 | GNUNET_CADET_receive_done (mh->channel); |
349 | GNUNET_STATISTICS_update(GSF_stats, | 352 | GNUNET_STATISTICS_update (GSF_stats, |
350 | gettext_noop("# replies received via cadet"), | 353 | gettext_noop ("# replies received via cadet"), |
351 | 1, | 354 | 1, |
352 | GNUNET_NO); | 355 | GNUNET_NO); |
353 | hrc.data = &srm[1]; | 356 | hrc.data = &srm[1]; |
354 | hrc.data_size = msize; | 357 | hrc.data_size = msize; |
355 | hrc.expiration = GNUNET_TIME_absolute_ntoh(srm->expiration); | 358 | hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration); |
356 | hrc.type = type; | 359 | hrc.type = type; |
357 | hrc.found = GNUNET_NO; | 360 | hrc.found = GNUNET_NO; |
358 | GNUNET_CONTAINER_multihashmap_get_multiple(mh->waiting_map, | 361 | GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map, |
359 | &query, | 362 | &query, |
360 | &process_reply, | 363 | &process_reply, |
361 | &hrc); | 364 | &hrc); |
362 | if (GNUNET_NO == hrc.found) | 365 | if (GNUNET_NO == hrc.found) |
363 | { | 366 | { |
364 | GNUNET_STATISTICS_update(GSF_stats, | 367 | GNUNET_STATISTICS_update (GSF_stats, |
365 | gettext_noop( | 368 | gettext_noop ( |
366 | "# replies received via cadet dropped"), | 369 | "# replies received via cadet dropped"), |
367 | 1, | 370 | 1, |
368 | GNUNET_NO); | 371 | GNUNET_NO); |
369 | } | 372 | } |
370 | } | 373 | } |
371 | 374 | ||
372 | 375 | ||
@@ -378,34 +381,34 @@ handle_reply(void *cls, const struct CadetReplyMessage *srm) | |||
378 | * @param channel channel of the disconnecting client | 381 | * @param channel channel of the disconnecting client |
379 | */ | 382 | */ |
380 | static void | 383 | static void |
381 | disconnect_cb(void *cls, const struct GNUNET_CADET_Channel *channel) | 384 | disconnect_cb (void *cls, const struct GNUNET_CADET_Channel *channel) |
382 | { | 385 | { |
383 | struct CadetHandle *mh = cls; | 386 | struct CadetHandle *mh = cls; |
384 | struct GSF_CadetRequest *sr; | 387 | struct GSF_CadetRequest *sr; |
385 | 388 | ||
386 | if (NULL == mh->channel) | 389 | if (NULL == mh->channel) |
387 | return; /* being destroyed elsewhere */ | 390 | return; /* being destroyed elsewhere */ |
388 | GNUNET_assert(channel == mh->channel); | 391 | GNUNET_assert (channel == mh->channel); |
389 | mh->channel = NULL; | 392 | mh->channel = NULL; |
390 | while (NULL != (sr = mh->pending_head)) | 393 | while (NULL != (sr = mh->pending_head)) |
391 | GSF_cadet_query_cancel(sr); | 394 | GSF_cadet_query_cancel (sr); |
392 | /* first remove `mh` from the `cadet_map`, so that if the | 395 | /* first remove `mh` from the `cadet_map`, so that if the |
393 | callback from `free_waiting_entry()` happens to re-issue | 396 | callback from `free_waiting_entry()` happens to re-issue |
394 | the request, we don't immediately have it back in the | 397 | the request, we don't immediately have it back in the |
395 | `waiting_map`. */ | 398 | `waiting_map`. */ |
396 | GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove(cadet_map, | 399 | GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multipeermap_remove (cadet_map, |
397 | &mh->target, | 400 | &mh->target, |
398 | mh)); | 401 | mh)); |
399 | GNUNET_CONTAINER_multihashmap_iterate(mh->waiting_map, | 402 | GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, |
400 | &free_waiting_entry, | 403 | &free_waiting_entry, |
401 | mh); | 404 | mh); |
402 | if (NULL != mh->timeout_task) | 405 | if (NULL != mh->timeout_task) |
403 | GNUNET_SCHEDULER_cancel(mh->timeout_task); | 406 | GNUNET_SCHEDULER_cancel (mh->timeout_task); |
404 | if (NULL != mh->reset_task) | 407 | if (NULL != mh->reset_task) |
405 | GNUNET_SCHEDULER_cancel(mh->reset_task); | 408 | GNUNET_SCHEDULER_cancel (mh->reset_task); |
406 | GNUNET_assert(0 == GNUNET_CONTAINER_multihashmap_size(mh->waiting_map)); | 409 | GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)); |
407 | GNUNET_CONTAINER_multihashmap_destroy(mh->waiting_map); | 410 | GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map); |
408 | GNUNET_free(mh); | 411 | GNUNET_free (mh); |
409 | } | 412 | } |
410 | 413 | ||
411 | 414 | ||
@@ -424,16 +427,16 @@ disconnect_cb(void *cls, const struct GNUNET_CADET_Channel *channel) | |||
424 | * this value will be negative.. | 427 | * this value will be negative.. |
425 | */ | 428 | */ |
426 | static void | 429 | static void |
427 | window_change_cb(void *cls, | 430 | window_change_cb (void *cls, |
428 | const struct GNUNET_CADET_Channel *channel, | 431 | const struct GNUNET_CADET_Channel *channel, |
429 | int window_size) | 432 | int window_size) |
430 | { | 433 | { |
431 | /* FIXME: for flow control, implement? */ | 434 | /* FIXME: for flow control, implement? */ |
432 | #if 0 | 435 | #if 0 |
433 | /* Something like this instead of the GNUNET_MQ_notify_sent() in | 436 | /* Something like this instead of the GNUNET_MQ_notify_sent() in |
434 | transmit_pending() might be good (once the window change CB works...) */ | 437 | transmit_pending() might be good (once the window change CB works...) */ |
435 | if (0 < window_size) /* test needed? */ | 438 | if (0 < window_size) /* test needed? */ |
436 | transmit_pending(mh); | 439 | transmit_pending (mh); |
437 | #endif | 440 | #endif |
438 | } | 441 | } |
439 | 442 | ||
@@ -444,38 +447,38 @@ window_change_cb(void *cls, | |||
444 | * @param mh cadet to reset | 447 | * @param mh cadet to reset |
445 | */ | 448 | */ |
446 | static void | 449 | static void |
447 | reset_cadet(struct CadetHandle *mh) | 450 | reset_cadet (struct CadetHandle *mh) |
448 | { | 451 | { |
449 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 452 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
450 | "Resetting cadet channel to %s\n", | 453 | "Resetting cadet channel to %s\n", |
451 | GNUNET_i2s(&mh->target)); | 454 | GNUNET_i2s (&mh->target)); |
452 | if (NULL != mh->channel) | 455 | if (NULL != mh->channel) |
453 | { | 456 | { |
454 | GNUNET_CADET_channel_destroy(mh->channel); | 457 | GNUNET_CADET_channel_destroy (mh->channel); |
455 | mh->channel = NULL; | 458 | mh->channel = NULL; |
456 | } | 459 | } |
457 | GNUNET_CONTAINER_multihashmap_iterate(mh->waiting_map, &move_to_pending, mh); | 460 | GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, &move_to_pending, mh); |
458 | { | 461 | { |
459 | struct GNUNET_MQ_MessageHandler handlers[] = | 462 | struct GNUNET_MQ_MessageHandler handlers[] = |
460 | { GNUNET_MQ_hd_var_size(reply, | 463 | { GNUNET_MQ_hd_var_size (reply, |
461 | GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, | 464 | GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, |
462 | struct CadetReplyMessage, | 465 | struct CadetReplyMessage, |
463 | mh), | 466 | mh), |
464 | GNUNET_MQ_handler_end() }; | 467 | GNUNET_MQ_handler_end () }; |
465 | struct GNUNET_HashCode port; | 468 | struct GNUNET_HashCode port; |
466 | 469 | ||
467 | GNUNET_CRYPTO_hash(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, | 470 | GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, |
468 | strlen(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), | 471 | strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), |
469 | &port); | 472 | &port); |
470 | mh->channel = GNUNET_CADET_channel_create(cadet_handle, | 473 | mh->channel = GNUNET_CADET_channel_create (cadet_handle, |
471 | mh, | 474 | mh, |
472 | &mh->target, | 475 | &mh->target, |
473 | &port, | 476 | &port, |
474 | &window_change_cb, | 477 | &window_change_cb, |
475 | &disconnect_cb, | 478 | &disconnect_cb, |
476 | handlers); | 479 | handlers); |
477 | } | 480 | } |
478 | transmit_pending(mh); | 481 | transmit_pending (mh); |
479 | } | 482 | } |
480 | 483 | ||
481 | 484 | ||
@@ -485,19 +488,19 @@ reset_cadet(struct CadetHandle *mh) | |||
485 | * @param cls the `struct CadetHandle` to tear down | 488 | * @param cls the `struct CadetHandle` to tear down |
486 | */ | 489 | */ |
487 | static void | 490 | static void |
488 | cadet_timeout(void *cls) | 491 | cadet_timeout (void *cls) |
489 | { | 492 | { |
490 | struct CadetHandle *mh = cls; | 493 | struct CadetHandle *mh = cls; |
491 | struct GNUNET_CADET_Channel *tun; | 494 | struct GNUNET_CADET_Channel *tun; |
492 | 495 | ||
493 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 496 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
494 | "Timeout on cadet channel to %s\n", | 497 | "Timeout on cadet channel to %s\n", |
495 | GNUNET_i2s(&mh->target)); | 498 | GNUNET_i2s (&mh->target)); |
496 | mh->timeout_task = NULL; | 499 | mh->timeout_task = NULL; |
497 | tun = mh->channel; | 500 | tun = mh->channel; |
498 | mh->channel = NULL; | 501 | mh->channel = NULL; |
499 | if (NULL != tun) | 502 | if (NULL != tun) |
500 | GNUNET_CADET_channel_destroy(tun); | 503 | GNUNET_CADET_channel_destroy (tun); |
501 | } | 504 | } |
502 | 505 | ||
503 | 506 | ||
@@ -507,12 +510,12 @@ cadet_timeout(void *cls) | |||
507 | * @param cls the `struct CadetHandle` to tear down | 510 | * @param cls the `struct CadetHandle` to tear down |
508 | */ | 511 | */ |
509 | static void | 512 | static void |
510 | reset_cadet_task(void *cls) | 513 | reset_cadet_task (void *cls) |
511 | { | 514 | { |
512 | struct CadetHandle *mh = cls; | 515 | struct CadetHandle *mh = cls; |
513 | 516 | ||
514 | mh->reset_task = NULL; | 517 | mh->reset_task = NULL; |
515 | reset_cadet(mh); | 518 | reset_cadet (mh); |
516 | } | 519 | } |
517 | 520 | ||
518 | 521 | ||
@@ -522,36 +525,36 @@ reset_cadet_task(void *cls) | |||
522 | * @param cls `struct CadetHandle` to process | 525 | * @param cls `struct CadetHandle` to process |
523 | */ | 526 | */ |
524 | static void | 527 | static void |
525 | transmit_pending(void *cls) | 528 | transmit_pending (void *cls) |
526 | { | 529 | { |
527 | struct CadetHandle *mh = cls; | 530 | struct CadetHandle *mh = cls; |
528 | struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq(mh->channel); | 531 | struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel); |
529 | struct GSF_CadetRequest *sr; | 532 | struct GSF_CadetRequest *sr; |
530 | struct GNUNET_MQ_Envelope *env; | 533 | struct GNUNET_MQ_Envelope *env; |
531 | struct CadetQueryMessage *sqm; | 534 | struct CadetQueryMessage *sqm; |
532 | 535 | ||
533 | if ((0 != GNUNET_MQ_get_length(mq)) || (NULL == (sr = mh->pending_head))) | 536 | if ((0 != GNUNET_MQ_get_length (mq)) || (NULL == (sr = mh->pending_head))) |
534 | return; | 537 | return; |
535 | GNUNET_CONTAINER_DLL_remove(mh->pending_head, mh->pending_tail, sr); | 538 | GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr); |
536 | GNUNET_assert(GNUNET_OK == GNUNET_CONTAINER_multihashmap_put( | 539 | GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put ( |
537 | mh->waiting_map, | 540 | mh->waiting_map, |
538 | &sr->query, | 541 | &sr->query, |
539 | sr, | 542 | sr, |
540 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | 543 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
541 | sr->was_transmitted = GNUNET_YES; | 544 | sr->was_transmitted = GNUNET_YES; |
542 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 545 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
543 | "Sending query for %s via cadet to %s\n", | 546 | "Sending query for %s via cadet to %s\n", |
544 | GNUNET_h2s(&sr->query), | 547 | GNUNET_h2s (&sr->query), |
545 | GNUNET_i2s(&mh->target)); | 548 | GNUNET_i2s (&mh->target)); |
546 | env = GNUNET_MQ_msg(sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); | 549 | env = GNUNET_MQ_msg (sqm, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY); |
547 | GNUNET_MQ_env_set_options(env, | 550 | GNUNET_MQ_env_set_options (env, |
548 | GNUNET_MQ_PREF_GOODPUT | | 551 | GNUNET_MQ_PREF_GOODPUT |
549 | GNUNET_MQ_PREF_CORK_ALLOWED | | 552 | | GNUNET_MQ_PREF_CORK_ALLOWED |
550 | GNUNET_MQ_PREF_OUT_OF_ORDER); | 553 | | GNUNET_MQ_PREF_OUT_OF_ORDER); |
551 | sqm->type = htonl(sr->type); | 554 | sqm->type = htonl (sr->type); |
552 | sqm->query = sr->query; | 555 | sqm->query = sr->query; |
553 | GNUNET_MQ_notify_sent(env, &transmit_pending, mh); | 556 | GNUNET_MQ_notify_sent (env, &transmit_pending, mh); |
554 | GNUNET_MQ_send(mq, env); | 557 | GNUNET_MQ_send (mq, env); |
555 | } | 558 | } |
556 | 559 | ||
557 | 560 | ||
@@ -561,53 +564,53 @@ transmit_pending(void *cls) | |||
561 | * @param target peer we want to communicate with | 564 | * @param target peer we want to communicate with |
562 | */ | 565 | */ |
563 | static struct CadetHandle * | 566 | static struct CadetHandle * |
564 | get_cadet(const struct GNUNET_PeerIdentity *target) | 567 | get_cadet (const struct GNUNET_PeerIdentity *target) |
565 | { | 568 | { |
566 | struct CadetHandle *mh; | 569 | struct CadetHandle *mh; |
567 | 570 | ||
568 | mh = GNUNET_CONTAINER_multipeermap_get(cadet_map, target); | 571 | mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, target); |
569 | if (NULL != mh) | 572 | if (NULL != mh) |
573 | { | ||
574 | if (NULL != mh->timeout_task) | ||
570 | { | 575 | { |
571 | if (NULL != mh->timeout_task) | 576 | GNUNET_SCHEDULER_cancel (mh->timeout_task); |
572 | { | 577 | mh->timeout_task = NULL; |
573 | GNUNET_SCHEDULER_cancel(mh->timeout_task); | ||
574 | mh->timeout_task = NULL; | ||
575 | } | ||
576 | return mh; | ||
577 | } | 578 | } |
578 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 579 | return mh; |
579 | "Creating cadet channel to %s\n", | 580 | } |
580 | GNUNET_i2s(target)); | 581 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
581 | mh = GNUNET_new(struct CadetHandle); | 582 | "Creating cadet channel to %s\n", |
583 | GNUNET_i2s (target)); | ||
584 | mh = GNUNET_new (struct CadetHandle); | ||
582 | mh->reset_task = | 585 | mh->reset_task = |
583 | GNUNET_SCHEDULER_add_delayed(CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh); | 586 | GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, &reset_cadet_task, mh); |
584 | mh->waiting_map = GNUNET_CONTAINER_multihashmap_create(16, GNUNET_YES); | 587 | mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES); |
585 | mh->target = *target; | 588 | mh->target = *target; |
586 | GNUNET_assert(GNUNET_OK == | 589 | GNUNET_assert (GNUNET_OK == |
587 | GNUNET_CONTAINER_multipeermap_put( | 590 | GNUNET_CONTAINER_multipeermap_put ( |
588 | cadet_map, | 591 | cadet_map, |
589 | &mh->target, | 592 | &mh->target, |
590 | mh, | 593 | mh, |
591 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); | 594 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); |
592 | { | 595 | { |
593 | struct GNUNET_MQ_MessageHandler handlers[] = | 596 | struct GNUNET_MQ_MessageHandler handlers[] = |
594 | { GNUNET_MQ_hd_var_size(reply, | 597 | { GNUNET_MQ_hd_var_size (reply, |
595 | GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, | 598 | GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, |
596 | struct CadetReplyMessage, | 599 | struct CadetReplyMessage, |
597 | mh), | 600 | mh), |
598 | GNUNET_MQ_handler_end() }; | 601 | GNUNET_MQ_handler_end () }; |
599 | struct GNUNET_HashCode port; | 602 | struct GNUNET_HashCode port; |
600 | 603 | ||
601 | GNUNET_CRYPTO_hash(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, | 604 | GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, |
602 | strlen(GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), | 605 | strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), |
603 | &port); | 606 | &port); |
604 | mh->channel = GNUNET_CADET_channel_create(cadet_handle, | 607 | mh->channel = GNUNET_CADET_channel_create (cadet_handle, |
605 | mh, | 608 | mh, |
606 | &mh->target, | 609 | &mh->target, |
607 | &port, | 610 | &port, |
608 | &window_change_cb, | 611 | &window_change_cb, |
609 | &disconnect_cb, | 612 | &disconnect_cb, |
610 | handlers); | 613 | handlers); |
611 | } | 614 | } |
612 | return mh; | 615 | return mh; |
613 | } | 616 | } |
@@ -624,28 +627,28 @@ get_cadet(const struct GNUNET_PeerIdentity *target) | |||
624 | * @return handle to cancel the operation | 627 | * @return handle to cancel the operation |
625 | */ | 628 | */ |
626 | struct GSF_CadetRequest * | 629 | struct GSF_CadetRequest * |
627 | GSF_cadet_query(const struct GNUNET_PeerIdentity *target, | 630 | GSF_cadet_query (const struct GNUNET_PeerIdentity *target, |
628 | const struct GNUNET_HashCode *query, | 631 | const struct GNUNET_HashCode *query, |
629 | enum GNUNET_BLOCK_Type type, | 632 | enum GNUNET_BLOCK_Type type, |
630 | GSF_CadetReplyProcessor proc, | 633 | GSF_CadetReplyProcessor proc, |
631 | void *proc_cls) | 634 | void *proc_cls) |
632 | { | 635 | { |
633 | struct CadetHandle *mh; | 636 | struct CadetHandle *mh; |
634 | struct GSF_CadetRequest *sr; | 637 | struct GSF_CadetRequest *sr; |
635 | 638 | ||
636 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 639 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
637 | "Preparing to send query for %s via cadet to %s\n", | 640 | "Preparing to send query for %s via cadet to %s\n", |
638 | GNUNET_h2s(query), | 641 | GNUNET_h2s (query), |
639 | GNUNET_i2s(target)); | 642 | GNUNET_i2s (target)); |
640 | mh = get_cadet(target); | 643 | mh = get_cadet (target); |
641 | sr = GNUNET_new(struct GSF_CadetRequest); | 644 | sr = GNUNET_new (struct GSF_CadetRequest); |
642 | sr->mh = mh; | 645 | sr->mh = mh; |
643 | sr->proc = proc; | 646 | sr->proc = proc; |
644 | sr->proc_cls = proc_cls; | 647 | sr->proc_cls = proc_cls; |
645 | sr->type = type; | 648 | sr->type = type; |
646 | sr->query = *query; | 649 | sr->query = *query; |
647 | GNUNET_CONTAINER_DLL_insert(mh->pending_head, mh->pending_tail, sr); | 650 | GNUNET_CONTAINER_DLL_insert (mh->pending_head, mh->pending_tail, sr); |
648 | transmit_pending(mh); | 651 | transmit_pending (mh); |
649 | return sr; | 652 | return sr; |
650 | } | 653 | } |
651 | 654 | ||
@@ -657,7 +660,7 @@ GSF_cadet_query(const struct GNUNET_PeerIdentity *target, | |||
657 | * @param sr request to cancel | 660 | * @param sr request to cancel |
658 | */ | 661 | */ |
659 | void | 662 | void |
660 | GSF_cadet_query_cancel(struct GSF_CadetRequest *sr) | 663 | GSF_cadet_query_cancel (struct GSF_CadetRequest *sr) |
661 | { | 664 | { |
662 | struct CadetHandle *mh = sr->mh; | 665 | struct CadetHandle *mh = sr->mh; |
663 | GSF_CadetReplyProcessor p; | 666 | GSF_CadetReplyProcessor p; |
@@ -665,26 +668,26 @@ GSF_cadet_query_cancel(struct GSF_CadetRequest *sr) | |||
665 | p = sr->proc; | 668 | p = sr->proc; |
666 | sr->proc = NULL; | 669 | sr->proc = NULL; |
667 | if (NULL != p) | 670 | if (NULL != p) |
668 | { | 671 | { |
669 | /* signal failure / cancellation to callback */ | 672 | /* signal failure / cancellation to callback */ |
670 | p(sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL); | 673 | p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, GNUNET_TIME_UNIT_ZERO_ABS, 0, NULL); |
671 | } | 674 | } |
672 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 675 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
673 | "Cancelled query for %s via cadet to %s\n", | 676 | "Cancelled query for %s via cadet to %s\n", |
674 | GNUNET_h2s(&sr->query), | 677 | GNUNET_h2s (&sr->query), |
675 | GNUNET_i2s(&sr->mh->target)); | 678 | GNUNET_i2s (&sr->mh->target)); |
676 | if (GNUNET_YES == sr->was_transmitted) | 679 | if (GNUNET_YES == sr->was_transmitted) |
677 | GNUNET_assert( | 680 | GNUNET_assert ( |
678 | GNUNET_OK == | 681 | GNUNET_OK == |
679 | GNUNET_CONTAINER_multihashmap_remove(mh->waiting_map, &sr->query, sr)); | 682 | GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map, &sr->query, sr)); |
680 | else | 683 | else |
681 | GNUNET_CONTAINER_DLL_remove(mh->pending_head, mh->pending_tail, sr); | 684 | GNUNET_CONTAINER_DLL_remove (mh->pending_head, mh->pending_tail, sr); |
682 | GNUNET_free(sr); | 685 | GNUNET_free (sr); |
683 | if ((0 == GNUNET_CONTAINER_multihashmap_size(mh->waiting_map)) && | 686 | if ((0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) && |
684 | (NULL == mh->pending_head)) | 687 | (NULL == mh->pending_head)) |
685 | mh->timeout_task = GNUNET_SCHEDULER_add_delayed(GNUNET_TIME_UNIT_SECONDS, | 688 | mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, |
686 | &cadet_timeout, | 689 | &cadet_timeout, |
687 | mh); | 690 | mh); |
688 | } | 691 | } |
689 | 692 | ||
690 | 693 | ||
@@ -697,27 +700,27 @@ GSF_cadet_query_cancel(struct GSF_CadetRequest *sr) | |||
697 | * @return #GNUNET_YES (continue to iterate) | 700 | * @return #GNUNET_YES (continue to iterate) |
698 | */ | 701 | */ |
699 | int | 702 | int |
700 | GSF_cadet_release_clients(void *cls, | 703 | GSF_cadet_release_clients (void *cls, |
701 | const struct GNUNET_PeerIdentity *key, | 704 | const struct GNUNET_PeerIdentity *key, |
702 | void *value) | 705 | void *value) |
703 | { | 706 | { |
704 | struct CadetHandle *mh = value; | 707 | struct CadetHandle *mh = value; |
705 | 708 | ||
706 | GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, | 709 | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, |
707 | "Timeout on cadet channel to %s\n", | 710 | "Timeout on cadet channel to %s\n", |
708 | GNUNET_i2s(&mh->target)); | 711 | GNUNET_i2s (&mh->target)); |
709 | if (NULL != mh->channel) | 712 | if (NULL != mh->channel) |
710 | { | 713 | { |
711 | struct GNUNET_CADET_Channel *channel = mh->channel; | 714 | struct GNUNET_CADET_Channel *channel = mh->channel; |
712 | 715 | ||
713 | mh->channel = NULL; | 716 | mh->channel = NULL; |
714 | GNUNET_CADET_channel_destroy(channel); | 717 | GNUNET_CADET_channel_destroy (channel); |
715 | } | 718 | } |
716 | if (NULL != mh->reset_task) | 719 | if (NULL != mh->reset_task) |
717 | { | 720 | { |
718 | GNUNET_SCHEDULER_cancel(mh->reset_task); | 721 | GNUNET_SCHEDULER_cancel (mh->reset_task); |
719 | mh->reset_task = NULL; | 722 | mh->reset_task = NULL; |
720 | } | 723 | } |
721 | return GNUNET_YES; | 724 | return GNUNET_YES; |
722 | } | 725 | } |
723 | 726 | ||