aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cadet_client.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-02-17 14:31:38 +0100
committerChristian Grothoff <christian@grothoff.org>2017-02-17 14:31:38 +0100
commit181c039d12aa2aa99920d14070e7b64c018e8be7 (patch)
tree58725a1ff58892cbdf53983dc28b4c50c423e1ac /src/fs/gnunet-service-fs_cadet_client.c
parent69d9269f2c5887e73950ee3a7fc0fd5e050a8a86 (diff)
downloadgnunet-181c039d12aa2aa99920d14070e7b64c018e8be7.tar.gz
gnunet-181c039d12aa2aa99920d14070e7b64c018e8be7.zip
get FS test with CADET to finally pass again
Diffstat (limited to 'src/fs/gnunet-service-fs_cadet_client.c')
-rw-r--r--src/fs/gnunet-service-fs_cadet_client.c358
1 files changed, 166 insertions, 192 deletions
diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c
index 193fe2263..55e0cbc24 100644
--- a/src/fs/gnunet-service-fs_cadet_client.c
+++ b/src/fs/gnunet-service-fs_cadet_client.c
@@ -77,7 +77,7 @@ struct GSF_CadetRequest
77 GSF_CadetReplyProcessor proc; 77 GSF_CadetReplyProcessor proc;
78 78
79 /** 79 /**
80 * Closure for 'proc' 80 * Closure for @e proc
81 */ 81 */
82 void *proc_cls; 82 void *proc_cls;
83 83
@@ -126,11 +126,6 @@ struct CadetHandle
126 struct GNUNET_CADET_Channel *channel; 126 struct GNUNET_CADET_Channel *channel;
127 127
128 /** 128 /**
129 * Handle for active write operation, or NULL.
130 */
131 struct GNUNET_CADET_TransmitHandle *wh;
132
133 /**
134 * Which peer does this cadet go to? 129 * Which peer does this cadet go to?
135 */ 130 */
136 struct GNUNET_PeerIdentity target; 131 struct GNUNET_PeerIdentity target;
@@ -140,14 +135,14 @@ struct CadetHandle
140 * a few seconds to give the application a chance to give 135 * a few seconds to give the application a chance to give
141 * us another query). 136 * us another query).
142 */ 137 */
143 struct GNUNET_SCHEDULER_Task * timeout_task; 138 struct GNUNET_SCHEDULER_Task *timeout_task;
144 139
145 /** 140 /**
146 * Task to reset cadets that had errors (asynchronously, 141 * Task to reset cadets that had errors (asynchronously,
147 * as we may not be able to do it immediately during a 142 * as we may not be able to do it immediately during a
148 * callback from the cadet API). 143 * callback from the cadet API).
149 */ 144 */
150 struct GNUNET_SCHEDULER_Task * reset_task; 145 struct GNUNET_SCHEDULER_Task *reset_task;
151 146
152}; 147};
153 148
@@ -170,10 +165,10 @@ struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
170/** 165/**
171 * Transmit pending requests via the cadet. 166 * Transmit pending requests via the cadet.
172 * 167 *
173 * @param mh cadet to process 168 * @param cls `struct CadetHandle` to process
174 */ 169 */
175static void 170static void
176transmit_pending (struct CadetHandle *mh); 171transmit_pending (void *cls);
177 172
178 173
179/** 174/**
@@ -206,65 +201,19 @@ move_to_pending (void *cls,
206 201
207 202
208/** 203/**
209 * We had a serious error, tear down and re-create cadet from scratch. 204 * Functions with this signature are called whenever a complete reply
210 * 205 * is received.
211 * @param mh cadet to reset
212 */
213static void
214reset_cadet (struct CadetHandle *mh)
215{
216 struct GNUNET_CADET_Channel *channel = mh->channel;
217 struct GNUNET_HashCode port;
218
219 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
220 "Resetting cadet channel to %s\n",
221 GNUNET_i2s (&mh->target));
222 mh->channel = NULL;
223
224 if (NULL != channel)
225 {
226 /* Avoid loop */
227 if (NULL != mh->wh)
228 {
229 GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
230 mh->wh = NULL;
231 }
232 GNUNET_CADET_channel_destroy (channel);
233 }
234 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
235 &move_to_pending,
236 mh);
237 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
238 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
239 &port);
240 mh->channel = GNUNET_CADET_channel_create (cadet_handle,
241 mh,
242 &mh->target,
243 &port,
244 GNUNET_CADET_OPTION_RELIABLE);
245 transmit_pending (mh);
246}
247
248
249/**
250 * Task called when it is time to destroy an inactive cadet channel.
251 * 206 *
252 * @param cls the `struct CadetHandle` to tear down 207 * @param cls closure with the `struct CadetHandle`
208 * @param srm the actual message
209 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
253 */ 210 */
254static void 211static int
255cadet_timeout (void *cls) 212check_reply (void *cls,
213 const struct CadetReplyMessage *srm)
256{ 214{
257 struct CadetHandle *mh = cls; 215 /* We check later... */
258 struct GNUNET_CADET_Channel *tun; 216 return GNUNET_OK;
259
260 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
261 "Timeout on cadet channel to %s\n",
262 GNUNET_i2s (&mh->target));
263 mh->timeout_task = NULL;
264 tun = mh->channel;
265 mh->channel = NULL;
266 if(NULL != tun)
267 GNUNET_CADET_channel_destroy (tun);
268} 217}
269 218
270 219
@@ -274,13 +223,7 @@ cadet_timeout (void *cls)
274 * @param cls the `struct CadetHandle` to tear down 223 * @param cls the `struct CadetHandle` to tear down
275 */ 224 */
276static void 225static void
277reset_cadet_task (void *cls) 226reset_cadet_task (void *cls);
278{
279 struct CadetHandle *mh = cls;
280
281 mh->reset_task = NULL;
282 reset_cadet (mh);
283}
284 227
285 228
286/** 229/**
@@ -300,83 +243,6 @@ reset_cadet_async (struct CadetHandle *mh)
300 243
301 244
302/** 245/**
303 * Functions of this signature are called whenever we are ready to transmit
304 * query via a cadet.
305 *
306 * @param cls the struct CadetHandle for which we did the write call
307 * @param size the number of bytes that can be written to @a buf
308 * @param buf where to write the message
309 * @return number of bytes written to @a buf
310 */
311static size_t
312transmit_sqm (void *cls,
313 size_t size,
314 void *buf)
315{
316 struct CadetHandle *mh = cls;
317 struct CadetQueryMessage sqm;
318 struct GSF_CadetRequest *sr;
319
320 mh->wh = NULL;
321 if (NULL == buf)
322 {
323 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
324 "Cadet channel to %s failed during transmission attempt, rebuilding\n",
325 GNUNET_i2s (&mh->target));
326 reset_cadet_async (mh);
327 return 0;
328 }
329 sr = mh->pending_head;
330 if (NULL == sr)
331 return 0;
332 GNUNET_assert (size >= sizeof (struct CadetQueryMessage));
333 GNUNET_CONTAINER_DLL_remove (mh->pending_head,
334 mh->pending_tail,
335 sr);
336 GNUNET_assert (GNUNET_OK ==
337 GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
338 &sr->query,
339 sr,
340 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
341 sr->was_transmitted = GNUNET_YES;
342 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
343 "Sending query for %s via cadet to %s\n",
344 GNUNET_h2s (&sr->query),
345 GNUNET_i2s (&mh->target));
346 sqm.header.size = htons (sizeof (sqm));
347 sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
348 sqm.type = htonl (sr->type);
349 sqm.query = sr->query;
350 GNUNET_memcpy (buf, &sqm, sizeof (sqm));
351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
352 "Successfully transmitted %u bytes via cadet to %s\n",
353 (unsigned int) size,
354 GNUNET_i2s (&mh->target));
355 transmit_pending (mh);
356 return sizeof (sqm);
357}
358
359
360/**
361 * Transmit pending requests via the cadet.
362 *
363 * @param mh cadet to process
364 */
365static void
366transmit_pending (struct CadetHandle *mh)
367{
368 if (NULL == mh->channel)
369 return;
370 if (NULL != mh->wh)
371 return;
372 mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /* allow cork */,
373 GNUNET_TIME_UNIT_FOREVER_REL,
374 sizeof (struct CadetQueryMessage),
375 &transmit_sqm, mh);
376}
377
378
379/**
380 * Closure for handle_reply(). 246 * Closure for handle_reply().
381 */ 247 */
382struct HandleReplyClosure 248struct HandleReplyClosure
@@ -393,7 +259,7 @@ struct HandleReplyClosure
393 struct GNUNET_TIME_Absolute expiration; 259 struct GNUNET_TIME_Absolute expiration;
394 260
395 /** 261 /**
396 * Number of bytes in 'data'. 262 * Number of bytes in @e data.
397 */ 263 */
398 size_t data_size; 264 size_t data_size;
399 265
@@ -439,19 +305,24 @@ process_reply (void *cls,
439 305
440 306
441/** 307/**
442 * Functions with this signature are called whenever a complete reply 308 * Iterator called on each entry in a waiting map to
443 * is received. 309 * call the 'proc' continuation and release associated
310 * resources.
444 * 311 *
445 * @param cls closure with the `struct CadetHandle` 312 * @param cls the `struct CadetHandle`
446 * @param srm the actual message 313 * @param key the key of the entry in the map (the query)
447 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing 314 * @param value the `struct GSF_CadetRequest` to clean up
315 * @return #GNUNET_YES (continue to iterate)
448 */ 316 */
449static int 317static int
450check_reply (void *cls, 318free_waiting_entry (void *cls,
451 const struct CadetReplyMessage *srm) 319 const struct GNUNET_HashCode *key,
320 void *value)
452{ 321{
453 /* We check later... */ 322 struct GSF_CadetRequest *sr = value;
454 return GNUNET_OK; 323
324 GSF_cadet_query_cancel (sr);
325 return GNUNET_YES;
455} 326}
456 327
457 328
@@ -517,28 +388,6 @@ handle_reply (void *cls,
517 388
518 389
519/** 390/**
520 * Iterator called on each entry in a waiting map to
521 * call the 'proc' continuation and release associated
522 * resources.
523 *
524 * @param cls the `struct CadetHandle`
525 * @param key the key of the entry in the map (the query)
526 * @param value the `struct GSF_CadetRequest` to clean up
527 * @return #GNUNET_YES (continue to iterate)
528 */
529static int
530free_waiting_entry (void *cls,
531 const struct GNUNET_HashCode *key,
532 void *value)
533{
534 struct GSF_CadetRequest *sr = value;
535
536 GSF_cadet_query_cancel (sr);
537 return GNUNET_YES;
538}
539
540
541/**
542 * Function called by cadet when a client disconnects. 391 * Function called by cadet when a client disconnects.
543 * Cleans up our `struct CadetClient` of that channel. 392 * Cleans up our `struct CadetClient` of that channel.
544 * 393 *
@@ -569,8 +418,6 @@ disconnect_cb (void *cls,
569 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, 418 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
570 &free_waiting_entry, 419 &free_waiting_entry,
571 mh); 420 mh);
572 if (NULL != mh->wh)
573 GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
574 if (NULL != mh->timeout_task) 421 if (NULL != mh->timeout_task)
575 GNUNET_SCHEDULER_cancel (mh->timeout_task); 422 GNUNET_SCHEDULER_cancel (mh->timeout_task);
576 if (NULL != mh->reset_task) 423 if (NULL != mh->reset_task)
@@ -602,6 +449,133 @@ window_change_cb (void *cls,
602 int window_size) 449 int window_size)
603{ 450{
604 /* FIXME: for flow control, implement? */ 451 /* FIXME: for flow control, implement? */
452#if 0
453 /* Something like this instead of the GNUNET_MQ_notify_sent() in
454 transmit_pending() might be good (once the window change CB works...) */
455 if (0 < window_size) /* test needed? */
456 transmit_pending (mh);
457#endif
458}
459
460
461/**
462 * We had a serious error, tear down and re-create cadet from scratch.
463 *
464 * @param mh cadet to reset
465 */
466static void
467reset_cadet (struct CadetHandle *mh)
468{
469 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
470 "Resetting cadet channel to %s\n",
471 GNUNET_i2s (&mh->target));
472 GNUNET_CADET_channel_destroy (mh->channel);
473 mh->channel = NULL;
474 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
475 &move_to_pending,
476 mh);
477 {
478 struct GNUNET_MQ_MessageHandler handlers[] = {
479 GNUNET_MQ_hd_var_size (reply,
480 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
481 struct CadetReplyMessage,
482 mh),
483 GNUNET_MQ_handler_end ()
484 };
485 struct GNUNET_HashCode port;
486
487 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
488 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
489 &port);
490 mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
491 mh,
492 &mh->target,
493 &port,
494 GNUNET_CADET_OPTION_RELIABLE,
495 &window_change_cb,
496 &disconnect_cb,
497 handlers);
498 }
499 transmit_pending (mh);
500}
501
502
503/**
504 * Task called when it is time to destroy an inactive cadet channel.
505 *
506 * @param cls the `struct CadetHandle` to tear down
507 */
508static void
509cadet_timeout (void *cls)
510{
511 struct CadetHandle *mh = cls;
512 struct GNUNET_CADET_Channel *tun;
513
514 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
515 "Timeout on cadet channel to %s\n",
516 GNUNET_i2s (&mh->target));
517 mh->timeout_task = NULL;
518 tun = mh->channel;
519 mh->channel = NULL;
520 if (NULL != tun)
521 GNUNET_CADET_channel_destroy (tun);
522}
523
524
525/**
526 * Task called when it is time to reset an cadet.
527 *
528 * @param cls the `struct CadetHandle` to tear down
529 */
530static void
531reset_cadet_task (void *cls)
532{
533 struct CadetHandle *mh = cls;
534
535 mh->reset_task = NULL;
536 reset_cadet (mh);
537}
538
539
540/**
541 * Transmit pending requests via the cadet.
542 *
543 * @param cls `struct CadetHandle` to process
544 */
545static void
546transmit_pending (void *cls)
547{
548 struct CadetHandle *mh = cls;
549 struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
550 struct GSF_CadetRequest *sr;
551 struct GNUNET_MQ_Envelope *env;
552 struct CadetQueryMessage *sqm;
553
554 if ( (0 != GNUNET_MQ_get_length (mq)) ||
555 (NULL == (sr = mh->pending_head)) )
556 return;
557 GNUNET_CONTAINER_DLL_remove (mh->pending_head,
558 mh->pending_tail,
559 sr);
560 GNUNET_assert (GNUNET_OK ==
561 GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
562 &sr->query,
563 sr,
564 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
565 sr->was_transmitted = GNUNET_YES;
566 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
567 "Sending query for %s via cadet to %s\n",
568 GNUNET_h2s (&sr->query),
569 GNUNET_i2s (&mh->target));
570 env = GNUNET_MQ_msg (sqm,
571 GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
572 sqm->type = htonl (sr->type);
573 sqm->query = sr->query;
574 GNUNET_MQ_notify_sent (env,
575 &transmit_pending,
576 mh);
577 GNUNET_MQ_send (mq,
578 env);
605} 579}
606 580
607 581
@@ -614,7 +588,6 @@ static struct CadetHandle *
614get_cadet (const struct GNUNET_PeerIdentity *target) 588get_cadet (const struct GNUNET_PeerIdentity *target)
615{ 589{
616 struct CadetHandle *mh; 590 struct CadetHandle *mh;
617 struct GNUNET_HashCode port;
618 591
619 mh = GNUNET_CONTAINER_multipeermap_get (cadet_map, 592 mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
620 target); 593 target);
@@ -641,10 +614,6 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
641 &mh->target, 614 &mh->target,
642 mh, 615 mh,
643 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 616 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
644 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
645 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
646 &port);
647
648 { 617 {
649 struct GNUNET_MQ_MessageHandler handlers[] = { 618 struct GNUNET_MQ_MessageHandler handlers[] = {
650 GNUNET_MQ_hd_var_size (reply, 619 GNUNET_MQ_hd_var_size (reply,
@@ -653,7 +622,11 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
653 mh), 622 mh),
654 GNUNET_MQ_handler_end () 623 GNUNET_MQ_handler_end ()
655 }; 624 };
625 struct GNUNET_HashCode port;
656 626
627 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
628 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
629 &port);
657 mh->channel = GNUNET_CADET_channel_creatE (cadet_handle, 630 mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
658 mh, 631 mh,
659 &mh->target, 632 &mh->target,
@@ -679,9 +652,10 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
679 */ 652 */
680struct GSF_CadetRequest * 653struct GSF_CadetRequest *
681GSF_cadet_query (const struct GNUNET_PeerIdentity *target, 654GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
682 const struct GNUNET_HashCode *query, 655 const struct GNUNET_HashCode *query,
683 enum GNUNET_BLOCK_Type type, 656 enum GNUNET_BLOCK_Type type,
684 GSF_CadetReplyProcessor proc, void *proc_cls) 657 GSF_CadetReplyProcessor proc,
658 void *proc_cls)
685{ 659{
686 struct CadetHandle *mh; 660 struct CadetHandle *mh;
687 struct GSF_CadetRequest *sr; 661 struct GSF_CadetRequest *sr;