aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_protocols.h51
-rw-r--r--src/include/gnunet_transport_communication_service.h14
-rw-r--r--src/transport/transport.h257
-rw-r--r--src/transport/transport_api2_communication.c565
4 files changed, 860 insertions, 27 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 03b13fd48..4831c9215 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -3005,9 +3005,58 @@ extern "C"
3005#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL 1135 3005#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL 1135
3006 3006
3007 3007
3008/*******************************************************
3009 NEW (TNG) Transport service
3010 ******************************************************* */
3008 3011
3009/** 3012/**
3010 * Next available: 1200 3013 * @brief inform transport to add an address of this peer
3014 */
3015#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS 1200
3016
3017/**
3018 * @brief inform transport to delete an address of this peer
3019 */
3020#define GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS 1201
3021
3022/**
3023 * @brief inform transport about an incoming message
3024 */
3025#define GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG 1202
3026
3027/**
3028 * @brief transport acknowledges processing an incoming message
3029 */
3030#define GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK 1203
3031
3032/**
3033 * @brief inform transport that a queue was setup to talk to some peer
3034 */
3035#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP 1204
3036
3037/**
3038 * @brief inform transport that a queue was torn down
3039 */
3040#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN 1205
3041
3042/**
3043 * @brief transport tells communicator it wants a queue
3044 */
3045#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE 1206
3046
3047/**
3048 * @brief transport tells communicator it wants to transmit
3049 */
3050#define GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG 1207
3051
3052/**
3053 * @brief communicator tells transports that message was sent
3054 */
3055#define GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK 1208
3056
3057
3058/**
3059 * Next available: 1300
3011 */ 3060 */
3012 3061
3013 3062
diff --git a/src/include/gnunet_transport_communication_service.h b/src/include/gnunet_transport_communication_service.h
index 94d15af22..d93d5134e 100644
--- a/src/include/gnunet_transport_communication_service.h
+++ b/src/include/gnunet_transport_communication_service.h
@@ -137,8 +137,8 @@ typedef void
137 * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was 137 * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
138 * immediately dropped due to memory limitations (communicator 138 * immediately dropped due to memory limitations (communicator
139 * should try to apply back pressure), 139 * should try to apply back pressure),
140 * #GNUNET_SYSERR if the message is ill formed and communicator 140 * #GNUNET_SYSERR if the message could not be delivered because
141 * should try to reset stream 141 * the tranport service is not yet up
142 */ 142 */
143int 143int
144GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *handle, 144GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *handle,
@@ -162,7 +162,7 @@ struct GNUNET_TRANSPORT_QueueHandle;
162 * "inbound" connection or because the communicator discovered the 162 * "inbound" connection or because the communicator discovered the
163 * presence of another peer. 163 * presence of another peer.
164 * 164 *
165 * @param handle connection to transport service 165 * @param ch connection to transport service
166 * @param peer peer with which we can now communicate 166 * @param peer peer with which we can now communicate
167 * @param address address in human-readable format, 0-terminated, UTF-8 167 * @param address address in human-readable format, 0-terminated, UTF-8
168 * @param nt which network type does the @a address belong to? 168 * @param nt which network type does the @a address belong to?
@@ -170,7 +170,7 @@ struct GNUNET_TRANSPORT_QueueHandle;
170 * @return API handle identifying the new MQ 170 * @return API handle identifying the new MQ
171 */ 171 */
172struct GNUNET_TRANSPORT_QueueHandle * 172struct GNUNET_TRANSPORT_QueueHandle *
173GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *handle, 173GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
174 const struct GNUNET_PeerIdentity *peer, 174 const struct GNUNET_PeerIdentity *peer,
175 const char *address, 175 const char *address,
176 enum GNUNET_ATS_Network_Type nt, 176 enum GNUNET_ATS_Network_Type nt,
@@ -198,16 +198,16 @@ struct GNUNET_TRANSPORT_AddressIdentifier;
198 * Notify transport service about an address that this communicator 198 * Notify transport service about an address that this communicator
199 * provides for this peer. 199 * provides for this peer.
200 * 200 *
201 * @param handle connection to transport service 201 * @param ch connection to transport service
202 * @param address our address in human-readable format, 0-terminated, UTF-8 202 * @param address our address in human-readable format, 0-terminated, UTF-8
203 * @param nt which network type does the address belong to? 203 * @param nt which network type does the address belong to?
204 * @param expiration when does the communicator forsee this address expiring? 204 * @param expiration when does the communicator forsee this address expiring?
205 */ 205 */
206struct GNUNET_TRANSPORT_AddressIdentifier * 206struct GNUNET_TRANSPORT_AddressIdentifier *
207GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *handle, 207GNUNET_TRANSPORT_communicator_address_add (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
208 const char *address, 208 const char *address,
209 enum GNUNET_ATS_Network_Type nt, 209 enum GNUNET_ATS_Network_Type nt,
210 struct GNUNET_TIME_Absolute expiration); 210 struct GNUNET_TIME_Relative expiration);
211 211
212 212
213/** 213/**
diff --git a/src/transport/transport.h b/src/transport/transport.h
index 75726e462..ec373286d 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -644,6 +644,263 @@ struct TransportPluginMonitorMessage
644}; 644};
645 645
646 646
647
648
649
650
651
652
653
654/* *********************** TNG messages ***************** */
655
656/**
657 * Add address to the list.
658 */
659struct GNUNET_TRANSPORT_AddAddressMessage
660{
661
662 /**
663 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS.
664 */
665 struct GNUNET_MessageHeader header;
666
667 /**
668 * Address identifier (used during deletion).
669 */
670 uint32_t aid GNUNET_PACKED;
671
672 /**
673 * When does the address expire?
674 */
675 struct GNUNET_TIME_RelativeNBO expiration;
676
677 /**
678 * An `enum GNUNET_ATS_Network_Type` in NBO.
679 */
680 uint32_t nt;
681
682 /* followed by UTF-8 encoded, 0-terminated human-readable address */
683};
684
685
686/**
687 * Remove address from the list.
688 */
689struct GNUNET_TRANSPORT_DelAddressMessage
690{
691
692 /**
693 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS.
694 */
695 struct GNUNET_MessageHeader header;
696
697 /**
698 * Address identifier.
699 */
700 uint32_t aid GNUNET_PACKED;
701
702};
703
704
705/**
706 * Inform transport about an incoming message.
707 */
708struct GNUNET_TRANSPORT_IncomingMessage
709{
710
711 /**
712 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG.
713 */
714 struct GNUNET_MessageHeader header;
715
716 /**
717 * Do we use flow control or not?
718 */
719 uint32_t fc_on GNUNET_PACKED;
720
721 /**
722 * 64-bit number to identify the matching ACK.
723 */
724 uint64_t fc_id GNUNET_PACKED;
725
726 /**
727 * Sender identifier.
728 */
729 struct GNUNET_PeerIdentity sender GNUNET_PACKED;
730
731 /* followed by the message */
732};
733
734
735/**
736 * Transport informs us about being done with an incoming message.
737 * (only sent if fc_on was set).
738 */
739struct GNUNET_TRANSPORT_IncomingMessageAck
740{
741
742 /**
743 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK.
744 */
745 struct GNUNET_MessageHeader header;
746
747 /**
748 * Reserved (0)
749 */
750 uint32_t reserved GNUNET_PACKED;
751
752 /**
753 * Which message is being ACKed?
754 */
755 uint64_t fc_id GNUNET_PACKED;
756
757 /**
758 * Sender identifier of the original message.
759 */
760 struct GNUNET_PeerIdentity sender GNUNET_PACKED;
761
762};
763
764
765/**
766 * Add queue to the transport
767 */
768struct GNUNET_TRANSPORT_AddQueueMessage
769{
770
771 /**
772 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE.
773 */
774 struct GNUNET_MessageHeader header;
775
776 /**
777 * Queue identifier (used to identify the queue).
778 */
779 uint32_t qid GNUNET_PACKED;
780
781 /**
782 * Receiver that can be addressed via the queue.
783 */
784 struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
785
786 /**
787 * An `enum GNUNET_ATS_Network_Type` in NBO.
788 */
789 uint32_t nt;
790
791 /* followed by UTF-8 encoded, 0-terminated human-readable address */
792};
793
794
795/**
796 * Remove queue, it is no longer available.
797 */
798struct GNUNET_TRANSPORT_DelQueueMessage
799{
800
801 /**
802 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE.
803 */
804 struct GNUNET_MessageHeader header;
805
806 /**
807 * Address identifier.
808 */
809 uint32_t qid GNUNET_PACKED;
810
811 /**
812 * Receiver that can be addressed via the queue.
813 */
814 struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
815
816};
817
818
819/**
820 * Transport tells communicator that it wants a new queue.
821 */
822struct GNUNET_TRANSPORT_CreateQueue
823{
824
825 /**
826 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE.
827 */
828 struct GNUNET_MessageHeader header;
829
830 /**
831 * Always zero.
832 */
833 uint32_t reserved GNUNET_PACKED;
834
835 /**
836 * Receiver that can be addressed via the queue.
837 */
838 struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
839
840 /* followed by UTF-8 encoded, 0-terminated human-readable address */
841};
842
843
844/**
845 * Inform communicator about transport's desire to send a message.
846 */
847struct GNUNET_TRANSPORT_SendMessageTo
848{
849
850 /**
851 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG.
852 */
853 struct GNUNET_MessageHeader header;
854
855 /**
856 * Which queue should we use?
857 */
858 uint32_t qid GNUNET_PACKED;
859
860 /**
861 * Message ID, used for flow control.
862 */
863 uint64_t mid GNUNET_PACKED;
864
865 /**
866 * Receiver identifier.
867 */
868 struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
869
870 /* followed by the message */
871};
872
873
874/**
875 * Inform transport that message was sent.
876 */
877struct GNUNET_TRANSPORT_SendMessageToAck
878{
879
880 /**
881 * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK.
882 */
883 struct GNUNET_MessageHeader header;
884
885 /**
886 * Success (#GNUNET_OK), failure (#GNUNET_SYSERR).
887 */
888 uint32_t status GNUNET_PACKED;
889
890 /**
891 * Message ID of the original message.
892 */
893 uint64_t mid GNUNET_PACKED;
894
895 /**
896 * Receiver identifier.
897 */
898 struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
899
900};
901
902
903
647GNUNET_NETWORK_STRUCT_END 904GNUNET_NETWORK_STRUCT_END
648 905
649/* end of transport.h */ 906/* end of transport.h */
diff --git a/src/transport/transport_api2_communication.c b/src/transport/transport_api2_communication.c
index e33c5f444..d446516bd 100644
--- a/src/transport/transport_api2_communication.c
+++ b/src/transport/transport_api2_communication.c
@@ -29,6 +29,79 @@
29 29
30 30
31/** 31/**
32 * How many messages do we keep at most in the queue to the
33 * transport service before we start to drop (default,
34 * can be changed via the configuration file).
35 */
36#define DEFAULT_MAX_QUEUE_LENGTH 16
37
38
39/**
40 * Information we track per packet to enable flow control.
41 */
42struct FlowControl
43{
44 /**
45 * Kept in a DLL.
46 */
47 struct FlowControl *next;
48
49 /**
50 * Kept in a DLL.
51 */
52 struct FlowControl *prev;
53
54 /**
55 * Function to call once the message was processed.
56 */
57 GNUNET_TRANSPORT_MessageCompletedCallback cb;
58
59 /**
60 * Closure for @e cb
61 */
62 void *cb_cls;
63
64 /**
65 * Which peer is this about?
66 */
67 struct GNUNET_PeerIdentity sender;
68
69 /**
70 * More-or-less unique ID for the message.
71 */
72 uint64_t id;
73};
74
75
76/**
77 * Information we track per message to tell the transport about
78 * success or failures.
79 */
80struct AckPending
81{
82 /**
83 * Kept in a DLL.
84 */
85 struct AckPending *next;
86
87 /**
88 * Kept in a DLL.
89 */
90 struct AckPending *prev;
91
92 /**
93 * Which peer is this about?
94 */
95 struct GNUNET_PeerIdentity receiver;
96
97 /**
98 * More-or-less unique ID for the message.
99 */
100 uint64_t mid;
101};
102
103
104/**
32 * Opaque handle to the transport service for communicators. 105 * Opaque handle to the transport service for communicators.
33 */ 106 */
34struct GNUNET_TRANSPORT_CommunicatorHandle 107struct GNUNET_TRANSPORT_CommunicatorHandle
@@ -44,6 +117,36 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
44 struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail; 117 struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
45 118
46 /** 119 /**
120 * DLL of messages awaiting flow control confirmation (ack).
121 */
122 struct FlowControl *fc_head;
123
124 /**
125 * DLL of messages awaiting flow control confirmation (ack).
126 */
127 struct FlowControl *fc_tail;
128
129 /**
130 * DLL of messages awaiting transmission confirmation (ack).
131 */
132 struct AckPending *ap_head;
133
134 /**
135 * DLL of messages awaiting transmission confirmation (ack).
136 */
137 struct AckPending *ac_tail;
138
139 /**
140 * DLL of queues we offer.
141 */
142 struct QueueHandle *queue_head;
143
144 /**
145 * DLL of queues we offer.
146 */
147 struct QueueHandle *queue_tail;
148
149 /**
47 * Our configuration. 150 * Our configuration.
48 */ 151 */
49 const struct GNUNET_CONFIGURATION_Handle *cfg; 152 const struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -65,6 +168,16 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
65 void *mq_init_cls; 168 void *mq_init_cls;
66 169
67 /** 170 /**
171 * Maximum permissable queue length.
172 */
173 unsigned long long max_queue_length;
174
175 /**
176 * Flow-control identifier generator.
177 */
178 uint64_t fc_gen;
179
180 /**
68 * MTU of the communicator 181 * MTU of the communicator
69 */ 182 */
70 size_t mtu; 183 size_t mtu;
@@ -74,10 +187,53 @@ struct GNUNET_TRANSPORT_CommunicatorHandle
74 * transport service. 187 * transport service.
75 */ 188 */
76 uint32_t aid_gen; 189 uint32_t aid_gen;
190
191 /**
192 * Queue identifier generator.
193 */
194 uint32_t queue_gen;
77 195
78}; 196};
79 197
80 198
199/**
200 * Handle returned to identify the internal data structure the transport
201 * API has created to manage a message queue to a particular peer.
202 */
203struct GNUNET_TRANSPORT_QueueHandle
204{
205 /**
206 * Handle this queue belongs to.
207 */
208 struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
209
210 /**
211 * Which peer we can communciate with.
212 */
213 struct GNUNET_PeerIdentity peer;
214
215 /**
216 * Address used by the communication queue.
217 */
218 char *address;
219
220 /**
221 * Network type of the communciation queue.
222 */
223 enum GNUNET_ATS_Network_Type nt;
224
225 /**
226 * The queue itself.
227 */
228 struct GNUNET_MQ_Handle *mq;
229
230 /**
231 * ID for this queue when talking to the transport service.
232 */
233 uint32_t queue_id;
234
235};
236
81 237
82/** 238/**
83 * Internal representation of an address a communicator is 239 * Internal representation of an address a communicator is
@@ -185,6 +341,100 @@ send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
185 341
186 342
187/** 343/**
344 * Send message to the transport service about queue @a qh
345 * being now available.
346 *
347 * @param qh queue to add
348 */
349static void
350send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
351{
352 struct GNUNET_MQ_Envelope *env;
353 struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
354
355 if (NULL == ai->ch->mq)
356 return;
357 env = GNUNET_MQ_msg_extra (aqm,
358 strlen (ai->address) + 1,
359 GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE);
360 aqm.receiver = qh->peer;
361 aqm.nt = htonl ((uint32_t) qh->nt);
362 aqm.qid = htonl (qh->qid);
363 memcpy (&aqm[1],
364 ai->address,
365 strlen (ai->address) + 1);
366 GNUNET_MQ_send (ai->ch->mq,
367 env);
368}
369
370
371/**
372 * Send message to the transport service about queue @a qh
373 * being no longer available.
374 *
375 * @param qh queue to delete
376 */
377static void
378send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
379{
380 struct GNUNET_MQ_Envelope *env;
381 struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
382
383 if (NULL == ai->ch->mq)
384 return;
385 env = GNUNET_MQ_msg (dqm,
386 GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE);
387 dqm.qid = htonl (qh->qid);
388 dqm.receiver = qh->peer;
389 GNUNET_MQ_send (ai->ch->mq,
390 env);
391}
392
393
394/**
395 * Disconnect from the transport service. Purges
396 * all flow control entries as we will no longer receive
397 * the ACKs. Purges the ack pending entries as the
398 * transport will no longer expect the confirmations.
399 *
400 * @param ch service to disconnect from
401 */
402static void
403disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
404{
405 struct FlowControl *fcn;
406 struct AckPending *apn;
407
408 for (struct FlowControl *fc = ch->fc_head;
409 NULL != fc;
410 fc = fcn)
411 {
412 fcn = fc->next;
413 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
414 ch->fc_tail,
415 fc);
416 fc->cb (fc->cb_cls,
417 GNUNET_SYSERR);
418 GNUNET_free (fc);
419 }
420 for (struct AckPending *ap = ch->ap_head;
421 NULL != ap;
422 ap = apn)
423 {
424 apn = ap->next;
425 GNUNET_CONTAINER_DLL_remove (ch->ap_head,
426 ch->ap_tail,
427 ap);
428 GNUNET_free (ap);
429 }
430 if (NULL == ch->mq)
431 return;
432 GNUNET_MQ_destroy (ch->mq);
433 ch->mq = NULL;
434}
435
436
437/**
188 * Function called on MQ errors. 438 * Function called on MQ errors.
189 */ 439 */
190static void 440static void
@@ -192,15 +442,230 @@ error_handler (void *cls,
192 enum GNUNET_MQ_Error error) 442 enum GNUNET_MQ_Error error)
193{ 443{
194 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls; 444 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
445
446 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
447 "MQ failure, reconnecting to transport service.\n");
448 disconnect (ch);
449 /* TODO: maybe do this with exponential backoff/delay */
450 reconnect (ch);
451}
452
453
454/**
455 * Transport service acknowledged a message we gave it
456 * (with flow control enabled). Tell the communicator.
457 *
458 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
459 * @param incoming_ack the ack
460 */
461static void
462handle_incoming_ack (void *cls,
463 struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
464{
465 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
195 466
196 GNUNET_MQ_destroy (ch->mq); 467 for (struct FlowControl *fc = ch->fc_head;
197 ch->mq = NULL; 468 NULL != fc;
469 fc = fc->next)
470 {
471 if ( (fc->id == incoming_ack->fc_id) &&
472 (0 == memcmp (&fc->sender,
473 incoming_ack->sender,
474 sizeof (struct GNUNET_PeerIdentity))) )
475 {
476 GNUNET_CONTAINER_DLL_remove (ch->fc_head,
477 ch->fc_tail,
478 fc);
479 fc->cb (fc->cb_cls,
480 GNUNET_OK);
481 GNUNET_free (fc);
482 return;
483 }
484 }
485 GNUNET_break (0);
486 disconnect (ch);
198 /* TODO: maybe do this with exponential backoff/delay */ 487 /* TODO: maybe do this with exponential backoff/delay */
199 reconnect (ch); 488 reconnect (ch);
200} 489}
201 490
202 491
203/** 492/**
493 * Transport service wants us to create a queue. Check if @a cq
494 * is well-formed.
495 *
496 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
497 * @param cq the queue creation request
498 * @return #GNUNET_OK if @a smt is well-formed
499 */
500static int
501check_create_queue (void *cls,
502 struct GNUNET_TRANSPORT_CreateQueue *cq)
503{
504 uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
505 const char *addr = (const char *) &cq[1];
506
507 if ( (0 == len) ||
508 ('\0' != addr[len-1]) )
509 {
510 GNUNET_break (0);
511 return GNUNET_SYSERR;
512 }
513 return GNUNET_OK;
514}
515
516
517/**
518 * Transport service wants us to create a queue. Tell the communicator.
519 *
520 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
521 * @param cq the queue creation request
522 */
523static void
524handle_create_queue (void *cls,
525 struct GNUNET_TRANSPORT_CreateQueue *cq)
526{
527 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
528 const char *addr = (const char *) &cq[1];
529
530 if (GNUNET_OK !=
531 ch->mq_init (ch->mq_init_cls,
532 &cq->receiver,
533 addr))
534 {
535 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
536 "Address `%s' invalid for this communicator\n",
537 addr);
538 // TODO: do we notify the transport!?
539 }
540}
541
542
543/**
544 * Transport service wants us to send a message. Check if @a smt
545 * is well-formed.
546 *
547 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
548 * @param smt the transmission request
549 * @return #GNUNET_OK if @a smt is well-formed
550 */
551static int
552check_send_msg (void *cls,
553 struct GNUNET_TRANSPORT_SendMessageTo *smt)
554{
555 uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
556 const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader *) &smt[1];
557
558 if (ntohs (mh->size) != len)
559 {
560 GNUNET_break (0);
561 return GNUNET_SYSERR;
562 }
563 return GNUNET_OK;
564}
565
566
567/**
568 * Notify transport service about @a status of a message with
569 * @a mid sent to @a receiver.
570 *
571 * @param ch handle
572 * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
573 * @param receiver which peer was the receiver
574 * @param mid message that the ack is about
575 */
576static void
577send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
578 int status,
579 const struct GNUNET_PeerIdentity *receiver,
580 uint64_t mid)
581{
582 struct GNUNET_MQ_Envelope *env;
583 struct GNUNET_TRANSPORT_SendMessageToAck *ack;
584
585 env = GNUNET_MQ_msg (ack,
586 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
587 ack->status = htonl (GNUNET_OK);
588 ack->mid = ap->mid;
589 ack->receiver = ap->receiver;
590 GNUNET_MQ_send (ch->mq,
591 env);
592}
593
594
595/**
596 * Message queue transmission by communicator was successful,
597 * notify transport service.
598 *
599 * @param cls an `struct AckPending *`
600 */
601static void
602send_ack_cb (void *cls)
603{
604 struct AckPending *ap = cls;
605 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
606
607 GNUNET_CONTAINER_DLL_remove (ch->ap_head,
608 ch->ap_tail,
609 ap);
610 send_ack (ch,
611 GNUNET_OK,
612 &ap->receiver,
613 ap->mid);
614 GNUNET_free (ap);
615}
616
617
618/**
619 * Transport service wants us to send a message. Tell the communicator.
620 *
621 * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
622 * @param smt the transmission request
623 */
624static void
625handle_send_msg (void *cls,
626 struct GNUNET_TRANSPORT_SendMessageTo *smt)
627{
628 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
629 const struct GNUNET_MessageHeader *mh;
630 struct GNUNET_MQ_Envelope *env;
631 struct AckPending *ap;
632 struct QueueHandle *qh;
633
634 for (qh = ch->queue_head;NULL != qh; qh = qh->next)
635 if ( (qh->queue_id == smt->qid) &&
636 (0 == memcmp (&qh->peer,
637 &smt->target,
638 sizeof (struct GNUNET_PeerIdentity))) )
639 break;
640 if (NULL == qh)
641 {
642 /* queue is already gone, tell transport this one failed */
643 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
644 "Transmission failed, queue no longer exists.\n");
645 send_ack (ch,
646 GNUNET_NO,
647 &smt->receiver,
648 smt->mid);
649 return;
650 }
651 ap = GNUNET_new (struct AckPending);
652 ap->ch = ch;
653 ap->receiver = smt->receiver;
654 ap->mid = smt->mid;
655 GNUNET_CONTAINER_DLL_insert (ch->ap_head,
656 cp->ap_tail,
657 ap);
658 mh = (const struct GNUNET_MessageHeader *) &smt[1];
659 env = GNUNET_MQ_msg_copy (mh);
660 GNUNET_MQ_notify_sent (env,
661 &send_ack_cb,
662 ap);
663 GNUNET_MQ_send (qh->mq,
664 env);
665}
666
667
668/**
204 * (re)connect our communicator to the transport service 669 * (re)connect our communicator to the transport service
205 * 670 *
206 * @param ch handle to reconnect 671 * @param ch handle to reconnect
@@ -209,6 +674,18 @@ static void
209reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 674reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
210{ 675{
211 struct GNUNET_MQ_MessageHandler handlers[] = { 676 struct GNUNET_MQ_MessageHandler handlers[] = {
677 GNUNET_MQ_hd_fixed_size (incoming_ack,
678 GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
679 struct GNUNET_TRANSPORT_IncomingMessageAck,
680 ch),
681 GNUNET_MQ_hd_var_size (create_queue,
682 GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE,
683 struct GNUNET_TRANSPORT_CreateQueue,
684 ch),
685 GNUNET_MQ_hd_var_size (send_msg,
686 GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
687 struct GNUNET_TRANSPORT_SendMessageTo,
688 ch),
212 GNUNET_MQ_handler_end() 689 GNUNET_MQ_handler_end()
213 }; 690 };
214 691
@@ -217,10 +694,14 @@ reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
217 handlers, 694 handlers,
218 &error_handler, 695 &error_handler,
219 ch); 696 ch);
220 for (struct GNUNET_TRANSPORT_AddressIdentifier ai = ch->ai_head; 697 for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
221 NULL != ai; 698 NULL != ai;
222 ai = ai->next) 699 ai = ai->next)
223 send_add_address (ai); 700 send_add_address (ai);
701 for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
702 NULL != qh;
703 qh = qh->next)
704 send_add_queue (qh);
224} 705}
225 706
226 707
@@ -253,6 +734,12 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle
253 ch->mq_init = mq_init; 734 ch->mq_init = mq_init;
254 ch->mq_init_cls = mq_init_cls; 735 ch->mq_init_cls = mq_init_cls;
255 reconnect (ch); 736 reconnect (ch);
737 if (GNUNET_OK !=
738 GNUNET_CONFIGURATION_get_value_number (cfg,
739 name,
740 "MAX_QUEUE_LENGTH",
741 &ch->max_queue_length))
742 ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
256 if (NULL == ch->mq) 743 if (NULL == ch->mq)
257 { 744 {
258 GNUNET_free (ch); 745 GNUNET_free (ch);
@@ -270,12 +757,12 @@ GNUNET_TRANSPORT_communicator_connect (const struct GNUNET_CONFIGURATION_Handle
270void 757void
271GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch) 758GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
272{ 759{
760 disconnect (ch);
273 while (NULL != ch->ai_head) 761 while (NULL != ch->ai_head)
274 { 762 {
275 GNUNET_break (0); /* communicator forgot to remove address, warn! */ 763 GNUNET_break (0); /* communicator forgot to remove address, warn! */
276 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head); 764 GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
277 } 765 }
278 GNUNET_MQ_destroy (ch->mq);
279 GNUNET_free (ch); 766 GNUNET_free (ch);
280} 767}
281 768
@@ -297,8 +784,8 @@ GNUNET_TRANSPORT_communicator_disconnect (struct GNUNET_TRANSPORT_CommunicatorHa
297 * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was 784 * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
298 * immediately dropped due to memory limitations (communicator 785 * immediately dropped due to memory limitations (communicator
299 * should try to apply back pressure), 786 * should try to apply back pressure),
300 * #GNUNET_SYSERR if the message is ill formed and communicator 787 * #GNUNET_SYSERR if the message could not be delivered because
301 * should try to reset stream 788 * the tranport service is not yet up
302 */ 789 */
303int 790int
304GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch, 791GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
@@ -312,7 +799,33 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
312 uint16_t msize; 799 uint16_t msize;
313 800
314 if (NULL == ai->ch->mq) 801 if (NULL == ai->ch->mq)
315 return; 802 return GNUNET_SYSERR;
803 if (NULL != cb)
804 {
805 struct FlowControl *fc;
806
807 im->fc_on = htonl (GNUNET_YES);
808 im->fc_id = ai->ch->fc_gen++;
809 fc = GNUNET_new (struct FlowControl);
810 fc->sender = *sender;
811 fc->id = im->fc_id;
812 fc->cb = cb;
813 fc->cb_cls = cb_cls;
814 GNUNET_CONTAINER_DLL_insert (ch->fc_head,
815 ch->fc_tail,
816 fc);
817 }
818 else
819 {
820 if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)
821 {
822 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
823 "Dropping message: transprot is too slow, queue length %u exceeded\n",
824 ch->max_queue_length);
825 return GNUNET_NO;
826 }
827 }
828
316 msize = ntohs (msg->size); 829 msize = ntohs (msg->size);
317 env = GNUNET_MQ_msg_extra (im, 830 env = GNUNET_MQ_msg_extra (im,
318 msize, 831 msize,
@@ -320,7 +833,7 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
320 if (NULL == env) 833 if (NULL == env)
321 { 834 {
322 GNUNET_break (0); 835 GNUNET_break (0);
323 return; 836 return GNUNET_SYSERR;
324 } 837 }
325 im->sender = *sender; 838 im->sender = *sender;
326 memcpy (&im[1], 839 memcpy (&im[1],
@@ -328,19 +841,12 @@ GNUNET_TRANSPORT_communicator_receive (struct GNUNET_TRANSPORT_CommunicatorHandl
328 msize); 841 msize);
329 GNUNET_MQ_send (ai->ch->mq, 842 GNUNET_MQ_send (ai->ch->mq,
330 env); 843 env);
844 return GNUNET_OK;
331} 845}
332 846
333 847
334/* ************************* Discovery *************************** */ 848/* ************************* Discovery *************************** */
335 849
336/**
337 * Handle returned to identify the internal data structure the transport
338 * API has created to manage a message queue to a particular peer.
339 */
340struct GNUNET_TRANSPORT_QueueHandle
341{
342};
343
344 850
345/** 851/**
346 * Notify transport service that an MQ became available due to an 852 * Notify transport service that an MQ became available due to an
@@ -361,6 +867,20 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle
361 enum GNUNET_ATS_Network_Type nt, 867 enum GNUNET_ATS_Network_Type nt,
362 struct GNUNET_MQ_Handle *mq) 868 struct GNUNET_MQ_Handle *mq)
363{ 869{
870 struct GNUNET_TRANSPORT_QueueHandle *qh;
871
872 qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
873 qh->ch = ch;
874 qh->peer = *peer;
875 qh->address = GNUNET_strdup (address);
876 qh->nt = nt;
877 qh->mq = mq;
878 qh->queue_id = ch->queue_gen++;
879 GNUNET_CONTAINER_DLL_insert (ch->queue_head,
880 ch->queue_tail,
881 qh);
882 send_add_queue (qh);
883 return qh;
364} 884}
365 885
366 886
@@ -373,11 +893,18 @@ GNUNET_TRANSPORT_communicator_mq_add (struct GNUNET_TRANSPORT_CommunicatorHandle
373void 893void
374GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh) 894GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
375{ 895{
896 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
897
898 send_del_queue (qh);
899 GNUNET_CONTAINER_DLL_remove (ch->queue_head,
900 ch->queue_tail,
901 qh);
902 GNUNET_MQ_destroy (qh->mq);
903 GNUNET_free (qh->address);
904 GNUNET_free (qh);
376} 905}
377 906
378 907
379
380
381/** 908/**
382 * Notify transport service about an address that this communicator 909 * Notify transport service about an address that this communicator
383 * provides for this peer. 910 * provides for this peer.
@@ -421,10 +948,10 @@ GNUNET_TRANSPORT_communicator_address_remove (struct GNUNET_TRANSPORT_AddressIde
421 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch; 948 struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
422 949
423 send_del_address (ai); 950 send_del_address (ai);
424 GNUNET_free (ai->address);
425 GNUNET_CONTAINER_DLL_remove (ch->ai_head, 951 GNUNET_CONTAINER_DLL_remove (ch->ai_head,
426 ch->ai_tail, 952 ch->ai_tail,
427 ai); 953 ai);
954 GNUNET_free (ai->address);
428 GNUNET_free (ai); 955 GNUNET_free (ai);
429} 956}
430 957