aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-02-17 11:06:15 +0100
committerChristian Grothoff <christian@grothoff.org>2017-02-17 11:06:15 +0100
commit9727e5e53721dace7abbcc5bcd28c838af4291cc (patch)
treeca32ed19cf0d4129d3497261531aa40a19599280 /src/fs
parentc793bffc39fe1445616c9d0cb071d62575dea217 (diff)
downloadgnunet-9727e5e53721dace7abbcc5bcd28c838af4291cc.tar.gz
gnunet-9727e5e53721dace7abbcc5bcd28c838af4291cc.zip
convert to new CADET API, not working due to CADET-API internal bugs
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/gnunet-service-fs.c2
-rw-r--r--src/fs/gnunet-service-fs_cadet.h49
-rw-r--r--src/fs/gnunet-service-fs_cadet_client.c290
-rw-r--r--src/fs/gnunet-service-fs_cadet_server.c286
4 files changed, 290 insertions, 337 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index e38fdb032..8c605c6a2 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -1177,7 +1177,6 @@ handle_client_unindex (void *cls,
1177static void 1177static void
1178shutdown_task (void *cls) 1178shutdown_task (void *cls)
1179{ 1179{
1180 GSF_cadet_stop_client ();
1181 GSF_cadet_stop_server (); 1180 GSF_cadet_stop_server ();
1182 if (NULL != GSF_core) 1181 if (NULL != GSF_core)
1183 { 1182 {
@@ -1320,7 +1319,6 @@ main_init (const struct GNUNET_CONFIGURATION_Handle *c)
1320 NULL); 1319 NULL);
1321 datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); 1320 datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
1322 GSF_cadet_start_server (); 1321 GSF_cadet_start_server ();
1323 GSF_cadet_start_client ();
1324 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 1322 GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
1325 NULL); 1323 NULL);
1326 return GNUNET_OK; 1324 return GNUNET_OK;
diff --git a/src/fs/gnunet-service-fs_cadet.h b/src/fs/gnunet-service-fs_cadet.h
index 060a3993c..1fbd3a406 100644
--- a/src/fs/gnunet-service-fs_cadet.h
+++ b/src/fs/gnunet-service-fs_cadet.h
@@ -1,6 +1,6 @@
1/* 1/*
2 This file is part of GNUnet. 2 This file is part of GNUnet.
3 Copyright (C) 2012 GNUnet e.V. 3 Copyright (C) 2012, 2017 GNUnet e.V.
4 4
5 GNUnet is free software; you can redistribute it and/or modify 5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published 6 it under the terms of the GNU General Public License as published
@@ -38,14 +38,15 @@ struct GSF_CadetRequest;
38 * @param cls closure 38 * @param cls closure
39 * @param type type of the block, ANY on error 39 * @param type type of the block, ANY on error
40 * @param expiration expiration time for the block 40 * @param expiration expiration time for the block
41 * @param data_size number of bytes in 'data', 0 on error 41 * @param data_size number of bytes in @a data, 0 on error
42 * @param data reply block data, NULL on error 42 * @param data reply block data, NULL on error
43 */ 43 */
44typedef void (*GSF_CadetReplyProcessor)(void *cls, 44typedef void
45 enum GNUNET_BLOCK_Type type, 45(*GSF_CadetReplyProcessor)(void *cls,
46 struct GNUNET_TIME_Absolute expiration, 46 enum GNUNET_BLOCK_Type type,
47 size_t data_size, 47 struct GNUNET_TIME_Absolute expiration,
48 const void *data); 48 size_t data_size,
49 const void *data);
49 50
50 51
51/** 52/**
@@ -55,14 +56,28 @@ typedef void (*GSF_CadetReplyProcessor)(void *cls,
55 * @param query hash to query for the block 56 * @param query hash to query for the block
56 * @param type desired type for the block 57 * @param type desired type for the block
57 * @param proc function to call with result 58 * @param proc function to call with result
58 * @param proc_cls closure for 'proc' 59 * @param proc_cls closure for @a proc
59 * @return handle to cancel the operation 60 * @return handle to cancel the operation
60 */ 61 */
61struct GSF_CadetRequest * 62struct GSF_CadetRequest *
62GSF_cadet_query (const struct GNUNET_PeerIdentity *target, 63GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
63 const struct GNUNET_HashCode *query, 64 const struct GNUNET_HashCode *query,
64 enum GNUNET_BLOCK_Type type, 65 enum GNUNET_BLOCK_Type type,
65 GSF_CadetReplyProcessor proc, void *proc_cls); 66 GSF_CadetReplyProcessor proc,
67 void *proc_cls);
68
69/**
70 * Function called on each active cadets to shut them down.
71 *
72 * @param cls NULL
73 * @param key target peer, unused
74 * @param value the `struct CadetHandle` to destroy
75 * @return #GNUNET_YES (continue to iterate)
76 */
77int
78GSF_cadet_release_clients (void *cls,
79 const struct GNUNET_PeerIdentity *key,
80 void *value);
66 81
67 82
68/** 83/**
@@ -89,17 +104,15 @@ void
89GSF_cadet_stop_server (void); 104GSF_cadet_stop_server (void);
90 105
91/** 106/**
92 * Initialize subsystem for non-anonymous file-sharing. 107 * Cadet channel for creating outbound channels.
93 */ 108 */
94void 109extern struct GNUNET_CADET_Handle *cadet_handle;
95GSF_cadet_start_client (void);
96
97 110
98/** 111/**
99 * Shutdown subsystem for non-anonymous file-sharing. 112 * Map from peer identities to 'struct CadetHandles' with cadet
113 * channels to those peers.
100 */ 114 */
101void 115extern struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
102GSF_cadet_stop_client (void);
103 116
104 117
105GNUNET_NETWORK_STRUCT_BEGIN 118GNUNET_NETWORK_STRUCT_BEGIN
diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c
index 4e268b93c..193fe2263 100644
--- a/src/fs/gnunet-service-fs_cadet_client.c
+++ b/src/fs/gnunet-service-fs_cadet_client.c
@@ -155,13 +155,13 @@ struct CadetHandle
155/** 155/**
156 * Cadet channel for creating outbound channels. 156 * Cadet channel for creating outbound channels.
157 */ 157 */
158static struct GNUNET_CADET_Handle *cadet_handle; 158struct GNUNET_CADET_Handle *cadet_handle;
159 159
160/** 160/**
161 * Map from peer identities to 'struct CadetHandles' with cadet 161 * Map from peer identities to 'struct CadetHandles' with cadet
162 * channels to those peers. 162 * channels to those peers.
163 */ 163 */
164static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map; 164struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
165 165
166 166
167/* ********************* client-side code ************************* */ 167/* ********************* client-side code ************************* */
@@ -419,9 +419,9 @@ struct HandleReplyClosure
419 * @return #GNUNET_YES (continue to iterate) 419 * @return #GNUNET_YES (continue to iterate)
420 */ 420 */
421static int 421static int
422handle_reply (void *cls, 422process_reply (void *cls,
423 const struct GNUNET_HashCode *key, 423 const struct GNUNET_HashCode *key,
424 void *value) 424 void *value)
425{ 425{
426 struct HandleReplyClosure *hrc = cls; 426 struct HandleReplyClosure *hrc = cls;
427 struct GSF_CadetRequest *sr = value; 427 struct GSF_CadetRequest *sr = value;
@@ -443,38 +443,43 @@ handle_reply (void *cls,
443 * is received. 443 * is received.
444 * 444 *
445 * @param cls closure with the `struct CadetHandle` 445 * @param cls closure with the `struct CadetHandle`
446 * @param channel channel handle 446 * @param srm the actual message
447 * @param channel_ctx channel context
448 * @param message the actual message
449 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing 447 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
450 */ 448 */
451static int 449static int
452reply_cb (void *cls, 450check_reply (void *cls,
453 struct GNUNET_CADET_Channel *channel, 451 const struct CadetReplyMessage *srm)
454 void **channel_ctx,
455 const struct GNUNET_MessageHeader *message)
456{ 452{
457 struct CadetHandle *mh = *channel_ctx; 453 /* We check later... */
458 const struct CadetReplyMessage *srm; 454 return GNUNET_OK;
455}
456
457
458/**
459 * Functions with this signature are called whenever a complete reply
460 * is received.
461 *
462 * @param cls closure with the `struct CadetHandle`
463 * @param srm the actual message
464 */
465static void
466handle_reply (void *cls,
467 const struct CadetReplyMessage *srm)
468{
469 struct CadetHandle *mh = cls;
459 struct HandleReplyClosure hrc; 470 struct HandleReplyClosure hrc;
460 uint16_t msize; 471 uint16_t msize;
461 enum GNUNET_BLOCK_Type type; 472 enum GNUNET_BLOCK_Type type;
462 struct GNUNET_HashCode query; 473 struct GNUNET_HashCode query;
463 474
464 msize = ntohs (message->size); 475 msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage);
465 if (sizeof (struct CadetReplyMessage) > msize)
466 {
467 GNUNET_break_op (0);
468 reset_cadet_async (mh);
469 return GNUNET_SYSERR;
470 }
471 srm = (const struct CadetReplyMessage *) message;
472 msize -= sizeof (struct CadetReplyMessage);
473 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); 476 type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
474 if (GNUNET_YES != 477 if (GNUNET_YES !=
475 GNUNET_BLOCK_get_key (GSF_block_ctx, 478 GNUNET_BLOCK_get_key (GSF_block_ctx,
476 type, 479 type,
477 &srm[1], msize, &query)) 480 &srm[1],
481 msize,
482 &query))
478 { 483 {
479 GNUNET_break_op (0); 484 GNUNET_break_op (0);
480 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 485 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -483,13 +488,13 @@ reply_cb (void *cls,
483 msize, 488 msize,
484 GNUNET_i2s (&mh->target)); 489 GNUNET_i2s (&mh->target));
485 reset_cadet_async (mh); 490 reset_cadet_async (mh);
486 return GNUNET_SYSERR; 491 return;
487 } 492 }
488 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 493 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
489 "Received reply `%s' via cadet from peer %s\n", 494 "Received reply `%s' via cadet from peer %s\n",
490 GNUNET_h2s (&query), 495 GNUNET_h2s (&query),
491 GNUNET_i2s (&mh->target)); 496 GNUNET_i2s (&mh->target));
492 GNUNET_CADET_receive_done (channel); 497 GNUNET_CADET_receive_done (mh->channel);
493 GNUNET_STATISTICS_update (GSF_stats, 498 GNUNET_STATISTICS_update (GSF_stats,
494 gettext_noop ("# replies received via cadet"), 1, 499 gettext_noop ("# replies received via cadet"), 1,
495 GNUNET_NO); 500 GNUNET_NO);
@@ -500,16 +505,103 @@ reply_cb (void *cls,
500 hrc.found = GNUNET_NO; 505 hrc.found = GNUNET_NO;
501 GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map, 506 GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
502 &query, 507 &query,
503 &handle_reply, 508 &process_reply,
504 &hrc); 509 &hrc);
505 if (GNUNET_NO == hrc.found) 510 if (GNUNET_NO == hrc.found)
506 { 511 {
507 GNUNET_STATISTICS_update (GSF_stats, 512 GNUNET_STATISTICS_update (GSF_stats,
508 gettext_noop ("# replies received via cadet dropped"), 1, 513 gettext_noop ("# replies received via cadet dropped"), 1,
509 GNUNET_NO); 514 GNUNET_NO);
510 return GNUNET_OK;
511 } 515 }
512 return GNUNET_OK; 516}
517
518
519/**
520 * Iterator called on each entry in a waiting map to
521 * call the 'proc' continuation and release associated
522 * resources.
523 *
524 * @param cls the `struct CadetHandle`
525 * @param key the key of the entry in the map (the query)
526 * @param value the `struct GSF_CadetRequest` to clean up
527 * @return #GNUNET_YES (continue to iterate)
528 */
529static int
530free_waiting_entry (void *cls,
531 const struct GNUNET_HashCode *key,
532 void *value)
533{
534 struct GSF_CadetRequest *sr = value;
535
536 GSF_cadet_query_cancel (sr);
537 return GNUNET_YES;
538}
539
540
541/**
542 * Function called by cadet when a client disconnects.
543 * Cleans up our `struct CadetClient` of that channel.
544 *
545 * @param cls our `struct CadetClient`
546 * @param channel channel of the disconnecting client
547 */
548static void
549disconnect_cb (void *cls,
550 const struct GNUNET_CADET_Channel *channel)
551{
552 struct CadetHandle *mh = cls;
553 struct GSF_CadetRequest *sr;
554
555 if (NULL == mh->channel)
556 return; /* being destroyed elsewhere */
557 GNUNET_assert (channel == mh->channel);
558 mh->channel = NULL;
559 while (NULL != (sr = mh->pending_head))
560 GSF_cadet_query_cancel (sr);
561 /* first remove `mh` from the `cadet_map`, so that if the
562 callback from `free_waiting_entry()` happens to re-issue
563 the request, we don't immediately have it back in the
564 `waiting_map`. */
565 GNUNET_assert (GNUNET_OK ==
566 GNUNET_CONTAINER_multipeermap_remove (cadet_map,
567 &mh->target,
568 mh));
569 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
570 &free_waiting_entry,
571 mh);
572 if (NULL != mh->wh)
573 GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
574 if (NULL != mh->timeout_task)
575 GNUNET_SCHEDULER_cancel (mh->timeout_task);
576 if (NULL != mh->reset_task)
577 GNUNET_SCHEDULER_cancel (mh->reset_task);
578 GNUNET_assert (0 ==
579 GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
580 GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
581 GNUNET_free (mh);
582}
583
584
585/**
586 * Function called whenever an MQ-channel's transmission window size changes.
587 *
588 * The first callback in an outgoing channel will be with a non-zero value
589 * and will mean the channel is connected to the destination.
590 *
591 * For an incoming channel it will be called immediately after the
592 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
593 *
594 * @param cls Channel closure.
595 * @param channel Connection to the other end (henceforth invalid).
596 * @param window_size New window size. If the is more messages than buffer size
597 * this value will be negative..
598 */
599static void
600window_change_cb (void *cls,
601 const struct GNUNET_CADET_Channel *channel,
602 int window_size)
603{
604 /* FIXME: for flow control, implement? */
513} 605}
514 606
515 607
@@ -552,14 +644,25 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
552 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, 644 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
553 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), 645 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
554 &port); 646 &port);
555 mh->channel = GNUNET_CADET_channel_create (cadet_handle, 647
556 mh, 648 {
557 &mh->target, 649 struct GNUNET_MQ_MessageHandler handlers[] = {
558 &port, 650 GNUNET_MQ_hd_var_size (reply,
559 GNUNET_CADET_OPTION_RELIABLE); 651 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
560 GNUNET_assert (mh == 652 struct CadetReplyMessage,
561 GNUNET_CONTAINER_multipeermap_get (cadet_map, 653 mh),
562 target)); 654 GNUNET_MQ_handler_end ()
655 };
656
657 mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
658 mh,
659 &mh->target,
660 &port,
661 GNUNET_CADET_OPTION_RELIABLE,
662 &window_change_cb,
663 &disconnect_cb,
664 handlers);
665 }
563 return mh; 666 return mh;
564} 667}
565 668
@@ -646,93 +749,6 @@ GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
646 749
647 750
648/** 751/**
649 * Iterator called on each entry in a waiting map to
650 * call the 'proc' continuation and release associated
651 * resources.
652 *
653 * @param cls the `struct CadetHandle`
654 * @param key the key of the entry in the map (the query)
655 * @param value the `struct GSF_CadetRequest` to clean up
656 * @return #GNUNET_YES (continue to iterate)
657 */
658static int
659free_waiting_entry (void *cls,
660 const struct GNUNET_HashCode *key,
661 void *value)
662{
663 struct GSF_CadetRequest *sr = value;
664
665 GSF_cadet_query_cancel (sr);
666 return GNUNET_YES;
667}
668
669
670/**
671 * Function called by cadet when a client disconnects.
672 * Cleans up our `struct CadetClient` of that channel.
673 *
674 * @param cls NULL
675 * @param channel channel of the disconnecting client
676 * @param channel_ctx our `struct CadetClient`
677 */
678static void
679cleaner_cb (void *cls,
680 const struct GNUNET_CADET_Channel *channel,
681 void *channel_ctx)
682{
683 struct CadetHandle *mh = channel_ctx;
684 struct GSF_CadetRequest *sr;
685
686 if (NULL == mh->channel)
687 return; /* being destroyed elsewhere */
688 GNUNET_assert (channel == mh->channel);
689 mh->channel = NULL;
690 while (NULL != (sr = mh->pending_head))
691 GSF_cadet_query_cancel (sr);
692 /* first remove `mh` from the `cadet_map`, so that if the
693 callback from `free_waiting_entry()` happens to re-issue
694 the request, we don't immediately have it back in the
695 `waiting_map`. */
696 GNUNET_assert (GNUNET_OK ==
697 GNUNET_CONTAINER_multipeermap_remove (cadet_map,
698 &mh->target,
699 mh));
700 GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
701 &free_waiting_entry,
702 mh);
703 if (NULL != mh->wh)
704 GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
705 if (NULL != mh->timeout_task)
706 GNUNET_SCHEDULER_cancel (mh->timeout_task);
707 if (NULL != mh->reset_task)
708 GNUNET_SCHEDULER_cancel (mh->reset_task);
709 GNUNET_assert (0 ==
710 GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
711 GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
712 GNUNET_free (mh);
713}
714
715
716/**
717 * Initialize subsystem for non-anonymous file-sharing.
718 */
719void
720GSF_cadet_start_client ()
721{
722 static const struct GNUNET_CADET_MessageHandler handlers[] = {
723 { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 },
724 { NULL, 0, 0 }
725 };
726
727 cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
728 cadet_handle = GNUNET_CADET_connect (GSF_cfg,
729 NULL,
730 &cleaner_cb,
731 handlers);
732}
733
734
735/**
736 * Function called on each active cadets to shut them down. 752 * Function called on each active cadets to shut them down.
737 * 753 *
738 * @param cls NULL 754 * @param cls NULL
@@ -740,10 +756,10 @@ GSF_cadet_start_client ()
740 * @param value the `struct CadetHandle` to destroy 756 * @param value the `struct CadetHandle` to destroy
741 * @return #GNUNET_YES (continue to iterate) 757 * @return #GNUNET_YES (continue to iterate)
742 */ 758 */
743static int 759int
744release_cadets (void *cls, 760GSF_cadet_release_clients (void *cls,
745 const struct GNUNET_PeerIdentity *key, 761 const struct GNUNET_PeerIdentity *key,
746 void *value) 762 void *value)
747{ 763{
748 struct CadetHandle *mh = value; 764 struct CadetHandle *mh = value;
749 765
@@ -756,23 +772,5 @@ release_cadets (void *cls,
756} 772}
757 773
758 774
759/**
760 * Shutdown subsystem for non-anonymous file-sharing.
761 */
762void
763GSF_cadet_stop_client ()
764{
765 GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
766 &release_cadets,
767 NULL);
768 GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
769 cadet_map = NULL;
770 if (NULL != cadet_handle)
771 {
772 GNUNET_CADET_disconnect (cadet_handle);
773 cadet_handle = NULL;
774 }
775}
776
777 775
778/* end of gnunet-service-fs_cadet_client.c */ 776/* end of gnunet-service-fs_cadet_client.c */
diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c
index ac86537c3..0a72a8279 100644
--- a/src/fs/gnunet-service-fs_cadet_server.c
+++ b/src/fs/gnunet-service-fs_cadet_server.c
@@ -124,9 +124,9 @@ struct CadetClient
124 124
125 125
126/** 126/**
127 * Listen channel for incoming requests. 127 * Listen port for incoming requests.
128 */ 128 */
129static struct GNUNET_CADET_Handle *listen_channel; 129static struct GNUNET_CADET_Port *cadet_port;
130 130
131/** 131/**
132 * Head of DLL of cadet clients. 132 * Head of DLL of cadet clients.
@@ -188,121 +188,29 @@ refresh_timeout_task (struct CadetClient *sc)
188 188
189 189
190/** 190/**
191 * We're done handling a request from a client, read the next one. 191 * Check if we are done with the write queue, and if so tell CADET
192 * that we are ready to read more.
192 * 193 *
193 * @param sc client to continue reading requests from 194 * @param cls where to process the write queue
194 */ 195 */
195static void 196static void
196continue_reading (struct CadetClient *sc) 197continue_writing (void *cls)
197{
198 refresh_timeout_task (sc);
199 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
200 "Finished processing cadet request from client %p, ready to receive the next one\n",
201 sc);
202 GNUNET_CADET_receive_done (sc->channel);
203}
204
205
206/**
207 * Transmit the next entry from the write queue.
208 *
209 * @param sc where to process the write queue
210 */
211static void
212continue_writing (struct CadetClient *sc);
213
214
215/**
216 * Send a reply now, cadet is ready.
217 *
218 * @param cls closure with the `struct CadetClient` which sent the query
219 * @param size number of bytes available in @a buf
220 * @param buf where to write the message
221 * @return number of bytes written to @a buf
222 */
223static size_t
224write_continuation (void *cls,
225 size_t size,
226 void *buf)
227{ 198{
228 struct CadetClient *sc = cls; 199 struct CadetClient *sc = cls;
229 struct GNUNET_CADET_Channel *tun; 200 struct GNUNET_MQ_Handle *mq;
230 struct WriteQueueItem *wqi;
231 size_t ret;
232
233 sc->wh = NULL;
234 if (NULL == (wqi = sc->wqi_head))
235 {
236 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
237 "Write queue empty, reading more requests\n");
238 return 0;
239 }
240 if ( (0 == size) ||
241 (size < wqi->msize) )
242 {
243 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
244 "Transmission of reply failed, terminating cadet\n");
245 tun = sc->channel;
246 sc->channel = NULL;
247 GNUNET_CADET_channel_destroy (tun);
248 return 0;
249 }
250 GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
251 sc->wqi_tail,
252 wqi);
253 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
254 "Transmitted %u byte reply via cadet to %p\n",
255 (unsigned int) size,
256 sc);
257 GNUNET_STATISTICS_update (GSF_stats,
258 gettext_noop ("# Blocks transferred via cadet"), 1,
259 GNUNET_NO);
260 ret = wqi->msize;
261 GNUNET_memcpy (buf, &wqi[1], ret);
262 GNUNET_free (wqi);
263 continue_writing (sc);
264 return ret;
265}
266
267
268/**
269 * Transmit the next entry from the write queue.
270 *
271 * @param sc where to process the write queue
272 */
273static void
274continue_writing (struct CadetClient *sc)
275{
276 struct WriteQueueItem *wqi;
277 struct GNUNET_CADET_Channel *tun;
278 201
279 if (NULL != sc->wh) 202 mq = GNUNET_CADET_get_mq (sc->channel);
203 if (0 != GNUNET_MQ_get_length (mq))
280 { 204 {
281 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 205 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
282 "Write pending, waiting for it to complete\n"); 206 "Write pending, waiting for it to complete\n");
283 return; /* write already pending */
284 }
285 if (NULL == (wqi = sc->wqi_head))
286 {
287 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
288 "Write queue empty, reading more requests\n");
289 continue_reading (sc);
290 return;
291 }
292 sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO,
293 GNUNET_TIME_UNIT_FOREVER_REL,
294 wqi->msize,
295 &write_continuation,
296 sc);
297 if (NULL == sc->wh)
298 {
299 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
300 "Write failed; terminating cadet\n");
301 tun = sc->channel;
302 sc->channel = NULL;
303 GNUNET_CADET_channel_destroy (tun);
304 return; 207 return;
305 } 208 }
209 refresh_timeout_task (sc);
210 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
211 "Finished processing cadet request from client %p, ready to receive the next one\n",
212 sc);
213 GNUNET_CADET_receive_done (sc->channel);
306} 214}
307 215
308 216
@@ -333,7 +241,7 @@ handle_datastore_reply (void *cls,
333{ 241{
334 struct CadetClient *sc = cls; 242 struct CadetClient *sc = cls;
335 size_t msize = size + sizeof (struct CadetReplyMessage); 243 size_t msize = size + sizeof (struct CadetReplyMessage);
336 struct WriteQueueItem *wqi; 244 struct GNUNET_MQ_Envelope *env;
337 struct CadetReplyMessage *srm; 245 struct CadetReplyMessage *srm;
338 246
339 sc->qe = NULL; 247 sc->qe = NULL;
@@ -357,7 +265,8 @@ handle_datastore_reply (void *cls,
357 GNUNET_h2s (key)); 265 GNUNET_h2s (key));
358 } 266 }
359 GNUNET_STATISTICS_update (GSF_stats, 267 GNUNET_STATISTICS_update (GSF_stats,
360 gettext_noop ("# queries received via CADET not answered"), 1, 268 gettext_noop ("# queries received via CADET not answered"),
269 1,
361 GNUNET_NO); 270 GNUNET_NO);
362 continue_writing (sc); 271 continue_writing (sc);
363 return; 272 return;
@@ -369,9 +278,13 @@ handle_datastore_reply (void *cls,
369 GNUNET_h2s (key)); 278 GNUNET_h2s (key));
370 if (GNUNET_OK != 279 if (GNUNET_OK !=
371 GNUNET_FS_handle_on_demand_block (key, 280 GNUNET_FS_handle_on_demand_block (key,
372 size, data, type, 281 size,
373 priority, anonymity, 282 data,
374 expiration, uid, 283 type,
284 priority,
285 anonymity,
286 expiration,
287 uid,
375 &handle_datastore_reply, 288 &handle_datastore_reply,
376 sc)) 289 sc))
377 { 290 {
@@ -394,19 +307,23 @@ handle_datastore_reply (void *cls,
394 (unsigned int) type, 307 (unsigned int) type,
395 GNUNET_h2s (key), 308 GNUNET_h2s (key),
396 sc); 309 sc);
397 wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize); 310 env = GNUNET_MQ_msg_extra (srm,
398 wqi->msize = msize; 311 size,
399 srm = (struct CadetReplyMessage *) &wqi[1]; 312 GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
400 srm->header.size = htons ((uint16_t) msize);
401 srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
402 srm->type = htonl (type); 313 srm->type = htonl (type);
403 srm->expiration = GNUNET_TIME_absolute_hton (expiration); 314 srm->expiration = GNUNET_TIME_absolute_hton (expiration);
404 GNUNET_memcpy (&srm[1], data, size); 315 GNUNET_memcpy (&srm[1],
405 sc->reply_size = msize; 316 data,
406 GNUNET_CONTAINER_DLL_insert (sc->wqi_head, 317 size);
407 sc->wqi_tail, 318 GNUNET_MQ_notify_sent (env,
408 wqi); 319 &continue_writing,
409 continue_writing (sc); 320 sc);
321 GNUNET_STATISTICS_update (GSF_stats,
322 gettext_noop ("# Blocks transferred via cadet"),
323 1,
324 GNUNET_NO);
325 GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
326 env);
410} 327}
411 328
412 329
@@ -414,30 +331,22 @@ handle_datastore_reply (void *cls,
414 * Functions with this signature are called whenever a 331 * Functions with this signature are called whenever a
415 * complete query message is received. 332 * complete query message is received.
416 * 333 *
417 * Do not call #GNUNET_SERVER_mst_destroy() in callback
418 *
419 * @param cls closure with the `struct CadetClient` 334 * @param cls closure with the `struct CadetClient`
420 * @param channel channel handle 335 * @param sqm the actual message
421 * @param channel_ctx channel context
422 * @param message the actual message
423 * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
424 */ 336 */
425static int 337static void
426request_cb (void *cls, 338handle_request (void *cls,
427 struct GNUNET_CADET_Channel *channel, 339 const struct CadetQueryMessage *sqm)
428 void **channel_ctx,
429 const struct GNUNET_MessageHeader *message)
430{ 340{
431 struct CadetClient *sc = *channel_ctx; 341 struct CadetClient *sc = cls;
432 const struct CadetQueryMessage *sqm;
433 342
434 sqm = (const struct CadetQueryMessage *) message;
435 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 343 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
436 "Received query for `%s' via cadet from client %p\n", 344 "Received query for `%s' via cadet from client %p\n",
437 GNUNET_h2s (&sqm->query), 345 GNUNET_h2s (&sqm->query),
438 sc); 346 sc);
439 GNUNET_STATISTICS_update (GSF_stats, 347 GNUNET_STATISTICS_update (GSF_stats,
440 gettext_noop ("# queries received via cadet"), 1, 348 gettext_noop ("# queries received via cadet"),
349 1,
441 GNUNET_NO); 350 GNUNET_NO);
442 refresh_timeout_task (sc); 351 refresh_timeout_task (sc);
443 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, 352 sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
@@ -446,14 +355,14 @@ request_cb (void *cls,
446 ntohl (sqm->type), 355 ntohl (sqm->type),
447 0 /* priority */, 356 0 /* priority */,
448 GSF_datastore_queue_size, 357 GSF_datastore_queue_size,
449 &handle_datastore_reply, sc); 358 &handle_datastore_reply,
359 sc);
450 if (NULL == sc->qe) 360 if (NULL == sc->qe)
451 { 361 {
452 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 362 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
453 "Queueing request with datastore failed (queue full?)\n"); 363 "Queueing request with datastore failed (queue full?)\n");
454 continue_writing (sc); 364 continue_writing (sc);
455 } 365 }
456 return GNUNET_OK;
457} 366}
458 367
459 368
@@ -464,16 +373,12 @@ request_cb (void *cls,
464 * @param channel the channel representing the cadet 373 * @param channel the channel representing the cadet
465 * @param initiator the identity of the peer who wants to establish a cadet 374 * @param initiator the identity of the peer who wants to establish a cadet
466 * with us; NULL on binding error 375 * with us; NULL on binding error
467 * @param port cadet port used for the incoming connection 376 * @return initial channel context (our `struct CadetClient`)
468 * @param options channel option flags
469 * @return initial channel context (our 'struct CadetClient')
470 */ 377 */
471static void * 378static void *
472accept_cb (void *cls, 379connect_cb (void *cls,
473 struct GNUNET_CADET_Channel *channel, 380 struct GNUNET_CADET_Channel *channel,
474 const struct GNUNET_PeerIdentity *initiator, 381 const struct GNUNET_PeerIdentity *initiator)
475 const struct GNUNET_HashCode *port,
476 enum GNUNET_CADET_ChannelOption options)
477{ 382{
478 struct CadetClient *sc; 383 struct CadetClient *sc;
479 384
@@ -481,13 +386,15 @@ accept_cb (void *cls,
481 if (sc_count >= sc_count_max) 386 if (sc_count >= sc_count_max)
482 { 387 {
483 GNUNET_STATISTICS_update (GSF_stats, 388 GNUNET_STATISTICS_update (GSF_stats,
484 gettext_noop ("# cadet client connections rejected"), 1, 389 gettext_noop ("# cadet client connections rejected"),
390 1,
485 GNUNET_NO); 391 GNUNET_NO);
486 GNUNET_CADET_channel_destroy (channel); 392 GNUNET_CADET_channel_destroy (channel);
487 return NULL; 393 return NULL;
488 } 394 }
489 GNUNET_STATISTICS_update (GSF_stats, 395 GNUNET_STATISTICS_update (GSF_stats,
490 gettext_noop ("# cadet connections active"), 1, 396 gettext_noop ("# cadet connections active"),
397 1,
491 GNUNET_NO); 398 GNUNET_NO);
492 sc = GNUNET_new (struct CadetClient); 399 sc = GNUNET_new (struct CadetClient);
493 sc->channel = channel; 400 sc->channel = channel;
@@ -506,18 +413,17 @@ accept_cb (void *cls,
506 413
507/** 414/**
508 * Function called by cadet when a client disconnects. 415 * Function called by cadet when a client disconnects.
509 * Cleans up our 'struct CadetClient' of that channel. 416 * Cleans up our `struct CadetClient` of that channel.
510 * 417 *
511 * @param cls NULL 418 * @param cls our `struct CadetClient`
512 * @param channel channel of the disconnecting client 419 * @param channel channel of the disconnecting client
513 * @param channel_ctx our 'struct CadetClient' 420 * @param channel_ctx
514 */ 421 */
515static void 422static void
516cleaner_cb (void *cls, 423disconnect_cb (void *cls,
517 const struct GNUNET_CADET_Channel *channel, 424 const struct GNUNET_CADET_Channel *channel)
518 void *channel_ctx)
519{ 425{
520 struct CadetClient *sc = channel_ctx; 426 struct CadetClient *sc = cls;
521 struct WriteQueueItem *wqi; 427 struct WriteQueueItem *wqi;
522 428
523 if (NULL == sc) 429 if (NULL == sc)
@@ -552,15 +458,42 @@ cleaner_cb (void *cls,
552} 458}
553 459
554 460
461
462/**
463 * Function called whenever an MQ-channel's transmission window size changes.
464 *
465 * The first callback in an outgoing channel will be with a non-zero value
466 * and will mean the channel is connected to the destination.
467 *
468 * For an incoming channel it will be called immediately after the
469 * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
470 *
471 * @param cls Channel closure.
472 * @param channel Connection to the other end (henceforth invalid).
473 * @param window_size New window size. If the is more messages than buffer size
474 * this value will be negative..
475 */
476static void
477window_change_cb (void *cls,
478 const struct GNUNET_CADET_Channel *channel,
479 int window_size)
480{
481 /* FIXME: could do flow control here... */
482}
483
484
555/** 485/**
556 * Initialize subsystem for non-anonymous file-sharing. 486 * Initialize subsystem for non-anonymous file-sharing.
557 */ 487 */
558void 488void
559GSF_cadet_start_server () 489GSF_cadet_start_server ()
560{ 490{
561 static const struct GNUNET_CADET_MessageHandler handlers[] = { 491 struct GNUNET_MQ_MessageHandler handlers[] = {
562 { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)}, 492 GNUNET_MQ_hd_fixed_size (request,
563 { NULL, 0, 0 } 493 GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
494 struct CadetQueryMessage,
495 NULL),
496 GNUNET_MQ_handler_end ()
564 }; 497 };
565 struct GNUNET_HashCode port; 498 struct GNUNET_HashCode port;
566 499
@@ -573,18 +506,19 @@ GSF_cadet_start_server ()
573 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 506 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
574 "Initializing cadet FS server with a limit of %llu connections\n", 507 "Initializing cadet FS server with a limit of %llu connections\n",
575 sc_count_max); 508 sc_count_max);
576 listen_channel = GNUNET_CADET_connect (GSF_cfg, 509 cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
577 NULL, 510 cadet_handle = GNUNET_CADET_connecT (GSF_cfg);
578 &cleaner_cb, 511 GNUNET_assert (NULL != cadet_handle);
579 handlers);
580 GNUNET_assert (NULL != listen_channel);
581 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, 512 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
582 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), 513 strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
583 &port); 514 &port);
584 GNUNET_CADET_open_port (listen_channel, 515 cadet_port = GNUNET_CADET_open_porT (cadet_handle,
585 &port, 516 &port,
586 &accept_cb, 517 &connect_cb,
587 NULL); 518 NULL,
519 &window_change_cb,
520 &disconnect_cb,
521 handlers);
588} 522}
589 523
590 524
@@ -594,10 +528,20 @@ GSF_cadet_start_server ()
594void 528void
595GSF_cadet_stop_server () 529GSF_cadet_stop_server ()
596{ 530{
597 if (NULL != listen_channel) 531 GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
532 &GSF_cadet_release_clients,
533 NULL);
534 GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
535 cadet_map = NULL;
536 if (NULL != cadet_port)
537 {
538 GNUNET_CADET_close_port (cadet_port);
539 cadet_port = NULL;
540 }
541 if (NULL != cadet_handle)
598 { 542 {
599 GNUNET_CADET_disconnect (listen_channel); 543 GNUNET_CADET_disconnect (cadet_handle);
600 listen_channel = NULL; 544 cadet_handle = NULL;
601 } 545 }
602 GNUNET_assert (NULL == sc_head); 546 GNUNET_assert (NULL == sc_head);
603 GNUNET_assert (0 == sc_count); 547 GNUNET_assert (0 == sc_count);