aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_cadet_client.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-02-17 11:06:15 +0100
committerChristian Grothoff <christian@grothoff.org>2017-02-17 11:06:15 +0100
commit9727e5e53721dace7abbcc5bcd28c838af4291cc (patch)
treeca32ed19cf0d4129d3497261531aa40a19599280 /src/fs/gnunet-service-fs_cadet_client.c
parentc793bffc39fe1445616c9d0cb071d62575dea217 (diff)
downloadgnunet-9727e5e53721dace7abbcc5bcd28c838af4291cc.tar.gz
gnunet-9727e5e53721dace7abbcc5bcd28c838af4291cc.zip
convert to new CADET API, not working due to CADET-API internal bugs
Diffstat (limited to 'src/fs/gnunet-service-fs_cadet_client.c')
-rw-r--r--src/fs/gnunet-service-fs_cadet_client.c290
1 files changed, 144 insertions, 146 deletions
diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c
index 4e268b93c..193fe2263 100644
--- a/src/fs/gnunet-service-fs_cadet_client.c
+++ b/src/fs/gnunet-service-fs_cadet_client.c
@@ -155,13 +155,13 @@ struct CadetHandle
155/** 155/**
156 * Cadet channel for creating outbound channels. 156 * Cadet channel for creating outbound channels.
157 */ 157 */
158static struct GNUNET_CADET_Handle *cadet_handle; 158struct GNUNET_CADET_Handle *cadet_handle;
159 159
160/** 160/**
161 * Map from peer identities to 'struct CadetHandles' with cadet 161 * Map from peer identities to 'struct CadetHandles' with cadet
162 * channels to those peers. 162 * channels to those peers.
163 */ 163 */
164static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map; 164struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
165 165
166 166
167/* ********************* client-side code ************************* */ 167/* ********************* client-side code ************************* */
@@ -419,9 +419,9 @@ struct HandleReplyClosure
419 * @return #GNUNET_YES (continue to iterate) 419 * @return #GNUNET_YES (continue to iterate)
420 */ 420 */
421static int 421static int
422handle_reply (void *cls, 422process_reply (void *cls,
423 const struct GNUNET_HashCode *key, 423 const struct GNUNET_HashCode *key,
424 void *value) 424 void *value)
425{ 425{
426 struct HandleReplyClosure *hrc = cls; 426 struct HandleReplyClosure *hrc = cls;
427 struct GSF_CadetRequest *sr = value; 427 struct GSF_CadetRequest *sr = value;
@@ -443,38 +443,43 @@ handle_reply (void *cls,
443 * is received. 443 * is received.
444 * 444 *
445 * @param cls closure with the `struct CadetHandle` 445 * @param cls closure with the `struct CadetHandle`
446 * @param channel channel handle 446 * @param srm the actual message
447 * @param channel_ctx channel context
448 * @param message the actual message
449 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing 447 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
450 */ 448 */
451static int 449static int
452reply_cb (void *cls, 450check_reply (void *cls,
453 struct GNUNET_CADET_Channel *channel, 451 const struct CadetReplyMessage *srm)
454 void **channel_ctx,
455 const struct GNUNET_MessageHeader *message)
456{ 452{
457 struct CadetHandle *mh = *channel_ctx; 453 /* We check later... */
458 const struct CadetReplyMessage *srm; 454 return GNUNET_OK;
455}
456
457
458/**
459 * Functions with this signature are called whenever a complete reply
460 * is received.
461 *
462 * @param cls closure with the `struct CadetHandle`
463 * @param srm the actual message
464 */
465static void
466handle_reply (void *cls,
467 const struct CadetReplyMessage *srm)
468{
469 struct CadetHandle *mh = cls;
459 struct HandleReplyClosure hrc; 470 struct HandleReplyClosure hrc;
460 uint16_t msize; 471 uint16_t msize;
461 enum GNUNET_BLOCK_Type type; 472 enum GNUNET_BLOCK_Type type;
462 struct GNUNET_HashCode query; 473 struct GNUNET_HashCode query;
463 474
464 msize = ntohs (message->size); 475 msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage);
465 if (sizeof (struct CadetReplyMessage) > msize)
466 {
467 GNUNET_break_op (0);
468 reset_cadet_async (mh);
469 return GNUNET_SYSERR;
470 }
471 srm = (const struct CadetReplyMessage *) message;
472 msize -= sizeof (struct CadetReplyMessage);
473 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); 476 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
474 if (GNUNET_YES != 477 if (GNUNET_YES !=
475 GNUNET_BLOCK_get_key (GSF_block_ctx, 478 GNUNET_BLOCK_get_key (GSF_block_ctx,
476 type, 479 type,
477 &srm[1], msize, &query)) 480 &srm[1],
481 msize,
482 &query))
478 { 483 {
479 GNUNET_break_op (0); 484 GNUNET_break_op (0);
480 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 485 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -483,13 +488,13 @@ reply_cb (void *cls,
483 msize, 488 msize,
484 GNUNET_i2s (&mh->target)); 489 GNUNET_i2s (&mh->target));
485 reset_cadet_async (mh); 490 reset_cadet_async (mh);
486 return GNUNET_SYSERR; 491 return;
487 } 492 }
488 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 493 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489 "Received reply `%s' via cadet from peer %s\n", 494 "Received reply `%s' via cadet from peer %s\n",
490 GNUNET_h2s (&query), 495 GNUNET_h2s (&query),
491 GNUNET_i2s (&mh->target)); 496 GNUNET_i2s (&mh->target));
492 GNUNET_CADET_receive_done (channel); 497 GNUNET_CADET_receive_done (mh->channel);
493 GNUNET_STATISTICS_update (GSF_stats, 498 GNUNET_STATISTICS_update (GSF_stats,
494 gettext_noop ("# replies received via cadet"), 1, 499 gettext_noop ("# replies received via cadet"), 1,
495 GNUNET_NO); 500 GNUNET_NO);
@@ -500,16 +505,103 @@ reply_cb (void *cls,
500 hrc.found = GNUNET_NO; 505 hrc.found = GNUNET_NO;
501 GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map, 506 GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
502 &query, 507 &query,
503 &handle_reply, 508 &process_reply,
504 &hrc); 509 &hrc);
505 if (GNUNET_NO == hrc.found) 510 if (GNUNET_NO == hrc.found)
506 { 511 {
507 GNUNET_STATISTICS_update (GSF_stats, 512 GNUNET_STATISTICS_update (GSF_stats,
508 gettext_noop ("# replies received via cadet dropped"), 1, 513 gettext_noop ("# replies received via cadet dropped"), 1,
509 GNUNET_NO); 514 GNUNET_NO);
510 return GNUNET_OK;
511 } 515 }
512 return GNUNET_OK; 516}
517
518
519/**
520 * Iterator called on each entry in a waiting map to
521 * call the 'proc' continuation and release associated
522 * resources.
523 *
524 * @param cls the `struct CadetHandle`
525 * @param key the key of the entry in the map (the query)
526 * @param value the `struct GSF_CadetRequest` to clean up
527 * @return #GNUNET_YES (continue to iterate)
528 */
529static int
530free_waiting_entry (void *cls,
531 const struct GNUNET_HashCode *key,
532 void *value)
533{
534 struct GSF_CadetRequest *sr = value;
535
536 GSF_cadet_query_cancel (sr);
537 return GNUNET_YES;
538}
539
540
541/**
542 * Function called by cadet when a client disconnects.
543 * Cleans up our `struct CadetClient` of that channel.
544 *
545 * @param cls our `struct CadetClient`
546 * @param channel channel of the disconnecting client
547 */
548static void
549disconnect_cb (void *cls,
550 const struct GNUNET_CADET_Channel *channel)
551{
552 struct CadetHandle *mh = cls;
553 struct GSF_CadetRequest *sr;
554
555 if (NULL == mh->channel)
556 return; /* being destroyed elsewhere */
557 GNUNET_assert (channel == mh->channel);
558 mh->channel = NULL;
559 while (NULL != (sr = mh->pending_head))
560 GSF_cadet_query_cancel (sr);
561 /* first remove `mh` from the `cadet_map`, so that if the
562 callback from `free_waiting_entry()` happens to re-issue
563 the request, we don't immediately have it back in the
564 `waiting_map`. */
565 GNUNET_assert (GNUNET_OK ==
566 GNUNET_CONTAINER_multipeermap_remove (cadet_map,
567 &mh->target,
568 mh));
569 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
570 &free_waiting_entry,
571 mh);
572 if (NULL != mh->wh)
573 GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
574 if (NULL != mh->timeout_task)
575 GNUNET_SCHEDULER_cancel (mh->timeout_task);
576 if (NULL != mh->reset_task)
577 GNUNET_SCHEDULER_cancel (mh->reset_task);
578 GNUNET_assert (0 ==
579 GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
580 GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
581 GNUNET_free (mh);
582}
583
584
585/**
586 * Function called whenever an MQ-channel's transmission window size changes.
587 *
588 * The first callback in an outgoing channel will be with a non-zero value
589 * and will mean the channel is connected to the destination.
590 *
591 * For an incoming channel it will be called immediately after the
592 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
593 *
594 * @param cls Channel closure.
595 * @param channel Connection to the other end (henceforth invalid).
596 * @param window_size New window size. If the is more messages than buffer size
597 * this value will be negative..
598 */
599static void
600window_change_cb (void *cls,
601 const struct GNUNET_CADET_Channel *channel,
602 int window_size)
603{
604 /* FIXME: for flow control, implement? */
513} 605}
514 606
515 607
@@ -552,14 +644,25 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
552 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, 644 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
553 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), 645 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
554 &port); 646 &port);
555 mh->channel = GNUNET_CADET_channel_create (cadet_handle, 647
556 mh, 648 {
557 &mh->target, 649 struct GNUNET_MQ_MessageHandler handlers[] = {
558 &port, 650 GNUNET_MQ_hd_var_size (reply,
559 GNUNET_CADET_OPTION_RELIABLE); 651 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
560 GNUNET_assert (mh == 652 struct CadetReplyMessage,
561 GNUNET_CONTAINER_multipeermap_get (cadet_map, 653 mh),
562 target)); 654 GNUNET_MQ_handler_end ()
655 };
656
657 mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
658 mh,
659 &mh->target,
660 &port,
661 GNUNET_CADET_OPTION_RELIABLE,
662 &window_change_cb,
663 &disconnect_cb,
664 handlers);
665 }
563 return mh; 666 return mh;
564} 667}
565 668
@@ -646,93 +749,6 @@ GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
646 749
647 750
648/** 751/**
649 * Iterator called on each entry in a waiting map to
650 * call the 'proc' continuation and release associated
651 * resources.
652 *
653 * @param cls the `struct CadetHandle`
654 * @param key the key of the entry in the map (the query)
655 * @param value the `struct GSF_CadetRequest` to clean up
656 * @return #GNUNET_YES (continue to iterate)
657 */
658static int
659free_waiting_entry (void *cls,
660 const struct GNUNET_HashCode *key,
661 void *value)
662{
663 struct GSF_CadetRequest *sr = value;
664
665 GSF_cadet_query_cancel (sr);
666 return GNUNET_YES;
667}
668
669
670/**
671 * Function called by cadet when a client disconnects.
672 * Cleans up our `struct CadetClient` of that channel.
673 *
674 * @param cls NULL
675 * @param channel channel of the disconnecting client
676 * @param channel_ctx our `struct CadetClient`
677 */
678static void
679cleaner_cb (void *cls,
680 const struct GNUNET_CADET_Channel *channel,
681 void *channel_ctx)
682{
683 struct CadetHandle *mh = channel_ctx;
684 struct GSF_CadetRequest *sr;
685
686 if (NULL == mh->channel)
687 return; /* being destroyed elsewhere */
688 GNUNET_assert (channel == mh->channel);
689 mh->channel = NULL;
690 while (NULL != (sr = mh->pending_head))
691 GSF_cadet_query_cancel (sr);
692 /* first remove `mh` from the `cadet_map`, so that if the
693 callback from `free_waiting_entry()` happens to re-issue
694 the request, we don't immediately have it back in the
695 `waiting_map`. */
696 GNUNET_assert (GNUNET_OK ==
697 GNUNET_CONTAINER_multipeermap_remove (cadet_map,
698 &mh->target,
699 mh));
700 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
701 &free_waiting_entry,
702 mh);
703 if (NULL != mh->wh)
704 GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
705 if (NULL != mh->timeout_task)
706 GNUNET_SCHEDULER_cancel (mh->timeout_task);
707 if (NULL != mh->reset_task)
708 GNUNET_SCHEDULER_cancel (mh->reset_task);
709 GNUNET_assert (0 ==
710 GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
711 GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
712 GNUNET_free (mh);
713}
714
715
716/**
717 * Initialize subsystem for non-anonymous file-sharing.
718 */
719void
720GSF_cadet_start_client ()
721{
722 static const struct GNUNET_CADET_MessageHandler handlers[] = {
723 { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 },
724 { NULL, 0, 0 }
725 };
726
727 cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
728 cadet_handle = GNUNET_CADET_connect (GSF_cfg,
729 NULL,
730 &cleaner_cb,
731 handlers);
732}
733
734
735/**
736 * Function called on each active cadets to shut them down. 752 * Function called on each active cadets to shut them down.
737 * 753 *
738 * @param cls NULL 754 * @param cls NULL
@@ -740,10 +756,10 @@ GSF_cadet_start_client ()
740 * @param value the `struct CadetHandle` to destroy 756 * @param value the `struct CadetHandle` to destroy
741 * @return #GNUNET_YES (continue to iterate) 757 * @return #GNUNET_YES (continue to iterate)
742 */ 758 */
743static int 759int
744release_cadets (void *cls, 760GSF_cadet_release_clients (void *cls,
745 const struct GNUNET_PeerIdentity *key, 761 const struct GNUNET_PeerIdentity *key,
746 void *value) 762 void *value)
747{ 763{
748 struct CadetHandle *mh = value; 764 struct CadetHandle *mh = value;
749 765
@@ -756,23 +772,5 @@ release_cadets (void *cls,
756} 772}
757 773
758 774
759/**
760 * Shutdown subsystem for non-anonymous file-sharing.
761 */
762void
763GSF_cadet_stop_client ()
764{
765 GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
766 &release_cadets,
767 NULL);
768 GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
769 cadet_map = NULL;
770 if (NULL != cadet_handle)
771 {
772 GNUNET_CADET_disconnect (cadet_handle);
773 cadet_handle = NULL;
774 }
775}
776
777 775
778/* end of gnunet-service-fs_cadet_client.c */ 776/* end of gnunet-service-fs_cadet_client.c */