aboutsummaryrefslogtreecommitdiff
path: root/src/cadet
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-01-20 20:39:51 +0100
committerChristian Grothoff <christian@grothoff.org>2017-01-20 20:39:51 +0100
commitf52fc4b001758430bb911759c755d0f06d3eb693 (patch)
tree185370e26928136dfb2786843d06ed0a90f3ffd5 /src/cadet
parent6a828bdf078d44531cc8e7de70a88e9b6f438da9 (diff)
downloadgnunet-f52fc4b001758430bb911759c755d0f06d3eb693.tar.gz
gnunet-f52fc4b001758430bb911759c755d0f06d3eb693.zip
working on channel passing data to clients
Diffstat (limited to 'src/cadet')
-rw-r--r--src/cadet/cadet.h2
-rw-r--r--src/cadet/cadet_api.c8
-rw-r--r--src/cadet/gnunet-service-cadet-new.c2
-rw-r--r--src/cadet/gnunet-service-cadet-new.h1
-rw-r--r--src/cadet/gnunet-service-cadet-new_channel.c215
-rw-r--r--src/cadet/gnunet-service-cadet_local.c4
6 files changed, 205 insertions, 27 deletions
diff --git a/src/cadet/cadet.h b/src/cadet/cadet.h
index c16fb2917..9d154fb99 100644
--- a/src/cadet/cadet.h
+++ b/src/cadet/cadet.h
@@ -198,7 +198,7 @@ struct GNUNET_CADET_LocalData
198 /** 198 /**
199 * ID of the channel 199 * ID of the channel
200 */ 200 */
201 struct GNUNET_CADET_ClientChannelNumber id; 201 struct GNUNET_CADET_ClientChannelNumber channel_id;
202 202
203 /** 203 /**
204 * Payload follows 204 * Payload follows
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c
index 8f1274d63..5dcf43e46 100644
--- a/src/cadet/cadet_api.c
+++ b/src/cadet/cadet_api.c
@@ -569,7 +569,7 @@ request_data (void *cls)
569 env = GNUNET_MQ_msg_extra (msg, 569 env = GNUNET_MQ_msg_extra (msg,
570 th->size, 570 th->size,
571 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); 571 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
572 msg->id = th->channel->chid; 572 msg->channel_id = th->channel->chid;
573 osize = th->notify (th->notify_cls, 573 osize = th->notify (th->notify_cls,
574 th->size, 574 th->size,
575 &msg[1]); 575 &msg[1]);
@@ -697,7 +697,7 @@ check_local_data (void *cls,
697 } 697 }
698 698
699 ch = retrieve_channel (h, 699 ch = retrieve_channel (h,
700 message->id); 700 message->channel_id);
701 if (NULL == ch) 701 if (NULL == ch)
702 { 702 {
703 GNUNET_break_op (0); 703 GNUNET_break_op (0);
@@ -727,7 +727,7 @@ handle_local_data (void *cls,
727 727
728 LOG (GNUNET_ERROR_TYPE_DEBUG, 728 LOG (GNUNET_ERROR_TYPE_DEBUG,
729 "Got a data message!\n"); 729 "Got a data message!\n");
730 ch = retrieve_channel (h, message->id); 730 ch = retrieve_channel (h, message->channel_id);
731 GNUNET_assert (NULL != ch); 731 GNUNET_assert (NULL != ch);
732 732
733 payload = (struct GNUNET_MessageHeader *) &message[1]; 733 payload = (struct GNUNET_MessageHeader *) &message[1];
@@ -735,7 +735,7 @@ handle_local_data (void *cls,
735 GC_f2s (ntohl (ch->chid.channel_of_client) >= 735 GC_f2s (ntohl (ch->chid.channel_of_client) >=
736 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI), 736 GNUNET_CADET_LOCAL_CHANNEL_ID_CLI),
737 GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)), 737 GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)),
738 ntohl (message->id.channel_of_client)); 738 ntohl (message->channel_id.channel_of_client));
739 739
740 type = ntohs (payload->type); 740 type = ntohs (payload->type);
741 LOG (GNUNET_ERROR_TYPE_DEBUG, " payload type %s\n", GC_m2s (type)); 741 LOG (GNUNET_ERROR_TYPE_DEBUG, " payload type %s\n", GC_m2s (type));
diff --git a/src/cadet/gnunet-service-cadet-new.c b/src/cadet/gnunet-service-cadet-new.c
index 7b4a0e95b..7801708c1 100644
--- a/src/cadet/gnunet-service-cadet-new.c
+++ b/src/cadet/gnunet-service-cadet-new.c
@@ -628,7 +628,7 @@ handle_data (void *cls,
628 struct CadetChannel *ch; 628 struct CadetChannel *ch;
629 const struct GNUNET_MessageHeader *payload; 629 const struct GNUNET_MessageHeader *payload;
630 630
631 chid = msg->id; 631 chid = msg->channel_id;
632 map = get_map_by_chid (c, 632 map = get_map_by_chid (c,
633 chid); 633 chid);
634 ch = GNUNET_CONTAINER_multihashmap32_get (map, 634 ch = GNUNET_CONTAINER_multihashmap32_get (map,
diff --git a/src/cadet/gnunet-service-cadet-new.h b/src/cadet/gnunet-service-cadet-new.h
index 9f4667e23..b3bb85d85 100644
--- a/src/cadet/gnunet-service-cadet-new.h
+++ b/src/cadet/gnunet-service-cadet-new.h
@@ -220,7 +220,6 @@ extern unsigned long long ratchet_messages;
220extern struct GNUNET_TIME_Relative ratchet_time; 220extern struct GNUNET_TIME_Relative ratchet_time;
221 221
222 222
223
224/** 223/**
225 * Send a message to a client. 224 * Send a message to a client.
226 * 225 *
diff --git a/src/cadet/gnunet-service-cadet-new_channel.c b/src/cadet/gnunet-service-cadet-new_channel.c
index 5d2eba618..75ec81992 100644
--- a/src/cadet/gnunet-service-cadet-new_channel.c
+++ b/src/cadet/gnunet-service-cadet-new_channel.c
@@ -25,14 +25,12 @@
25 * @author Christian Grothoff 25 * @author Christian Grothoff
26 * 26 *
27 * TODO: 27 * TODO:
28 * - handle CREATE_ACK
29 * - handle plaintext data
30 * - handle plaintext ACK
31 * - handle destroy 28 * - handle destroy
32 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL! 29 * - estimate max bandwidth using bursts and use to for CONGESTION CONTROL!
33 * - check that '0xFFULL' really is sufficient for flow control! 30 * - check that '0xFFULL' really is sufficient for flow control!
34 * - what about the 'no buffer' option? 31 * - revisit handling of 'unreliable' traffic!
35 * - what about the 'out-of-order' option? 32 * - revisit handling of 'out-of-order' option, especially in combination with/without 'reliable'.
33 * - figure out flow control without ACKs (unreliable traffic!)
36 */ 34 */
37#include "platform.h" 35#include "platform.h"
38#include "gnunet_util_lib.h" 36#include "gnunet_util_lib.h"
@@ -147,7 +145,8 @@ struct CadetOutOfOrderMessage
147 struct CadetOutOfOrderMessage *prev; 145 struct CadetOutOfOrderMessage *prev;
148 146
149 /** 147 /**
150 * ID of the message (ACK needed to free) 148 * ID of the message (messages up to this point needed
149 * before we give this one to the client).
151 */ 150 */
152 struct ChannelMessageIdentifier mid; 151 struct ChannelMessageIdentifier mid;
153 152
@@ -311,7 +310,6 @@ struct CadetChannel
311}; 310};
312 311
313 312
314
315/** 313/**
316 * Get the static string for identification of the channel. 314 * Get the static string for identification of the channel.
317 * 315 *
@@ -480,8 +478,10 @@ GCCH_channel_local_new (struct CadetClient *owner,
480 struct CadetChannel *ch; 478 struct CadetChannel *ch;
481 479
482 ch = GNUNET_new (struct CadetChannel); 480 ch = GNUNET_new (struct CadetChannel);
483 ch->max_pending_messages = 32; /* FIXME: allow control via options 481 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
484 or adjust dynamically... */ 482 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
483 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
484 ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
485 ch->owner = owner; 485 ch->owner = owner;
486 ch->lid = owner_id; 486 ch->lid = owner_id;
487 ch->port = *port; 487 ch->port = *port;
@@ -490,9 +490,6 @@ GCCH_channel_local_new (struct CadetClient *owner,
490 ch->chid = GCT_add_channel (ch->t, 490 ch->chid = GCT_add_channel (ch->t,
491 ch); 491 ch);
492 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME; 492 ch->retry_time = CADET_INITIAL_RETRANSMIT_TIME;
493 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
494 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
495 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
496 ch->retry_task = GNUNET_SCHEDULER_add_now (&send_create, 493 ch->retry_task = GNUNET_SCHEDULER_add_now (&send_create,
497 ch); 494 ch);
498 GNUNET_STATISTICS_update (stats, 495 GNUNET_STATISTICS_update (stats,
@@ -538,8 +535,6 @@ GCCH_channel_incoming_new (struct CadetTunnel *t,
538 struct CadetClient *c; 535 struct CadetClient *c;
539 536
540 ch = GNUNET_new (struct CadetChannel); 537 ch = GNUNET_new (struct CadetChannel);
541 ch->max_pending_messages = 32; /* FIXME: allow control via options
542 or adjust dynamically... */
543 ch->port = *port; 538 ch->port = *port;
544 ch->t = t; 539 ch->t = t;
545 ch->chid = chid; 540 ch->chid = chid;
@@ -547,6 +542,7 @@ GCCH_channel_incoming_new (struct CadetTunnel *t,
547 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER)); 542 ch->nobuffer = (0 != (options & GNUNET_CADET_OPTION_NOBUFFER));
548 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE)); 543 ch->reliable = (0 != (options & GNUNET_CADET_OPTION_RELIABLE));
549 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER)); 544 ch->out_of_order = (0 != (options & GNUNET_CADET_OPTION_OUT_OF_ORDER));
545 ch->max_pending_messages = (ch->nobuffer) ? 1 : 32; /* FIXME: 32!? Do not hardcode! */
550 GNUNET_STATISTICS_update (stats, 546 GNUNET_STATISTICS_update (stats,
551 "# channels", 547 "# channels",
552 1, 548 1,
@@ -635,6 +631,27 @@ send_connect_ack (void *cls)
635 631
636 632
637/** 633/**
634 * Send a LOCAL ACK to the client to solicit more messages.
635 *
636 * @param ch channel the ack is for
637 * @param c client to send the ACK to
638 */
639static void
640send_ack_to_client (struct CadetChannel *ch,
641 struct CadetClient *c)
642{
643 struct GNUNET_MQ_Envelope *env;
644 struct GNUNET_CADET_LocalAck *ack;
645
646 env = GNUNET_MQ_msg (ack,
647 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
648 ack->channel_id = ch->lid;
649 GSC_send_to_client (c,
650 env);
651}
652
653
654/**
638 * A client is bound to the port that we have a channel 655 * A client is bound to the port that we have a channel
639 * open to. Send the acknowledgement for the connection 656 * open to. Send the acknowledgement for the connection
640 * request and establish the link with the client. 657 * request and establish the link with the client.
@@ -672,6 +689,10 @@ GCCH_bind (struct CadetChannel *ch,
672 /* notify other peer that we accepted the connection */ 689 /* notify other peer that we accepted the connection */
673 ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack, 690 ch->retry_task = GNUNET_SCHEDULER_add_now (&send_connect_ack,
674 ch); 691 ch);
692 /* give client it's initial supply of ACKs */
693 for (unsigned int i=0;i<ch->max_pending_messages;i++)
694 send_ack_to_client (ch,
695 ch->owner);
675} 696}
676 697
677 698
@@ -742,12 +763,75 @@ GCCH_channel_incoming_destroy (struct CadetChannel *ch)
742void 763void
743GCCH_handle_channel_create_ack (struct CadetChannel *ch) 764GCCH_handle_channel_create_ack (struct CadetChannel *ch)
744{ 765{
745 GNUNET_break (0); // FIXME! 766 switch (ch->state)
767 {
768 case CADET_CHANNEL_NEW:
769 /* this should be impossible */
770 GNUNET_break (0);
771 break;
772 case CADET_CHANNEL_CREATE_SENT:
773 if (NULL == ch->owner)
774 {
775 /* We're not the owner, wrong direction! */
776 GNUNET_break_op (0);
777 return;
778 }
779 ch->state = CADET_CHANNEL_READY;
780 /* On first connect, send client as many ACKs as we allow messages
781 to be buffered! */
782 for (unsigned int i=0;i<ch->max_pending_messages;i++)
783 send_ack_to_client (ch,
784 ch->owner);
785 break;
786 case CADET_CHANNEL_READY:
787 /* duplicate ACK, maybe we retried the CREATE. Ignore. */
788 GNUNET_STATISTICS_update (stats,
789 "# duplicate CREATE_ACKs",
790 1,
791 GNUNET_NO);
792 break;
793 }
746} 794}
747 795
748 796
749/** 797/**
750 * We got payload data for a channel. Pass it on to the client. 798 * Test if element @a e1 comes before element @a e2.
799 *
800 * TODO: use opportunity to create generic list insertion sort
801 * logic in container!
802 *
803 * @param cls closure, our `struct CadetChannel`
804 * @param e1 an element of to sort
805 * @param e2 another element to sort
806 * @return #GNUNET_YES if @e1 < @e2, otherwise #GNUNET_NO
807 */
808static int
809is_before (void *cls,
810 void *e1,
811 void *e2)
812{
813 struct CadetOutOfOrderMessage *m1 = e1;
814 struct CadetOutOfOrderMessage *m2 = e2;
815 uint32_t v1 = ntohl (m1->mid.mid);
816 uint32_t v2 = ntohl (m2->mid.mid);
817 uint32_t delta;
818
819 delta = v1 - v2;
820 if (delta > (uint32_t) INT_MAX)
821 {
822 /* in overflow range, we can safely assume we wrapped around */
823 return GNUNET_NO;
824 }
825 else
826 {
827 return GNUNET_YES;
828 }
829}
830
831
832/**
833 * We got payload data for a channel. Pass it on to the client
834 * and send an ACK to the other end (once flow control allows it!)
751 * 835 *
752 * @param ch channel that got data 836 * @param ch channel that got data
753 */ 837 */
@@ -755,7 +839,70 @@ void
755GCCH_handle_channel_plaintext_data (struct CadetChannel *ch, 839GCCH_handle_channel_plaintext_data (struct CadetChannel *ch,
756 const struct GNUNET_CADET_ChannelAppDataMessage *msg) 840 const struct GNUNET_CADET_ChannelAppDataMessage *msg)
757{ 841{
758 GNUNET_break (0); // FIXME! 842 struct GNUNET_MQ_Envelope *env;
843 struct GNUNET_CADET_LocalData *ld;
844 struct CadetOutOfOrderMessage *com;
845 size_t payload_size;
846
847 payload_size = ntohs (msg->header.size) - sizeof (*msg);
848 env = GNUNET_MQ_msg_extra (ld,
849 payload_size,
850 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
851 ld->channel_id = ch->lid;
852 GNUNET_memcpy (&ld[1],
853 &msg[1],
854 payload_size);
855 if ( (GNUNET_YES == ch->client_ready) &&
856 ( (GNUNET_YES == ch->out_of_order) ||
857 (msg->mid.mid == ch->mid_recv.mid) ) )
858 {
859 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
860 env);
861 ch->mid_recv.mid = htonl (1 + ntohl (ch->mid_recv.mid));
862 ch->mid_futures >>= 1;
863 }
864 else
865 {
866 /* FIXME-SECURITY: if the element is WAY too far ahead,
867 drop it (can't buffer too much!) */
868 com = GNUNET_new (struct CadetOutOfOrderMessage);
869 com->mid = msg->mid;
870 com->env = env;
871 /* sort into list ordered by "is_before" */
872 if ( (NULL == ch->head_recv) ||
873 (GNUNET_YES == is_before (ch,
874 com,
875 ch->head_recv)) )
876 {
877 GNUNET_CONTAINER_DLL_insert (ch->head_recv,
878 ch->tail_recv,
879 com);
880 }
881 else
882 {
883 struct CadetOutOfOrderMessage *pos;
884
885 for (pos = ch->head_recv;
886 NULL != pos;
887 pos = pos->next)
888 {
889 if (GNUNET_YES !=
890 is_before (ch,
891 pos,
892 com))
893 break;
894 }
895 if (NULL == pos)
896 GNUNET_CONTAINER_DLL_insert_tail (ch->head_recv,
897 ch->tail_recv,
898 com);
899 else
900 GNUNET_CONTAINER_DLL_insert_after (ch->head_recv,
901 ch->tail_recv,
902 com,
903 pos->prev);
904 }
905 }
759} 906}
760 907
761 908
@@ -770,7 +917,37 @@ void
770GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch, 917GCCH_handle_channel_plaintext_data_ack (struct CadetChannel *ch,
771 const struct GNUNET_CADET_ChannelDataAckMessage *ack) 918 const struct GNUNET_CADET_ChannelDataAckMessage *ack)
772{ 919{
773 GNUNET_break (0); // FIXME! 920 struct CadetReliableMessage *crm;
921
922 if (GNUNET_NO == ch->reliable)
923 {
924 /* not expecting ACKs on unreliable channel, odd */
925 GNUNET_break_op (0);
926 return;
927 }
928 for (crm = ch->head_sent;
929 NULL != crm;
930 crm = crm->next)
931 if (ack->mid.mid == crm->data_message.mid.mid)
932 break;
933 if (NULL == crm)
934 {
935 /* ACK for message we already dropped, might have been a
936 duplicate ACK? Ignore. */
937 GNUNET_STATISTICS_update (stats,
938 "# duplicate CHANNEL_DATA_ACKs",
939 1,
940 GNUNET_NO);
941 return;
942 }
943 GNUNET_CONTAINER_DLL_remove (ch->head_sent,
944 ch->tail_sent,
945 crm);
946 ch->pending_messages--;
947 GNUNET_free (crm);
948 GNUNET_assert (ch->pending_messages < ch->max_pending_messages);
949 send_ack_to_client (ch,
950 (NULL == ch->owner) ? ch->dest : ch->owner);
774} 951}
775 952
776 953
@@ -1026,6 +1203,8 @@ send_client_buffered_data (struct CadetChannel *ch)
1026 GNUNET_CONTAINER_DLL_remove (ch->head_recv, 1203 GNUNET_CONTAINER_DLL_remove (ch->head_recv,
1027 ch->tail_recv, 1204 ch->tail_recv,
1028 com); 1205 com);
1206 /* FIXME: if unreliable, this is not aggressive
1207 enough, as it would be OK to have lost some! */
1029 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid)); 1208 ch->mid_recv.mid = htonl (1 + ntohl (com->mid.mid));
1030 ch->mid_futures >>= 1; /* equivalent to division by 2 */ 1209 ch->mid_futures >>= 1; /* equivalent to division by 2 */
1031 GSC_send_to_client (ch->owner ? ch->owner : ch->dest, 1210 GSC_send_to_client (ch->owner ? ch->owner : ch->dest,
diff --git a/src/cadet/gnunet-service-cadet_local.c b/src/cadet/gnunet-service-cadet_local.c
index e1f6ac4c3..c476f6ac2 100644
--- a/src/cadet/gnunet-service-cadet_local.c
+++ b/src/cadet/gnunet-service-cadet_local.c
@@ -586,7 +586,7 @@ handle_data (void *cls, struct GNUNET_SERVER_Client *client,
586 return; 586 return;
587 } 587 }
588 588
589 chid = msg->id; 589 chid = msg->channel_id;
590 LOG (GNUNET_ERROR_TYPE_DEBUG, " %u bytes (%u payload) by client %u\n", 590 LOG (GNUNET_ERROR_TYPE_DEBUG, " %u bytes (%u payload) by client %u\n",
591 payload_size, payload_claimed_size, c->id); 591 payload_size, payload_claimed_size, c->id);
592 592
@@ -1531,7 +1531,7 @@ GML_send_data (struct CadetClient *c,
1531 GNUNET_memcpy (&copy[1], &msg[1], size); 1531 GNUNET_memcpy (&copy[1], &msg[1], size);
1532 copy->header.size = htons (sizeof (struct GNUNET_CADET_LocalData) + size); 1532 copy->header.size = htons (sizeof (struct GNUNET_CADET_LocalData) + size);
1533 copy->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); 1533 copy->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1534 copy->id = id; 1534 copy->channel_id = id;
1535 GNUNET_SERVER_notification_context_unicast (nc, c->handle, 1535 GNUNET_SERVER_notification_context_unicast (nc, c->handle,
1536 &copy->header, GNUNET_NO); 1536 &copy->header, GNUNET_NO);
1537} 1537}