aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/cadet/cadet_api.c1044
-rw-r--r--src/include/gnunet_cadet_service.h4
2 files changed, 425 insertions, 623 deletions
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c
index 1ca8bad9d..a9793a7f8 100644
--- a/src/cadet/cadet_api.c
+++ b/src/cadet/cadet_api.c
@@ -25,12 +25,12 @@
25 25
26#include "platform.h" 26#include "platform.h"
27#include "gnunet_util_lib.h" 27#include "gnunet_util_lib.h"
28#include "gnunet_constants.h"
28#include "gnunet_cadet_service.h" 29#include "gnunet_cadet_service.h"
29#include "cadet.h" 30#include "cadet.h"
30#include "cadet_protocol.h" 31#include "cadet_protocol.h"
31 32
32#define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__) 33#define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__)
33#define DATA_OVERHEAD sizeof(struct GNUNET_CADET_LocalData)
34 34
35/******************************************************************************/ 35/******************************************************************************/
36/************************ DATA STRUCTURES ****************************/ 36/************************ DATA STRUCTURES ****************************/
@@ -41,7 +41,6 @@
41 */ 41 */
42struct GNUNET_CADET_TransmitHandle 42struct GNUNET_CADET_TransmitHandle
43{ 43{
44
45 /** 44 /**
46 * Double Linked list 45 * Double Linked list
47 */ 46 */
@@ -58,6 +57,11 @@ struct GNUNET_CADET_TransmitHandle
58 struct GNUNET_CADET_Channel *channel; 57 struct GNUNET_CADET_Channel *channel;
59 58
60 /** 59 /**
60 * Request data task.
61 */
62 struct GNUNET_SCHEDULER_Task *request_data_task;
63
64 /**
61 * Callback to obtain the message to transmit, or NULL if we 65 * Callback to obtain the message to transmit, or NULL if we
62 * got the message in 'data'. Notice that messages built 66 * got the message in 'data'. Notice that messages built
63 * by 'notify' need to be encapsulated with information about 67 * by 'notify' need to be encapsulated with information about
@@ -71,20 +75,7 @@ struct GNUNET_CADET_TransmitHandle
71 void *notify_cls; 75 void *notify_cls;
72 76
73 /** 77 /**
74 * How long is this message valid. Once the timeout has been 78 * Size of the payload.
75 * reached, the message must no longer be sent. If this
76 * is a message with a 'notify' callback set, the 'notify'
77 * function should be called with 'buf' NULL and size 0.
78 */
79 struct GNUNET_TIME_Absolute timeout;
80
81 /**
82 * Task triggering a timeout, can be NO_TASK if the timeout is FOREVER.
83 */
84 struct GNUNET_SCHEDULER_Task * timeout_task;
85
86 /**
87 * Size of 'data' -- or the desired size of 'notify' if 'data' is NULL.
88 */ 79 */
89 size_t size; 80 size_t size;
90}; 81};
@@ -123,11 +114,10 @@ union CadetInfoCB {
123 */ 114 */
124struct GNUNET_CADET_Handle 115struct GNUNET_CADET_Handle
125{ 116{
126
127 /** 117 /**
128 * Handle to the server connection, to send messages later 118 * Message queue (if available).
129 */ 119 */
130 struct GNUNET_CLIENT_Connection *client; 120 struct GNUNET_MQ_Handle *mq;
131 121
132 /** 122 /**
133 * Set of handlers used for processing incoming messages in the channels 123 * Set of handlers used for processing incoming messages in the channels
@@ -160,11 +150,6 @@ struct GNUNET_CADET_Handle
160 GNUNET_CADET_ChannelEndHandler *cleaner; 150 GNUNET_CADET_ChannelEndHandler *cleaner;
161 151
162 /** 152 /**
163 * Handle to cancel pending transmissions in case of disconnection
164 */
165 struct GNUNET_CLIENT_TransmitHandle *th;
166
167 /**
168 * Closure for all the handlers given by the client 153 * Closure for all the handlers given by the client
169 */ 154 */
170 void *cls; 155 void *cls;
@@ -184,12 +169,6 @@ struct GNUNET_CADET_Handle
184 */ 169 */
185 CADET_ChannelNumber next_chid; 170 CADET_ChannelNumber next_chid;
186 171
187 /**
188 * Have we started the task to receive messages from the service
189 * yet? We do this after we send the 'CADET_LOCAL_CONNECT' message.
190 */
191 int in_receive;
192
193 /** 172 /**
194 * Configuration given by the client, in case of reconnection 173 * Configuration given by the client, in case of reconnection
195 */ 174 */
@@ -337,24 +316,6 @@ struct CadetMQState
337 316
338 317
339/******************************************************************************/ 318/******************************************************************************/
340/*********************** DECLARATIONS *************************/
341/******************************************************************************/
342
343/**
344 * Function called to send a message to the service.
345 * "buf" will be NULL and "size" zero if the socket was closed for writing in
346 * the meantime.
347 *
348 * @param cls closure, the cadet handle
349 * @param size number of bytes available in buf
350 * @param buf where the callee should write the connect message
351 * @return number of bytes written to buf
352 */
353static size_t
354send_callback (void *cls, size_t size, void *buf);
355
356
357/******************************************************************************/
358/*********************** AUXILIARY FUNCTIONS *************************/ 319/*********************** AUXILIARY FUNCTIONS *************************/
359/******************************************************************************/ 320/******************************************************************************/
360 321
@@ -392,29 +353,6 @@ find_port (const struct GNUNET_CADET_Handle *h,
392 return p; 353 return p;
393} 354}
394 355
395/**
396 * Check whether there is any message ready in the queue and find the size.
397 *
398 * @param h Cadet handle.
399 *
400 * @return The size of the first ready message in the queue, including overhead.
401 * 0 if there is none.
402 */
403static size_t
404message_ready_size (struct GNUNET_CADET_Handle *h)
405{
406 struct GNUNET_CADET_TransmitHandle *th;
407 struct GNUNET_CADET_Channel *ch;
408
409 for (th = h->th_head; NULL != th; th = th->next)
410 {
411 ch = th->channel;
412 if (GNUNET_NO == th_is_payload (th) || GNUNET_YES == ch->allow_send)
413 return th->size;
414 }
415 return 0;
416}
417
418 356
419/** 357/**
420 * Get the channel handler for the channel specified by id from the given handle 358 * Get the channel handler for the channel specified by id from the given handle
@@ -520,22 +458,8 @@ destroy_channel (struct GNUNET_CADET_Channel *ch, int call_cleaner)
520 * Management traffic should be ok, as clients can't cancel that. 458 * Management traffic should be ok, as clients can't cancel that.
521 * If the service crashed and we are reconnecting, it's ok. 459 * If the service crashed and we are reconnecting, it's ok.
522 */ 460 */
523 GNUNET_break (GNUNET_NO == th_is_payload (th) 461 GNUNET_break (GNUNET_NO == th_is_payload (th));
524 || GNUNET_NO == h->in_receive); 462 GNUNET_CADET_notify_transmit_ready_cancel (th);
525 GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
526
527 /* clean up request */
528 if (NULL != th->timeout_task)
529 GNUNET_SCHEDULER_cancel (th->timeout_task);
530 GNUNET_free (th);
531 }
532
533 /* if there are no more pending requests with cadet service, cancel active request */
534 /* Note: this should be unnecessary... */
535 if ((0 == message_ready_size (h)) && (NULL != h->th))
536 {
537 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
538 h->th = NULL;
539 } 463 }
540 464
541 if (0 != ch->peer) 465 if (0 != ch->peer)
@@ -546,32 +470,6 @@ destroy_channel (struct GNUNET_CADET_Channel *ch, int call_cleaner)
546 470
547 471
548/** 472/**
549 * Notify client that the transmission has timed out
550 *
551 * @param cls closure
552 */
553static void
554timeout_transmission (void *cls)
555{
556 struct GNUNET_CADET_TransmitHandle *th = cls;
557 struct GNUNET_CADET_Handle *cadet = th->channel->cadet;
558
559 th->timeout_task = NULL;
560 th->channel->packet_size = 0;
561 GNUNET_CONTAINER_DLL_remove (cadet->th_head, cadet->th_tail, th);
562 if (GNUNET_YES == th_is_payload (th))
563 GNUNET_break (0 == th->notify (th->notify_cls, 0, NULL));
564 GNUNET_free (th);
565 if ((0 == message_ready_size (cadet)) && (NULL != cadet->th))
566 {
567 /* nothing ready to transmit, no point in asking for transmission */
568 GNUNET_CLIENT_notify_transmit_ready_cancel (cadet->th);
569 cadet->th = NULL;
570 }
571}
572
573
574/**
575 * Add a transmit handle to the transmission queue and set the 473 * Add a transmit handle to the transmission queue and set the
576 * timeout if needed. 474 * timeout if needed.
577 * 475 *
@@ -583,30 +481,10 @@ add_to_queue (struct GNUNET_CADET_Handle *h,
583 struct GNUNET_CADET_TransmitHandle *th) 481 struct GNUNET_CADET_TransmitHandle *th)
584{ 482{
585 GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th); 483 GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th);
586 if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us == th->timeout.abs_value_us)
587 return;
588 th->timeout_task =
589 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
590 (th->timeout), &timeout_transmission, th);
591} 484}
592 485
593 486
594/** 487/**
595 * Auxiliary function to send an already constructed packet to the service.
596 * Takes care of creating a new queue element, copying the message and
597 * calling the tmt_rdy function if necessary.
598 *
599 * @param h cadet handle
600 * @param msg message to transmit
601 * @param channel channel this send is related to (NULL if N/A)
602 */
603static void
604send_packet (struct GNUNET_CADET_Handle *h,
605 const struct GNUNET_MessageHeader *msg,
606 struct GNUNET_CADET_Channel *channel);
607
608
609/**
610 * Send an ack on the channel to confirm the processing of a message. 488 * Send an ack on the channel to confirm the processing of a message.
611 * 489 *
612 * @param ch Channel on which to send the ACK. 490 * @param ch Channel on which to send the ACK.
@@ -614,124 +492,54 @@ send_packet (struct GNUNET_CADET_Handle *h,
614static void 492static void
615send_ack (struct GNUNET_CADET_Channel *ch) 493send_ack (struct GNUNET_CADET_Channel *ch)
616{ 494{
617 struct GNUNET_CADET_LocalAck msg; 495 struct GNUNET_CADET_LocalAck *msg;
496 struct GNUNET_MQ_Envelope *env;
497
498 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK);
618 499
619 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK on channel %X\n", ch->chid); 500 LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK on channel %X\n", ch->chid);
620 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); 501 msg->channel_id = htonl (ch->chid);
621 msg.header.size = htons (sizeof (msg)); 502 GNUNET_MQ_send (ch->cadet->mq, env);
622 msg.channel_id = htonl (ch->chid);
623 503
624 send_packet (ch->cadet, &msg.header, ch);
625 return; 504 return;
626} 505}
627 506
628 507
629 508
630/** 509/******************************************************************************/
631 * Reconnect callback: tries to reconnect again after a failer previous 510/*********************** RECEIVE HANDLERS ****************************/
632 * reconnection. 511/******************************************************************************/
633 *
634 * @param cls closure (cadet handle)
635 */
636static void
637reconnect_cbk (void *cls);
638
639
640/**
641 * Reconnect to the service, retransmit all infomation to try to restore the
642 * original state.
643 *
644 * @param h handle to the cadet
645 *
646 * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
647 */
648static int
649do_reconnect (struct GNUNET_CADET_Handle *h)
650{
651 LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
652 LOG (GNUNET_ERROR_TYPE_DEBUG, "******* RECONNECT *******\n");
653 LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
654 LOG (GNUNET_ERROR_TYPE_DEBUG, "******** on %p *******\n", h);
655 LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n");
656
657 /* disconnect */
658 if (NULL != h->th)
659 {
660 GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
661 h->th = NULL;
662 }
663 if (NULL != h->client)
664 {
665 GNUNET_CLIENT_disconnect (h->client);
666 }
667
668 /* connect again */
669 h->client = GNUNET_CLIENT_connect ("cadet", h->cfg);
670 if (h->client == NULL)
671 {
672 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
673 &reconnect_cbk, h);
674 h->reconnect_time =
675 GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
676 GNUNET_TIME_relative_multiply
677 (h->reconnect_time, 2));
678 LOG (GNUNET_ERROR_TYPE_DEBUG, "Next retry in %s\n",
679 GNUNET_STRINGS_relative_time_to_string (h->reconnect_time,
680 GNUNET_NO));
681 GNUNET_break (0);
682 return GNUNET_NO;
683 }
684 else
685 {
686 h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
687 }
688 return GNUNET_YES;
689}
690
691/**
692 * Reconnect callback: tries to reconnect again after a failer previous
693 * reconnecttion
694 *
695 * @param cls closure (cadet handle)
696 */
697static void
698reconnect_cbk (void *cls)
699{
700 struct GNUNET_CADET_Handle *h = cls;
701
702 h->reconnect_task = NULL;
703 do_reconnect (h);
704}
705 512
706 513
707/** 514/**
708 * Reconnect to the service, retransmit all infomation to try to restore the 515 * Call the @a notify callback given to #GNUNET_CADET_notify_transmit_ready to
709 * original state. 516 * request the data to send over MQ. Since MQ manages the queue, this function
517 * is scheduled immediatly after a transmit ready notification.
710 * 518 *
711 * @param h handle to the cadet 519 * @param cls Closure (transmit handle).
712 *
713 * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...)
714 */ 520 */
715static void 521static void
716reconnect (struct GNUNET_CADET_Handle *h) 522request_data (void *cls)
717{ 523{
718 struct GNUNET_CADET_Channel *ch; 524 struct GNUNET_CADET_TransmitHandle *th = cls;
525 struct GNUNET_CADET_LocalData *msg;
526 struct GNUNET_MQ_Envelope *env;
527 size_t osize;
719 528
720 LOG (GNUNET_ERROR_TYPE_DEBUG, 529 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting Data: %u bytes\n", th->size);
721 "Requested RECONNECT, destroying all channels\n"); 530 th->request_data_task = NULL;
722 h->in_receive = GNUNET_NO; 531 th->channel->packet_size = 0;
723 for (ch = h->channels_head; NULL != ch; ch = h->channels_head) 532 env = GNUNET_MQ_msg_extra (msg, th->size,
724 destroy_channel (ch, GNUNET_YES); 533 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
725 if (NULL == h->reconnect_task) 534 msg->id = htonl (th->channel->chid);
726 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, 535 osize = th->notify (th->notify_cls, th->size, &msg[1]);
727 &reconnect_cbk, h); 536 GNUNET_assert (osize == th->size);
537 th->channel->allow_send = GNUNET_NO;
538 GNUNET_MQ_send (th->channel->cadet->mq, env);
539 GNUNET_CADET_notify_transmit_ready_cancel (th);
728} 540}
729 541
730 542
731/******************************************************************************/
732/*********************** RECEIVE HANDLERS ****************************/
733/******************************************************************************/
734
735/** 543/**
736 * Process the new channel notification and add it to the channels in the handle 544 * Process the new channel notification and add it to the channels in the handle
737 * 545 *
@@ -739,9 +547,10 @@ reconnect (struct GNUNET_CADET_Handle *h)
739 * @param msg A message with the details of the new incoming channel 547 * @param msg A message with the details of the new incoming channel
740 */ 548 */
741static void 549static void
742process_channel_created (struct GNUNET_CADET_Handle *h, 550handle_channel_created (void *cls,
743 const struct GNUNET_CADET_ChannelCreateMessage *msg) 551 const struct GNUNET_CADET_ChannelCreateMessage *msg)
744{ 552{
553 struct GNUNET_CADET_Handle *h = cls;
745 struct GNUNET_CADET_Channel *ch; 554 struct GNUNET_CADET_Channel *ch;
746 struct GNUNET_CADET_Port *port; 555 struct GNUNET_CADET_Port *port;
747 const struct GNUNET_HashCode *port_number; 556 const struct GNUNET_HashCode *port_number;
@@ -777,15 +586,13 @@ process_channel_created (struct GNUNET_CADET_Handle *h,
777 } 586 }
778 else 587 else
779 { 588 {
780 struct GNUNET_CADET_ChannelDestroyMessage d_msg; 589 struct GNUNET_CADET_ChannelDestroyMessage *d_msg;
590 struct GNUNET_MQ_Envelope *env;
781 591
782 LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming channels\n"); 592 LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming channels\n");
783 593 env = GNUNET_MQ_msg (d_msg, GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY);
784 d_msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY); 594 d_msg->channel_id = msg->channel_id;
785 d_msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelDestroyMessage)); 595 GNUNET_MQ_send (h->mq, env);
786 d_msg.channel_id = msg->channel_id;
787
788 send_packet (h, &d_msg.header, NULL);
789 } 596 }
790 return; 597 return;
791} 598}
@@ -798,9 +605,10 @@ process_channel_created (struct GNUNET_CADET_Handle *h,
798 * @param msg A message with the details of the channel being destroyed 605 * @param msg A message with the details of the channel being destroyed
799 */ 606 */
800static void 607static void
801process_channel_destroy (struct GNUNET_CADET_Handle *h, 608handle_channel_destroy (void *cls,
802 const struct GNUNET_CADET_ChannelDestroyMessage *msg) 609 const struct GNUNET_CADET_ChannelDestroyMessage *msg)
803{ 610{
611 struct GNUNET_CADET_Handle *h = cls;
804 struct GNUNET_CADET_Channel *ch; 612 struct GNUNET_CADET_Channel *ch;
805 CADET_ChannelNumber chid; 613 CADET_ChannelNumber chid;
806 614
@@ -818,42 +626,66 @@ process_channel_destroy (struct GNUNET_CADET_Handle *h,
818 626
819 627
820/** 628/**
629 * Check that message received from CADET service is well-formed.
630 *
631 * @param cls the `struct GNUNET_CADET_Handle`
632 * @param message the message we got
633 * @return #GNUNET_OK if the message is well-formed,
634 * #GNUNET_SYSERR otherwise
635 */
636static int
637check_local_data (void *cls,
638 const struct GNUNET_CADET_LocalData *message)
639{
640 struct GNUNET_CADET_Handle *h = cls;
641 struct GNUNET_CADET_Channel *ch;
642 uint16_t size;
643
644 size = ntohs (message->header.size);
645 if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size)
646 {
647 GNUNET_break_op (0);
648 return GNUNET_SYSERR;
649 }
650
651 ch = retrieve_channel (h, ntohl (message->id));
652 if (NULL == ch)
653 {
654 GNUNET_break_op (0);
655 return GNUNET_SYSERR;
656 }
657
658 return GNUNET_OK;
659}
660
661
662/**
821 * Process the incoming data packets, call appropriate handlers. 663 * Process the incoming data packets, call appropriate handlers.
822 * 664 *
823 * @param h The cadet handle 665 * @param h The cadet handle
824 * @param message A message encapsulating the data 666 * @param message A message encapsulating the data
825 */ 667 */
826static void 668static void
827process_incoming_data (struct GNUNET_CADET_Handle *h, 669handle_local_data (void *cls,
828 const struct GNUNET_MessageHeader *message) 670 const struct GNUNET_CADET_LocalData *message)
829{ 671{
672 struct GNUNET_CADET_Handle *h = cls;
830 const struct GNUNET_MessageHeader *payload; 673 const struct GNUNET_MessageHeader *payload;
831 const struct GNUNET_CADET_MessageHandler *handler; 674 const struct GNUNET_CADET_MessageHandler *handler;
832 struct GNUNET_CADET_LocalData *dmsg;
833 struct GNUNET_CADET_Channel *ch; 675 struct GNUNET_CADET_Channel *ch;
834 size_t size;
835 unsigned int i; 676 unsigned int i;
836 uint16_t type; 677 uint16_t type;
837 678
838 LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n"); 679 LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n");
839 dmsg = (struct GNUNET_CADET_LocalData *) message; 680 ch = retrieve_channel (h, ntohl (message->id));
840 ch = retrieve_channel (h, ntohl (dmsg->id)); 681 GNUNET_assert (NULL != ch);
841 if (NULL == ch)
842 {
843 GNUNET_break (0);
844 return;
845 }
846 682
847 payload = (struct GNUNET_MessageHeader *) &dmsg[1]; 683 payload = (struct GNUNET_MessageHeader *) &message[1];
848 LOG (GNUNET_ERROR_TYPE_DEBUG, " %s data on channel %s [%X]\n", 684 LOG (GNUNET_ERROR_TYPE_DEBUG, " %s data on channel %s [%X]\n",
849 GC_f2s (ch->chid >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV), 685 GC_f2s (ch->chid >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV),
850 GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)), ntohl (dmsg->id)); 686 GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)), ntohl (message->id));
851
852 size = ntohs (message->size);
853 LOG (GNUNET_ERROR_TYPE_DEBUG, " %u bytes\n", size);
854 687
855 type = ntohs (payload->type); 688 type = ntohs (payload->type);
856 size = ntohs (payload->size);
857 LOG (GNUNET_ERROR_TYPE_DEBUG, " payload type %s\n", GC_m2s (type)); 689 LOG (GNUNET_ERROR_TYPE_DEBUG, " payload type %s\n", GC_m2s (type));
858 for (i = 0; i < h->n_handlers; i++) 690 for (i = 0; i < h->n_handlers; i++)
859 { 691 {
@@ -867,13 +699,13 @@ process_incoming_data (struct GNUNET_CADET_Handle *h,
867 { 699 {
868 LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n"); 700 LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n");
869 GNUNET_CADET_channel_destroy (ch); 701 GNUNET_CADET_channel_destroy (ch);
870 return; 702 break;
871 } 703 }
872 else 704 else
873 { 705 {
874 LOG (GNUNET_ERROR_TYPE_DEBUG, 706 LOG (GNUNET_ERROR_TYPE_DEBUG,
875 "callback completed successfully\n"); 707 "callback completed successfully\n");
876 return; 708 break;
877 } 709 }
878 } 710 }
879 } 711 }
@@ -888,16 +720,15 @@ process_incoming_data (struct GNUNET_CADET_Handle *h,
888 * @param message Message itself. 720 * @param message Message itself.
889 */ 721 */
890static void 722static void
891process_ack (struct GNUNET_CADET_Handle *h, 723handle_local_ack (void *cls,
892 const struct GNUNET_MessageHeader *message) 724 const struct GNUNET_CADET_LocalAck *message)
893{ 725{
894 struct GNUNET_CADET_LocalAck *msg; 726 struct GNUNET_CADET_Handle *h = cls;
895 struct GNUNET_CADET_Channel *ch; 727 struct GNUNET_CADET_Channel *ch;
896 CADET_ChannelNumber chid; 728 CADET_ChannelNumber chid;
897 729
898 LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n"); 730 LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n");
899 msg = (struct GNUNET_CADET_LocalAck *) message; 731 chid = ntohl (message->channel_id);
900 chid = ntohl (msg->channel_id);
901 ch = retrieve_channel (h, chid); 732 ch = retrieve_channel (h, chid);
902 if (NULL == ch) 733 if (NULL == ch)
903 { 734 {
@@ -906,13 +737,162 @@ process_ack (struct GNUNET_CADET_Handle *h,
906 } 737 }
907 LOG (GNUNET_ERROR_TYPE_DEBUG, " on channel %X!\n", ch->chid); 738 LOG (GNUNET_ERROR_TYPE_DEBUG, " on channel %X!\n", ch->chid);
908 ch->allow_send = GNUNET_YES; 739 ch->allow_send = GNUNET_YES;
909 if (NULL == h->th && 0 < ch->packet_size) 740 if (0 < ch->packet_size)
741 {
742 struct GNUNET_CADET_TransmitHandle *th;
743 struct GNUNET_CADET_TransmitHandle *next;
744 LOG (GNUNET_ERROR_TYPE_DEBUG,
745 " pending data, sending %U bytes!\n",
746 ch->packet_size);
747 for (th = h->th_head; NULL != th; th = next)
748 {
749 next = th->next;
750 if (th->channel == ch)
751 {
752 th->request_data_task = GNUNET_SCHEDULER_add_now (&request_data, th);
753 GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
754 break;
755 }
756 }
757 /* Complain if we got thru all th without sending anything, ch was wrong */
758 GNUNET_break (NULL != th);
759 }
760}
761
762/**
763 * Reconnect to the service, retransmit all infomation to try to restore the
764 * original state.
765 *
766 * @param h handle to the cadet
767 *
768 * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...)
769 */
770static void
771reconnect (struct GNUNET_CADET_Handle *h);
772
773
774/**
775 * Reconnect callback: tries to reconnect again after a failer previous
776 * reconnection.
777 *
778 * @param cls closure (cadet handle)
779 */
780static void
781reconnect_cbk (void *cls);
782
783
784/**
785 * Generic error handler, called with the appropriate error code and
786 * the same closure specified at the creation of the message queue.
787 * Not every message queue implementation supports an error handler.
788 *
789 * @param cls closure, a `struct GNUNET_CORE_Handle *`
790 * @param error error code
791 */
792static void
793handle_mq_error (void *cls,
794 enum GNUNET_MQ_Error error)
795{
796 struct GNUNET_CADET_Handle *h = cls;
797
798 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error);
799 GNUNET_MQ_destroy (h->mq);
800 h->mq = NULL;
801 reconnect (h);
802}
803
804
805/**
806 * Reconnect to the service, retransmit all infomation to try to restore the
807 * original state.
808 *
809 * @param h handle to the cadet
810 *
811 * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...)
812 */
813static int
814do_reconnect (struct GNUNET_CADET_Handle *h)
815{
816 GNUNET_MQ_hd_fixed_size (channel_created,
817 GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE,
818 struct GNUNET_CADET_ChannelCreateMessage);
819 GNUNET_MQ_hd_fixed_size (channel_destroy,
820 GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY,
821 struct GNUNET_CADET_ChannelDestroyMessage);
822 GNUNET_MQ_hd_var_size (local_data,
823 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
824 struct GNUNET_CADET_LocalData);
825 GNUNET_MQ_hd_fixed_size (local_ack,
826 GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
827 struct GNUNET_CADET_LocalAck);
828 // FIXME
829// GNUNET_MQ_hd_fixed_Y size (channel_destroyed,
830// GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK,
831// struct GNUNET_CADET_ChannelDestroyMessage);
832 struct GNUNET_MQ_MessageHandler handlers[] = {
833 make_channel_created_handler (h),
834 make_channel_destroy_handler (h),
835 make_local_data_handler (h),
836 make_local_ack_handler (h),
837 GNUNET_MQ_handler_end ()
838 };
839
840 LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CADET\n");
841
842 GNUNET_assert (NULL == h->mq);
843 h->mq = GNUNET_CLIENT_connecT (h->cfg,
844 "cadet",
845 handlers,
846 &handle_mq_error,
847 h);
848 if (NULL == h->mq)
910 { 849 {
911 LOG (GNUNET_ERROR_TYPE_DEBUG, " tmt rdy was NULL, requesting!\n"); 850 reconnect (h);
912 h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, ch->packet_size, 851 return GNUNET_NO;
913 GNUNET_TIME_UNIT_FOREVER_REL,
914 GNUNET_YES, &send_callback, h);
915 } 852 }
853 else
854 {
855 h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
856 }
857 return GNUNET_YES;
858}
859
860/**
861 * Reconnect callback: tries to reconnect again after a failer previous
862 * reconnecttion
863 *
864 * @param cls closure (cadet handle)
865 */
866static void
867reconnect_cbk (void *cls)
868{
869 struct GNUNET_CADET_Handle *h = cls;
870
871 h->reconnect_task = NULL;
872 do_reconnect (h);
873}
874
875
876/**
877 * Reconnect to the service, retransmit all infomation to try to restore the
878 * original state.
879 *
880 * @param h handle to the cadet
881 *
882 * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...)
883 */
884static void
885reconnect (struct GNUNET_CADET_Handle *h)
886{
887 struct GNUNET_CADET_Channel *ch;
888
889 LOG (GNUNET_ERROR_TYPE_DEBUG,
890 "Requested RECONNECT, destroying all channels\n");
891 for (ch = h->channels_head; NULL != ch; ch = h->channels_head)
892 destroy_channel (ch, GNUNET_YES);
893 if (NULL == h->reconnect_task)
894 h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
895 &reconnect_cbk, h);
916} 896}
917 897
918 898
@@ -1237,253 +1217,74 @@ clean_cls:
1237} 1217}
1238 1218
1239 1219
1240/** 1220// FIXME: add monitor messages to mq
1241 * Function to process all messages received from the service 1221// static void
1242 * 1222// msg_received (void *cls, const struct GNUNET_MessageHeader *msg)
1243 * @param cls closure 1223// {
1244 * @param msg message received, NULL on timeout or fatal error 1224// struct GNUNET_CADET_Handle *h = cls;
1245 */ 1225// uint16_t type;
1246static void 1226//
1247msg_received (void *cls, const struct GNUNET_MessageHeader *msg) 1227// if (msg == NULL)
1248{ 1228// {
1249 struct GNUNET_CADET_Handle *h = cls; 1229// LOG (GNUNET_ERROR_TYPE_DEBUG,
1250 uint16_t type; 1230// "Cadet service disconnected, reconnecting\n", h);
1251 1231// reconnect (h);
1252 if (msg == NULL) 1232// return;
1253 { 1233// }
1254 LOG (GNUNET_ERROR_TYPE_DEBUG, 1234// type = ntohs (msg->type);
1255 "Cadet service disconnected, reconnecting\n", h); 1235// LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1256 reconnect (h); 1236// LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n",
1257 return; 1237// GC_m2s (type));
1258 } 1238// switch (type)
1259 type = ntohs (msg->type); 1239// {
1260 LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); 1240// /* Notify of a new incoming channel */
1261 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n", 1241// case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
1262 GC_m2s (type)); 1242// // process_channel_created (h,
1263 switch (type) 1243// // (struct GNUNET_CADET_ChannelCreateMessage *) msg);
1264 {
1265 /* Notify of a new incoming channel */
1266 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE:
1267 process_channel_created (h,
1268 (struct GNUNET_CADET_ChannelCreateMessage *) msg);
1269 break;
1270 /* Notify of a channel disconnection */
1271 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: /* TODO separate(gid problem)*/
1272 case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK:
1273 process_channel_destroy (h,
1274 (struct GNUNET_CADET_ChannelDestroyMessage *) msg);
1275 break;
1276 case GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA:
1277 process_incoming_data (h, msg);
1278 break;
1279 case GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK:
1280 process_ack (h, msg);
1281 break;
1282// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS:
1283// process_get_channels (h, msg);
1284// break; 1244// break;
1285// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL: 1245// /* Notify of a channel disconnection */
1286// process_show_channel (h, msg); 1246// case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: /* TODO separate(gid problem)*/
1247// case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK:
1248// // process_channel_destroy (h,
1249// // (struct GNUNET_CADET_ChannelDestroyMessage *) msg);
1287// break; 1250// break;
1288 case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS: 1251// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA:
1289 process_get_peers (h, msg); 1252// // process_incoming_data (h, msg);
1290 break;
1291 case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER:
1292 process_get_peer (h, msg);
1293 break;
1294 case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS:
1295 process_get_tunnels (h, msg);
1296 break;
1297 case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL:
1298 process_get_tunnel (h, msg);
1299 break;
1300// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1301// process_show_channel (h, msg);
1302// break; 1253// break;
1303 default: 1254// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK:
1304 /* We shouldn't get any other packages, log and ignore */ 1255// // process_ack (h, msg);
1305 LOG (GNUNET_ERROR_TYPE_WARNING, 1256// break;
1306 "unsolicited message form service (type %s)\n", 1257// // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS:
1307 GC_m2s (ntohs (msg->type))); 1258// // process_get_channels (h, msg);
1308 } 1259// // break;
1309 LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n"); 1260// // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1310 if (GNUNET_YES == h->in_receive) 1261// // process_show_channel (h, msg);
1311 { 1262// // break;
1312 GNUNET_CLIENT_receive (h->client, &msg_received, h, 1263// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS:
1313 GNUNET_TIME_UNIT_FOREVER_REL); 1264// process_get_peers (h, msg);
1314 } 1265// break;
1315 else 1266// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER:
1316 { 1267// process_get_peer (h, msg);
1317 LOG (GNUNET_ERROR_TYPE_DEBUG, 1268// break;
1318 "in receive off, not calling CLIENT_receive\n"); 1269// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS:
1319 } 1270// process_get_tunnels (h, msg);
1320} 1271// break;
1321 1272// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL:
1322 1273// process_get_tunnel (h, msg);
1323/******************************************************************************/ 1274// break;
1324/************************ SEND FUNCTIONS ****************************/ 1275// // case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL:
1325/******************************************************************************/ 1276// // process_show_channel (h, msg);
1326 1277// // break;
1327/** 1278// default:
1328 * Function called to send a message to the service. 1279// /* We shouldn't get any other packages, log and ignore */
1329 * "buf" will be NULL and "size" zero if the socket was closed for writing in 1280// LOG (GNUNET_ERROR_TYPE_WARNING,
1330 * the meantime. 1281// "unsolicited message form service (type %s)\n",
1331 * 1282// GC_m2s (ntohs (msg->type)));
1332 * @param cls closure, the cadet handle 1283// }
1333 * @param size number of bytes available in buf 1284// LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n");
1334 * @param buf where the callee should write the connect message 1285// GNUNET_CLIENT_receive (h->client, &msg_received, h,
1335 * @return number of bytes written to buf 1286// GNUNET_TIME_UNIT_FOREVER_REL);
1336 */ 1287// }
1337static size_t
1338send_callback (void *cls, size_t size, void *buf)
1339{
1340 struct GNUNET_CADET_Handle *h = cls;
1341 struct GNUNET_CADET_TransmitHandle *th;
1342 struct GNUNET_CADET_TransmitHandle *next;
1343 struct GNUNET_CADET_Channel *ch;
1344 char *cbuf = buf;
1345 size_t tsize;
1346 size_t psize;
1347 size_t nsize;
1348
1349 LOG (GNUNET_ERROR_TYPE_DEBUG, "\n");
1350 LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send callback, buffer %u\n", size);
1351 if ((0 == size) || (NULL == buf))
1352 {
1353 LOG (GNUNET_ERROR_TYPE_DEBUG, "# Received NULL send callback on %p\n", h);
1354 reconnect (h);
1355 h->th = NULL;
1356 return 0;
1357 }
1358 tsize = 0;
1359 next = h->th_head;
1360 nsize = message_ready_size (h);
1361 while ((NULL != (th = next)) && (0 < nsize) && (size >= nsize))
1362 {
1363 ch = th->channel;
1364 if (GNUNET_YES == th_is_payload (th))
1365 {
1366 struct GNUNET_CADET_LocalData *dmsg;
1367 struct GNUNET_MessageHeader *mh;
1368
1369 LOG (GNUNET_ERROR_TYPE_DEBUG, "# payload, %u bytes on %X (%p)\n",
1370 th->size, ch->chid, ch);
1371 if (GNUNET_NO == ch->allow_send)
1372 {
1373 /* This channel is not ready to transmit yet, Try the next message */
1374 next = th->next;
1375 continue;
1376 }
1377 ch->packet_size = 0;
1378 GNUNET_assert (size >= th->size);
1379 dmsg = (struct GNUNET_CADET_LocalData *) cbuf;
1380 mh = (struct GNUNET_MessageHeader *) &dmsg[1];
1381 psize = th->notify (th->notify_cls, size - DATA_OVERHEAD, mh);
1382
1383 if (psize > 0)
1384 {
1385 GNUNET_assert (sizeof (struct GNUNET_MessageHeader) <= psize);
1386 psize += DATA_OVERHEAD;
1387 GNUNET_assert (size >= psize);
1388 dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA);
1389 dmsg->header.size = htons (psize);
1390 dmsg->id = htonl (ch->chid);
1391 LOG (GNUNET_ERROR_TYPE_DEBUG, "# sending, type %s\n",
1392 GC_m2s (ntohs (mh->type)));
1393 ch->allow_send = GNUNET_NO;
1394 }
1395 else
1396 {
1397 LOG (GNUNET_ERROR_TYPE_DEBUG,
1398 "# callback returned size 0, "
1399 "application canceled transmission\n");
1400 }
1401 }
1402 else
1403 {
1404 const struct GNUNET_MessageHeader *mh;
1405
1406 mh = (const struct GNUNET_MessageHeader *) &th[1];
1407 LOG (GNUNET_ERROR_TYPE_DEBUG, "# cadet internal traffic, type %s\n",
1408 GC_m2s (ntohs (mh->type)));
1409 GNUNET_memcpy (cbuf, &th[1], th->size);
1410 psize = th->size;
1411 }
1412 GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= psize);
1413 if (th->timeout_task != NULL)
1414 GNUNET_SCHEDULER_cancel (th->timeout_task);
1415 next = th->next;
1416 GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1417 GNUNET_free (th);
1418 nsize = message_ready_size (h);
1419 cbuf += psize;
1420 size -= psize;
1421 tsize += psize;
1422 }
1423 LOG (GNUNET_ERROR_TYPE_DEBUG, "# total size: %u\n", tsize);
1424 h->th = NULL;
1425 size = message_ready_size (h);
1426 if (0 != size)
1427 {
1428 LOG (GNUNET_ERROR_TYPE_DEBUG, "# next size: %u\n", size);
1429 h->th =
1430 GNUNET_CLIENT_notify_transmit_ready (h->client, size,
1431 GNUNET_TIME_UNIT_FOREVER_REL,
1432 GNUNET_YES, &send_callback, h);
1433 }
1434 else
1435 {
1436 if (NULL != h->th_head)
1437 LOG (GNUNET_ERROR_TYPE_DEBUG, "# nothing ready to transmit\n");
1438 else
1439 LOG (GNUNET_ERROR_TYPE_DEBUG, "# nothing left to transmit\n");
1440 }
1441 if (GNUNET_NO == h->in_receive)
1442 {
1443 LOG (GNUNET_ERROR_TYPE_DEBUG, "# start receiving from service\n");
1444 h->in_receive = GNUNET_YES;
1445 GNUNET_CLIENT_receive (h->client, &msg_received, h,
1446 GNUNET_TIME_UNIT_FOREVER_REL);
1447 }
1448 LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send callback() END\n");
1449 return tsize;
1450}
1451
1452
1453/**
1454 * Auxiliary function to send an already constructed packet to the service.
1455 * Takes care of creating a new queue element, copying the message and
1456 * calling the tmt_rdy function if necessary.
1457 *
1458 * @param h cadet handle
1459 * @param msg message to transmit
1460 * @param channel channel this send is related to (NULL if N/A)
1461 */
1462static void
1463send_packet (struct GNUNET_CADET_Handle *h,
1464 const struct GNUNET_MessageHeader *msg,
1465 struct GNUNET_CADET_Channel *channel)
1466{
1467 struct GNUNET_CADET_TransmitHandle *th;
1468 size_t msize;
1469
1470 LOG (GNUNET_ERROR_TYPE_DEBUG, " Sending message to service: %s\n",
1471 GC_m2s(ntohs(msg->type)));
1472 msize = ntohs (msg->size);
1473 th = GNUNET_malloc (sizeof (struct GNUNET_CADET_TransmitHandle) + msize);
1474 th->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
1475 th->size = msize;
1476 th->channel = channel;
1477 GNUNET_memcpy (&th[1], msg, msize);
1478 add_to_queue (h, th);
1479 if (NULL != h->th)
1480 return;
1481 LOG (GNUNET_ERROR_TYPE_DEBUG, " calling ntfy tmt rdy for %u bytes\n", msize);
1482 h->th =
1483 GNUNET_CLIENT_notify_transmit_ready (h->client, msize,
1484 GNUNET_TIME_UNIT_FOREVER_REL,
1485 GNUNET_YES, &send_callback, h);
1486}
1487 1288
1488 1289
1489/******************************************************************************/ 1290/******************************************************************************/
@@ -1503,11 +1304,11 @@ GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls,
1503 h->cfg = cfg; 1304 h->cfg = cfg;
1504 h->cleaner = cleaner; 1305 h->cleaner = cleaner;
1505 h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES); 1306 h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
1506 h->client = GNUNET_CLIENT_connect ("cadet", cfg); 1307 do_reconnect (h);
1507 if (h->client == NULL) 1308 if (h->mq == NULL)
1508 { 1309 {
1509 GNUNET_break (0); 1310 GNUNET_break (0);
1510 GNUNET_free (h); 1311 GNUNET_CADET_disconnect (h);
1511 return NULL; 1312 return NULL;
1512 } 1313 }
1513 h->cls = cls; 1314 h->cls = cls;
@@ -1574,19 +1375,13 @@ GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle)
1574 GC_m2s (ntohs(msg->type))); 1375 GC_m2s (ntohs(msg->type)));
1575 } 1376 }
1576 1377
1577 GNUNET_CONTAINER_DLL_remove (handle->th_head, handle->th_tail, th); 1378 GNUNET_CADET_notify_transmit_ready_cancel (th);
1578 GNUNET_free (th);
1579 } 1379 }
1580 1380
1581 if (NULL != handle->th) 1381 if (NULL != handle->mq)
1582 {
1583 GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
1584 handle->th = NULL;
1585 }
1586 if (NULL != handle->client)
1587 { 1382 {
1588 GNUNET_CLIENT_disconnect (handle->client); 1383 GNUNET_MQ_destroy (handle->mq);
1589 handle->client = NULL; 1384 handle->mq = NULL;
1590 } 1385 }
1591 if (NULL != handle->reconnect_task) 1386 if (NULL != handle->reconnect_task)
1592 { 1387 {
@@ -1617,8 +1412,9 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1617 new_channel, 1412 new_channel,
1618 void *new_channel_cls) 1413 void *new_channel_cls)
1619{ 1414{
1415 struct GNUNET_CADET_PortMessage *msg;
1416 struct GNUNET_MQ_Envelope *env;
1620 struct GNUNET_CADET_Port *p; 1417 struct GNUNET_CADET_Port *p;
1621 struct GNUNET_CADET_PortMessage msg;
1622 1418
1623 GNUNET_assert (NULL != new_channel); 1419 GNUNET_assert (NULL != new_channel);
1624 p = GNUNET_new (struct GNUNET_CADET_Port); 1420 p = GNUNET_new (struct GNUNET_CADET_Port);
@@ -1633,10 +1429,9 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1633 p, 1429 p,
1634 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); 1430 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1635 1431
1636 msg.header.size = htons (sizeof (msg)); 1432 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
1637 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); 1433 msg->port = *p->hash;
1638 msg.port = *p->hash; 1434 GNUNET_MQ_send (h->mq, env);
1639 send_packet (p->cadet, &msg.header, NULL);
1640 1435
1641 return p; 1436 return p;
1642} 1437}
@@ -1650,12 +1445,13 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
1650void 1445void
1651GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p) 1446GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p)
1652{ 1447{
1653 struct GNUNET_CADET_PortMessage msg; 1448 struct GNUNET_CADET_PortMessage *msg;
1449 struct GNUNET_MQ_Envelope *env;
1654 1450
1655 msg.header.size = htons (sizeof (msg)); 1451 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
1656 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE); 1452
1657 msg.port = *p->hash; 1453 msg->port = *p->hash;
1658 send_packet (p->cadet, &msg.header, NULL); 1454 GNUNET_MQ_send (p->cadet->mq, env);
1659 GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, p->hash, p); 1455 GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, p->hash, p);
1660 GNUNET_free (p->hash); 1456 GNUNET_free (p->hash);
1661 GNUNET_free (p); 1457 GNUNET_free (p);
@@ -1684,8 +1480,9 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1684 const struct GNUNET_HashCode *port, 1480 const struct GNUNET_HashCode *port,
1685 enum GNUNET_CADET_ChannelOption options) 1481 enum GNUNET_CADET_ChannelOption options)
1686{ 1482{
1483 struct GNUNET_CADET_ChannelCreateMessage *msg;
1484 struct GNUNET_MQ_Envelope *env;
1687 struct GNUNET_CADET_Channel *ch; 1485 struct GNUNET_CADET_Channel *ch;
1688 struct GNUNET_CADET_ChannelCreateMessage msg;
1689 1486
1690 LOG (GNUNET_ERROR_TYPE_DEBUG, 1487 LOG (GNUNET_ERROR_TYPE_DEBUG,
1691 "Creating new channel to %s:%u\n", 1488 "Creating new channel to %s:%u\n",
@@ -1696,14 +1493,13 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
1696 ch->ctx = channel_ctx; 1493 ch->ctx = channel_ctx;
1697 ch->peer = GNUNET_PEER_intern (peer); 1494 ch->peer = GNUNET_PEER_intern (peer);
1698 1495
1699 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE); 1496 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE);
1700 msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelCreateMessage)); 1497 msg->channel_id = htonl (ch->chid);
1701 msg.channel_id = htonl (ch->chid); 1498 msg->port = *port;
1702 msg.port = *port; 1499 msg->peer = *peer;
1703 msg.peer = *peer; 1500 msg->opt = htonl (options);
1704 msg.opt = htonl (options);
1705 ch->allow_send = GNUNET_NO; 1501 ch->allow_send = GNUNET_NO;
1706 send_packet (h, &msg.header, ch); 1502 GNUNET_MQ_send (h->mq, env);
1707 1503
1708 return ch; 1504 return ch;
1709} 1505}
@@ -1713,39 +1509,40 @@ void
1713GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel) 1509GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
1714{ 1510{
1715 struct GNUNET_CADET_Handle *h; 1511 struct GNUNET_CADET_Handle *h;
1716 struct GNUNET_CADET_ChannelDestroyMessage msg; 1512 struct GNUNET_CADET_ChannelDestroyMessage *msg;
1513 struct GNUNET_MQ_Envelope *env;
1717 struct GNUNET_CADET_TransmitHandle *th; 1514 struct GNUNET_CADET_TransmitHandle *th;
1515 struct GNUNET_CADET_TransmitHandle *next;
1718 1516
1719 LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel\n"); 1517 LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel\n");
1720 h = channel->cadet; 1518 h = channel->cadet;
1721 1519 for (th = h->th_head; th != NULL; th = next)
1722 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY);
1723 msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelDestroyMessage));
1724 msg.channel_id = htonl (channel->chid);
1725 th = h->th_head;
1726 while (th != NULL)
1727 { 1520 {
1728 struct GNUNET_CADET_TransmitHandle *aux; 1521 next = th->next;
1729 if (th->channel == channel) 1522 if (th->channel == channel)
1730 { 1523 {
1731 aux = th->next; 1524 GNUNET_break (0);
1732 if (GNUNET_YES == th_is_payload (th)) 1525 if (GNUNET_YES == th_is_payload (th))
1733 { 1526 {
1734 /* applications should cancel before destroying channel */ 1527 /* applications should cancel before destroying channel */
1735 GNUNET_break (0);
1736 LOG (GNUNET_ERROR_TYPE_WARNING, 1528 LOG (GNUNET_ERROR_TYPE_WARNING,
1737 "Channel destroyed without cancelling transmission requests\n"); 1529 "Channel destroyed without cancelling transmission requests\n");
1738 th->notify (th->notify_cls, 0, NULL); 1530 th->notify (th->notify_cls, 0, NULL);
1739 } 1531 }
1532 else
1533 {
1534 LOG (GNUNET_ERROR_TYPE_WARNING, "no meta-traffic should be queued\n");
1535 }
1536 GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1740 GNUNET_CADET_notify_transmit_ready_cancel (th); 1537 GNUNET_CADET_notify_transmit_ready_cancel (th);
1741 th = aux;
1742 } 1538 }
1743 else
1744 th = th->next;
1745 } 1539 }
1746 1540
1541 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY);
1542 msg->channel_id = htonl (channel->chid);
1543 GNUNET_MQ_send (h->mq, env);
1544
1747 destroy_channel (channel, GNUNET_YES); 1545 destroy_channel (channel, GNUNET_YES);
1748 send_packet (h, &msg.header, NULL);
1749} 1546}
1750 1547
1751 1548
@@ -1787,8 +1584,10 @@ GNUNET_CADET_channel_get_info (struct GNUNET_CADET_Channel *channel,
1787 return ret; 1584 return ret;
1788} 1585}
1789 1586
1587
1790struct GNUNET_CADET_TransmitHandle * 1588struct GNUNET_CADET_TransmitHandle *
1791GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel, int cork, 1589GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel,
1590 int cork,
1792 struct GNUNET_TIME_Relative maxdelay, 1591 struct GNUNET_TIME_Relative maxdelay,
1793 size_t notify_size, 1592 size_t notify_size,
1794 GNUNET_CONNECTION_TransmitReadyNotify notify, 1593 GNUNET_CONNECTION_TransmitReadyNotify notify,
@@ -1808,25 +1607,25 @@ GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel, int co
1808 LOG (GNUNET_ERROR_TYPE_DEBUG, " payload size %u\n", notify_size); 1607 LOG (GNUNET_ERROR_TYPE_DEBUG, " payload size %u\n", notify_size);
1809 GNUNET_assert (NULL != notify); 1608 GNUNET_assert (NULL != notify);
1810 GNUNET_assert (0 == channel->packet_size); // Only one data packet allowed 1609 GNUNET_assert (0 == channel->packet_size); // Only one data packet allowed
1610
1611 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != maxdelay.rel_value_us)
1612 {
1613 LOG (GNUNET_ERROR_TYPE_WARNING,
1614 "CADET transmit ready timeout is deprected (has no effect)\n");
1615 }
1616
1811 th = GNUNET_new (struct GNUNET_CADET_TransmitHandle); 1617 th = GNUNET_new (struct GNUNET_CADET_TransmitHandle);
1812 th->channel = channel; 1618 th->channel = channel;
1813 th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); 1619 th->size = notify_size;
1814 th->size = notify_size + DATA_OVERHEAD;
1815 channel->packet_size = th->size; 1620 channel->packet_size = th->size;
1816 LOG (GNUNET_ERROR_TYPE_DEBUG, " total size %u\n", th->size); 1621 LOG (GNUNET_ERROR_TYPE_DEBUG, " total size %u\n", th->size);
1817 th->notify = notify; 1622 th->notify = notify;
1818 th->notify_cls = notify_cls; 1623 th->notify_cls = notify_cls;
1819 add_to_queue (channel->cadet, th); 1624 if (GNUNET_YES == channel->allow_send)
1820 if (NULL != channel->cadet->th) 1625 th->request_data_task = GNUNET_SCHEDULER_add_now (&request_data, th);
1821 return th; 1626 else
1822 if (GNUNET_NO == channel->allow_send) 1627 add_to_queue (channel->cadet, th);
1823 return th; 1628
1824 LOG (GNUNET_ERROR_TYPE_DEBUG, " call client notify tmt rdy\n");
1825 channel->cadet->th =
1826 GNUNET_CLIENT_notify_transmit_ready (channel->cadet->client, th->size,
1827 GNUNET_TIME_UNIT_FOREVER_REL,
1828 GNUNET_YES, &send_callback,
1829 channel->cadet);
1830 LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY END\n"); 1629 LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY END\n");
1831 return th; 1630 return th;
1832} 1631}
@@ -1835,25 +1634,20 @@ GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel, int co
1835void 1634void
1836GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th) 1635GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th)
1837{ 1636{
1838 struct GNUNET_CADET_Handle *cadet; 1637 if (NULL != th->request_data_task)
1638 {
1639 GNUNET_SCHEDULER_cancel (th->request_data_task);
1640 }
1641 th->request_data_task = NULL;
1839 1642
1840 LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY CANCEL\n"); 1643 /* It might or might not have been queued (rarely not), but check anyway. */
1841 LOG (GNUNET_ERROR_TYPE_DEBUG, " on channel %X (%p)\n", 1644 if (NULL != th->next)
1842 th->channel->chid, th->channel);
1843 LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u bytes\n", th->size);
1844 th->channel->packet_size = 0;
1845 cadet = th->channel->cadet;
1846 if (th->timeout_task != NULL)
1847 GNUNET_SCHEDULER_cancel (th->timeout_task);
1848 GNUNET_CONTAINER_DLL_remove (cadet->th_head, cadet->th_tail, th);
1849 GNUNET_free (th);
1850 if ((0 == message_ready_size (cadet)) && (NULL != cadet->th))
1851 { 1645 {
1852 /* queue empty, no point in asking for transmission */ 1646 struct GNUNET_CADET_Handle *h;
1853 GNUNET_CLIENT_notify_transmit_ready_cancel (cadet->th); 1647 h = th->channel->cadet;
1854 cadet->th = NULL; 1648 GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th);
1855 } 1649 }
1856 LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY CANCEL END\n"); 1650 GNUNET_free (th);
1857} 1651}
1858 1652
1859 1653
@@ -1867,11 +1661,15 @@ GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel)
1867static void 1661static void
1868send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type) 1662send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type)
1869{ 1663{
1870 struct GNUNET_MessageHeader msg; 1664 struct GNUNET_MessageHeader *msg;
1665 struct GNUNET_MQ_Envelope *env;
1666
1667 env = GNUNET_MQ_msg (msg, type);
1668 GNUNET_MQ_send (h->mq, env);
1871 1669
1872 msg.size = htons (sizeof (msg)); 1670 LOG (GNUNET_ERROR_TYPE_DEBUG,
1873 msg.type = htons (type); 1671 " Sending %s message to service\n",
1874 send_packet (h, &msg, NULL); 1672 GC_m2s(type));
1875} 1673}
1876 1674
1877 1675
@@ -1958,11 +1756,12 @@ GNUNET_CADET_get_peers_cancel (struct GNUNET_CADET_Handle *h)
1958 */ 1756 */
1959int 1757int
1960GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h, 1758GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h,
1961 const struct GNUNET_PeerIdentity *id, 1759 const struct GNUNET_PeerIdentity *id,
1962 GNUNET_CADET_PeerCB callback, 1760 GNUNET_CADET_PeerCB callback,
1963 void *callback_cls) 1761 void *callback_cls)
1964{ 1762{
1965 struct GNUNET_CADET_LocalInfo msg; 1763 struct GNUNET_CADET_LocalInfo *msg;
1764 struct GNUNET_MQ_Envelope *env;
1966 1765
1967 if (NULL != h->info_cb.peer_cb) 1766 if (NULL != h->info_cb.peer_cb)
1968 { 1767 {
@@ -1970,11 +1769,10 @@ GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h,
1970 return GNUNET_SYSERR; 1769 return GNUNET_SYSERR;
1971 } 1770 }
1972 1771
1973 memset (&msg, 0, sizeof (msg)); 1772 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER);
1974 msg.header.size = htons (sizeof (msg)); 1773 msg->peer = *id;
1975 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER); 1774 GNUNET_MQ_send (h->mq, env);
1976 msg.peer = *id; 1775
1977 send_packet (h, &msg.header, NULL);
1978 h->info_cb.peer_cb = callback; 1776 h->info_cb.peer_cb = callback;
1979 h->info_cls = callback_cls; 1777 h->info_cls = callback_cls;
1980 return GNUNET_OK; 1778 return GNUNET_OK;
@@ -2052,7 +1850,8 @@ GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h,
2052 GNUNET_CADET_TunnelCB callback, 1850 GNUNET_CADET_TunnelCB callback,
2053 void *callback_cls) 1851 void *callback_cls)
2054{ 1852{
2055 struct GNUNET_CADET_LocalInfo msg; 1853 struct GNUNET_CADET_LocalInfo *msg;
1854 struct GNUNET_MQ_Envelope *env;
2056 1855
2057 if (NULL != h->info_cb.tunnel_cb) 1856 if (NULL != h->info_cb.tunnel_cb)
2058 { 1857 {
@@ -2060,11 +1859,10 @@ GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h,
2060 return GNUNET_SYSERR; 1859 return GNUNET_SYSERR;
2061 } 1860 }
2062 1861
2063 memset (&msg, 0, sizeof (msg)); 1862 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL);
2064 msg.header.size = htons (sizeof (msg)); 1863 msg->peer = *id;
2065 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL); 1864 GNUNET_MQ_send (h->mq, env);
2066 msg.peer = *id; 1865
2067 send_packet (h, &msg.header, NULL);
2068 h->info_cb.tunnel_cb = callback; 1866 h->info_cb.tunnel_cb = callback;
2069 h->info_cls = callback_cls; 1867 h->info_cls = callback_cls;
2070 return GNUNET_OK; 1868 return GNUNET_OK;
@@ -2087,12 +1885,13 @@ GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h,
2087 */ 1885 */
2088int 1886int
2089GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h, 1887GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h,
2090 struct GNUNET_PeerIdentity *initiator, 1888 struct GNUNET_PeerIdentity *initiator,
2091 unsigned int channel_number, 1889 unsigned int channel_number,
2092 GNUNET_CADET_ChannelCB callback, 1890 GNUNET_CADET_ChannelCB callback,
2093 void *callback_cls) 1891 void *callback_cls)
2094{ 1892{
2095 struct GNUNET_CADET_LocalInfo msg; 1893 struct GNUNET_CADET_LocalInfo *msg;
1894 struct GNUNET_MQ_Envelope *env;
2096 1895
2097 if (NULL != h->info_cb.channel_cb) 1896 if (NULL != h->info_cb.channel_cb)
2098 { 1897 {
@@ -2100,12 +1899,11 @@ GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h,
2100 return GNUNET_SYSERR; 1899 return GNUNET_SYSERR;
2101 } 1900 }
2102 1901
2103 msg.header.size = htons (sizeof (msg)); 1902 env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL);
2104 msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL); 1903 msg->peer = *initiator;
2105 msg.peer = *initiator; 1904 msg->channel_id = htonl (channel_number);
2106 msg.channel_id = htonl (channel_number); 1905 GNUNET_MQ_send (h->mq, env);
2107// msg.reserved = 0; 1906
2108 send_packet (h, &msg.header, NULL);
2109 h->info_cb.channel_cb = callback; 1907 h->info_cb.channel_cb = callback;
2110 h->info_cls = callback_cls; 1908 h->info_cls = callback_cls;
2111 return GNUNET_OK; 1909 return GNUNET_OK;
diff --git a/src/include/gnunet_cadet_service.h b/src/include/gnunet_cadet_service.h
index 1c440fc46..c32311643 100644
--- a/src/include/gnunet_cadet_service.h
+++ b/src/include/gnunet_cadet_service.h
@@ -372,6 +372,10 @@ GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel,
372/** 372/**
373 * Cancel the specified transmission-ready notification. 373 * Cancel the specified transmission-ready notification.
374 * 374 *
375 * #DEPRECATED
376 * Since soon we will send immediately with mq (via request_data),
377 * there will be time or need to cancel a "pending" transmission.
378 *
375 * @param th handle that was returned by "notify_transmit_ready". 379 * @param th handle that was returned by "notify_transmit_ready".
376 */ 380 */
377void 381void